40 uint32_t raw_schema_id = 0;
41 memcpy(&raw_schema_id,
string + 1,
sizeof(uint32_t));
43 return (int32_t)ntohl(raw_schema_id);
66 uint32_t schema_id_htonl = htonl((uint32_t)schema_id);
67 memcpy(
string + 1, &schema_id_htonl,
sizeof(uint32_t));
123 if (rkmessage->err) {
124 n_log(
LOG_ERR,
"message delivery failed: %s", rd_kafka_err2str(rkmessage->err));
129 if (event->parent_table)
133 if (event->parent_table) {
134 event->parent_table->nb_waiting--;
135 event->parent_table->nb_error++;
137 if (event->parent_table)
138 unlock(event->parent_table->rwlock);
140 n_log(
LOG_DEBUG,
"message delivered (%ld bytes, partition %d)", rkmessage->len, rkmessage->partition);
143 if (event->parent_table)
146 if (event->event_files_to_delete) {
147 char** files_to_delete =
split(
_nstr(event->event_files_to_delete),
";", 0);
148 if (files_to_delete) {
151 while (files_to_delete[index]) {
152 int ret = unlink(files_to_delete[index]);
155 n_log(
LOG_DEBUG,
"deleted on produce ack: %s", files_to_delete[index]);
157 n_log(
LOG_ERR,
"couldn't delete \"%s\": %s", files_to_delete[index], strerror(error));
170 if (event->parent_table)
171 unlock(event->parent_table->rwlock);
201 if (kafka->
mode == RD_KAFKA_CONSUMER) {
206 rd_kafka_topic_partition_list_destroy(kafka->
subscription);
208 if (kafka->
mode == RD_KAFKA_PRODUCER) {
284 n_log(
LOG_ERR,
"could not init kafka rwlock in kafka structure at address %p", kafka);
310 N_STR* config_string = NULL;
312 if (!config_string) {
318 json = cJSON_Parse(
_nstrp(config_string));
328 for (jsonIndex = 0; jsonIndex < cJSON_GetArraySize(json); jsonIndex++) {
329 cJSON* entry = cJSON_GetArrayItem(json, jsonIndex);
331 if (!entry)
continue;
333 if (!entry->valuestring) {
338 if (entry->string[0] !=
'-') {
340 if (strcmp(
"topic", entry->string) != 0 &&
341 strcmp(
"topics", entry->string) != 0 &&
342 strcmp(
"event_cmd", entry->string) != 0 &&
343 strcmp(
"value.schema.id", entry->string) != 0 &&
344 strcmp(
"value.schema.type", entry->string) != 0 &&
345 strcmp(
"poll.interval", entry->string) != 0 &&
346 strcmp(
"poll.timeout", entry->string) != 0 &&
347 strcmp(
"group.id.autogen", entry->string) != 0 &&
348 strcmp(
"monitored.directory.interval", entry->string) != 0) {
349 if (!strcmp(
"group.id", entry->string)) {
351 if (
mode == RD_KAFKA_PRODUCER)
353 kafka->
groupid = strdup(entry->valuestring);
359 n_log(
LOG_DEBUG,
"kafka config enabled: %s => %s", entry->string, entry->valuestring);
363 n_log(
LOG_DEBUG,
"kafka disabled config: %s => %s", entry->string, entry->valuestring);
370 jstr = cJSON_GetObjectItem(json,
"topic");
371 if (jstr && jstr->valuestring) {
372 kafka->
topic = strdup(jstr->valuestring);
375 if (
mode == RD_KAFKA_PRODUCER) {
383 jstr = cJSON_GetObjectItem(json,
"topics");
384 if (jstr && jstr->valuestring) {
388 if (
mode == RD_KAFKA_CONSUMER) {
397 jstr = cJSON_GetObjectItem(json,
"event_cmd");
398 if (jstr && jstr->valuestring) {
399 kafka->
event_cmd = strdup(jstr->valuestring);
400 n_log(
LOG_DEBUG,
"kafka consumer event_cmd: %s", jstr->valuestring);
404 jstr = cJSON_GetObjectItem(json,
"value.schema.id");
405 if (jstr && jstr->valuestring) {
406 int schem_v = atoi(jstr->valuestring);
407 if (schem_v < -1 || schem_v > 9999) {
418 jstr = cJSON_GetObjectItem(json,
"poll.interval");
419 if (jstr && jstr->valuestring) {
425 jstr = cJSON_GetObjectItem(json,
"poll.timeout");
426 if (jstr && jstr->valuestring) {
432 jstr = cJSON_GetObjectItem(json,
"monitored.directory.interval");
433 if (jstr && jstr->valuestring) {
439 jstr = cJSON_GetObjectItem(json,
"bootstrap.servers");
440 if (jstr && jstr->valuestring) {
445 if (
mode == RD_KAFKA_PRODUCER) {
465 kafka->
mode = RD_KAFKA_PRODUCER;
466 }
else if (
mode == RD_KAFKA_CONSUMER) {
480 char computer_name[1024] =
"";
485 jstr = cJSON_GetObjectItem(json,
"group.id.autogen");
486 if (jstr && jstr->valuestring) {
487 if (strcmp(jstr->valuestring,
"host-topic-group") == 0) {
489 nstrprintf(groupid,
"%s_%s", computer_name, topics);
490 }
else if (strcmp(jstr->valuestring,
"unique-group") == 0) {
492 nstrprintf(groupid,
"%s_%s_%d", computer_name, topics, getpid());
497 nstrprintf(groupid,
"%s_%s_%d", computer_name, topics, getpid());
498 n_log(
LOG_DEBUG,
"group.id is not set and group.id.autogen is not set, generated unique group id: %s",
_nstr(groupid));
502 groupid->
data = NULL;
541 kafka->
subscription = rd_kafka_topic_partition_list_new(topic_cnt);
542 for (
int i = 0; i < topic_cnt; i++)
546 RD_KAFKA_PARTITION_UA);
559 n_log(
LOG_ERR,
"kafka consumer: failed to subscribe to %d topics: %s", kafka->
subscription->cnt, rd_kafka_err2str(err));
565 n_log(
LOG_DEBUG,
"kafka consumer created and subscribed to %d topic(s), waiting for rebalance and messages...", kafka->
subscription->cnt);
567 kafka->
mode = RD_KAFKA_CONSUMER;
589 event->event_string = NULL;
590 event->event_files_to_delete = NULL;
591 event->from_topic = NULL;
592 event->rd_kafka_headers = NULL;
593 event->received_headers = NULL;
594 event->schema_id = schema_id;
596 event->parent_table = NULL;
614 event->rd_kafka_headers = rd_kafka_headers_new(count);
617 n_log(
LOG_ERR,
"event headers already allocated for event %p", event);
638 if (key_length < 1 || key_length > SSIZE_MAX) {
639 n_log(
LOG_ERR,
"Invalid key length (%zu) for header in event %p", key_length, event);
643 if (value_length < 1 || value_length > SSIZE_MAX) {
644 n_log(
LOG_ERR,
"Invalid value length (%zu) for key '%s' in event %p", value_length,
key, event);
648 rd_kafka_resp_err_t err = rd_kafka_header_add(event->
rd_kafka_headers,
key, (ssize_t)key_length, value, (ssize_t)value_length);
651 n_log(
LOG_ERR,
"Failed to add header [%s:%zu=%s:%zu] to event %p: %s",
652 key, key_length, value, value_length, event, rd_kafka_err2str(err));
686 event->parent_table = kafka;
707 size_t event_length = 0;
709 event->parent_table = kafka;
712 event_length =
event->event_string->written;
715 rd_kafka_headers_t* hdrs_copy;
718 rd_kafka_resp_err_t err = rd_kafka_producev(
720 RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA),
721 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
723 RD_KAFKA_V_HEADERS(hdrs_copy),
724 RD_KAFKA_V_OPAQUE((
void*)event),
728 rd_kafka_headers_destroy(hdrs_copy);
730 n_log(
LOG_ERR,
"failed to produce event: %p with headers %p, producer: %p, topic: %s, error: %s", event, event->rd_kafka_headers, kafka->
rd_kafka_handle, kafka->
topic, rd_kafka_err2str(err));
734 if (rd_kafka_produce(kafka->
rd_kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
event_string, event_length, NULL, 0, event) == -1) {
767 __n_assert(event->event_string, free(event);
return NULL);
770 size_t length = written + offset;
771 Malloc(event->event_string->data,
char, length);
772 __n_assert(event->event_string->data, free(event->event_string); free(event);
return NULL);
773 event->event_string->length = length;
776 memcpy(event->event_string->data + offset,
string, written);
777 event->event_string->written = written + offset;
829 if (event->event_string)
832 if (event->event_files_to_delete)
833 free_nstr(&event->event_files_to_delete);
837 if (event->rd_kafka_headers)
838 rd_kafka_headers_destroy(event->rd_kafka_headers);
840 if (event->received_headers)
867 int event_consumption_enabled;
868 int event_production_enabled;
876 if (kafka->
mode == RD_KAFKA_PRODUCER) {
877 if (event_production_enabled) {
913 usleep((
unsigned int)(sleep_us > (uint64_t)UINT_MAX ? (uint64_t)UINT_MAX : sleep_us));
915 }
else if (kafka->
mode == RD_KAFKA_CONSUMER) {
916 if (event_consumption_enabled) {
917 rd_kafka_message_t* rkm = NULL;
926 n_log(
LOG_ERR,
"consumer: %s", rd_kafka_message_errstr(rkm));
927 rd_kafka_message_destroy(rkm);
929 usleep((
unsigned int)(sleep_us > (uint64_t)UINT_MAX ? (uint64_t)UINT_MAX : sleep_us));
933 n_log(
LOG_DEBUG,
"Message on %s [%" PRId32
"] at offset %" PRId64
" (leader epoch %" PRId32
")", rd_kafka_topic_name(rkm->rkt), rkm->partition, rkm->offset, rd_kafka_message_leader_epoch(rkm));
936 if (rkm->key && rkm->key_len > 0)
937 n_log(
LOG_DEBUG,
"Key: %.*s", (
int)rkm->key_len, (
const char*)rkm->key);
941 if (rkm->payload && rkm->len > 0) {
946 event->parent_table = kafka;
948 rd_kafka_headers_t* hdrs = NULL;
949 if (!rd_kafka_message_headers(rkm, &hdrs)) {
951 const char* name = NULL;
952 const void* val = NULL;
955 while (!rd_kafka_header_get_all(hdrs, idx, &name, &val, &size)) {
956 N_STR* header_entry = NULL;
963 event->from_topic = strdup(rd_kafka_topic_name(rkm->rkt));
967 n_log(
LOG_DEBUG,
"Consumer received event of (%d bytes) from topic %s", (
int)rkm->len, event->from_topic);
970 rd_kafka_message_destroy(rkm);
974 usleep((
unsigned int)(sleep_us > (uint64_t)UINT_MAX ? (uint64_t)UINT_MAX : sleep_us));
993 if (kafka->
mode == RD_KAFKA_PRODUCER)
995 if (kafka->
mode == RD_KAFKA_CONSUMER) {
1003 int64_t remaining_time = (int64_t)kafka->
poll_timeout * 1000;
1004 while (status == 1) {
1006 if (kafka->
mode == RD_KAFKA_PRODUCER) {
1008 }
else if (kafka->
topics) {
1022 int64_t elapsed_time =
get_usec(&chrono);
1024 remaining_time -= elapsed_time;
1025 if (remaining_time < 0) {
1026 if (kafka->
mode == RD_KAFKA_PRODUCER) {
1028 }
else if (kafka->
mode == RD_KAFKA_CONSUMER) {
1059 n_log(
LOG_ERR,
"kafka polling thread already started for handle %p", kafka);
1067 n_log(
LOG_ERR,
"unable to create polling_thread for kafka handle %p", kafka);
1086 if (kafka->
mode == RD_KAFKA_CONSUMER) {
1094 if (polling_thread_status == 0) {
1095 n_log(
LOG_DEBUG,
"kafka polling thread already stopped for handle %p", kafka);
1098 if (polling_thread_status == 2) {
1099 n_log(
LOG_DEBUG,
"kafka polling ask for stop thread already done for handle %p", kafka);
1112 clock_gettime(CLOCK_REALTIME, &ts);
1117 n_log(
LOG_ERR,
"polling thread did not stop in 10s, force stop !");
1187 size_t nb_todump = 0;
1192 n_log(
LOG_ERR,
"kafka handle %p thread polling func is still running, aborting dump", kafka);
1196 if (nb_todump == 0) {
1197 n_log(
LOG_DEBUG,
"kafka handle %p: nothing to dump, all events processed correctly", kafka);
1204 N_STR* dumpstr = NULL;
1211 if (event->schema_id != -1)
1214 N_STR* filename = NULL;
1218 dumpstr->
data =
event->event_string->data + offset;
1219 dumpstr->
written =
event->event_string->written - offset;
1220 dumpstr->
length =
event->event_string->length - offset;
1227 dumpstr->
data = NULL;
#define init_lock(__rwlock_mutex)
Macro for initializing a rwlock.
#define FreeNoLog(__ptr)
Free Handler without log.
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
#define __n_assert(__ptr, __ret)
macro to assert things
#define _str(__PTR)
define true
int get_computer_name(char *computer_name, size_t len)
get the computer name
#define rw_lock_destroy(__rwlock_mutex)
Macro to destroy rwlock mutex.
#define unlock(__rwlock_mutex)
Macro for releasing read/write lock a rwlock mutex.
#define _nstrp(__PTR)
N_STR or NULL pointer for testing purposes.
#define write_lock(__rwlock_mutex)
Macro for acquiring a write lock on a rwlock mutex.
#define Free(__ptr)
Free Handler to get errors.
#define read_lock(__rwlock_mutex)
Macro for acquiring a read lock on a rwlock mutex.
#define _nstr(__PTR)
N_STR or "NULL" string for logging purposes.
void * ptr
void pointer to store
LIST_NODE * start
pointer to the start of the list
struct LIST_NODE * next
pointer to the next node
int list_push(LIST *list, void *ptr, void(*destructor)(void *ptr))
Add a pointer to the end of the list.
#define list_foreach(__ITEM_, __LIST_)
ForEach macro helper, safe for node removal during iteration.
#define remove_list_node(__LIST_, __NODE_, __TYPE_)
Remove macro helper for void pointer casting.
int list_destroy(LIST **list)
Empty and Free a list container.
LIST * new_generic_list(size_t max_items)
Initialiaze a generic list container to max_items pointers.
#define MAX_LIST_ITEMS
flag to pass to new_generic_list for the maximum possible number of item in a list
Structure of a generic list node.
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
#define LOG_DEBUG
debug-level messages
#define LOG_ERR
error conditions
#define LOG_INFO
informational
int32_t event_production_enabled
bool flag to suspend or restart event production
int32_t poll_interval
poll interval in msecs
rd_kafka_topic_partition_list_t * subscription
subscribed topics
int polling_thread_status
polling thread status, 0 => off , 1 => on , 2 => wants to stop, will be turned out to 0 by exiting po...
cJSON * configuration
kafka json configuration holder
size_t nb_waiting
number of events waiting for an ack in the waiting list
int mode
kafka handle mode: RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER
char ** topics
list of topics to subscribe to
rd_kafka_topic_t * rd_kafka_topic
kafka topic handle
rd_kafka_headers_t * rd_kafka_headers
kafka produce event headers structure handle
pthread_rwlock_t rwlock
access lock
N_STR * errstr
kafka error string holder
rd_kafka_conf_t * rd_kafka_conf
kafka structure handle
LIST * received_events
list of received N_KAFKA_EVENT
LIST * events_to_send
list of N_KAFKA_EVENT to send
int32_t poll_timeout
poll timeout in msecs
pthread_t polling_thread
polling thread id
rd_kafka_t * rd_kafka_handle
kafka handle (producer or consumer)
char * event_cmd
eventual custom event_cmd
int32_t monitored_directory_interval
monitored directory refresh interval in msecs
char * groupid
consumer group id
int32_t event_consumption_enabled
bool flag to suspend or restart event consumption
int schema_id
kafka schema id in network order
size_t nb_queued
number of waiting events in the producer waiting list
size_t nb_error
number of errored events
char * bootstrap_servers
kafka bootstrap servers string
char * topic
kafka topic string
int n_kafka_add_header_ex(N_KAFKA_EVENT *event, char *key, size_t key_length, char *value, size_t value_length)
add a header entry to an event.
int n_kafka_dump_unprocessed(N_KAFKA *kafka, char *directory)
dump unprocessed/unset events
N_KAFKA_EVENT * n_kafka_get_event(N_KAFKA *kafka)
get a received event from the N_KAFKA kafka handle
#define N_KAFKA_EVENT_OK
state of an OK event
int n_kafka_stop_polling_thread(N_KAFKA *kafka)
stop the polling thread of a kafka handle
#define N_KAFKA_EVENT_ERROR
state of an errored event
int32_t n_kafka_get_schema_from_nstr(const N_STR *string)
get a schema from the first 4 bytes of a N_STR *string, returning ntohl of the value
int n_kafka_new_headers(N_KAFKA_EVENT *event, size_t count)
allocate a headers array for the event
int n_kafka_put_schema_in_nstr(N_STR *string, int schema_id)
put a htonl schema id into the first 4 bytes of a N_STR *string
int n_kafka_enable_event_production(N_KAFKA *kafka)
enable event production
int n_kafka_disable_event_consumption(N_KAFKA *kafka)
disable event consumption
N_KAFKA * n_kafka_load_config(char *config_file, int mode)
load a kafka configuration from a file
int n_kafka_disable_event_production(N_KAFKA *kafka)
disable event production
int n_kafka_get_status(N_KAFKA *kafka, size_t *nb_queued, size_t *nb_waiting, size_t *nb_error)
return the queues status
#define N_KAFKA_EVENT_WAITING_ACK
state of a sent event waiting for acknowledgement
int n_kafka_put_schema_in_char(char *string, int schema_id)
put a htonl schema id into the first 4 bytes of a char *string
int n_kafka_poll(N_KAFKA *kafka)
Poll kafka handle in producer or consumer mode.
int n_kafka_start_polling_thread(N_KAFKA *kafka)
start the polling thread of a kafka handle
N_KAFKA_EVENT * n_kafka_new_event_from_char(const char *string, size_t written, int schema_id)
make a new event from a char *string
N_KAFKA_EVENT * n_kafka_new_event_from_string(const N_STR *string, int schema_id)
make a new event from a N_STR *string
int n_kafka_event_destroy(N_KAFKA_EVENT **event)
destroy a kafka event and set it's pointer to NULL
int n_kafka_enable_event_consumption(N_KAFKA *kafka)
enable event consumption
void n_kafka_event_destroy_ptr(void *event_ptr)
festroy a kafka event
int32_t n_kafka_get_schema_from_char(const char *string)
get a schema from the first 4 bytes of a char *string, returning ntohl of the value
int n_kafka_add_header(N_KAFKA_EVENT *event, N_STR *key, N_STR *value)
add a header entry to an event.
int n_kafka_load_unprocessed(N_KAFKA *kafka, const char *directory)
load unprocessed/unset events
int n_kafka_produce(N_KAFKA *kafka, N_KAFKA_EVENT *event)
put an event in the events_to_send list
N_KAFKA_EVENT * n_kafka_new_event(int schema_id)
create a new empty event
#define N_KAFKA_EVENT_CREATED
state of a freshly created event
N_KAFKA_EVENT * n_kafka_new_event_from_file(char *filename, int schema_id)
make a new event from a N_STR *string
N_KAFKA * n_kafka_new(int32_t poll_timeout, int32_t poll_interval, size_t errstr_len)
allocate a new kafka handle
#define N_KAFKA_EVENT_QUEUED
state of a queued event
void n_kafka_delete(N_KAFKA *kafka)
delete a N_KAFKA handle
structure of a KAFKA consumer or producer handle
structure of a KAFKA message
size_t written
size of the written data inside the string
size_t length
length of string (in case we wanna keep information after the 0 end of string value)
void free_nstr_ptr(void *ptr)
Free a N_STR pointer structure.
#define free_nstr(__ptr)
free a N_STR structure and set the pointer to NULL
int split_count(char **split_result)
Count split elements.
int nstr_to_file(N_STR *str, char *filename)
Write a N_STR content into a file.
char * join(char **splitresult, const char *delim)
join the array into a string
N_STR * new_nstr(NSTRBYTE size)
create a new N_STR string
#define nstrprintf(__nstr_var, __format,...)
Macro to quickly allocate and sprintf to N_STR.
char ** split(const char *str, const char *delim, int empty)
split the strings into a an array of char *pointer , ended by a NULL one.
N_STR * file_to_nstr(char *filename)
Load a whole file into a N_STR.
int free_split_result(char ***tab)
Free a split result allocated array.
A box including a string and his lenght.
int start_HiTimer(N_TIME *timer)
Initialize or restart from zero any N_TIME HiTimer.
time_t get_usec(N_TIME *timer)
Poll any N_TIME HiTimer, returning usec, and moving currentTime to startTime.
Base64 encoding and decoding functions using N_STR.
Common headers and low-level functions & define.
static void n_kafka_delivery_message_callback(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque)
Message delivery report callback.
void * n_kafka_polling_thread(void *ptr)
kafka produce or consume polling thread function
int n_kafka_produce_ex(N_KAFKA *kafka, N_KAFKA_EVENT *event)
produce an event on a N_KAFKA *kafka handle
Kafka generic produce and consume event header.