75#define MODE_SINGLE_INLINE 0
76#define MODE_SINGLE_POOL 1
156 arg->client = client;
157 arg->cb_data = cb_data;
172 pthread_mutex_lock(&cb_data->
lock);
175 pthread_mutex_unlock(&cb_data->
lock);
200 "Usage: ex_accept_pool_server [options]\n"
201 " -p PORT port to listen on (required)\n"
202 " -a ADDR address to bind to (optional, default: all)\n"
203 " -m MODE 'single-inline', 'single-pool', or 'pooled' (default: single-inline)\n"
204 " -n COUNT number of connections to handle (default: 500)\n"
205 " -t THREADS number of threads for pool/accept (default: 4)\n"
206 " -V LEVEL log level: LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_ERR (default: LOG_NOTICE)\n"
207 " -h show this help\n");
210int main(
int argc,
char** argv) {
213 char* mode_str =
"single-inline";
218 while ((opt = getopt(argc, argv,
"hp:a:m:n:t:V:")) != -1) {
221 port = strdup(optarg);
224 addr = strdup(optarg);
233 nb_threads = atoi(optarg);
236 if (!strcmp(optarg,
"LOG_DEBUG"))
238 else if (!strcmp(optarg,
"LOG_INFO"))
240 else if (!strcmp(optarg,
"LOG_NOTICE"))
242 else if (!strcmp(optarg,
"LOG_ERR"))
253 fprintf(stderr,
"Error: -p PORT is required\n");
259 if (strcmp(mode_str,
"single-pool") == 0) {
261 }
else if (strcmp(mode_str,
"pooled") == 0) {
263 }
else if (strcmp(mode_str,
"single-inline") != 0) {
264 fprintf(stderr,
"Error: unknown mode '%s'\n", mode_str);
274 signal(SIGPIPE, SIG_IGN);
287 cb_data.worker_pool = NULL;
288 pthread_mutex_init(&cb_data.lock, NULL);
290 struct timespec t_start, t_end;
291 clock_gettime(CLOCK_MONOTONIC, &t_start);
293 const char* mode_names[] = {
"SINGLE-INLINE",
"SINGLE-POOL",
"POOLED"};
300 n_log(
LOG_NOTICE,
"1 thread: accept + handle inline (fully serialized)");
303 pthread_mutex_lock(&cb_data.lock);
304 size_t h = cb_data.handled;
305 pthread_mutex_unlock(&cb_data.lock);
323 if (!cb_data.worker_pool) {
328 int accepted_count = 0;
346 n_log(
LOG_NOTICE,
"%d accept threads, %d worker threads", nb_threads, nb_threads);
349 if (!cb_data.worker_pool) {
371 pthread_mutex_lock(&cb_data.lock);
372 size_t h = cb_data.handled;
373 pthread_mutex_unlock(&cb_data.lock);
381 n_log(
LOG_INFO,
"stats: accepted=%zu errors=%zu timeouts=%zu active_threads=%zu",
403 clock_gettime(CLOCK_MONOTONIC, &t_end);
404 double elapsed = (double)(t_end.tv_sec - t_start.tv_sec) +
405 (double)(t_end.tv_nsec - t_start.tv_nsec) / 1e9;
407 n_log(
LOG_NOTICE,
"=== DONE: %zu connections in %.3f seconds (%.1f conn/sec) ===",
408 cb_data.handled, elapsed, (
double)cb_data.handled / elapsed);
411 pthread_mutex_destroy(&cb_data.lock);
static void * worker_handle_client(void *ptr)
Worker thread function: handles a client and updates counter.
size_t handled
atomic counter of handled connections
static void sighandler(int sig)
pthread_mutex_t lock
mutex for counter
static volatile int server_running
NETWORK * client
the accepted connection
static void handle_inline_and_count(NETWORK *client, CALLBACK_DATA *cb_data)
Handle client inline in the accept thread and update counter.
THREAD_POOL * worker_pool
thread pool for dispatching (modes single-pool and pooled)
static void on_accept_pooled(NETWORK *conn, void *user_data)
Accept pool callback: dispatches to thread pool.
#define MODE_SINGLE_INLINE
mode constants
CALLBACK_DATA * cb_data
back pointer to callback data for counting
static void handle_client(NETWORK *client)
Handle a single client: read one message, echo it back, close.
static void dispatch_to_pool(NETWORK *client, CALLBACK_DATA *cb_data)
Dispatch a client to the thread pool for handling.
data passed to accept callback and worker threads
data passed to worker thread function
size_t total_errors
total accept errors
size_t active_threads
number of accept threads currently running
size_t total_timeouts
total accept timeouts (no connection available)
size_t total_accepted
total connections successfully accepted
NETW_ACCEPT_POOL * netw_accept_pool_create(NETWORK *server, size_t nb_threads, int accept_timeout, netw_accept_callback_t callback, void *user_data)
Create a new accept pool.
int netw_accept_pool_get_stats(NETW_ACCEPT_POOL *pool, NETW_ACCEPT_POOL_STATS *stats)
Get a snapshot of pool statistics.
int netw_accept_pool_wait(NETW_ACCEPT_POOL *pool, int timeout_sec)
Wait for all accept threads to finish after a stop request.
int netw_accept_pool_destroy(NETW_ACCEPT_POOL **pool)
Destroy an accept pool.
int netw_accept_pool_stop(NETW_ACCEPT_POOL *pool)
Request the accept pool to stop.
int netw_accept_pool_start(NETW_ACCEPT_POOL *pool)
Start the accept pool (launches accept threads)
Structure of a parallel accept pool.
Statistics for the accept pool.
#define FreeNoLog(__ptr)
Free Handler without log.
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
#define __n_assert(__ptr, __ret)
macro to assert things
#define Free(__ptr)
Free Handler to get errors.
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
#define LOG_DEBUG
debug-level messages
#define LOG_ERR
error conditions
void set_log_level(const int log_level)
Set the global log level value ( static int LOG_LEVEL )
#define LOG_NOTICE
normal but significant condition
#define LOG_INFO
informational
A box including a string and his lenght.
void u_sleep(unsigned int usec)
wrapper around usleep for API consistency
int netw_add_msg(NETWORK *netw, N_STR *msg)
Add a message to send in aimed NETWORK.
int netw_make_listening(NETWORK **netw, char *addr, char *port, int nbpending, int ip_version)
Make a NETWORK be a Listening network.
int netw_start_thr_engine(NETWORK *netw)
Start the NETWORK netw Threaded Engine.
NETWORK * netw_accept_from_ex(NETWORK *from, size_t send_list_limit, size_t recv_list_limit, int blocking, int *retval)
make a normal 'accept' .
#define NETWORK_IPALL
Flag for auto detection by OS of ip version to use.
int netw_close(NETWORK **netw)
Closing a specified Network, destroy queues, free the structure.
N_STR * netw_wait_msg(NETWORK *netw, unsigned int refresh, size_t timeout)
Wait a message from aimed NETWORK.
#define NORMAL_PROC
processing mode for added func, synced start, can be queued
THREAD_POOL * new_thread_pool(size_t nbmaxthr, size_t nb_max_waiting)
Create a new pool of nbmaxthr threads.
int add_threaded_process(THREAD_POOL *thread_pool, void *(*func_ptr)(void *param), void *param, int mode)
add a function and params to a thread pool
int wait_for_threaded_pool(THREAD_POOL *thread_pool)
Wait for the thread pool to become idle (no active threads, empty waiting list), blocking without pol...
int destroy_threaded_pool(THREAD_POOL **pool, unsigned int delay)
delete a thread_pool, exit the threads and free the structs
Structure of a thread pool.
Common headers and low-level functions & define.
Accept pool for parallel connection acceptance (nginx-style)