Nilorea Library
C utilities for networking, threading, graphics
Loading...
Searching...
No Matches
ex_accept_pool_server.c
Go to the documentation of this file.
1/*
2 * Nilorea Library
3 * Copyright (C) 2005-2026 Castagnier Mickael
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 */
18
58#include <stdio.h>
59#include <stdlib.h>
60#include <string.h>
61#include <errno.h>
62#include <getopt.h>
63#include <signal.h>
64
65#include "nilorea/n_common.h"
66#include "nilorea/n_log.h"
67#include "nilorea/n_network.h"
70
71static volatile int server_running = 1;
72static int total_target = 500;
73
75#define MODE_SINGLE_INLINE 0
76#define MODE_SINGLE_POOL 1
77#define MODE_POOLED 2
78
80typedef struct CALLBACK_DATA {
82 size_t handled;
84 pthread_mutex_t lock;
88
93static void handle_client(NETWORK* client) {
94 __n_assert(client, return);
95
97
98 N_STR* msg = netw_wait_msg(client, 10000, 20000000);
99 if (msg) {
100 netw_add_msg(client, msg);
101 u_sleep(50000);
102 }
103
104 netw_close(&client);
105}
106
114
120static void* worker_handle_client(void* ptr) {
121 WORKER_ARG* arg = (WORKER_ARG*)ptr;
122 __n_assert(arg, return NULL);
123
124 handle_client(arg->client);
125
126 pthread_mutex_lock(&arg->cb_data->lock);
127 arg->cb_data->handled++;
128 size_t h = arg->cb_data->handled;
129 pthread_mutex_unlock(&arg->cb_data->lock);
130
131 if (h % 100 == 0) {
132 n_log(LOG_NOTICE, "handled %zu connections", h);
133 }
134
135 Free(arg);
136 return NULL;
137}
138
144static void dispatch_to_pool(NETWORK* client, CALLBACK_DATA* cb_data) {
145 __n_assert(client, return);
146 __n_assert(cb_data, netw_close(&client); return);
147 __n_assert(cb_data->worker_pool, netw_close(&client); return);
148
149 WORKER_ARG* arg = NULL;
150 Malloc(arg, WORKER_ARG, 1);
151 if (!arg) {
152 n_log(LOG_ERR, "failed to allocate worker arg");
153 netw_close(&client);
154 return;
155 }
156 arg->client = client;
157 arg->cb_data = cb_data;
158
160}
161
167static void handle_inline_and_count(NETWORK* client, CALLBACK_DATA* cb_data) {
168 __n_assert(cb_data, netw_close(&client); return);
169
170 handle_client(client);
171
172 pthread_mutex_lock(&cb_data->lock);
173 cb_data->handled++;
174 size_t h = cb_data->handled;
175 pthread_mutex_unlock(&cb_data->lock);
176
177 if (h % 100 == 0) {
178 n_log(LOG_NOTICE, "handled %zu connections", h);
179 }
180}
181
187static void on_accept_pooled(NETWORK* conn, void* user_data) {
188 CALLBACK_DATA* data = (CALLBACK_DATA*)user_data;
189 __n_assert(data, netw_close(&conn); return);
190 dispatch_to_pool(conn, data);
191}
192
193static void sighandler(int sig) {
194 (void)sig;
195 server_running = 0;
196}
197
198static void usage(void) {
199 fprintf(stderr,
200 "Usage: ex_accept_pool_server [options]\n"
201 " -p PORT port to listen on (required)\n"
202 " -a ADDR address to bind to (optional, default: all)\n"
203 " -m MODE 'single-inline', 'single-pool', or 'pooled' (default: single-inline)\n"
204 " -n COUNT number of connections to handle (default: 500)\n"
205 " -t THREADS number of threads for pool/accept (default: 4)\n"
206 " -V LEVEL log level: LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_ERR (default: LOG_NOTICE)\n"
207 " -h show this help\n");
208}
209
210int main(int argc, char** argv) {
211 char* addr = NULL;
212 char* port = NULL;
213 char* mode_str = "single-inline";
214 int nb_threads = 4;
215 int log_level = LOG_NOTICE;
216 int opt;
217
218 while ((opt = getopt(argc, argv, "hp:a:m:n:t:V:")) != -1) {
219 switch (opt) {
220 case 'p':
221 port = strdup(optarg);
222 break;
223 case 'a':
224 addr = strdup(optarg);
225 break;
226 case 'm':
227 mode_str = optarg;
228 break;
229 case 'n':
230 total_target = atoi(optarg);
231 break;
232 case 't':
233 nb_threads = atoi(optarg);
234 break;
235 case 'V':
236 if (!strcmp(optarg, "LOG_DEBUG"))
238 else if (!strcmp(optarg, "LOG_INFO"))
240 else if (!strcmp(optarg, "LOG_NOTICE"))
242 else if (!strcmp(optarg, "LOG_ERR"))
244 break;
245 case 'h':
246 default:
247 usage();
248 exit(1);
249 }
250 }
251
252 if (!port) {
253 fprintf(stderr, "Error: -p PORT is required\n");
254 usage();
255 exit(1);
256 }
257
259 if (strcmp(mode_str, "single-pool") == 0) {
261 } else if (strcmp(mode_str, "pooled") == 0) {
263 } else if (strcmp(mode_str, "single-inline") != 0) {
264 fprintf(stderr, "Error: unknown mode '%s'\n", mode_str);
265 usage();
266 exit(1);
267 }
268
270
271 signal(SIGINT, sighandler);
272 signal(SIGTERM, sighandler);
273#ifdef __linux__
274 signal(SIGPIPE, SIG_IGN);
275#endif
276
277 /* create listening socket */
278 NETWORK* server = NULL;
279 if (netw_make_listening(&server, addr, port, 1024, NETWORK_IPALL) == FALSE) {
280 n_log(LOG_ERR, "Failed to create listening socket on %s:%s", addr ? addr : "*", port);
281 exit(1);
282 }
283 n_log(LOG_NOTICE, "Listening on %s:%s (backlog=1024)", addr ? addr : "*", port);
284
285 CALLBACK_DATA cb_data;
286 cb_data.handled = 0;
287 cb_data.worker_pool = NULL;
288 pthread_mutex_init(&cb_data.lock, NULL);
289
290 struct timespec t_start, t_end;
291 clock_gettime(CLOCK_MONOTONIC, &t_start);
292
293 const char* mode_names[] = {"SINGLE-INLINE", "SINGLE-POOL", "POOLED"};
294 n_log(LOG_NOTICE, "=== %s MODE: target %d connections ===", mode_names[mode], total_target);
295
296 if (mode == MODE_SINGLE_INLINE) {
297 /* ---- SINGLE-INLINE MODE ----
298 * One thread doing accept + handle client in the same thread.
299 * Both accept and handling are fully serialized. */
300 n_log(LOG_NOTICE, "1 thread: accept + handle inline (fully serialized)");
301
302 while (server_running) {
303 pthread_mutex_lock(&cb_data.lock);
304 size_t h = cb_data.handled;
305 pthread_mutex_unlock(&cb_data.lock);
306 if ((int)h >= total_target) {
307 break;
308 }
309
310 int retval = 0;
311 NETWORK* client = netw_accept_from_ex(server, 0, 0, 500, &retval);
312 if (client) {
313 handle_inline_and_count(client, &cb_data);
314 }
315 }
316 } else if (mode == MODE_SINGLE_POOL) {
317 /* ---- SINGLE-POOL MODE ----
318 * One thread doing accept, dispatching to a worker thread pool.
319 * Accept is serialized, but client handling is parallelized. */
320 n_log(LOG_NOTICE, "1 accept thread, %d worker threads", nb_threads);
321
322 cb_data.worker_pool = new_thread_pool((size_t)nb_threads, (size_t)(total_target + 16));
323 if (!cb_data.worker_pool) {
324 n_log(LOG_ERR, "Failed to create worker thread pool");
325 goto cleanup;
326 }
327
328 int accepted_count = 0;
329 while (server_running && accepted_count < total_target) {
330 int retval = 0;
331 NETWORK* client = netw_accept_from_ex(server, 0, 0, 500, &retval);
332 if (client) {
333 dispatch_to_pool(client, &cb_data);
334 accepted_count++;
335 }
336 }
337
338 /* wait for all workers to finish */
339 n_log(LOG_NOTICE, "all connections accepted, waiting for workers...");
340 wait_for_threaded_pool(cb_data.worker_pool);
341 destroy_threaded_pool(&cb_data.worker_pool, 500000);
342 } else {
343 /* ---- POOLED MODE ----
344 * N accept threads (accept pool) + worker thread pool.
345 * Both accept and handling are parallelized. */
346 n_log(LOG_NOTICE, "%d accept threads, %d worker threads", nb_threads, nb_threads);
347
348 cb_data.worker_pool = new_thread_pool((size_t)nb_threads, (size_t)(total_target + 16));
349 if (!cb_data.worker_pool) {
350 n_log(LOG_ERR, "Failed to create worker thread pool");
351 goto cleanup;
352 }
353
355 server, (size_t)nb_threads, 500, on_accept_pooled, &cb_data);
356 if (!accept_pool) {
357 n_log(LOG_ERR, "Failed to create accept pool");
358 destroy_threaded_pool(&cb_data.worker_pool, 500000);
359 goto cleanup;
360 }
361
362 if (netw_accept_pool_start(accept_pool) == FALSE) {
363 n_log(LOG_ERR, "Failed to start accept pool");
364 netw_accept_pool_destroy(&accept_pool);
365 destroy_threaded_pool(&cb_data.worker_pool, 500000);
366 goto cleanup;
367 }
368
369 /* wait until we've handled enough connections or user interrupts */
370 while (server_running) {
371 pthread_mutex_lock(&cb_data.lock);
372 size_t h = cb_data.handled;
373 pthread_mutex_unlock(&cb_data.lock);
374
375 if ((int)h >= total_target) {
376 break;
377 }
378
380 netw_accept_pool_get_stats(accept_pool, &stats);
381 n_log(LOG_INFO, "stats: accepted=%zu errors=%zu timeouts=%zu active_threads=%zu",
382 stats.total_accepted, stats.total_errors, stats.total_timeouts, stats.active_threads);
383
384 u_sleep(100000);
385 }
386
387 netw_accept_pool_stop(accept_pool);
388 netw_accept_pool_wait(accept_pool, 10);
389
390 NETW_ACCEPT_POOL_STATS final_stats;
391 netw_accept_pool_get_stats(accept_pool, &final_stats);
392 n_log(LOG_NOTICE, "Pool stats: accepted=%zu errors=%zu timeouts=%zu",
393 final_stats.total_accepted, final_stats.total_errors, final_stats.total_timeouts);
394
395 netw_accept_pool_destroy(&accept_pool);
396
397 /* wait for all workers to finish */
398 n_log(LOG_NOTICE, "accept pool stopped, waiting for workers...");
399 wait_for_threaded_pool(cb_data.worker_pool);
400 destroy_threaded_pool(&cb_data.worker_pool, 500000);
401 }
402
403 clock_gettime(CLOCK_MONOTONIC, &t_end);
404 double elapsed = (double)(t_end.tv_sec - t_start.tv_sec) +
405 (double)(t_end.tv_nsec - t_start.tv_nsec) / 1e9;
406
407 n_log(LOG_NOTICE, "=== DONE: %zu connections in %.3f seconds (%.1f conn/sec) ===",
408 cb_data.handled, elapsed, (double)cb_data.handled / elapsed);
409
410cleanup:
411 pthread_mutex_destroy(&cb_data.lock);
413
416
417 netw_unload();
418 n_log(LOG_NOTICE, "Server exited cleanly");
419
420 return 0;
421}
static void usage(void)
#define MODE_SINGLE_POOL
static void * worker_handle_client(void *ptr)
Worker thread function: handles a client and updates counter.
static int total_target
size_t handled
atomic counter of handled connections
static void sighandler(int sig)
pthread_mutex_t lock
mutex for counter
static volatile int server_running
NETWORK * client
the accepted connection
static void handle_inline_and_count(NETWORK *client, CALLBACK_DATA *cb_data)
Handle client inline in the accept thread and update counter.
THREAD_POOL * worker_pool
thread pool for dispatching (modes single-pool and pooled)
static void on_accept_pooled(NETWORK *conn, void *user_data)
Accept pool callback: dispatches to thread pool.
#define MODE_SINGLE_INLINE
mode constants
CALLBACK_DATA * cb_data
back pointer to callback data for counting
static void handle_client(NETWORK *client)
Handle a single client: read one message, echo it back, close.
#define MODE_POOLED
static void dispatch_to_pool(NETWORK *client, CALLBACK_DATA *cb_data)
Dispatch a client to the thread pool for handling.
data passed to accept callback and worker threads
data passed to worker thread function
int main(void)
int log_level
Definition ex_fluid.c:61
static int mode
NETWORK * server
char * addr
char * port
size_t total_errors
total accept errors
size_t active_threads
number of accept threads currently running
size_t total_timeouts
total accept timeouts (no connection available)
size_t total_accepted
total connections successfully accepted
NETW_ACCEPT_POOL * netw_accept_pool_create(NETWORK *server, size_t nb_threads, int accept_timeout, netw_accept_callback_t callback, void *user_data)
Create a new accept pool.
int netw_accept_pool_get_stats(NETW_ACCEPT_POOL *pool, NETW_ACCEPT_POOL_STATS *stats)
Get a snapshot of pool statistics.
int netw_accept_pool_wait(NETW_ACCEPT_POOL *pool, int timeout_sec)
Wait for all accept threads to finish after a stop request.
int netw_accept_pool_destroy(NETW_ACCEPT_POOL **pool)
Destroy an accept pool.
int netw_accept_pool_stop(NETW_ACCEPT_POOL *pool)
Request the accept pool to stop.
int netw_accept_pool_start(NETW_ACCEPT_POOL *pool)
Start the accept pool (launches accept threads)
Structure of a parallel accept pool.
Statistics for the accept pool.
#define FreeNoLog(__ptr)
Free Handler without log.
Definition n_common.h:271
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
Definition n_common.h:203
#define __n_assert(__ptr, __ret)
macro to assert things
Definition n_common.h:278
#define Free(__ptr)
Free Handler to get errors.
Definition n_common.h:262
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
Definition n_log.h:88
#define LOG_DEBUG
debug-level messages
Definition n_log.h:83
#define LOG_ERR
error conditions
Definition n_log.h:75
void set_log_level(const int log_level)
Set the global log level value ( static int LOG_LEVEL )
Definition n_log.c:120
#define LOG_NOTICE
normal but significant condition
Definition n_log.h:79
#define LOG_INFO
informational
Definition n_log.h:81
A box including a string and his lenght.
Definition n_str.h:60
void u_sleep(unsigned int usec)
wrapper around usleep for API consistency
Definition n_time.c:53
int netw_add_msg(NETWORK *netw, N_STR *msg)
Add a message to send in aimed NETWORK.
Definition n_network.c:2914
int netw_make_listening(NETWORK **netw, char *addr, char *port, int nbpending, int ip_version)
Make a NETWORK be a Listening network.
Definition n_network.c:2240
int netw_start_thr_engine(NETWORK *netw)
Start the NETWORK netw Threaded Engine.
Definition n_network.c:3050
NETWORK * netw_accept_from_ex(NETWORK *from, size_t send_list_limit, size_t recv_list_limit, int blocking, int *retval)
make a normal 'accept' .
Definition n_network.c:2713
#define NETWORK_IPALL
Flag for auto detection by OS of ip version to use.
Definition n_network.h:47
int netw_close(NETWORK **netw)
Closing a specified Network, destroy queues, free the structure.
Definition n_network.c:2041
N_STR * netw_wait_msg(NETWORK *netw, unsigned int refresh, size_t timeout)
Wait a message from aimed NETWORK.
Definition n_network.c:2998
Structure of a NETWORK.
Definition n_network.h:258
#define NORMAL_PROC
processing mode for added func, synced start, can be queued
THREAD_POOL * new_thread_pool(size_t nbmaxthr, size_t nb_max_waiting)
Create a new pool of nbmaxthr threads.
int add_threaded_process(THREAD_POOL *thread_pool, void *(*func_ptr)(void *param), void *param, int mode)
add a function and params to a thread pool
int wait_for_threaded_pool(THREAD_POOL *thread_pool)
Wait for the thread pool to become idle (no active threads, empty waiting list), blocking without pol...
int destroy_threaded_pool(THREAD_POOL **pool, unsigned int delay)
delete a thread_pool, exit the threads and free the structs
Structure of a thread pool.
Common headers and low-level functions & define.
Generic log system.
Network Engine.
Accept pool for parallel connection acceptance (nginx-style)
Thread pool declaration.