Nilorea Library
C utilities for networking, threading, graphics
Loading...
Searching...
No Matches
n_thread_pool.c
Go to the documentation of this file.
1/*
2 * Nilorea Library
3 * Copyright (C) 2005-2026 Castagnier Mickael
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 */
18
27#include <unistd.h>
28#include "nilorea/n_common.h"
29#include "nilorea/n_log.h"
31#include "nilorea/n_time.h"
32
33#ifdef __linux__
34#include <sys/sysinfo.h>
35#endif
36#include <pthread.h>
37#include <string.h>
38#include <errno.h>
39
44long int get_nb_cpu_cores() {
45 long int nb_procs = 0;
46#ifdef __windows__
47 SYSTEM_INFO sysinfo;
48 GetSystemInfo(&sysinfo);
49 nb_procs = (long int)sysinfo.dwNumberOfProcessors;
50#else
51 nb_procs = sysconf(_SC_NPROCESSORS_ONLN);
52#endif
53 return nb_procs;
54}
55
62 THREAD_POOL_NODE* node = (THREAD_POOL_NODE*)param;
63
64 if (!node) {
65 n_log(LOG_ERR, "Thread fatal error, no valid payload found, exiting thread function !");
66 pthread_exit(NULL);
67 return NULL;
68 }
69
70 n_log(LOG_DEBUG, "Thread %ld started", node->thr);
71
72 int thread_state = 0;
73 do {
74 n_log(LOG_DEBUG, "Thread pool processing func waiting");
75
76 // note: direct procs will automatically post th_start
77 sem_wait(&node->th_start);
78
79 pthread_mutex_lock(&node->lock);
80 thread_state = node->thread_state;
81 pthread_mutex_unlock(&node->lock);
82
83 if (thread_state == RUNNING_THREAD) {
84 n_log(LOG_INFO, "Thread pool running proc %p", node->func);
85 void* (*func_to_run)(void* param) = NULL;
86 void* param_to_run = NULL;
87 pthread_mutex_lock(&node->lock);
88 node->state = RUNNING_PROC;
89 func_to_run = node->func;
90 param_to_run = node->param;
91 pthread_mutex_unlock(&node->lock);
92
93 if (func_to_run) {
94 func_to_run(param_to_run);
95 }
96 n_log(LOG_INFO, "Thread pool end proc %p", func_to_run);
97
98 pthread_mutex_lock(&node->lock);
99 node->func = NULL;
100 node->param = NULL;
101 node->state = IDLE_PROC;
102 int type = node->type;
103 node->type = -1;
104 // NORMAL_PROC or DIRECT_PROC do not need to post th_end
105 if (type & SYNCED_PROC)
106 sem_post(&node->th_end);
107 pthread_mutex_unlock(&node->lock);
108
110 }
111 } while (thread_state != EXITING_THREAD);
112
113 n_log(LOG_DEBUG, "Thread %ld exiting...", node->thr);
114
115 pthread_mutex_lock(&node->lock);
117 pthread_mutex_unlock(&node->lock);
118
119 n_log(LOG_DEBUG, "Thread %ld exited", node->thr);
120
121 pthread_exit(NULL);
122
123 return NULL;
124} /* thread_pool_processing_function */
125
132THREAD_POOL* new_thread_pool(size_t nbmaxthr, size_t nb_max_waiting) {
133 THREAD_POOL* thread_pool = NULL;
134
136 if (!thread_pool)
137 return NULL;
138
139 thread_pool->max_threads = nbmaxthr;
140 thread_pool->nb_max_waiting = nb_max_waiting;
142
143 thread_pool->thread_list = (THREAD_POOL_NODE**)malloc(nbmaxthr * sizeof(THREAD_POOL_NODE*));
144 if (!thread_pool->thread_list) {
146 return NULL;
147 }
148
151 n_log(LOG_ERR, "Unable to initialize wait list");
154 return NULL;
155 }
156
157 pthread_mutex_init(&thread_pool->lock, NULL);
158
159 if (sem_init(&thread_pool->nb_tasks, 0, 1) == -1) {
160 int error = errno;
161 n_log(LOG_ERR, "sem_init failed : %s on &thread_pool -> nb_tasks", strerror(error));
163 pthread_mutex_destroy(&thread_pool->lock);
166 return NULL;
167 }
168
169 size_t it = 0;
170 for (it = 0; it < nbmaxthr; it++) {
172 thread_pool->thread_list[it]->type = -1;
176
177 if (sem_init(&thread_pool->thread_list[it]->th_start, 0, 0) == -1) {
178 int error = errno;
179 n_log(LOG_ERR, "sem_init failed : %s on &thread_pool -> thread_list[ %zu ] -> th_start", strerror(error), it);
181 goto cleanup_error;
182 }
183 if (sem_init(&thread_pool->thread_list[it]->th_end, 0, 0) == -1) {
184 int error = errno;
185 n_log(LOG_ERR, "sem_init failed : %s on &thread_pool -> thread_list[ %zu] -> th_end", strerror(error), it);
186 sem_destroy(&thread_pool->thread_list[it]->th_start);
188 goto cleanup_error;
189 }
190
191 thread_pool->thread_list[it]->func = NULL;
192 thread_pool->thread_list[it]->param = NULL;
193
194 pthread_mutex_init(&thread_pool->thread_list[it]->lock, NULL);
195
196 if (pthread_create(&thread_pool->thread_list[it]->thr, NULL, thread_pool_processing_function, (void*)thread_pool->thread_list[it]) != 0) {
197 n_log(LOG_ERR, "pthread_create failed : %s for it %zu", strerror(errno), it);
198 pthread_mutex_destroy(&thread_pool->thread_list[it]->lock);
199 sem_destroy(&thread_pool->thread_list[it]->th_start);
200 sem_destroy(&thread_pool->thread_list[it]->th_end);
202 goto cleanup_error;
203 }
204 }
205 return thread_pool;
206
207cleanup_error:
208 for (size_t j = 0; j < it; j++) {
209 pthread_mutex_lock(&thread_pool->thread_list[j]->lock);
211 sem_post(&thread_pool->thread_list[j]->th_start);
212 pthread_mutex_unlock(&thread_pool->thread_list[j]->lock);
213 pthread_join(thread_pool->thread_list[j]->thr, NULL);
214 pthread_mutex_destroy(&thread_pool->thread_list[j]->lock);
215 sem_destroy(&thread_pool->thread_list[j]->th_start);
216 sem_destroy(&thread_pool->thread_list[j]->th_end);
218 }
220 sem_destroy(&thread_pool->nb_tasks);
221 pthread_mutex_destroy(&thread_pool->lock);
224 return NULL;
225} /* new_thread_pool */
226
235int add_threaded_process(THREAD_POOL* thread_pool, void* (*func_ptr)(void* param), void* param, int mode) {
236 if (!thread_pool) {
237 n_log(LOG_ERR, "thread_pool is not allocated, can't add processes to it !");
238 return FALSE;
239 }
240
241 if (!thread_pool->thread_list) {
242 n_log(LOG_ERR, "thread_pool thread_list is not allocated, can't add processes to it !");
243 return FALSE;
244 }
245
246 /* validate that exactly one of NORMAL_PROC, SYNCED_PROC, DIRECT_PROC is set */
247 int proc_mode = mode & (NORMAL_PROC | SYNCED_PROC | DIRECT_PROC);
248 if (proc_mode != NORMAL_PROC && proc_mode != SYNCED_PROC && proc_mode != DIRECT_PROC) {
249 n_log(LOG_ERR, "invalid mode %d: exactly one of NORMAL_PROC, SYNCED_PROC, DIRECT_PROC must be set", mode);
250 return FALSE;
251 }
252
253 if (!(mode & NO_LOCK)) pthread_mutex_lock(&thread_pool->lock);
254
255 size_t it = 0;
256 while (it < thread_pool->max_threads) {
257 pthread_mutex_lock(&thread_pool->thread_list[it]->lock);
259 break;
260 }
261 pthread_mutex_unlock(&thread_pool->thread_list[it]->lock);
262 it++;
263 }
264 // we have a free thread slot, and the lock on it
265 if (it < thread_pool->max_threads) {
267 thread_pool->thread_list[it]->func = func_ptr;
268 thread_pool->thread_list[it]->param = param;
271 } else {
272 n_log(LOG_ERR, "unknown mode %d for thread %zu", mode, it);
273 pthread_mutex_unlock(&thread_pool->thread_list[it]->lock);
274 if (!(mode & NO_LOCK))
275 pthread_mutex_unlock(&thread_pool->lock);
276 return FALSE;
277 }
278 if (mode & NORMAL_PROC || mode & DIRECT_PROC)
279 sem_post(&thread_pool->thread_list[it]->th_start);
280 pthread_mutex_unlock(&thread_pool->thread_list[it]->lock);
281 n_log(LOG_DEBUG, "proc %p(%p) added on thread %zu", func_ptr, param, it);
282 } else {
283 // all thread are occupied -> test waiting lists. not holding thread_list[ it ] lock because it was obligatory unlocked before
284
285 // if already coming from queue, or if it should be part of a synced start, do not re-add && return FALSE
286 // it's only an error if SYNCED_PROC mode
287 int cancel_and_return = FALSE;
288 if (mode & NO_QUEUE) {
289 n_log(LOG_DEBUG, "Thread pool active threads are all busy and mode is NO_QUEUE, cannot add %p(%p) to pool %p", func_ptr, param, thread_pool);
290 cancel_and_return = TRUE;
291 } else if (mode & SYNCED_PROC) {
292 n_log(LOG_ERR, "Thread pool active threads are all busy, cannot add SYNCED_PROC %p(%p) to pool %p", func_ptr, param, thread_pool);
293 cancel_and_return = TRUE;
294 } else if (mode & DIRECT_PROC) {
295 n_log(LOG_ERR, "Thread pool active threads are all busy, cannot add DIRECT_PROC %p(%p) to pool %p", func_ptr, param, thread_pool);
296 cancel_and_return = TRUE;
297 }
298
299 if (cancel_and_return) {
300 if (!(mode & NO_LOCK))
301 pthread_mutex_unlock(&thread_pool->lock);
302 return FALSE;
303 }
304
305 // try adding to wait list
307 THREAD_WAITING_PROC* proc = NULL;
308 Malloc(proc, THREAD_WAITING_PROC, 1);
309 if (!proc) {
310 n_log(LOG_ERR, "Failed to allocate THREAD_WAITING_PROC");
311 if (!(mode & NO_LOCK)) pthread_mutex_unlock(&thread_pool->lock);
312 return FALSE;
313 }
314 proc->func = func_ptr;
315 proc->param = param;
316 list_push(thread_pool->waiting_list, proc, free);
317 n_log(LOG_DEBUG, "Adding %p %p to waitlist", proc->func, proc->param);
318 } else {
319 n_log(LOG_ERR, "proc %p(%p) was dropped from waitlist because waitlist of thread pool %p is full", func_ptr, param, thread_pool);
320 if (!(mode & NO_LOCK)) pthread_mutex_unlock(&thread_pool->lock);
321 return FALSE;
322 }
323 }
324
325 /* consume idle signal: pool now has work in progress */
326 if (sem_trywait(&thread_pool->nb_tasks) != 0 && errno != EAGAIN) {
327 int error = errno;
328 n_log(LOG_ERR, "sem_trywait nb_tasks error in thread_pool %p: %s", thread_pool, strerror(error));
329 }
330
331 if (!(mode & NO_LOCK)) pthread_mutex_unlock(&thread_pool->lock);
332
333 return TRUE;
334} /* add_threaded_process */
335
342 if (!thread_pool)
343 return FALSE;
344
346 return FALSE;
347
348 int retval = TRUE;
349
350 pthread_mutex_lock(&thread_pool->lock);
351 for (size_t it = 0; it < thread_pool->max_threads; it++) {
352 int to_run = 0;
353 pthread_mutex_lock(&thread_pool->thread_list[it]->lock);
355 to_run = 1;
356 }
357 pthread_mutex_unlock(&thread_pool->thread_list[it]->lock);
358 if (to_run == 1) {
359 if (sem_post(&thread_pool->thread_list[it]->th_start) != 0) {
360 int error = errno;
361 n_log(LOG_ERR, "sem_post th_start error in thread_pool %p , thread_list[ %zu ] : %s", thread_pool, it, strerror(error));
362 retval = FALSE;
363 }
364 }
365 }
366 pthread_mutex_unlock(&thread_pool->lock);
367
368 return retval;
369} /* start_threaded_pool */
370
377 __n_assert(thread_pool, return FALSE);
378 __n_assert(thread_pool->thread_list, return FALSE);
379
380 int retval = TRUE;
381 for (size_t it = 0; it < thread_pool->max_threads; it++) {
382 int is_synced = 0;
383 pthread_mutex_lock(&thread_pool->thread_list[it]->lock);
385 is_synced = 1;
386 }
387 pthread_mutex_unlock(&thread_pool->thread_list[it]->lock);
388
389 if (is_synced) {
390 if (sem_wait(&thread_pool->thread_list[it]->th_end) == -1) {
391 int error = errno;
392 n_log(LOG_ERR, "sem_wait th_end error in thread_pool %p , thread_list[ %zu ] : %s", thread_pool, it, strerror(error));
393 retval = FALSE;
394 }
395 }
396 }
397 return retval;
398} /* wait_for_synced_threaded_pool */
399
406 __n_assert(thread_pool, return FALSE);
407 __n_assert(thread_pool->thread_list, return FALSE);
408
409 /* kick off draining the wait list before blocking */
411
412 while (sem_wait(&thread_pool->nb_tasks) == -1) {
413 int error = errno;
414 if (error == EINTR) {
415 continue;
416 }
417 n_log(LOG_ERR, "sem_wait nb_tasks error in thread_pool %p: %s", thread_pool, strerror(error));
418 return FALSE;
419 }
420 /* restore the idle signal so it can be waited on again or checked */
421 sem_post(&thread_pool->nb_tasks);
422 return TRUE;
423} /* wait_for_threaded_pool */
424
431int destroy_threaded_pool(THREAD_POOL** pool, unsigned int delay) {
432 __n_assert(pool && (*pool), return FALSE);
433 __n_assert((*pool)->thread_list, return FALSE);
434
435 int DONE = 0;
436 int max_retries = 1000;
437
438 while (!DONE) {
439 DONE = 1;
440 pthread_mutex_lock(&(*pool)->lock);
441 for (size_t it = 0; it < (*pool)->max_threads; it++) {
442 pthread_mutex_lock(&(*pool)->thread_list[it]->lock);
443 int state = (*pool)->thread_list[it]->state;
444 int thread_state = (*pool)->thread_list[it]->thread_state;
445 pthread_mutex_unlock(&(*pool)->thread_list[it]->lock);
446
447 if (thread_state == EXITING_THREAD || thread_state == EXITED_THREAD) {
448 continue;
449 }
450
451 if (state == IDLE_PROC) {
452 pthread_mutex_lock(&(*pool)->thread_list[it]->lock);
453 (*pool)->thread_list[it]->thread_state = EXITING_THREAD;
454 sem_post(&(*pool)->thread_list[it]->th_start);
455 pthread_mutex_unlock(&(*pool)->thread_list[it]->lock);
456 } else {
457 DONE = 0;
458 }
459 }
460 pthread_mutex_unlock(&(*pool)->lock);
461
462 if (!DONE) {
463 max_retries--;
464 if (max_retries <= 0) {
465 pthread_mutex_lock(&(*pool)->lock);
466 for (size_t it = 0; it < (*pool)->max_threads; it++) {
467 pthread_mutex_lock(&(*pool)->thread_list[it]->lock);
468 if ((*pool)->thread_list[it]->thread_state != EXITING_THREAD && (*pool)->thread_list[it]->thread_state != EXITED_THREAD) {
469 (*pool)->thread_list[it]->thread_state = EXITING_THREAD;
470 sem_post(&(*pool)->thread_list[it]->th_start);
471 }
472 pthread_mutex_unlock(&(*pool)->thread_list[it]->lock);
473 }
474 pthread_mutex_unlock(&(*pool)->lock);
475 break;
476 }
477 }
478
479 u_sleep(delay);
480 }
481
482 /* join threads without holding pool lock to avoid deadlock:
483 * threads may call refresh_thread_pool() which needs pool->lock */
484 for (size_t it = 0; it < (*pool)->max_threads; it++) {
485 pthread_join((*pool)->thread_list[it]->thr, NULL);
486 }
487
488 /* all threads have exited, safe to clean up without locking */
489 for (size_t it = 0; it < (*pool)->max_threads; it++) {
490 pthread_mutex_destroy(&(*pool)->thread_list[it]->lock);
491 sem_destroy(&(*pool)->thread_list[it]->th_start);
492 sem_destroy(&(*pool)->thread_list[it]->th_end);
493 Free((*pool)->thread_list[it]);
494 }
495 Free((*pool)->thread_list);
496 list_destroy(&(*pool)->waiting_list);
497
498 sem_destroy(&(*pool)->nb_tasks);
499
500 pthread_mutex_destroy(&(*pool)->lock);
501
502 Free((*pool));
503
504 return TRUE;
505} /* destroy_threaded_pool */
506
513 __n_assert(thread_pool, return FALSE);
514 __n_assert(thread_pool->waiting_list, return FALSE);
515
516 /* Trying to empty the wait list */
517 int push_status = 0;
518 pthread_mutex_lock(&thread_pool->lock);
520 push_status = 1;
521 while (push_status == 1) {
523 if (node && node->ptr) {
525 if (proc) { // cppcheck-suppress knownConditionTrueFalse -- defensive check after cast
526 if (add_threaded_process(thread_pool, proc->func, proc->param, NORMAL_PROC | NO_QUEUE | NO_LOCK) == TRUE) {
527 THREAD_WAITING_PROC* procptr = NULL;
528 LIST_NODE* next_node = node->next;
530 n_log(LOG_DEBUG, "waitlist: adding %p,%p to %p", procptr->func, procptr->param, thread_pool);
531 Free(procptr);
532 (void)next_node; /* advance past removed node (consumed by next iteration) */
533 } else {
534 n_log(LOG_DEBUG, "waitlist: cannot add proc %p from waiting list of %p, all active threads are busy !", proc, thread_pool);
535 push_status = 0;
536 }
537 } else {
538 n_log(LOG_ERR, "waitlist: trying to add invalid NULL proc on thread pool %p !", thread_pool);
539 push_status = 0;
540 }
541 } else {
542 push_status = 0;
543 }
544 } // while( push_status == 1 )
545
546 // update statistics: count threads with assigned work (waiting to run or running)
548 for (size_t it = 0; it < thread_pool->max_threads; it++) {
549 pthread_mutex_lock(&thread_pool->thread_list[it]->lock);
552 pthread_mutex_unlock(&thread_pool->thread_list[it]->lock);
553 }
554
555 /* signal idle: no active/waiting threads and waiting list is empty */
557 int value = 0;
558 sem_getvalue(&thread_pool->nb_tasks, &value);
559 if (value == 0) {
560 sem_post(&thread_pool->nb_tasks);
561 }
562 }
563
564 pthread_mutex_unlock(&thread_pool->lock);
565
566 return TRUE;
567} // refresh_thread_pool()
THREAD_POOL * thread_pool
Definition ex_fluid.c:77
int DONE
Definition ex_fluid.c:59
static int mode
static NETWORK_POOL * pool
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
Definition n_common.h:203
#define __n_assert(__ptr, __ret)
macro to assert things
Definition n_common.h:278
#define Free(__ptr)
Free Handler to get errors.
Definition n_common.h:262
void * ptr
void pointer to store
Definition n_list.h:45
LIST_NODE * start
pointer to the start of the list
Definition n_list.h:65
size_t nb_items
number of item currently in the list
Definition n_list.h:60
struct LIST_NODE * next
pointer to the next node
Definition n_list.h:51
int list_push(LIST *list, void *ptr, void(*destructor)(void *ptr))
Add a pointer to the end of the list.
Definition n_list.c:227
#define remove_list_node(__LIST_, __NODE_, __TYPE_)
Remove macro helper for void pointer casting.
Definition n_list.h:97
int list_destroy(LIST **list)
Empty and Free a list container.
Definition n_list.c:547
LIST * new_generic_list(size_t max_items)
Initialiaze a generic list container to max_items pointers.
Definition n_list.c:36
#define MAX_LIST_ITEMS
flag to pass to new_generic_list for the maximum possible number of item in a list
Definition n_list.h:74
Structure of a generic list node.
Definition n_list.h:43
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
Definition n_log.h:88
#define LOG_DEBUG
debug-level messages
Definition n_log.h:83
#define LOG_ERR
error conditions
Definition n_log.h:75
#define LOG_INFO
informational
Definition n_log.h:81
void u_sleep(unsigned int usec)
wrapper around usleep for API consistency
Definition n_time.c:53
size_t nb_max_waiting
Maximum number of waiting procedures in the list, 0 for unlimited.
int state
state of the proc , RUNNING_PROC when it is busy processing func( param) , IDLE_PROC when it waits fo...
int type
SYNCED or DIRECT process start.
size_t max_threads
Maximum number of running threads in the list.
void * param
if not NULL , passed as argument
void *(* func)(void *param)
function to call in the thread
sem_t th_end
thread ending semaphore
pthread_mutex_t lock
mutex to prevent mutual access of node parameters
struct THREAD_POOL * thread_pool
pointer to assigned thread pool
sem_t th_start
thread starting semaphore
pthread_t thr
thread id
THREAD_POOL_NODE ** thread_list
Dynamically allocated but fixed size thread array.
LIST * waiting_list
Waiting list handling.
size_t nb_actives
number of threads actually doing a proc
sem_t nb_tasks
semaphore signaling pool idle state: value 0 = work in progress, value 1 = pool is idle (no active th...
void *(* func)(void *param)
function to call in the thread
int thread_state
state of the managing thread , RUNNING_THREAD, EXITING_THREAD, EXITED_THREAD
pthread_mutex_t lock
mutex to prevent mutual access of waiting_list parameters
void * param
if not NULL , passed as argument
#define NORMAL_PROC
processing mode for added func, synced start, can be queued
int start_threaded_pool(THREAD_POOL *thread_pool)
Launch the process waiting for execution in the thread pool.
THREAD_POOL * new_thread_pool(size_t nbmaxthr, size_t nb_max_waiting)
Create a new pool of nbmaxthr threads.
#define EXITED_THREAD
indicate that the pool is off, all jobs have been consumed
#define SYNCED_PROC
processing mode for added func, synced start, not queued
int add_threaded_process(THREAD_POOL *thread_pool, void *(*func_ptr)(void *param), void *param, int mode)
add a function and params to a thread pool
#define NO_QUEUE
special processing mode for waiting_list: do not add the work in queue since it' coming from the queu...
int refresh_thread_pool(THREAD_POOL *thread_pool)
try to add some waiting DIRECT_PROCs on some free thread slots, else do nothing
#define IDLE_PROC
status of a thread which is waiting for some proc
int wait_for_threaded_pool(THREAD_POOL *thread_pool)
Wait for the thread pool to become idle (no active threads, empty waiting list), blocking without pol...
int destroy_threaded_pool(THREAD_POOL **pool, unsigned int delay)
delete a thread_pool, exit the threads and free the structs
int wait_for_synced_threaded_pool(THREAD_POOL *thread_pool)
wait for all the launched process, blocking but light on the CPU as there is no polling
long int get_nb_cpu_cores()
get number of core of current system
#define DIRECT_PROC
processing mode for added func, direct start, not queued
#define RUNNING_PROC
status of a thread which proc is currently running
#define NO_LOCK
if passed to add_threaded_process, skip main table lock in case we are in a func which is already loc...
#define EXITING_THREAD
indicate that the pool is exiting, unfinished jobs will finish and the pool will exit the threads and...
#define RUNNING_THREAD
indicate that the pool is running and ready to use
#define WAITING_PROC
status of a thread who have proc waiting to be processed
Structure of a thread pool.
A thread pool node.
Structure of a waiting process item.
Common headers and low-level functions & define.
Generic log system.
void * thread_pool_processing_function(void *param)
Internal thread pool processing function.
Thread pool declaration.
Timing utilities.