Nilorea Library
C utilities for networking, threading, graphics
Loading...
Searching...
No Matches
ex_kafka.c
Go to the documentation of this file.
1/*
2 * Nilorea Library
3 * Copyright (C) 2005-2026 Castagnier Mickael
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 */
18
27#include <stdio.h>
28#include <stdlib.h>
29#include <string.h>
30#include <locale.h>
31#include <libgen.h>
32#include <errno.h>
33
34#include "nilorea/n_log.h"
35#include "nilorea/n_network.h"
36#include "nilorea/n_kafka.h"
37#include "cJSON.h"
38
39#include "rdkafka.h"
40
41#define OK 0
42#define ERROR -1
43#define NB_TEST_EVENTS 10
44
45char *config_file = NULL,
46 *event_string = NULL,
47 *event_file = NULL,
49 *log_prefix = NULL;
50
51int log_level = LOG_ERR, /* default log level */
52 getoptret = 0, /* getopt return value */
55 run = 1;
56
57// help func
58void usage(void) {
59 fprintf(stderr,
60 "Syntax is: ex_kafka -v -c config_file [-s event or -f eventfile] -o event_log_file -V LOGLEVEL\n"
61 " -v version: print version and exit\n"
62 " -c config_file: [required] Kproducer config file\n"
63 " -s : string of the event to send\n"
64 " -f : file containing the event to send\n"
65 " -C : start a consumer (default output received in terminal)"
66 " -P : start a producer and produce event"
67 " -o : optionnal, set a log file instead of default (stderr/stdout)\n"
68 " -p : optionnal, set a log prefix\n"
69 " -V verbosity: specify a log level for console output\n"
70 " Supported: LOG_ EMERG,ALERT,CRIT,ERR,WARNING,NOTICE,INFO,DEBUG\n");
71}
72
73// stop handler
74static void stop(int sig) {
75 (void)sig;
76 run = 0;
77}
78
79int main(int argc, char* argv[]) {
80 /* Signal handler for clean shutdown */
81 signal(SIGINT, stop);
82
83 /* temporary header structure */
84 rd_kafka_headers_t* headers = NULL;
85
86 /* Analysing arguments */
87 while ((getoptret = getopt(argc, argv, "vhCPH:c:s:f:V:o:p:")) != -1) {
88 switch (getoptret) {
89 case 'v':
90 fprintf(stderr, " Version compiled on %s at %s\n", __DATE__, __TIME__);
91 exit(TRUE);
92 case 'h':
93 usage();
94 exit(0);
95 break;
96 case 'C':
97 if (KAFKA_MODE == -1) {
98 KAFKA_MODE = RD_KAFKA_CONSUMER;
99 } else {
100 fprintf(stderr, "-C and -P can not be used at the ame time!");
101 exit(TRUE);
102 }
103 break;
104 case 'P':
105 if (KAFKA_MODE == -1) {
106 KAFKA_MODE = RD_KAFKA_PRODUCER;
107 } else {
108 fprintf(stderr, "-C and -P can not be used at the ame time!");
109 exit(TRUE);
110 }
111 break;
112 case 'H': {
113 char *name = NULL, *val = NULL;
114 ssize_t name_sz = -1;
115
116 name = optarg;
117 val = strchr(name, '=');
118 if (val) {
119 name_sz = val - name;
120 val++; /* past the '=' */
121 }
122
123 if (!headers)
124 headers = rd_kafka_headers_new(16);
125
126 int err = rd_kafka_header_add(headers, name, name_sz, val, -1);
127 if (err) {
128 fprintf(stderr,
129 "%% Failed to add header %s: %s\n",
130 name, rd_kafka_err2str(err));
131 exit(1);
132 }
133 } break;
134 case 'c':
135 config_file = local_strdup(optarg);
136 break;
137 case 's':
138 Malloc(event_string, char, strlen(optarg) + 1);
139 strcpy(event_string, optarg);
140 break;
141 case 'f':
142 event_file = local_strdup(optarg);
143 break;
144 case 'o':
146 break;
147 case 'p':
148 log_prefix = local_strdup(optarg);
149 break;
150 case 'V':
151 if (!strncmp("LOG_NULL", optarg, 8)) {
153 } else if (!strncmp("LOG_NOTICE", optarg, 10)) {
155 } else if (!strncmp("LOG_INFO", optarg, 8)) {
157 } else if (!strncmp("LOG_ERR", optarg, 7)) {
159 } else if (!strncmp("LOG_DEBUG", optarg, 9)) {
161 } else {
162 fprintf(stderr, "%s n'est pas un niveau de log valide.\n", optarg);
163 exit(-1);
164 }
165 break;
166 case '?':
167 if (optopt == 'c' || optopt == 's' || optopt == 'f') {
168 fprintf(stderr, "Option -%c need a parameter\n", optopt);
169 exit(FALSE);
170 }
172 default:
173 usage();
174 exit(-1);
175 break;
176 }
177 }
178
179 if (KAFKA_MODE == -1) {
180 n_log(LOG_ERR, "consumer (-C) or producer (-P) mode is not defined !", log_prefix);
181 exit(1);
182 }
183
184 if (!log_prefix) {
185 log_prefix = strdup("");
186 }
187
189 if (event_log_file) {
190 int log_file_ret = set_log_file(event_log_file);
191 n_log(LOG_DEBUG, "%s log to file: %s , %d , %p", log_prefix, event_log_file, log_file_ret, get_log_file());
192 }
193
194 /* testing parameters */
195 if (!config_file) {
196 n_log(LOG_ERR, "%s parameter config_file needs to be set !", log_prefix);
197 exit(1);
198 }
199
200 if (KAFKA_MODE == RD_KAFKA_PRODUCER) {
201 if (!event_string && !event_file) {
202 n_log(LOG_ERR, "%s one of (event_string|event_file) needs to be set !", log_prefix);
203 exit(1);
204 }
205 }
206
207 if (event_string && event_file) {
208 n_log(LOG_ERR, "%s do not define event_string AND event_file, only one needs to be set !", log_prefix);
209 exit(1);
210 }
211
212 // load kafka config file
213 int exit_code = 0;
215 __n_assert(kafka_handle, n_log(LOG_ERR, "kafka handle is NULL !!"); exit(1));
216
217 n_kafka_start_polling_thread(kafka_handle);
218
219 size_t nb_queued = 0;
220 size_t nb_waiting = 0;
221 size_t nb_error = 0;
222 int poll_status = 1;
223 N_KAFKA_EVENT* event = NULL;
224
225 if (KAFKA_MODE == RD_KAFKA_PRODUCER) {
226 // create a new kafka event from a string or from a file
227 if (event_string) {
228 event = n_kafka_new_event_from_char(event_string, strlen(event_string), kafka_handle->schema_id);
229 }
230 if (event_file) {
231 event = n_kafka_new_event_from_file(event_file, kafka_handle->schema_id);
232 }
233 // set headers if any
234 if (headers && event) {
235 event->rd_kafka_headers = rd_kafka_headers_copy(headers);
236 // clean them if no more needed
237 rd_kafka_headers_destroy(headers);
238 }
239 // produce the event, API is charging itself of destroying it
240 if (n_kafka_produce(kafka_handle, event) == FALSE) {
241 n_log(LOG_ERR, "n_kafka_produce returned an error for event %p", event);
242 } else {
243 n_log(LOG_INFO, "n_kafka_produce returned OK for event %p", event);
244 }
245 }
246
247 // loop on poll
248 do {
249 poll_status = n_kafka_get_status(kafka_handle, &nb_queued, &nb_waiting, &nb_error);
250 n_log(LOG_DEBUG, "polling kafka handle, status: %d, %d in queue, %d waiting for ack, %d on error", poll_status, nb_queued, nb_waiting, nb_error);
251
252 // if we were waiting only for producing elemens we could use a test like this to break out of the loop
253 if (KAFKA_MODE == RD_KAFKA_PRODUCER) {
254 usleep(30000);
255 if (nb_queued == 0 && nb_waiting == 0 && nb_error == 0)
256 break;
257 } else {
258 event = n_kafka_get_event(kafka_handle);
259 if (event) {
260 if (kafka_handle->schema_id != -1)
261 n_log(LOG_INFO, "received event schema id %d string:\n%s", event->schema_id, _str(event->event_string->data + 4));
262 else
263 n_log(LOG_INFO, "received event string:\n%s", event->event_string->data);
264
265 list_foreach(node, event->received_headers) {
266 N_STR* header = (N_STR*)node->ptr;
267 n_log(LOG_INFO, "headers: %s", _nstr(header));
268 }
269 n_kafka_event_destroy(&event);
270 } else {
271 usleep(30000);
272 }
273 }
274 } while (run && poll_status > 0);
275
276 n_log(LOG_INFO, "kafka_handle: %d queued, %d waiting ack, %d on error", nb_queued, nb_waiting, nb_error);
277 if (nb_error > 0 || nb_waiting > 0) {
278 n_log(LOG_ERR, "kafka_handle: %d events are still waiting for ack, and %d are on error !", nb_waiting, nb_error);
279 n_kafka_dump_unprocessed(kafka_handle, "DATAS/kafka/unprocessed");
280 }
281
282 // log unprocessed events
283 list_foreach(node, kafka_handle->received_events) {
284 N_KAFKA_EVENT* unprocessed_event = (N_KAFKA_EVENT*)node->ptr;
285 if (unprocessed_event) {
286 if (kafka_handle->schema_id != -1)
287 n_log(LOG_INFO, "[unprocessed]received event schema id %d string:\n%s", unprocessed_event->schema_id, unprocessed_event->event_string->data + 4);
288 else
289 n_log(LOG_INFO, "[unprocessed]received event string:\n%s", unprocessed_event->event_string->data);
290 }
291 }
292
293 // closing kafka handle
294 n_kafka_delete(kafka_handle);
295
296 exit(exit_code);
297}
static void usage(void)
int main(void)
int getoptret
Definition ex_fluid.c:60
int log_level
Definition ex_fluid.c:61
int PRODUCER_MODE
Definition ex_kafka.c:54
static void stop(int sig)
Definition ex_kafka.c:74
char * log_prefix
Definition ex_kafka.c:49
int KAFKA_MODE
Definition ex_kafka.c:53
char * event_log_file
Definition ex_kafka.c:48
char * event_file
Definition ex_kafka.c:47
char * event_string
Definition ex_kafka.c:46
char * config_file
Definition ex_kafka.c:45
int run
Definition ex_kafka.c:55
#define FALL_THROUGH
set windows if true
Definition n_common.h:71
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
Definition n_common.h:203
#define __n_assert(__ptr, __ret)
macro to assert things
Definition n_common.h:278
#define _str(__PTR)
define true
Definition n_common.h:192
#define _nstr(__PTR)
N_STR or "NULL" string for logging purposes.
Definition n_common.h:198
#define list_foreach(__ITEM_, __LIST_)
ForEach macro helper, safe for node removal during iteration.
Definition n_list.h:88
FILE * get_log_file(void)
return the current log_file
Definition n_log.c:197
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
Definition n_log.h:88
#define LOG_DEBUG
debug-level messages
Definition n_log.h:83
#define LOG_ERR
error conditions
Definition n_log.h:75
int set_log_file(char *file)
Set the logging to a file instead of stderr.
Definition n_log.c:167
void set_log_level(const int log_level)
Set the global log level value ( static int LOG_LEVEL )
Definition n_log.c:120
#define LOG_NOTICE
normal but significant condition
Definition n_log.h:79
#define LOG_NULL
no log output
Definition n_log.h:45
#define LOG_INFO
informational
Definition n_log.h:81
int schema_id
kafka schema_id
Definition n_kafka.h:79
LIST * received_events
list of received N_KAFKA_EVENT
Definition n_kafka.h:89
N_STR * event_string
string containing the topic id + payload
Definition n_kafka.h:67
int schema_id
kafka schema id in network order
Definition n_kafka.h:111
int n_kafka_dump_unprocessed(N_KAFKA *kafka, char *directory)
dump unprocessed/unset events
Definition n_kafka.c:1182
N_KAFKA_EVENT * n_kafka_get_event(N_KAFKA *kafka)
get a received event from the N_KAFKA kafka handle
Definition n_kafka.c:1256
N_KAFKA * n_kafka_load_config(char *config_file, int mode)
load a kafka configuration from a file
Definition n_kafka.c:298
int n_kafka_get_status(N_KAFKA *kafka, size_t *nb_queued, size_t *nb_waiting, size_t *nb_error)
return the queues status
Definition n_kafka.c:92
int n_kafka_start_polling_thread(N_KAFKA *kafka)
start the polling thread of a kafka handle
Definition n_kafka.c:1051
N_KAFKA_EVENT * n_kafka_new_event_from_char(const char *string, size_t written, int schema_id)
make a new event from a char *string
Definition n_kafka.c:756
int n_kafka_event_destroy(N_KAFKA_EVENT **event)
destroy a kafka event and set it's pointer to NULL
Definition n_kafka.c:852
int n_kafka_produce(N_KAFKA *kafka, N_KAFKA_EVENT *event)
put an event in the events_to_send list
Definition n_kafka.c:682
N_KAFKA_EVENT * n_kafka_new_event_from_file(char *filename, int schema_id)
make a new event from a N_STR *string
Definition n_kafka.c:809
void n_kafka_delete(N_KAFKA *kafka)
delete a N_KAFKA handle
Definition n_kafka.c:185
structure of a KAFKA consumer or producer handle
Definition n_kafka.h:85
structure of a KAFKA message
Definition n_kafka.h:65
char * data
the string
Definition n_str.h:62
#define local_strdup(__src_)
Do tar(1) matching rules, which ignore a trailing slash?
Definition n_str.h:77
A box including a string and his lenght.
Definition n_str.h:60
Kafka generic produce and consume event header.
Generic log system.
Network Engine.