34#include <sys/sysinfo.h>
45 long int nb_procs = 0;
48 GetSystemInfo(&sysinfo);
49 nb_procs = (
long int)sysinfo.dwNumberOfProcessors;
51 nb_procs = sysconf(_SC_NPROCESSORS_ONLN);
65 n_log(
LOG_ERR,
"Thread fatal error, no valid payload found, exiting thread function !");
79 pthread_mutex_lock(&node->
lock);
81 pthread_mutex_unlock(&node->
lock);
85 void* (*func_to_run)(
void* param) = NULL;
86 void* param_to_run = NULL;
87 pthread_mutex_lock(&node->
lock);
89 func_to_run = node->
func;
90 param_to_run = node->
param;
91 pthread_mutex_unlock(&node->
lock);
94 func_to_run(param_to_run);
98 pthread_mutex_lock(&node->
lock);
102 int type = node->
type;
107 pthread_mutex_unlock(&node->
lock);
115 pthread_mutex_lock(&node->
lock);
117 pthread_mutex_unlock(&node->
lock);
161 n_log(
LOG_ERR,
"sem_init failed : %s on &thread_pool -> nb_tasks", strerror(error));
170 for (it = 0; it < nbmaxthr; it++) {
179 n_log(
LOG_ERR,
"sem_init failed : %s on &thread_pool -> thread_list[ %zu ] -> th_start", strerror(error), it);
185 n_log(
LOG_ERR,
"sem_init failed : %s on &thread_pool -> thread_list[ %zu] -> th_end", strerror(error), it);
197 n_log(
LOG_ERR,
"pthread_create failed : %s for it %zu", strerror(errno), it);
208 for (
size_t j = 0; j < it; j++) {
237 n_log(
LOG_ERR,
"thread_pool is not allocated, can't add processes to it !");
242 n_log(
LOG_ERR,
"thread_pool thread_list is not allocated, can't add processes to it !");
249 n_log(
LOG_ERR,
"invalid mode %d: exactly one of NORMAL_PROC, SYNCED_PROC, DIRECT_PROC must be set",
mode);
256 while (it < thread_pool->max_threads) {
265 if (it < thread_pool->max_threads) {
281 n_log(
LOG_DEBUG,
"proc %p(%p) added on thread %zu", func_ptr, param, it);
287 int cancel_and_return = FALSE;
289 n_log(
LOG_DEBUG,
"Thread pool active threads are all busy and mode is NO_QUEUE, cannot add %p(%p) to pool %p", func_ptr, param,
thread_pool);
290 cancel_and_return = TRUE;
292 n_log(
LOG_ERR,
"Thread pool active threads are all busy, cannot add SYNCED_PROC %p(%p) to pool %p", func_ptr, param,
thread_pool);
293 cancel_and_return = TRUE;
295 n_log(
LOG_ERR,
"Thread pool active threads are all busy, cannot add DIRECT_PROC %p(%p) to pool %p", func_ptr, param,
thread_pool);
296 cancel_and_return = TRUE;
299 if (cancel_and_return) {
310 n_log(
LOG_ERR,
"Failed to allocate THREAD_WAITING_PROC");
314 proc->
func = func_ptr;
319 n_log(
LOG_ERR,
"proc %p(%p) was dropped from waitlist because waitlist of thread pool %p is full", func_ptr, param,
thread_pool);
361 n_log(
LOG_ERR,
"sem_post th_start error in thread_pool %p , thread_list[ %zu ] : %s",
thread_pool, it, strerror(error));
392 n_log(
LOG_ERR,
"sem_wait th_end error in thread_pool %p , thread_list[ %zu ] : %s",
thread_pool, it, strerror(error));
414 if (error == EINTR) {
433 __n_assert((*pool)->thread_list,
return FALSE);
436 int max_retries = 1000;
440 pthread_mutex_lock(&(*pool)->lock);
441 for (
size_t it = 0; it < (*pool)->max_threads; it++) {
442 pthread_mutex_lock(&(*pool)->thread_list[it]->lock);
443 int state = (*pool)->thread_list[it]->state;
444 int thread_state = (*pool)->thread_list[it]->thread_state;
445 pthread_mutex_unlock(&(*pool)->thread_list[it]->lock);
452 pthread_mutex_lock(&(*pool)->thread_list[it]->lock);
454 sem_post(&(*pool)->thread_list[it]->th_start);
455 pthread_mutex_unlock(&(*pool)->thread_list[it]->lock);
460 pthread_mutex_unlock(&(*pool)->lock);
464 if (max_retries <= 0) {
465 pthread_mutex_lock(&(*pool)->lock);
466 for (
size_t it = 0; it < (*pool)->max_threads; it++) {
467 pthread_mutex_lock(&(*pool)->thread_list[it]->lock);
470 sem_post(&(*pool)->thread_list[it]->th_start);
472 pthread_mutex_unlock(&(*pool)->thread_list[it]->lock);
474 pthread_mutex_unlock(&(*pool)->lock);
484 for (
size_t it = 0; it < (*pool)->max_threads; it++) {
485 pthread_join((*pool)->thread_list[it]->thr, NULL);
489 for (
size_t it = 0; it < (*pool)->max_threads; it++) {
490 pthread_mutex_destroy(&(*pool)->thread_list[it]->lock);
491 sem_destroy(&(*pool)->thread_list[it]->th_start);
492 sem_destroy(&(*pool)->thread_list[it]->th_end);
493 Free((*pool)->thread_list[it]);
495 Free((*pool)->thread_list);
498 sem_destroy(&(*pool)->nb_tasks);
500 pthread_mutex_destroy(&(*pool)->lock);
521 while (push_status == 1) {
523 if (node && node->
ptr) {
534 n_log(
LOG_DEBUG,
"waitlist: cannot add proc %p from waiting list of %p, all active threads are busy !", proc,
thread_pool);
THREAD_POOL * thread_pool
static NETWORK_POOL * pool
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
#define __n_assert(__ptr, __ret)
macro to assert things
#define Free(__ptr)
Free Handler to get errors.
void * ptr
void pointer to store
LIST_NODE * start
pointer to the start of the list
size_t nb_items
number of item currently in the list
struct LIST_NODE * next
pointer to the next node
int list_push(LIST *list, void *ptr, void(*destructor)(void *ptr))
Add a pointer to the end of the list.
#define remove_list_node(__LIST_, __NODE_, __TYPE_)
Remove macro helper for void pointer casting.
int list_destroy(LIST **list)
Empty and Free a list container.
LIST * new_generic_list(size_t max_items)
Initialiaze a generic list container to max_items pointers.
#define MAX_LIST_ITEMS
flag to pass to new_generic_list for the maximum possible number of item in a list
Structure of a generic list node.
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
#define LOG_DEBUG
debug-level messages
#define LOG_ERR
error conditions
#define LOG_INFO
informational
void u_sleep(unsigned int usec)
wrapper around usleep for API consistency
size_t nb_max_waiting
Maximum number of waiting procedures in the list, 0 for unlimited.
int state
state of the proc , RUNNING_PROC when it is busy processing func( param) , IDLE_PROC when it waits fo...
int type
SYNCED or DIRECT process start.
size_t max_threads
Maximum number of running threads in the list.
void * param
if not NULL , passed as argument
void *(* func)(void *param)
function to call in the thread
sem_t th_end
thread ending semaphore
pthread_mutex_t lock
mutex to prevent mutual access of node parameters
struct THREAD_POOL * thread_pool
pointer to assigned thread pool
sem_t th_start
thread starting semaphore
THREAD_POOL_NODE ** thread_list
Dynamically allocated but fixed size thread array.
LIST * waiting_list
Waiting list handling.
size_t nb_actives
number of threads actually doing a proc
sem_t nb_tasks
semaphore signaling pool idle state: value 0 = work in progress, value 1 = pool is idle (no active th...
void *(* func)(void *param)
function to call in the thread
int thread_state
state of the managing thread , RUNNING_THREAD, EXITING_THREAD, EXITED_THREAD
pthread_mutex_t lock
mutex to prevent mutual access of waiting_list parameters
void * param
if not NULL , passed as argument
#define NORMAL_PROC
processing mode for added func, synced start, can be queued
int start_threaded_pool(THREAD_POOL *thread_pool)
Launch the process waiting for execution in the thread pool.
THREAD_POOL * new_thread_pool(size_t nbmaxthr, size_t nb_max_waiting)
Create a new pool of nbmaxthr threads.
#define EXITED_THREAD
indicate that the pool is off, all jobs have been consumed
#define SYNCED_PROC
processing mode for added func, synced start, not queued
int add_threaded_process(THREAD_POOL *thread_pool, void *(*func_ptr)(void *param), void *param, int mode)
add a function and params to a thread pool
#define NO_QUEUE
special processing mode for waiting_list: do not add the work in queue since it' coming from the queu...
int refresh_thread_pool(THREAD_POOL *thread_pool)
try to add some waiting DIRECT_PROCs on some free thread slots, else do nothing
#define IDLE_PROC
status of a thread which is waiting for some proc
int wait_for_threaded_pool(THREAD_POOL *thread_pool)
Wait for the thread pool to become idle (no active threads, empty waiting list), blocking without pol...
int destroy_threaded_pool(THREAD_POOL **pool, unsigned int delay)
delete a thread_pool, exit the threads and free the structs
int wait_for_synced_threaded_pool(THREAD_POOL *thread_pool)
wait for all the launched process, blocking but light on the CPU as there is no polling
long int get_nb_cpu_cores()
get number of core of current system
#define DIRECT_PROC
processing mode for added func, direct start, not queued
#define RUNNING_PROC
status of a thread which proc is currently running
#define NO_LOCK
if passed to add_threaded_process, skip main table lock in case we are in a func which is already loc...
#define EXITING_THREAD
indicate that the pool is exiting, unfinished jobs will finish and the pool will exit the threads and...
#define RUNNING_THREAD
indicate that the pool is running and ready to use
#define WAITING_PROC
status of a thread who have proc waiting to be processed
Structure of a thread pool.
Structure of a waiting process item.
Common headers and low-level functions & define.
void * thread_pool_processing_function(void *param)
Internal thread pool processing function.