46 pthread_mutex_lock(&
pool->stats_lock);
47 pool->stats.active_threads++;
48 pthread_mutex_unlock(&
pool->stats_lock);
63 pthread_mutex_lock(&
pool->stats_lock);
64 pool->stats.total_accepted++;
65 pthread_mutex_unlock(&
pool->stats_lock);
67 pool->callback(accepted,
pool->user_data);
69 if (retval == EINTR) {
73 if (retval == EAGAIN || retval == EWOULDBLOCK || retval == 0) {
75 pthread_mutex_lock(&
pool->stats_lock);
76 pool->stats.total_timeouts++;
77 pthread_mutex_unlock(&
pool->stats_lock);
79 if (
pool->accept_timeout == -1) {
84 pthread_mutex_lock(&
pool->stats_lock);
85 pool->stats.total_errors++;
86 pthread_mutex_unlock(&
pool->stats_lock);
87 n_log(
LOG_ERR,
"accept pool thread: accept error, retval=%d", retval);
92 pthread_mutex_lock(&
pool->stats_lock);
93 if (
pool->stats.active_threads > 0) {
94 pool->stats.active_threads--;
96 pthread_mutex_unlock(&
pool->stats_lock);
114 if (nb_threads == 0) {
115 n_log(
LOG_ERR,
"netw_accept_pool_create: nb_threads must be > 0");
124 pool->nb_accept_threads = nb_threads;
125 pool->accept_timeout = accept_timeout;
126 pool->callback = callback;
127 pool->user_data = user_data;
133 if (pthread_mutex_init(&
pool->stats_lock, NULL) != 0) {
134 n_log(
LOG_ERR,
"netw_accept_pool_create: failed to init stats mutex");
139 Malloc(
pool->accept_threads, pthread_t, nb_threads);
140 if (!
pool->accept_threads) {
141 n_log(
LOG_ERR,
"netw_accept_pool_create: failed to allocate thread array");
142 pthread_mutex_destroy(&
pool->stats_lock);
160 n_log(
LOG_ERR,
"netw_accept_pool_start: pool is not idle or stopped (state=%u)", current);
165 pthread_mutex_lock(&
pool->stats_lock);
167 clock_gettime(CLOCK_MONOTONIC, &
pool->stats.start_time);
168 pthread_mutex_unlock(&
pool->stats_lock);
172 for (
size_t i = 0; i <
pool->nb_accept_threads; i++) {
175 n_log(
LOG_ERR,
"netw_accept_pool_start: failed to create thread %zu: %s", i, strerror(err));
178 for (
size_t j = 0; j < i; j++) {
179 pthread_join(
pool->accept_threads[j], NULL);
200 n_log(
LOG_WARNING,
"netw_accept_pool_stop: pool is not running (state=%u)", current);
208 if (
pool->server &&
pool->server->link.sock != INVALID_SOCKET) {
209 shutdown(
pool->server->link.sock, SHUT_RDWR);
230 n_log(
LOG_WARNING,
"netw_accept_pool_wait: pool still running, calling stop first");
235 for (
size_t i = 0; i <
pool->nb_accept_threads; i++) {
236 pthread_join(
pool->accept_threads[i], NULL);
255 pthread_mutex_lock(&
pool->stats_lock);
257 pthread_mutex_unlock(&
pool->stats_lock);
273 n_log(
LOG_WARNING,
"netw_accept_pool_destroy: pool still active, stopping and waiting");
278 pthread_mutex_destroy(&(*pool)->stats_lock);
279 Free((*pool)->accept_threads);
static NETWORK_POOL * pool
#define NETW_ACCEPT_POOL_RUNNING
accept pool state: running and accepting connections
void(* netw_accept_callback_t)(NETWORK *accepted, void *user_data)
callback type for accepted connections.
#define NETW_ACCEPT_POOL_IDLE
accept pool state: idle, not yet started
NETW_ACCEPT_POOL * netw_accept_pool_create(NETWORK *server, size_t nb_threads, int accept_timeout, netw_accept_callback_t callback, void *user_data)
Create a new accept pool.
int netw_accept_pool_get_stats(NETW_ACCEPT_POOL *pool, NETW_ACCEPT_POOL_STATS *stats)
Get a snapshot of pool statistics.
int netw_accept_pool_wait(NETW_ACCEPT_POOL *pool, int timeout_sec)
Wait for all accept threads to finish after a stop request.
#define netw_accept_pool_atomic_read_state(pool)
Lock-free atomic read of the accept pool state.
int netw_accept_pool_destroy(NETW_ACCEPT_POOL **pool)
Destroy an accept pool.
int netw_accept_pool_stop(NETW_ACCEPT_POOL *pool)
Request the accept pool to stop.
int netw_accept_pool_start(NETW_ACCEPT_POOL *pool)
Start the accept pool (launches accept threads)
#define netw_accept_pool_atomic_write_state(pool, val)
Lock-free atomic write of the accept pool state.
#define NETW_ACCEPT_POOL_STOPPING
accept pool state: stop requested
#define NETW_ACCEPT_POOL_STOPPED
accept pool state: fully stopped
Structure of a parallel accept pool.
Statistics for the accept pool.
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
#define __n_assert(__ptr, __ret)
macro to assert things
#define Free(__ptr)
Free Handler to get errors.
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
#define LOG_ERR
error conditions
#define LOG_NOTICE
normal but significant condition
#define LOG_WARNING
warning conditions
void u_sleep(unsigned int usec)
wrapper around usleep for API consistency
NETWORK * netw_accept_from_ex(NETWORK *from, size_t send_list_limit, size_t recv_list_limit, int blocking, int *retval)
make a normal 'accept' .
int netw_close(NETWORK **netw)
Closing a specified Network, destroy queues, free the structure.
static void * netw_accept_pool_thread_func(void *arg)
Thread function for accept pool workers.
Accept pool for parallel connection acceptance (nginx-style)