Nilorea Library
C utilities for networking, threading, graphics
Loading...
Searching...
No Matches
ex_kafka.c

Nilorea Library kafka event test.

Nilorea Library kafka event test

Author
Castagnier Mickael
Version
1.0
Date
26/05/2025
/*
* Nilorea Library
* Copyright (C) 2005-2026 Castagnier Mickael
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <locale.h>
#include <libgen.h>
#include <errno.h>
#include "nilorea/n_log.h"
#include "cJSON.h"
#include "rdkafka.h"
#define OK 0
#define ERROR -1
#define NB_TEST_EVENTS 10
char *config_file = NULL,
*event_string = NULL,
*event_file = NULL,
*event_log_file = NULL,
*log_prefix = NULL;
int log_level = LOG_ERR, /* default log level */
getoptret = 0, /* getopt return value */
KAFKA_MODE = -1,
run = 1;
// help func
void usage(void) {
fprintf(stderr,
"Syntax is: ex_kafka -v -c config_file [-s event or -f eventfile] -o event_log_file -V LOGLEVEL\n"
" -v version: print version and exit\n"
" -c config_file: [required] Kproducer config file\n"
" -s : string of the event to send\n"
" -f : file containing the event to send\n"
" -C : start a consumer (default output received in terminal)"
" -P : start a producer and produce event"
" -o : optionnal, set a log file instead of default (stderr/stdout)\n"
" -p : optionnal, set a log prefix\n"
" -V verbosity: specify a log level for console output\n"
" Supported: LOG_ EMERG,ALERT,CRIT,ERR,WARNING,NOTICE,INFO,DEBUG\n");
}
// stop handler
static void stop(int sig) {
(void)sig;
run = 0;
}
int main(int argc, char* argv[]) {
/* Signal handler for clean shutdown */
signal(SIGINT, stop);
/* temporary header structure */
rd_kafka_headers_t* headers = NULL;
/* Analysing arguments */
while ((getoptret = getopt(argc, argv, "vhCPH:c:s:f:V:o:p:")) != -1) {
switch (getoptret) {
case 'v':
fprintf(stderr, " Version compiled on %s at %s\n", __DATE__, __TIME__);
exit(TRUE);
case 'h':
usage();
exit(0);
break;
case 'C':
if (KAFKA_MODE == -1) {
KAFKA_MODE = RD_KAFKA_CONSUMER;
} else {
fprintf(stderr, "-C and -P can not be used at the ame time!");
exit(TRUE);
}
break;
case 'P':
if (KAFKA_MODE == -1) {
KAFKA_MODE = RD_KAFKA_PRODUCER;
} else {
fprintf(stderr, "-C and -P can not be used at the ame time!");
exit(TRUE);
}
break;
case 'H': {
char *name = NULL, *val = NULL;
ssize_t name_sz = -1;
name = optarg;
val = strchr(name, '=');
if (val) {
name_sz = val - name;
val++; /* past the '=' */
}
if (!headers)
headers = rd_kafka_headers_new(16);
int err = rd_kafka_header_add(headers, name, name_sz, val, -1);
if (err) {
fprintf(stderr,
"%% Failed to add header %s: %s\n",
name, rd_kafka_err2str(err));
exit(1);
}
} break;
case 'c':
break;
case 's':
Malloc(event_string, char, strlen(optarg) + 1);
strcpy(event_string, optarg);
break;
case 'f':
break;
case 'o':
break;
case 'p':
break;
case 'V':
if (!strncmp("LOG_NULL", optarg, 8)) {
} else if (!strncmp("LOG_NOTICE", optarg, 10)) {
} else if (!strncmp("LOG_INFO", optarg, 8)) {
} else if (!strncmp("LOG_ERR", optarg, 7)) {
} else if (!strncmp("LOG_DEBUG", optarg, 9)) {
} else {
fprintf(stderr, "%s n'est pas un niveau de log valide.\n", optarg);
exit(-1);
}
break;
case '?':
if (optopt == 'c' || optopt == 's' || optopt == 'f') {
fprintf(stderr, "Option -%c need a parameter\n", optopt);
exit(FALSE);
}
default:
usage();
exit(-1);
break;
}
}
if (KAFKA_MODE == -1) {
n_log(LOG_ERR, "consumer (-C) or producer (-P) mode is not defined !", log_prefix);
exit(1);
}
if (!log_prefix) {
log_prefix = strdup("");
}
int log_file_ret = set_log_file(event_log_file);
n_log(LOG_DEBUG, "%s log to file: %s , %d , %p", log_prefix, event_log_file, log_file_ret, get_log_file());
}
/* testing parameters */
if (!config_file) {
n_log(LOG_ERR, "%s parameter config_file needs to be set !", log_prefix);
exit(1);
}
if (KAFKA_MODE == RD_KAFKA_PRODUCER) {
n_log(LOG_ERR, "%s one of (event_string|event_file) needs to be set !", log_prefix);
exit(1);
}
}
n_log(LOG_ERR, "%s do not define event_string AND event_file, only one needs to be set !", log_prefix);
exit(1);
}
// load kafka config file
int exit_code = 0;
__n_assert(kafka_handle, n_log(LOG_ERR, "kafka handle is NULL !!"); exit(1));
size_t nb_queued = 0;
size_t nb_waiting = 0;
size_t nb_error = 0;
int poll_status = 1;
N_KAFKA_EVENT* event = NULL;
if (KAFKA_MODE == RD_KAFKA_PRODUCER) {
// create a new kafka event from a string or from a file
if (event_string) {
}
if (event_file) {
}
// set headers if any
if (headers && event) {
event->rd_kafka_headers = rd_kafka_headers_copy(headers);
// clean them if no more needed
rd_kafka_headers_destroy(headers);
}
// produce the event, API is charging itself of destroying it
if (n_kafka_produce(kafka_handle, event) == FALSE) {
n_log(LOG_ERR, "n_kafka_produce returned an error for event %p", event);
} else {
n_log(LOG_INFO, "n_kafka_produce returned OK for event %p", event);
}
}
// loop on poll
do {
poll_status = n_kafka_get_status(kafka_handle, &nb_queued, &nb_waiting, &nb_error);
n_log(LOG_DEBUG, "polling kafka handle, status: %d, %d in queue, %d waiting for ack, %d on error", poll_status, nb_queued, nb_waiting, nb_error);
// if we were waiting only for producing elemens we could use a test like this to break out of the loop
if (KAFKA_MODE == RD_KAFKA_PRODUCER) {
usleep(30000);
if (nb_queued == 0 && nb_waiting == 0 && nb_error == 0)
break;
} else {
event = n_kafka_get_event(kafka_handle);
if (event) {
if (kafka_handle->schema_id != -1)
n_log(LOG_INFO, "received event schema id %d string:\n%s", event->schema_id, _str(event->event_string->data + 4));
else
n_log(LOG_INFO, "received event string:\n%s", event->event_string->data);
list_foreach(node, event->received_headers) {
N_STR* header = (N_STR*)node->ptr;
n_log(LOG_INFO, "headers: %s", _nstr(header));
}
} else {
usleep(30000);
}
}
} while (run && poll_status > 0);
n_log(LOG_INFO, "kafka_handle: %d queued, %d waiting ack, %d on error", nb_queued, nb_waiting, nb_error);
if (nb_error > 0 || nb_waiting > 0) {
n_log(LOG_ERR, "kafka_handle: %d events are still waiting for ack, and %d are on error !", nb_waiting, nb_error);
n_kafka_dump_unprocessed(kafka_handle, "DATAS/kafka/unprocessed");
}
// log unprocessed events
list_foreach(node, kafka_handle->received_events) {
N_KAFKA_EVENT* unprocessed_event = (N_KAFKA_EVENT*)node->ptr;
if (unprocessed_event) {
if (kafka_handle->schema_id != -1)
n_log(LOG_INFO, "[unprocessed]received event schema id %d string:\n%s", unprocessed_event->schema_id, unprocessed_event->event_string->data + 4);
else
n_log(LOG_INFO, "[unprocessed]received event string:\n%s", unprocessed_event->event_string->data);
}
}
// closing kafka handle
n_kafka_delete(kafka_handle);
exit(exit_code);
}
static void usage(void)
int main(void)
int getoptret
Definition ex_fluid.c:60
int log_level
Definition ex_fluid.c:61
int PRODUCER_MODE
Definition ex_kafka.c:54
static void stop(int sig)
Definition ex_kafka.c:74
char * log_prefix
Definition ex_kafka.c:49
int KAFKA_MODE
Definition ex_kafka.c:53
char * event_log_file
Definition ex_kafka.c:48
char * event_file
Definition ex_kafka.c:47
char * event_string
Definition ex_kafka.c:46
char * config_file
Definition ex_kafka.c:45
int run
Definition ex_kafka.c:55
#define FALL_THROUGH
set windows if true
Definition n_common.h:71
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
Definition n_common.h:203
#define __n_assert(__ptr, __ret)
macro to assert things
Definition n_common.h:278
#define _str(__PTR)
define true
Definition n_common.h:192
#define _nstr(__PTR)
N_STR or "NULL" string for logging purposes.
Definition n_common.h:198
#define list_foreach(__ITEM_, __LIST_)
ForEach macro helper, safe for node removal during iteration.
Definition n_list.h:88
FILE * get_log_file(void)
return the current log_file
Definition n_log.c:197
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
Definition n_log.h:88
#define LOG_DEBUG
debug-level messages
Definition n_log.h:83
#define LOG_ERR
error conditions
Definition n_log.h:75
int set_log_file(char *file)
Set the logging to a file instead of stderr.
Definition n_log.c:167
void set_log_level(const int log_level)
Set the global log level value ( static int LOG_LEVEL )
Definition n_log.c:120
#define LOG_NOTICE
normal but significant condition
Definition n_log.h:79
#define LOG_NULL
no log output
Definition n_log.h:45
#define LOG_INFO
informational
Definition n_log.h:81
LIST * received_events
list of received N_KAFKA_EVENT
Definition n_kafka.h:89
int schema_id
kafka schema id in network order
Definition n_kafka.h:111
int n_kafka_dump_unprocessed(N_KAFKA *kafka, char *directory)
dump unprocessed/unset events
Definition n_kafka.c:1182
N_KAFKA_EVENT * n_kafka_get_event(N_KAFKA *kafka)
get a received event from the N_KAFKA kafka handle
Definition n_kafka.c:1256
N_KAFKA * n_kafka_load_config(char *config_file, int mode)
load a kafka configuration from a file
Definition n_kafka.c:298
int n_kafka_get_status(N_KAFKA *kafka, size_t *nb_queued, size_t *nb_waiting, size_t *nb_error)
return the queues status
Definition n_kafka.c:92
int n_kafka_start_polling_thread(N_KAFKA *kafka)
start the polling thread of a kafka handle
Definition n_kafka.c:1051
N_KAFKA_EVENT * n_kafka_new_event_from_char(const char *string, size_t written, int schema_id)
make a new event from a char *string
Definition n_kafka.c:756
int n_kafka_event_destroy(N_KAFKA_EVENT **event)
destroy a kafka event and set it's pointer to NULL
Definition n_kafka.c:852
int n_kafka_produce(N_KAFKA *kafka, N_KAFKA_EVENT *event)
put an event in the events_to_send list
Definition n_kafka.c:682
N_KAFKA_EVENT * n_kafka_new_event_from_file(char *filename, int schema_id)
make a new event from a N_STR *string
Definition n_kafka.c:809
void n_kafka_delete(N_KAFKA *kafka)
delete a N_KAFKA handle
Definition n_kafka.c:185
structure of a KAFKA consumer or producer handle
Definition n_kafka.h:85
structure of a KAFKA message
Definition n_kafka.h:65
#define local_strdup(__src_)
Do tar(1) matching rules, which ignore a trailing slash?
Definition n_str.h:77
A box including a string and his lenght.
Definition n_str.h:60
Kafka generic produce and consume event header.
Generic log system.
Network Engine.