Nilorea Library
C utilities for networking, threading, graphics
Loading...
Searching...
No Matches
n_network_accept_pool.c
Go to the documentation of this file.
1/*
2 * Nilorea Library
3 * Copyright (C) 2005-2026 Castagnier Mickael
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 */
18
28
29#include <errno.h>
30#include <string.h>
31
39static void* netw_accept_pool_thread_func(void* arg) {
41
42 __n_assert(pool, return NULL);
43 __n_assert(pool->server, return NULL);
44 __n_assert(pool->callback, return NULL);
45
46 pthread_mutex_lock(&pool->stats_lock);
47 pool->stats.active_threads++;
48 pthread_mutex_unlock(&pool->stats_lock);
49
51 int retval = 0;
52 NETWORK* accepted = netw_accept_from_ex(pool->server, 0, 0, pool->accept_timeout, &retval);
53
54 /* check if we should exit */
56 if (accepted) {
57 netw_close(&accepted);
58 }
59 break;
60 }
61
62 if (accepted) {
63 pthread_mutex_lock(&pool->stats_lock);
64 pool->stats.total_accepted++;
65 pthread_mutex_unlock(&pool->stats_lock);
66
67 pool->callback(accepted, pool->user_data);
68 } else {
69 if (retval == EINTR) {
70 /* interrupted, just retry */
71 continue;
72 }
73 if (retval == EAGAIN || retval == EWOULDBLOCK || retval == 0) {
74 /* timeout or no connection available */
75 pthread_mutex_lock(&pool->stats_lock);
76 pool->stats.total_timeouts++;
77 pthread_mutex_unlock(&pool->stats_lock);
78 /* small sleep to avoid busy-looping on non-blocking accept */
79 if (pool->accept_timeout == -1) {
80 u_sleep(1000);
81 }
82 } else {
83 /* actual error */
84 pthread_mutex_lock(&pool->stats_lock);
85 pool->stats.total_errors++;
86 pthread_mutex_unlock(&pool->stats_lock);
87 n_log(LOG_ERR, "accept pool thread: accept error, retval=%d", retval);
88 }
89 }
90 }
91
92 pthread_mutex_lock(&pool->stats_lock);
93 if (pool->stats.active_threads > 0) {
94 pool->stats.active_threads--;
95 }
96 pthread_mutex_unlock(&pool->stats_lock);
97
98 return NULL;
99}
100
110NETW_ACCEPT_POOL* netw_accept_pool_create(NETWORK* server, size_t nb_threads, int accept_timeout, netw_accept_callback_t callback, void* user_data) {
111 __n_assert(server, return NULL);
112 __n_assert(callback, return NULL);
113
114 if (nb_threads == 0) {
115 n_log(LOG_ERR, "netw_accept_pool_create: nb_threads must be > 0");
116 return NULL;
117 }
118
119 NETW_ACCEPT_POOL* pool = NULL;
121 __n_assert(pool, return NULL);
122
123 pool->server = server;
124 pool->nb_accept_threads = nb_threads;
125 pool->accept_timeout = accept_timeout;
126 pool->callback = callback;
127 pool->user_data = user_data;
128
130
131 memset(&pool->stats, 0, sizeof(NETW_ACCEPT_POOL_STATS));
132
133 if (pthread_mutex_init(&pool->stats_lock, NULL) != 0) {
134 n_log(LOG_ERR, "netw_accept_pool_create: failed to init stats mutex");
135 Free(pool);
136 return NULL;
137 }
138
139 Malloc(pool->accept_threads, pthread_t, nb_threads);
140 if (!pool->accept_threads) {
141 n_log(LOG_ERR, "netw_accept_pool_create: failed to allocate thread array");
142 pthread_mutex_destroy(&pool->stats_lock);
143 Free(pool);
144 return NULL;
145 }
146
147 return pool;
148}
149
156 __n_assert(pool, return FALSE);
157
158 uint32_t current = netw_accept_pool_atomic_read_state(pool);
159 if (current != NETW_ACCEPT_POOL_IDLE && current != NETW_ACCEPT_POOL_STOPPED) {
160 n_log(LOG_ERR, "netw_accept_pool_start: pool is not idle or stopped (state=%u)", current);
161 return FALSE;
162 }
163
164 /* reset stats */
165 pthread_mutex_lock(&pool->stats_lock);
166 memset(&pool->stats, 0, sizeof(NETW_ACCEPT_POOL_STATS));
167 clock_gettime(CLOCK_MONOTONIC, &pool->stats.start_time);
168 pthread_mutex_unlock(&pool->stats_lock);
169
171
172 for (size_t i = 0; i < pool->nb_accept_threads; i++) {
173 int err = pthread_create(&pool->accept_threads[i], NULL, netw_accept_pool_thread_func, pool);
174 if (err != 0) {
175 n_log(LOG_ERR, "netw_accept_pool_start: failed to create thread %zu: %s", i, strerror(err));
176 /* stop already-created threads */
178 for (size_t j = 0; j < i; j++) {
179 pthread_join(pool->accept_threads[j], NULL);
180 }
182 return FALSE;
183 }
184 }
185
186 n_log(LOG_NOTICE, "accept pool started with %zu threads", pool->nb_accept_threads);
187 return TRUE;
188}
189
196 __n_assert(pool, return FALSE);
197
198 uint32_t current = netw_accept_pool_atomic_read_state(pool);
199 if (current != NETW_ACCEPT_POOL_RUNNING) {
200 n_log(LOG_WARNING, "netw_accept_pool_stop: pool is not running (state=%u)", current);
201 return FALSE;
202 }
203
204 n_log(LOG_NOTICE, "accept pool: stop requested");
206
207 /* shutdown the listening socket to unblock any threads stuck in select()/accept() */
208 if (pool->server && pool->server->link.sock != INVALID_SOCKET) {
209 shutdown(pool->server->link.sock, SHUT_RDWR);
210 }
211
212 return TRUE;
213}
214
222 __n_assert(pool, return FALSE);
223
224 uint32_t current = netw_accept_pool_atomic_read_state(pool);
225 if (current == NETW_ACCEPT_POOL_IDLE || current == NETW_ACCEPT_POOL_STOPPED) {
226 return TRUE;
227 }
228
229 if (current == NETW_ACCEPT_POOL_RUNNING) {
230 n_log(LOG_WARNING, "netw_accept_pool_wait: pool still running, calling stop first");
232 }
233
234 (void)timeout_sec;
235 for (size_t i = 0; i < pool->nb_accept_threads; i++) {
236 pthread_join(pool->accept_threads[i], NULL);
237 }
238
240 n_log(LOG_NOTICE, "accept pool: all threads joined");
241
242 return TRUE;
243}
244
252 __n_assert(pool, return FALSE);
253 __n_assert(stats, return FALSE);
254
255 pthread_mutex_lock(&pool->stats_lock);
256 memcpy(stats, &pool->stats, sizeof(NETW_ACCEPT_POOL_STATS));
257 pthread_mutex_unlock(&pool->stats_lock);
258
259 return TRUE;
260}
261
268 __n_assert(pool, return FALSE);
269 __n_assert((*pool), return FALSE);
270
271 uint32_t current = netw_accept_pool_atomic_read_state(*pool);
272 if (current == NETW_ACCEPT_POOL_RUNNING || current == NETW_ACCEPT_POOL_STOPPING) {
273 n_log(LOG_WARNING, "netw_accept_pool_destroy: pool still active, stopping and waiting");
276 }
277
278 pthread_mutex_destroy(&(*pool)->stats_lock);
279 Free((*pool)->accept_threads);
280 Free(*pool);
281
282 return TRUE;
283}
static NETWORK_POOL * pool
NETWORK * server
#define NETW_ACCEPT_POOL_RUNNING
accept pool state: running and accepting connections
void(* netw_accept_callback_t)(NETWORK *accepted, void *user_data)
callback type for accepted connections.
#define NETW_ACCEPT_POOL_IDLE
accept pool state: idle, not yet started
NETW_ACCEPT_POOL * netw_accept_pool_create(NETWORK *server, size_t nb_threads, int accept_timeout, netw_accept_callback_t callback, void *user_data)
Create a new accept pool.
int netw_accept_pool_get_stats(NETW_ACCEPT_POOL *pool, NETW_ACCEPT_POOL_STATS *stats)
Get a snapshot of pool statistics.
int netw_accept_pool_wait(NETW_ACCEPT_POOL *pool, int timeout_sec)
Wait for all accept threads to finish after a stop request.
#define netw_accept_pool_atomic_read_state(pool)
Lock-free atomic read of the accept pool state.
int netw_accept_pool_destroy(NETW_ACCEPT_POOL **pool)
Destroy an accept pool.
int netw_accept_pool_stop(NETW_ACCEPT_POOL *pool)
Request the accept pool to stop.
int netw_accept_pool_start(NETW_ACCEPT_POOL *pool)
Start the accept pool (launches accept threads)
#define netw_accept_pool_atomic_write_state(pool, val)
Lock-free atomic write of the accept pool state.
#define NETW_ACCEPT_POOL_STOPPING
accept pool state: stop requested
#define NETW_ACCEPT_POOL_STOPPED
accept pool state: fully stopped
Structure of a parallel accept pool.
Statistics for the accept pool.
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
Definition n_common.h:203
#define __n_assert(__ptr, __ret)
macro to assert things
Definition n_common.h:278
#define Free(__ptr)
Free Handler to get errors.
Definition n_common.h:262
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
Definition n_log.h:88
#define LOG_ERR
error conditions
Definition n_log.h:75
#define LOG_NOTICE
normal but significant condition
Definition n_log.h:79
#define LOG_WARNING
warning conditions
Definition n_log.h:77
void u_sleep(unsigned int usec)
wrapper around usleep for API consistency
Definition n_time.c:53
NETWORK * netw_accept_from_ex(NETWORK *from, size_t send_list_limit, size_t recv_list_limit, int blocking, int *retval)
make a normal 'accept' .
Definition n_network.c:2713
int netw_close(NETWORK **netw)
Closing a specified Network, destroy queues, free the structure.
Definition n_network.c:2041
Structure of a NETWORK.
Definition n_network.h:258
static void * netw_accept_pool_thread_func(void *arg)
Thread function for accept pool workers.
Accept pool for parallel connection acceptance (nginx-style)