Nilorea Library
C utilities for networking, threading, graphics
Loading...
Searching...
No Matches
n_kafka.c
Go to the documentation of this file.
1/*
2 * Nilorea Library
3 * Copyright (C) 2005-2026 Castagnier Mickael
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 */
18
27#include "nilorea/n_kafka.h"
28#include "nilorea/n_common.h"
29#include "nilorea/n_base64.h"
30#include <limits.h>
31
37int32_t n_kafka_get_schema_from_char(const char* string) {
38 __n_assert(string, return -1);
39
40 uint32_t raw_schema_id = 0;
41 memcpy(&raw_schema_id, string + 1, sizeof(uint32_t));
42
43 return (int32_t)ntohl(raw_schema_id);
44}
45
51int32_t n_kafka_get_schema_from_nstr(const N_STR* string) {
52 __n_assert(string, return -1);
53 __n_assert(string->data, return -1);
54 __n_assert((string->written >= sizeof(int32_t)), return -1);
55 return n_kafka_get_schema_from_char(string->data);
56}
57
64int n_kafka_put_schema_in_char(char* string, int schema_id) {
65 __n_assert(string, return FALSE);
66 uint32_t schema_id_htonl = htonl((uint32_t)schema_id); // cast to unsigned to avoid warning
67 memcpy(string + 1, &schema_id_htonl, sizeof(uint32_t));
68 return TRUE;
69}
70
77int n_kafka_put_schema_in_nstr(N_STR* string, int schema_id) {
78 __n_assert(string, return FALSE);
79 __n_assert(string->data, return FALSE);
80 __n_assert((string->written >= sizeof(int32_t)), return FALSE);
81 return n_kafka_put_schema_in_char(string->data, schema_id);
82}
83
92int n_kafka_get_status(N_KAFKA* kafka, size_t* nb_queued, size_t* nb_waiting, size_t* nb_error) {
93 __n_assert(kafka, return FALSE);
94 __n_assert(nb_waiting, return FALSE);
95 __n_assert(nb_error, return FALSE);
96 read_lock(kafka->rwlock);
97 int status = kafka->polling_thread_status;
98 *nb_queued = kafka->nb_queued;
99 *nb_waiting = kafka->nb_waiting;
100 *nb_error = kafka->nb_error;
101 unlock(kafka->rwlock);
102 return status;
103}
104
114// cppcheck-suppress constParameterCallback -- callback signature must match rd_kafka typedef
115static void n_kafka_delivery_message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) {
116 (void)opaque;
117
118 __n_assert(rk, n_log(LOG_ERR, "rk=NULL is not a valid kafka handle"); return);
119 __n_assert(rkmessage, n_log(LOG_ERR, "rkmessage=NULL is not a valid kafka message"); return);
120
121 N_KAFKA_EVENT* event = (N_KAFKA_EVENT*)rkmessage->_private;
122
123 if (rkmessage->err) {
124 n_log(LOG_ERR, "message delivery failed: %s", rd_kafka_err2str(rkmessage->err));
125 if (!event) {
126 n_log(LOG_ERR, "fatal: event is NULL");
127 return;
128 }
129 if (event->parent_table)
130 write_lock(event->parent_table->rwlock);
131 event->status = N_KAFKA_EVENT_ERROR;
132 /* adjust counters: event was WAITING_ACK, now it's ERROR */
133 if (event->parent_table) {
134 event->parent_table->nb_waiting--;
135 event->parent_table->nb_error++;
136 }
137 if (event->parent_table)
138 unlock(event->parent_table->rwlock);
139 } else {
140 n_log(LOG_DEBUG, "message delivered (%ld bytes, partition %d)", rkmessage->len, rkmessage->partition);
141 if (event) {
142 // lock
143 if (event->parent_table)
144 write_lock(event->parent_table->rwlock);
145 // delete produce event linked files
146 if (event->event_files_to_delete) {
147 char** files_to_delete = split(_nstr(event->event_files_to_delete), ";", 0);
148 if (files_to_delete) {
149 if (split_count(files_to_delete) > 0) {
150 int index = 0;
151 while (files_to_delete[index]) {
152 int ret = unlink(files_to_delete[index]);
153 int error = errno;
154 if (ret == 0) {
155 n_log(LOG_DEBUG, "deleted on produce ack: %s", files_to_delete[index]);
156 } else {
157 n_log(LOG_ERR, "couldn't delete \"%s\": %s", files_to_delete[index], strerror(error));
158 }
159 index++;
160 }
161 } else {
162 n_log(LOG_ERR, "split result is empty !");
163 }
164 }
165 free_split_result(&files_to_delete);
166 }
167 // set status
168 event->status = N_KAFKA_EVENT_OK;
169 // unlock
170 if (event->parent_table)
171 unlock(event->parent_table->rwlock);
172 n_log(LOG_INFO, "kafka event %p received an ack !", event);
173 } else {
174 n_log(LOG_ERR, "fatal: event is NULL");
175 }
176 }
177 return;
178 /* The rkmessage is destroyed automatically by librdkafka */
179}
180
186 __n_assert(kafka, return);
187
188 FreeNoLog(kafka->topic);
189 free_split_result(&kafka->topics);
190 FreeNoLog(kafka->event_cmd);
191 FreeNoLog(kafka->groupid);
192
194
195 /* rd_kafka_topic_destroy must be called before rd_kafka_destroy */
196 if (kafka->rd_kafka_topic)
197 rd_kafka_topic_destroy(kafka->rd_kafka_topic);
198 kafka->rd_kafka_topic = NULL;
199
200 if (kafka->rd_kafka_handle) {
201 if (kafka->mode == RD_KAFKA_CONSUMER) {
202 /* close the consumer. May already be closed by stop_polling_thread,
203 * calling it again is safe (returns an error but does not crash) */
204 rd_kafka_consumer_close(kafka->rd_kafka_handle);
205 if (kafka->subscription)
206 rd_kafka_topic_partition_list_destroy(kafka->subscription);
207 }
208 if (kafka->mode == RD_KAFKA_PRODUCER) {
209 rd_kafka_flush(kafka->rd_kafka_handle, kafka->poll_timeout);
210 }
211 rd_kafka_destroy(kafka->rd_kafka_handle);
212 n_log(LOG_DEBUG, "kafka handle destroyed");
213 }
214
215 if (kafka->rd_kafka_conf) {
216 rd_kafka_conf_destroy(kafka->rd_kafka_conf);
217 }
218
219 if (kafka->configuration)
220 cJSON_Delete(kafka->configuration);
221
222 if (kafka->errstr)
223 free_nstr(&kafka->errstr);
224
225 if (kafka->events_to_send)
227
228 if (kafka->received_events)
230
231 rw_lock_destroy(kafka->rwlock);
232
233 Free(kafka->bootstrap_servers);
234
235 Free(kafka);
236 return;
237}
238
246N_KAFKA* n_kafka_new(int32_t poll_timeout, int32_t poll_interval, size_t errstr_len) {
247 N_KAFKA* kafka = NULL;
248 Malloc(kafka, N_KAFKA, 1);
249 __n_assert(kafka, return NULL);
250
251 kafka->errstr = new_nstr(errstr_len);
252 __n_assert(kafka->errstr, Free(kafka); return NULL);
253
254 kafka->events_to_send = NULL;
255 kafka->received_events = NULL;
256 kafka->rd_kafka_conf = NULL;
257 kafka->rd_kafka_handle = NULL;
258 kafka->configuration = NULL;
259 kafka->groupid = NULL;
260 kafka->topics = NULL;
261 kafka->event_cmd = NULL;
262 kafka->subscription = NULL;
263 kafka->topic = NULL;
264 kafka->mode = -1;
265 kafka->schema_id = -1;
266 kafka->poll_timeout = poll_timeout;
267 kafka->poll_interval = poll_interval;
268 kafka->monitored_directory_interval = 3000; // 3000 msecs as a default
269 kafka->nb_queued = 0;
270 kafka->nb_waiting = 0;
271 kafka->nb_error = 0;
272 kafka->event_consumption_enabled = TRUE;
273 kafka->event_production_enabled = TRUE;
274 kafka->polling_thread_status = 0;
275 kafka->bootstrap_servers = NULL;
276
278 __n_assert(kafka->events_to_send, n_kafka_delete(kafka); return NULL);
279
281 __n_assert(kafka->received_events, n_kafka_delete(kafka); return NULL);
282
283 if (init_lock(kafka->rwlock) != 0) {
284 n_log(LOG_ERR, "could not init kafka rwlock in kafka structure at address %p", kafka);
285 n_kafka_delete(kafka);
286 return NULL;
287 }
288
289 return kafka;
290}
291
299 __n_assert(config_file, return NULL);
300
301 N_KAFKA* kafka = NULL;
302
303 kafka = n_kafka_new(-1, 100, 1024);
304 __n_assert(kafka, return NULL);
305
306 // initialize kafka object
307 kafka->rd_kafka_conf = rd_kafka_conf_new();
308
309 // load config file
310 N_STR* config_string = NULL;
311 config_string = file_to_nstr(config_file);
312 if (!config_string) {
313 n_log(LOG_ERR, "unable to read config from file %s !", config_file);
314 n_kafka_delete(kafka);
315 return NULL;
316 }
317 cJSON* json = NULL;
318 json = cJSON_Parse(_nstrp(config_string));
319 /* config_string is no longer needed after parsing */
320 free_nstr(&config_string);
321 if (!json) {
322 n_log(LOG_ERR, "unable to parse json from file %s", config_file);
323 n_kafka_delete(kafka);
324 return NULL;
325 }
326
327 int jsonIndex;
328 for (jsonIndex = 0; jsonIndex < cJSON_GetArraySize(json); jsonIndex++) {
329 cJSON* entry = cJSON_GetArrayItem(json, jsonIndex);
330
331 if (!entry) continue;
332 __n_assert(entry->string, continue);
333 if (!entry->valuestring) {
334 n_log(LOG_DEBUG, "no valuestring for entry %s", _str(entry->string));
335 continue;
336 }
337
338 if (entry->string[0] != '-') {
339 // if it's not one of the optionnal parameters not managed by kafka, then we can use rd_kafka_conf_set on them
340 if (strcmp("topic", entry->string) != 0 &&
341 strcmp("topics", entry->string) != 0 &&
342 strcmp("event_cmd", entry->string) != 0 &&
343 strcmp("value.schema.id", entry->string) != 0 &&
344 strcmp("value.schema.type", entry->string) != 0 &&
345 strcmp("poll.interval", entry->string) != 0 &&
346 strcmp("poll.timeout", entry->string) != 0 &&
347 strcmp("group.id.autogen", entry->string) != 0 &&
348 strcmp("monitored.directory.interval", entry->string) != 0) {
349 if (!strcmp("group.id", entry->string)) {
350 // exclude group id for producer
351 if (mode == RD_KAFKA_PRODUCER)
352 continue;
353 kafka->groupid = strdup(entry->valuestring);
354 }
355
356 if (rd_kafka_conf_set(kafka->rd_kafka_conf, entry->string, entry->valuestring, _nstr(kafka->errstr), kafka->errstr->length) != RD_KAFKA_CONF_OK) {
357 n_log(LOG_ERR, "kafka config: %s", _nstr(kafka->errstr));
358 } else {
359 n_log(LOG_DEBUG, "kafka config enabled: %s => %s", entry->string, entry->valuestring);
360 }
361 }
362 } else {
363 n_log(LOG_DEBUG, "kafka disabled config: %s => %s", entry->string, entry->valuestring);
364 }
365 }
366
367 // other parameters, not directly managed by kafka API (will cause an error if used along rd_kafka_conf_set )
368 // producer topic
369 cJSON* jstr = NULL;
370 jstr = cJSON_GetObjectItem(json, "topic");
371 if (jstr && jstr->valuestring) {
372 kafka->topic = strdup(jstr->valuestring);
373 n_log(LOG_DEBUG, "kafka producer topic: %s", kafka->topic);
374 } else {
375 if (mode == RD_KAFKA_PRODUCER) {
376 n_log(LOG_ERR, "no topic configured !");
377 cJSON_Delete(json);
378 n_kafka_delete(kafka);
379 return NULL;
380 }
381 }
382 // consumer topics
383 jstr = cJSON_GetObjectItem(json, "topics");
384 if (jstr && jstr->valuestring) {
385 kafka->topics = split(jstr->valuestring, ",", 0);
386 n_log(LOG_DEBUG, "kafka consumer topics: %s", jstr->valuestring);
387 } else {
388 if (mode == RD_KAFKA_CONSUMER) {
389 n_log(LOG_ERR, "no topics configured !");
390 cJSON_Delete(json);
391 n_kafka_delete(kafka);
392 return NULL;
393 }
394 }
395
396 // eventual consumer custom event_cmd
397 jstr = cJSON_GetObjectItem(json, "event_cmd");
398 if (jstr && jstr->valuestring) {
399 kafka->event_cmd = strdup(jstr->valuestring);
400 n_log(LOG_DEBUG, "kafka consumer event_cmd: %s", jstr->valuestring);
401 }
402
403 // schema id if any
404 jstr = cJSON_GetObjectItem(json, "value.schema.id");
405 if (jstr && jstr->valuestring) {
406 int schem_v = atoi(jstr->valuestring);
407 if (schem_v < -1 || schem_v > 9999) {
408 n_log(LOG_ERR, "invalid schema id %d", schem_v);
409 cJSON_Delete(json);
410 n_kafka_delete(kafka);
411 return NULL;
412 }
413 n_log(LOG_DEBUG, "kafka schema id: %d", schem_v);
414 kafka->schema_id = schem_v;
415 }
416
417 // kafka broker poll interval
418 jstr = cJSON_GetObjectItem(json, "poll.interval");
419 if (jstr && jstr->valuestring) {
420 kafka->poll_interval = atoi(jstr->valuestring);
421 n_log(LOG_DEBUG, "kafka poll interval: %d", kafka->poll_interval);
422 }
423
424 // kafka broker poll timeout
425 jstr = cJSON_GetObjectItem(json, "poll.timeout");
426 if (jstr && jstr->valuestring) {
427 kafka->poll_timeout = atoi(jstr->valuestring);
428 n_log(LOG_DEBUG, "kafka poll timeout: %d", kafka->poll_timeout);
429 }
430
431 // local directory poll interval
432 jstr = cJSON_GetObjectItem(json, "monitored.directory.interval");
433 if (jstr && jstr->valuestring) {
434 kafka->monitored_directory_interval = atoi(jstr->valuestring);
435 n_log(LOG_DEBUG, "kafka monitored directory interval: %d", kafka->monitored_directory_interval);
436 }
437
438 // bootstrap servers
439 jstr = cJSON_GetObjectItem(json, "bootstrap.servers");
440 if (jstr && jstr->valuestring) {
441 kafka->bootstrap_servers = strdup(jstr->valuestring);
442 n_log(LOG_DEBUG, "kafka bootstrap server: %s", kafka->bootstrap_servers);
443 }
444
445 if (mode == RD_KAFKA_PRODUCER) {
446 // set delivery callback
447 rd_kafka_conf_set_dr_msg_cb(kafka->rd_kafka_conf, n_kafka_delivery_message_callback);
448
449 kafka->rd_kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, kafka->rd_kafka_conf, _nstr(kafka->errstr), kafka->errstr->length);
450 if (!kafka->rd_kafka_handle) {
451 n_log(LOG_ERR, "failed to create new producer: %s", _nstr(kafka->errstr));
452 cJSON_Delete(json);
453 n_kafka_delete(kafka);
454 return NULL;
455 }
456 // conf is now owned by kafka handle
457 kafka->rd_kafka_conf = NULL;
458 // Create topic object
459 kafka->rd_kafka_topic = rd_kafka_topic_new(kafka->rd_kafka_handle, kafka->topic, NULL);
460 if (!kafka->rd_kafka_topic) {
461 cJSON_Delete(json);
462 n_kafka_delete(kafka);
463 return NULL;
464 }
465 kafka->mode = RD_KAFKA_PRODUCER;
466 } else if (mode == RD_KAFKA_CONSUMER) {
467 /* If there is no previously committed offset for a partition
468 * the auto.offset.reset strategy will be used to decide where
469 * in the partition to start fetching messages.
470 * By setting this to earliest the consumer will read all messages
471 * in the partition if there was no previously committed offset. */
472 /*if (rd_kafka_conf_set(kafka -> rd_kafka_conf, "auto.offset.reset", "earliest", _nstr( kafka -> errstr ) , kafka -> errstr -> length) != RD_KAFKA_CONF_OK) {
473 n_log( LOG_ERR , "kafka conf set: %s", kafka -> errstr);
474 n_kafka_delete( kafka );
475 return NULL ;
476 }*/
477
478 // if groupid is not set, generate a unique one
479 if (!kafka->groupid) {
480 char computer_name[1024] = "";
481 get_computer_name(computer_name, 1024);
482 N_STR* groupid = new_nstr(1024);
483 char* topics = join(kafka->topics, "_");
484 // generating group id
485 jstr = cJSON_GetObjectItem(json, "group.id.autogen");
486 if (jstr && jstr->valuestring) {
487 if (strcmp(jstr->valuestring, "host-topic-group") == 0) {
488 // nstrprintf( groupid , "%s_%s_%s" , computer_name , topics ,kafka -> bootstrap_servers );
489 nstrprintf(groupid, "%s_%s", computer_name, topics);
490 } else if (strcmp(jstr->valuestring, "unique-group") == 0) {
491 // nstrprintf( groupid , "%s_%s_%s_%d" , computer_name , topics , kafka -> bootstrap_servers , getpid() );
492 nstrprintf(groupid, "%s_%s_%d", computer_name, topics, getpid());
493 }
494 } else // default unique group
495 {
496 // nstrprintf( groupid , "%s_%s_%s_%d" , computer_name , topics , kafka -> bootstrap_servers , getpid() );
497 nstrprintf(groupid, "%s_%s_%d", computer_name, topics, getpid());
498 n_log(LOG_DEBUG, "group.id is not set and group.id.autogen is not set, generated unique group id: %s", _nstr(groupid));
499 }
500 free(topics);
501 kafka->groupid = groupid->data;
502 groupid->data = NULL;
503 free_nstr(&groupid);
504 }
505 if (rd_kafka_conf_set(kafka->rd_kafka_conf, "group.id", kafka->groupid, _nstr(kafka->errstr), kafka->errstr->length) != RD_KAFKA_CONF_OK) {
506 n_log(LOG_ERR, "kafka consumer group.id error: %s", _nstr(kafka->errstr));
507 cJSON_Delete(json);
508 n_kafka_delete(kafka);
509 return NULL;
510 } else {
511 n_log(LOG_DEBUG, "kafka consumer group.id => %s", kafka->groupid);
512 }
513
514 /* Create consumer instance.
515 * NOTE: rd_kafka_new() takes ownership of the conf object
516 * and the application must not reference it again after
517 * this call.
518 */
519 kafka->rd_kafka_handle = rd_kafka_new(RD_KAFKA_CONSUMER, kafka->rd_kafka_conf, _nstr(kafka->errstr), kafka->errstr->length);
520 if (!kafka->rd_kafka_handle) {
521 n_log(LOG_ERR, "%% Failed to create new consumer: %s", kafka->errstr);
522 cJSON_Delete(json);
523 n_kafka_delete(kafka);
524 return NULL;
525 }
526 // conf is now owned by kafka handle
527 kafka->rd_kafka_conf = NULL;
528
529 /* Redirect all messages from per-partition queues to
530 * the main queue so that messages can be consumed with one
531 * call from all assigned partitions.
532 *
533 * The alternative is to poll the main queue (for events)
534 * and each partition queue separately, which requires setting
535 * up a rebalance callback and keeping track of the assignment:
536 * but that is more complex and typically not recommended. */
537 rd_kafka_poll_set_consumer(kafka->rd_kafka_handle);
538
539 /* Convert the list of topics to a format suitable for librdkafka */
540 int topic_cnt = split_count(kafka->topics);
541 kafka->subscription = rd_kafka_topic_partition_list_new(topic_cnt);
542 for (int i = 0; i < topic_cnt; i++)
543 rd_kafka_topic_partition_list_add(kafka->subscription, kafka->topics[i],
544 /* the partition is ignored
545 * by subscribe() */
546 RD_KAFKA_PARTITION_UA);
547 /* Assign the topic. This method is disabled as it does noat allow dynamic partition assignement
548 int err = rd_kafka_assign(kafka -> rd_kafka_handle, kafka -> subscription);
549 if( err )
550 {
551 n_log( LOG_ERR , "kafka consumer: failed to assign %d topics: %s", kafka -> subscription->cnt, rd_kafka_err2str(err));
552 n_kafka_delete( kafka );
553 return NULL;
554 } */
555
556 /* Subscribe to the list of topics */
557 int err = rd_kafka_subscribe(kafka->rd_kafka_handle, kafka->subscription);
558 if (err) {
559 n_log(LOG_ERR, "kafka consumer: failed to subscribe to %d topics: %s", kafka->subscription->cnt, rd_kafka_err2str(err));
560 cJSON_Delete(json);
561 n_kafka_delete(kafka);
562 return NULL;
563 }
564
565 n_log(LOG_DEBUG, "kafka consumer created and subscribed to %d topic(s), waiting for rebalance and messages...", kafka->subscription->cnt);
566
567 kafka->mode = RD_KAFKA_CONSUMER;
568 } else {
569 n_log(LOG_ERR, "invalid mode %d", mode);
570 cJSON_Delete(json);
571 n_kafka_delete(kafka);
572 return NULL;
573 }
574
575 cJSON_Delete(json);
576 return kafka;
577} /* n_kafka_load_config */
578
585 N_KAFKA_EVENT* event = NULL;
586 Malloc(event, N_KAFKA_EVENT, 1);
587 __n_assert(event, return NULL);
588
589 event->event_string = NULL;
590 event->event_files_to_delete = NULL;
591 event->from_topic = NULL;
592 event->rd_kafka_headers = NULL;
593 event->received_headers = NULL;
594 event->schema_id = schema_id;
595 event->status = N_KAFKA_EVENT_CREATED;
596 event->parent_table = NULL;
597
598 return event;
599} /* n_kafka_new_event */
600
607int n_kafka_new_headers(N_KAFKA_EVENT* event, size_t count) {
608 __n_assert(event, return FALSE);
609
610 if (count == 0)
611 count = 1;
612
613 if (!event->rd_kafka_headers) {
614 event->rd_kafka_headers = rd_kafka_headers_new(count);
615 __n_assert(event->rd_kafka_headers, return FALSE);
616 } else {
617 n_log(LOG_ERR, "event headers already allocated for event %p", event);
618 return FALSE;
619 }
620 return TRUE;
621}
622
632int n_kafka_add_header_ex(N_KAFKA_EVENT* event, char* key, size_t key_length, char* value, size_t value_length) {
633 __n_assert(event, return FALSE);
634 __n_assert(event->rd_kafka_headers, return FALSE);
635 __n_assert(key, return FALSE);
636 __n_assert(value, return FALSE);
637
638 if (key_length < 1 || key_length > SSIZE_MAX) {
639 n_log(LOG_ERR, "Invalid key length (%zu) for header in event %p", key_length, event);
640 return FALSE;
641 }
642
643 if (value_length < 1 || value_length > SSIZE_MAX) {
644 n_log(LOG_ERR, "Invalid value length (%zu) for key '%s' in event %p", value_length, key, event);
645 return FALSE;
646 }
647
648 rd_kafka_resp_err_t err = rd_kafka_header_add(event->rd_kafka_headers, key, (ssize_t)key_length, value, (ssize_t)value_length);
649
650 if (err) {
651 n_log(LOG_ERR, "Failed to add header [%s:%zu=%s:%zu] to event %p: %s",
652 key, key_length, value, value_length, event, rd_kafka_err2str(err));
653 return FALSE;
654 }
655
656 return TRUE;
657}
658
667 __n_assert(event, return FALSE);
668 __n_assert(key, return FALSE);
669 __n_assert(key->data, return FALSE);
670 __n_assert(value, return FALSE);
671 __n_assert(value->data, return FALSE);
672
673 return n_kafka_add_header_ex(event, key->data, key->written, value->data, value->written);
674}
675
683 __n_assert(kafka, return FALSE);
684 __n_assert(event, return FALSE);
685
686 event->parent_table = kafka;
687
688 write_lock(kafka->rwlock);
689 kafka->nb_queued++;
691 unlock(kafka->rwlock);
692
693 // Success to *enqueue* event
694 n_log(LOG_DEBUG, "successfully enqueued event %p in producer %p waitlist, topic: %s", event, kafka->rd_kafka_handle, kafka->topic);
695
696 return TRUE;
697} /* n_kafka_produce */
698
706 char* event_string = NULL;
707 size_t event_length = 0;
708
709 event->parent_table = kafka;
710
711 event_string = event->event_string->data;
712 event_length = event->event_string->written;
713
714 if (event->rd_kafka_headers) {
715 rd_kafka_headers_t* hdrs_copy;
716 hdrs_copy = rd_kafka_headers_copy(event->rd_kafka_headers);
717
718 rd_kafka_resp_err_t err = rd_kafka_producev(
719 kafka->rd_kafka_handle, RD_KAFKA_V_RKT(kafka->rd_kafka_topic),
720 RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA),
721 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
722 RD_KAFKA_V_VALUE(event_string, event_length),
723 RD_KAFKA_V_HEADERS(hdrs_copy),
724 RD_KAFKA_V_OPAQUE((void*)event),
725 RD_KAFKA_V_END);
726
727 if (err) {
728 rd_kafka_headers_destroy(hdrs_copy);
729 event->status = N_KAFKA_EVENT_ERROR;
730 n_log(LOG_ERR, "failed to produce event: %p with headers %p, producer: %p, topic: %s, error: %s", event, event->rd_kafka_headers, kafka->rd_kafka_handle, kafka->topic, rd_kafka_err2str(err));
731 return FALSE;
732 }
733 } else {
734 if (rd_kafka_produce(kafka->rd_kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, event_string, event_length, NULL, 0, event) == -1) {
735 int error = errno;
736 event->status = N_KAFKA_EVENT_ERROR;
737 n_log(LOG_ERR, "failed to produce event: %p, producer: %p, topic: %s, error: %s", event, kafka->rd_kafka_handle, kafka->topic, strerror(error));
738 return FALSE;
739 }
740 }
741 // Success to *enqueue* event
742 n_log(LOG_DEBUG, "successfully enqueued event %p in local producer %p : %s", event, kafka->rd_kafka_handle, kafka->topic);
743
744 event->status = N_KAFKA_EVENT_WAITING_ACK;
745
746 return TRUE;
747} /* n_kafka_produce_ex */
748
756N_KAFKA_EVENT* n_kafka_new_event_from_char(const char* string, size_t written, int schema_id) {
757 __n_assert(string, return NULL);
758
759 size_t offset = 0;
760 if (schema_id != -1)
761 offset = 5;
762
763 N_KAFKA_EVENT* event = n_kafka_new_event(schema_id);
764 __n_assert(event, return NULL);
765
766 Malloc(event->event_string, N_STR, 1);
767 __n_assert(event->event_string, free(event); return NULL);
768
769 // allocate the size of the event + (the size of the schema id + magic byte) + one ending \0
770 size_t length = written + offset;
771 Malloc(event->event_string->data, char, length);
772 __n_assert(event->event_string->data, free(event->event_string); free(event); return NULL);
773 event->event_string->length = length;
774
775 // copy incomming body
776 memcpy(event->event_string->data + offset, string, written);
777 event->event_string->written = written + offset;
778
779 if (schema_id != -1)
780 n_kafka_put_schema_in_nstr(event->event_string, schema_id);
781
782 // set status and schema id
783 event->status = N_KAFKA_EVENT_QUEUED;
784
785 return event;
786} /* n_kafka_new_event_from_char */
787
794N_KAFKA_EVENT* n_kafka_new_event_from_string(const N_STR* string, int schema_id) {
795 __n_assert(string, return NULL);
796 __n_assert(string->data, return NULL);
797
798 N_KAFKA_EVENT* event = n_kafka_new_event_from_char(string->data, string->written, schema_id);
799
800 return event;
801} /* n_kafka_new_event_from_string */
802
809N_KAFKA_EVENT* n_kafka_new_event_from_file(char* filename, int schema_id) {
810 __n_assert(filename, return NULL);
811
812 N_STR* from = file_to_nstr(filename);
813 __n_assert(from, return NULL);
814
815 N_KAFKA_EVENT* event = n_kafka_new_event_from_string(from, schema_id);
816 free_nstr(&from);
817 return event;
818} /* n_kafka_new_event_from_file */
819
824void n_kafka_event_destroy_ptr(void* event_ptr) {
825 __n_assert(event_ptr, return);
826 N_KAFKA_EVENT* event = (N_KAFKA_EVENT*)event_ptr;
827 __n_assert(event, return);
828
829 if (event->event_string)
830 free_nstr(&event->event_string);
831
832 if (event->event_files_to_delete)
833 free_nstr(&event->event_files_to_delete);
834
835 FreeNoLog(event->from_topic);
836
837 if (event->rd_kafka_headers)
838 rd_kafka_headers_destroy(event->rd_kafka_headers);
839
840 if (event->received_headers)
841 list_destroy(&event->received_headers);
842
843 free(event);
844 return;
845} /* n_kafka_event_destroy_ptr */
846
853 __n_assert(event && (*event), return FALSE);
855 (*event) = NULL;
856 return TRUE;
857} /* n_kafka_event_destroy */
858
865 __n_assert(kafka, return FALSE);
866
867 int event_consumption_enabled;
868 int event_production_enabled;
869
870 read_lock(kafka->rwlock);
871 event_consumption_enabled = kafka->event_consumption_enabled;
872 event_production_enabled = kafka->event_production_enabled;
873 unlock(kafka->rwlock);
874
875 // wait poll interval msecs for kafka response
876 if (kafka->mode == RD_KAFKA_PRODUCER) {
877 if (event_production_enabled) {
878 int nb_events = rd_kafka_poll(kafka->rd_kafka_handle, kafka->poll_interval);
879 (void)nb_events;
880 write_lock(kafka->rwlock);
881 // check events status in event table
882 LIST_NODE* node = kafka->events_to_send->start;
883 while (node) {
884 N_KAFKA_EVENT* event = (N_KAFKA_EVENT*)node->ptr;
885 if (event->status == N_KAFKA_EVENT_OK) {
886 kafka->nb_waiting--;
887 n_log(LOG_DEBUG, "removing event OK %p", event);
888 LIST_NODE* node_to_kill = node;
889 node = node->next;
890 N_KAFKA_EVENT* event_to_kill = remove_list_node(kafka->events_to_send, node_to_kill, N_KAFKA_EVENT);
891 n_kafka_event_destroy(&event_to_kill);
892 continue;
893 } else if (event->status == N_KAFKA_EVENT_QUEUED) {
894 if (n_kafka_produce_ex(kafka, event) == FALSE) {
895 /* produce_ex already set status to ERROR,
896 * adjust counters: QUEUED -> ERROR */
897 kafka->nb_queued--;
898 kafka->nb_error++;
899 } else {
900 kafka->nb_waiting++;
901 kafka->nb_queued--;
902 }
903 } else if (event->status == N_KAFKA_EVENT_ERROR) {
904 /* counters already adjusted at transition time
905 * (in produce_ex failure above or in delivery callback) */
906 /* TODO: implement retry of errored events after error_timeout has elapsed */
907 }
908 node = node->next;
909 }
910 unlock(kafka->rwlock);
911 } else {
912 uint64_t sleep_us = (kafka->poll_interval > 0) ? ((uint64_t)(uint32_t)kafka->poll_interval * 1000ULL) : 0ULL;
913 usleep((unsigned int)(sleep_us > (uint64_t)UINT_MAX ? (uint64_t)UINT_MAX : sleep_us)); // production is disabled, sleep instead of producing
914 }
915 } else if (kafka->mode == RD_KAFKA_CONSUMER) {
916 if (event_consumption_enabled) {
917 rd_kafka_message_t* rkm = NULL;
918 while (event_consumption_enabled && (rkm = rd_kafka_consumer_poll(kafka->rd_kafka_handle, kafka->poll_interval))) {
919 read_lock(kafka->rwlock);
920 event_consumption_enabled = kafka->event_consumption_enabled;
921 unlock(kafka->rwlock);
922 if (rkm->err) {
923 /* Consumer errors are generally to be considered
924 * informational as the consumer will automatically
925 * try to recover from all types of errors. */
926 n_log(LOG_ERR, "consumer: %s", rd_kafka_message_errstr(rkm));
927 rd_kafka_message_destroy(rkm);
928 uint64_t sleep_us = (kafka->poll_interval > 0) ? ((uint64_t)(uint32_t)kafka->poll_interval * 1000ULL) : 0ULL;
929 usleep((unsigned int)(sleep_us > (uint64_t)UINT_MAX ? (uint64_t)UINT_MAX : sleep_us));
930 return FALSE;
931 }
932 // Reminder of rkm contents
933 n_log(LOG_DEBUG, "Message on %s [%" PRId32 "] at offset %" PRId64 " (leader epoch %" PRId32 ")", rd_kafka_topic_name(rkm->rkt), rkm->partition, rkm->offset, rd_kafka_message_leader_epoch(rkm));
934
935 // Print the message key
936 if (rkm->key && rkm->key_len > 0)
937 n_log(LOG_DEBUG, "Key: %.*s", (int)rkm->key_len, (const char*)rkm->key);
938 else if (rkm->key)
939 n_log(LOG_DEBUG, "Key: (%d bytes)", (int)rkm->key_len);
940
941 if (rkm->payload && rkm->len > 0) {
942 write_lock(kafka->rwlock);
943 // make a copy of the event for further processing
944 N_KAFKA_EVENT* event = NULL;
945 event = n_kafka_new_event_from_char(rkm->payload, rkm->len, -1); // no schema id because we want a full raw copy here
946 event->parent_table = kafka;
947 // test if there are headers, save them
948 rd_kafka_headers_t* hdrs = NULL;
949 if (!rd_kafka_message_headers(rkm, &hdrs)) {
950 size_t idx = 0;
951 const char* name = NULL;
952 const void* val = NULL;
953 size_t size = 0;
954 event->received_headers = new_generic_list(MAX_LIST_ITEMS);
955 while (!rd_kafka_header_get_all(hdrs, idx, &name, &val, &size)) {
956 N_STR* header_entry = NULL;
957 nstrprintf(header_entry, "%s=%s", _str(name), _str((char*)val));
958 list_push(event->received_headers, header_entry, &free_nstr_ptr);
959 idx++;
960 }
961 }
962 // save originating topic (can help sorting if there are multiples one)
963 event->from_topic = strdup(rd_kafka_topic_name(rkm->rkt));
964 if (kafka->schema_id != -1)
965 event->schema_id = n_kafka_get_schema_from_nstr(event->event_string);
967 n_log(LOG_DEBUG, "Consumer received event of (%d bytes) from topic %s", (int)rkm->len, event->from_topic);
968 unlock(kafka->rwlock);
969 }
970 rd_kafka_message_destroy(rkm);
971 }
972 } else {
973 uint64_t sleep_us = (kafka->poll_interval > 0) ? ((uint64_t)(uint32_t)kafka->poll_interval * 1000ULL) : 0ULL;
974 usleep((unsigned int)(sleep_us > (uint64_t)UINT_MAX ? (uint64_t)UINT_MAX : sleep_us)); // consumption is disabled, sleep instead of consuming
975 }
976 }
977 // n_log( LOG_DEBUG , "kafka poll for handle %p returned %d elements" , kafka -> rd_kafka_handle , nb_events );
978 return TRUE;
979} /* n_kafka_poll */
980
986void* n_kafka_polling_thread(void* ptr) {
987 N_KAFKA* kafka = (N_KAFKA*)ptr;
988
989 int status = 1;
990
991 N_TIME chrono;
992
993 if (kafka->mode == RD_KAFKA_PRODUCER)
994 n_log(LOG_DEBUG, "starting polling thread for kafka handler %p mode PRODUCER (%d) topic %s", kafka->rd_kafka_handle, RD_KAFKA_PRODUCER, kafka->topic);
995 if (kafka->mode == RD_KAFKA_CONSUMER) {
996 char* topiclist = join(kafka->topics, ",");
997 n_log(LOG_DEBUG, "starting polling thread for kafka handler %p mode CONSUMER (%d) topic %s", kafka->rd_kafka_handle, RD_KAFKA_CONSUMER, _str(topiclist));
998 FreeNoLog(topiclist);
999 }
1000
1001 start_HiTimer(&chrono);
1002
1003 int64_t remaining_time = (int64_t)kafka->poll_timeout * 1000;
1004 while (status == 1) {
1005 if (n_kafka_poll(kafka) == FALSE) {
1006 if (kafka->mode == RD_KAFKA_PRODUCER) {
1007 n_log(LOG_ERR, "failed to poll kafka producer handle %p with topic %s", kafka->rd_kafka_handle, rd_kafka_topic_name(kafka->rd_kafka_topic));
1008 } else if (kafka->topics) {
1009 char* topiclist = join(kafka->topics, ",");
1010 n_log(LOG_ERR, "failed to poll kafka consumer handle %p with topic %s", kafka->rd_kafka_handle, _str(topiclist));
1011 FreeNoLog(topiclist);
1012 }
1013 }
1014
1015 read_lock(kafka->rwlock);
1016 status = kafka->polling_thread_status;
1017 unlock(kafka->rwlock);
1018
1019 if (status == 2)
1020 break;
1021
1022 int64_t elapsed_time = get_usec(&chrono);
1023 if (kafka->poll_timeout != -1) {
1024 remaining_time -= elapsed_time;
1025 if (remaining_time < 0) {
1026 if (kafka->mode == RD_KAFKA_PRODUCER) {
1027 n_log(LOG_DEBUG, "timeouted on kafka handle %p", kafka->rd_kafka_handle);
1028 } else if (kafka->mode == RD_KAFKA_CONSUMER) {
1029 n_log(LOG_DEBUG, "timeouted on kafka handle %p", kafka->rd_kafka_handle);
1030 }
1031 break;
1032 }
1033 }
1034 // n_log( LOG_DEBUG , "remaining time: %d on kafka handle %p" , remaining_time , kafka -> rd_kafka_handle );
1035 }
1036
1037 write_lock(kafka->rwlock);
1038 kafka->polling_thread_status = 0;
1039 unlock(kafka->rwlock);
1040
1041 n_log(LOG_DEBUG, "exiting polling thread for kafka handler %p mode %s", kafka->rd_kafka_handle, (kafka->mode == RD_KAFKA_PRODUCER) ? "PRODUCER" : "CONSUMER");
1042 pthread_exit(NULL);
1043 return NULL;
1044} /* n_kafka_polling_thread */
1045
1052 __n_assert(kafka, return FALSE);
1053
1054 read_lock(kafka->rwlock);
1055 int status = kafka->polling_thread_status;
1056 unlock(kafka->rwlock);
1057
1058 if (status == 1) {
1059 n_log(LOG_ERR, "kafka polling thread already started for handle %p", kafka);
1060 return FALSE;
1061 }
1062
1063 write_lock(kafka->rwlock);
1064 kafka->polling_thread_status = 1;
1065
1066 if (pthread_create(&kafka->polling_thread, NULL, n_kafka_polling_thread, (void*)kafka) != 0) {
1067 n_log(LOG_ERR, "unable to create polling_thread for kafka handle %p", kafka);
1068 unlock(kafka->rwlock);
1069 return FALSE;
1070 }
1071 unlock(kafka->rwlock);
1072
1073 n_log(LOG_DEBUG, "pthread_create sucess for kafka handle %p->%p", kafka, kafka->rd_kafka_handle);
1074
1075 return TRUE;
1076} /* n_kafka_start_polling_thread */
1077
1084 __n_assert(kafka, return FALSE);
1085
1086 if (kafka->mode == RD_KAFKA_CONSUMER) {
1088 }
1089
1090 read_lock(kafka->rwlock);
1091 int polling_thread_status = kafka->polling_thread_status;
1092 unlock(kafka->rwlock);
1093
1094 if (polling_thread_status == 0) {
1095 n_log(LOG_DEBUG, "kafka polling thread already stopped for handle %p", kafka);
1096 return FALSE;
1097 }
1098 if (polling_thread_status == 2) {
1099 n_log(LOG_DEBUG, "kafka polling ask for stop thread already done for handle %p", kafka);
1100 return FALSE;
1101 }
1102
1103 write_lock(kafka->rwlock);
1104 kafka->polling_thread_status = 2;
1105 unlock(kafka->rwlock);
1106
1107 if (kafka->rd_kafka_handle && kafka->mode == RD_KAFKA_CONSUMER) {
1108 rd_kafka_consumer_close(kafka->rd_kafka_handle);
1109 }
1110
1111 struct timespec ts;
1112 clock_gettime(CLOCK_REALTIME, &ts);
1113 ts.tv_sec += 10;
1114
1115 int rc = pthread_timedjoin_np(kafka->polling_thread, NULL, &ts);
1116 if (rc != 0) {
1117 n_log(LOG_ERR, "polling thread did not stop in 10s, force stop !");
1118 return FALSE;
1119 }
1120
1121 return TRUE;
1122} /* n_kafka_stop_polling_thread */
1123
1130 __n_assert(kafka, return FALSE);
1131 write_lock(kafka->rwlock);
1132 kafka->event_consumption_enabled = TRUE;
1133 unlock(kafka->rwlock);
1134 return TRUE;
1135} /* n_kafka_set_consumer_event_polling */
1136
1143 __n_assert(kafka, return FALSE);
1144 write_lock(kafka->rwlock);
1145 kafka->event_consumption_enabled = FALSE;
1146 unlock(kafka->rwlock);
1147 return TRUE;
1148} /* n_kafka_set_consumer_event_polling */
1149
1156 __n_assert(kafka, return FALSE);
1157 write_lock(kafka->rwlock);
1158 kafka->event_production_enabled = TRUE;
1159 unlock(kafka->rwlock);
1160 return TRUE;
1161} /* n_kafka_set_consumer_event_polling */
1162
1169 __n_assert(kafka, return FALSE);
1170 write_lock(kafka->rwlock);
1171 kafka->event_production_enabled = FALSE;
1172 unlock(kafka->rwlock);
1173 return TRUE;
1174} /* n_kafka_set_consumer_event_polling */
1175
1182int n_kafka_dump_unprocessed(N_KAFKA* kafka, char* directory) {
1183 __n_assert(kafka, return FALSE);
1184 __n_assert(directory, return FALSE);
1185
1186 int status = 0;
1187 size_t nb_todump = 0;
1188 read_lock(kafka->rwlock);
1189 status = kafka->polling_thread_status;
1190 nb_todump = kafka->nb_queued + kafka->nb_waiting + kafka->nb_error;
1191 if (status != 0) {
1192 n_log(LOG_ERR, "kafka handle %p thread polling func is still running, aborting dump", kafka);
1193 unlock(kafka->rwlock);
1194 return FALSE;
1195 }
1196 if (nb_todump == 0) {
1197 n_log(LOG_DEBUG, "kafka handle %p: nothing to dump, all events processed correctly", kafka);
1198 unlock(kafka->rwlock);
1199 return TRUE;
1200 }
1201
1202 /* use a plain struct to alias event data without allocating a data buffer
1203 * that would leak when overwritten */
1204 N_STR* dumpstr = NULL;
1205 Malloc(dumpstr, N_STR, 1);
1206 __n_assert(dumpstr, unlock(kafka->rwlock); return FALSE);
1207 list_foreach(node, kafka->events_to_send) {
1208 N_KAFKA_EVENT* event = node->ptr;
1209 if (event->status != N_KAFKA_EVENT_OK) {
1210 size_t offset = 0;
1211 if (event->schema_id != -1)
1212 offset = 5;
1213
1214 N_STR* filename = NULL;
1215 nstrprintf(filename, "%s/%s+%p", directory, kafka->topic, event);
1216 n_log(LOG_DEBUG, "Dumping unprocessed events to %s", _nstr(filename));
1217 // dump event here: alias the data pointer, do not own it
1218 dumpstr->data = event->event_string->data + offset;
1219 dumpstr->written = event->event_string->written - offset;
1220 dumpstr->length = event->event_string->length - offset;
1221 nstr_to_file(dumpstr, _nstr(filename));
1222 free_nstr(&filename);
1223 }
1224 }
1225 unlock(kafka->rwlock);
1226 /* clear aliased pointer before freeing the wrapper struct */
1227 dumpstr->data = NULL;
1228 free_nstr(&dumpstr);
1229 return TRUE;
1230} /* n_kafka_dump_unprocessed */
1231
1238int n_kafka_load_unprocessed(N_KAFKA* kafka, const char* directory) {
1239 __n_assert(kafka, return FALSE);
1240 __n_assert(directory, return FALSE);
1241
1242 write_lock(kafka->rwlock);
1243 /* load events from filename using base64decode( filename ) split( result , '+' )
1244 to get brokersname+topic to check against what's saved with the event
1245 and what's inside kafka's handle conf */
1246 unlock(kafka->rwlock);
1247
1248 return TRUE;
1249} /* n_kafka_load_unprocessed */
1250
1257 __n_assert(kafka, return NULL);
1258
1259 N_KAFKA_EVENT* event = NULL;
1260
1261 write_lock(kafka->rwlock);
1262 if (kafka->received_events->start)
1264 unlock(kafka->rwlock);
1265
1266 return event;
1267} /* n_kafka_get_event */
static int mode
char * event_string
Definition ex_kafka.c:46
char * config_file
Definition ex_kafka.c:45
char * key
#define init_lock(__rwlock_mutex)
Macro for initializing a rwlock.
Definition n_common.h:349
#define FreeNoLog(__ptr)
Free Handler without log.
Definition n_common.h:271
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
Definition n_common.h:203
#define __n_assert(__ptr, __ret)
macro to assert things
Definition n_common.h:278
#define _str(__PTR)
define true
Definition n_common.h:192
int get_computer_name(char *computer_name, size_t len)
get the computer name
Definition n_common.c:69
#define rw_lock_destroy(__rwlock_mutex)
Macro to destroy rwlock mutex.
Definition n_common.h:408
#define unlock(__rwlock_mutex)
Macro for releasing read/write lock a rwlock mutex.
Definition n_common.h:395
#define _nstrp(__PTR)
N_STR or NULL pointer for testing purposes.
Definition n_common.h:200
#define write_lock(__rwlock_mutex)
Macro for acquiring a write lock on a rwlock mutex.
Definition n_common.h:381
#define Free(__ptr)
Free Handler to get errors.
Definition n_common.h:262
#define read_lock(__rwlock_mutex)
Macro for acquiring a read lock on a rwlock mutex.
Definition n_common.h:367
#define _nstr(__PTR)
N_STR or "NULL" string for logging purposes.
Definition n_common.h:198
void * ptr
void pointer to store
Definition n_list.h:45
LIST_NODE * start
pointer to the start of the list
Definition n_list.h:65
struct LIST_NODE * next
pointer to the next node
Definition n_list.h:51
int list_push(LIST *list, void *ptr, void(*destructor)(void *ptr))
Add a pointer to the end of the list.
Definition n_list.c:227
#define list_foreach(__ITEM_, __LIST_)
ForEach macro helper, safe for node removal during iteration.
Definition n_list.h:88
#define remove_list_node(__LIST_, __NODE_, __TYPE_)
Remove macro helper for void pointer casting.
Definition n_list.h:97
int list_destroy(LIST **list)
Empty and Free a list container.
Definition n_list.c:547
LIST * new_generic_list(size_t max_items)
Initialiaze a generic list container to max_items pointers.
Definition n_list.c:36
#define MAX_LIST_ITEMS
flag to pass to new_generic_list for the maximum possible number of item in a list
Definition n_list.h:74
Structure of a generic list node.
Definition n_list.h:43
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
Definition n_log.h:88
#define LOG_DEBUG
debug-level messages
Definition n_log.h:83
#define LOG_ERR
error conditions
Definition n_log.h:75
#define LOG_INFO
informational
Definition n_log.h:81
int32_t event_production_enabled
bool flag to suspend or restart event production
Definition n_kafka.h:137
int32_t poll_interval
poll interval in msecs
Definition n_kafka.h:121
rd_kafka_topic_partition_list_t * subscription
subscribed topics
Definition n_kafka.h:97
int polling_thread_status
polling thread status, 0 => off , 1 => on , 2 => wants to stop, will be turned out to 0 by exiting po...
Definition n_kafka.h:125
cJSON * configuration
kafka json configuration holder
Definition n_kafka.h:117
size_t nb_waiting
number of events waiting for an ack in the waiting list
Definition n_kafka.h:131
int mode
kafka handle mode: RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER
Definition n_kafka.h:115
char ** topics
list of topics to subscribe to
Definition n_kafka.h:93
rd_kafka_topic_t * rd_kafka_topic
kafka topic handle
Definition n_kafka.h:107
rd_kafka_headers_t * rd_kafka_headers
kafka produce event headers structure handle
Definition n_kafka.h:75
pthread_rwlock_t rwlock
access lock
Definition n_kafka.h:113
N_STR * errstr
kafka error string holder
Definition n_kafka.h:109
rd_kafka_conf_t * rd_kafka_conf
kafka structure handle
Definition n_kafka.h:99
LIST * received_events
list of received N_KAFKA_EVENT
Definition n_kafka.h:89
LIST * events_to_send
list of N_KAFKA_EVENT to send
Definition n_kafka.h:87
int32_t poll_timeout
poll timeout in msecs
Definition n_kafka.h:119
pthread_t polling_thread
polling thread id
Definition n_kafka.h:127
rd_kafka_t * rd_kafka_handle
kafka handle (producer or consumer)
Definition n_kafka.h:101
char * event_cmd
eventual custom event_cmd
Definition n_kafka.h:95
int32_t monitored_directory_interval
monitored directory refresh interval in msecs
Definition n_kafka.h:123
char * groupid
consumer group id
Definition n_kafka.h:91
int32_t event_consumption_enabled
bool flag to suspend or restart event consumption
Definition n_kafka.h:135
int schema_id
kafka schema id in network order
Definition n_kafka.h:111
size_t nb_queued
number of waiting events in the producer waiting list
Definition n_kafka.h:129
size_t nb_error
number of errored events
Definition n_kafka.h:133
char * bootstrap_servers
kafka bootstrap servers string
Definition n_kafka.h:105
char * topic
kafka topic string
Definition n_kafka.h:103
int n_kafka_add_header_ex(N_KAFKA_EVENT *event, char *key, size_t key_length, char *value, size_t value_length)
add a header entry to an event.
Definition n_kafka.c:632
int n_kafka_dump_unprocessed(N_KAFKA *kafka, char *directory)
dump unprocessed/unset events
Definition n_kafka.c:1182
N_KAFKA_EVENT * n_kafka_get_event(N_KAFKA *kafka)
get a received event from the N_KAFKA kafka handle
Definition n_kafka.c:1256
#define N_KAFKA_EVENT_OK
state of an OK event
Definition n_kafka.h:60
int n_kafka_stop_polling_thread(N_KAFKA *kafka)
stop the polling thread of a kafka handle
Definition n_kafka.c:1083
#define N_KAFKA_EVENT_ERROR
state of an errored event
Definition n_kafka.h:58
int32_t n_kafka_get_schema_from_nstr(const N_STR *string)
get a schema from the first 4 bytes of a N_STR *string, returning ntohl of the value
Definition n_kafka.c:51
int n_kafka_new_headers(N_KAFKA_EVENT *event, size_t count)
allocate a headers array for the event
Definition n_kafka.c:607
int n_kafka_put_schema_in_nstr(N_STR *string, int schema_id)
put a htonl schema id into the first 4 bytes of a N_STR *string
Definition n_kafka.c:77
int n_kafka_enable_event_production(N_KAFKA *kafka)
enable event production
Definition n_kafka.c:1155
int n_kafka_disable_event_consumption(N_KAFKA *kafka)
disable event consumption
Definition n_kafka.c:1142
N_KAFKA * n_kafka_load_config(char *config_file, int mode)
load a kafka configuration from a file
Definition n_kafka.c:298
int n_kafka_disable_event_production(N_KAFKA *kafka)
disable event production
Definition n_kafka.c:1168
int n_kafka_get_status(N_KAFKA *kafka, size_t *nb_queued, size_t *nb_waiting, size_t *nb_error)
return the queues status
Definition n_kafka.c:92
#define N_KAFKA_EVENT_WAITING_ACK
state of a sent event waiting for acknowledgement
Definition n_kafka.h:56
int n_kafka_put_schema_in_char(char *string, int schema_id)
put a htonl schema id into the first 4 bytes of a char *string
Definition n_kafka.c:64
int n_kafka_poll(N_KAFKA *kafka)
Poll kafka handle in producer or consumer mode.
Definition n_kafka.c:864
int n_kafka_start_polling_thread(N_KAFKA *kafka)
start the polling thread of a kafka handle
Definition n_kafka.c:1051
N_KAFKA_EVENT * n_kafka_new_event_from_char(const char *string, size_t written, int schema_id)
make a new event from a char *string
Definition n_kafka.c:756
N_KAFKA_EVENT * n_kafka_new_event_from_string(const N_STR *string, int schema_id)
make a new event from a N_STR *string
Definition n_kafka.c:794
int n_kafka_event_destroy(N_KAFKA_EVENT **event)
destroy a kafka event and set it's pointer to NULL
Definition n_kafka.c:852
int n_kafka_enable_event_consumption(N_KAFKA *kafka)
enable event consumption
Definition n_kafka.c:1129
void n_kafka_event_destroy_ptr(void *event_ptr)
festroy a kafka event
Definition n_kafka.c:824
int32_t n_kafka_get_schema_from_char(const char *string)
get a schema from the first 4 bytes of a char *string, returning ntohl of the value
Definition n_kafka.c:37
int n_kafka_add_header(N_KAFKA_EVENT *event, N_STR *key, N_STR *value)
add a header entry to an event.
Definition n_kafka.c:666
int n_kafka_load_unprocessed(N_KAFKA *kafka, const char *directory)
load unprocessed/unset events
Definition n_kafka.c:1238
int n_kafka_produce(N_KAFKA *kafka, N_KAFKA_EVENT *event)
put an event in the events_to_send list
Definition n_kafka.c:682
N_KAFKA_EVENT * n_kafka_new_event(int schema_id)
create a new empty event
Definition n_kafka.c:584
#define N_KAFKA_EVENT_CREATED
state of a freshly created event
Definition n_kafka.h:62
N_KAFKA_EVENT * n_kafka_new_event_from_file(char *filename, int schema_id)
make a new event from a N_STR *string
Definition n_kafka.c:809
N_KAFKA * n_kafka_new(int32_t poll_timeout, int32_t poll_interval, size_t errstr_len)
allocate a new kafka handle
Definition n_kafka.c:246
#define N_KAFKA_EVENT_QUEUED
state of a queued event
Definition n_kafka.h:54
void n_kafka_delete(N_KAFKA *kafka)
delete a N_KAFKA handle
Definition n_kafka.c:185
structure of a KAFKA consumer or producer handle
Definition n_kafka.h:85
structure of a KAFKA message
Definition n_kafka.h:65
size_t written
size of the written data inside the string
Definition n_str.h:66
char * data
the string
Definition n_str.h:62
size_t length
length of string (in case we wanna keep information after the 0 end of string value)
Definition n_str.h:64
void free_nstr_ptr(void *ptr)
Free a N_STR pointer structure.
Definition n_str.c:69
#define free_nstr(__ptr)
free a N_STR structure and set the pointer to NULL
Definition n_str.h:201
int split_count(char **split_result)
Count split elements.
Definition n_str.c:992
int nstr_to_file(N_STR *str, char *filename)
Write a N_STR content into a file.
Definition n_str.c:416
char * join(char **splitresult, const char *delim)
join the array into a string
Definition n_str.c:1029
N_STR * new_nstr(NSTRBYTE size)
create a new N_STR string
Definition n_str.c:206
#define nstrprintf(__nstr_var, __format,...)
Macro to quickly allocate and sprintf to N_STR.
Definition n_str.h:115
char ** split(const char *str, const char *delim, int empty)
split the strings into a an array of char *pointer , ended by a NULL one.
Definition n_str.c:912
N_STR * file_to_nstr(char *filename)
Load a whole file into a N_STR.
Definition n_str.c:286
int free_split_result(char ***tab)
Free a split result allocated array.
Definition n_str.c:1008
A box including a string and his lenght.
Definition n_str.h:60
int start_HiTimer(N_TIME *timer)
Initialize or restart from zero any N_TIME HiTimer.
Definition n_time.c:84
time_t get_usec(N_TIME *timer)
Poll any N_TIME HiTimer, returning usec, and moving currentTime to startTime.
Definition n_time.c:106
Timing Structure.
Definition n_time.h:48
Base64 encoding and decoding functions using N_STR.
Common headers and low-level functions & define.
static void n_kafka_delivery_message_callback(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque)
Message delivery report callback.
Definition n_kafka.c:115
void * n_kafka_polling_thread(void *ptr)
kafka produce or consume polling thread function
Definition n_kafka.c:986
int n_kafka_produce_ex(N_KAFKA *kafka, N_KAFKA_EVENT *event)
produce an event on a N_KAFKA *kafka handle
Definition n_kafka.c:705
Kafka generic produce and consume event header.