Nilorea Library
C utilities for networking, threading, graphics
Loading...
Searching...
No Matches
n_kafka.h
Go to the documentation of this file.
1/*
2 * Nilorea Library
3 * Copyright (C) 2005-2026 Castagnier Mickael
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 */
18
27#ifndef __N_KAFKA
28#define __N_KAFKA
29
30#ifdef __cplusplus
31extern "C" {
32#endif
33
39#include "nilorea/n_log.h"
40#include "nilorea/n_network.h"
41#include "cJSON.h"
42
43#include <stdio.h>
44#include <stdlib.h>
45#include <string.h>
46#include <locale.h>
47#include <libgen.h>
48#include <errno.h>
49#include <unistd.h>
50
51#include "rdkafka.h"
52
54#define N_KAFKA_EVENT_QUEUED 0
56#define N_KAFKA_EVENT_WAITING_ACK 1
58#define N_KAFKA_EVENT_ERROR 2
60#define N_KAFKA_EVENT_OK 4
62#define N_KAFKA_EVENT_CREATED 5
63
83
139
141int32_t n_kafka_get_schema_from_char(const char* string);
143int32_t n_kafka_get_schema_from_nstr(const N_STR* string);
145int n_kafka_put_schema_in_char(char* string, int schema_id);
147int n_kafka_put_schema_in_nstr(N_STR* string, int schema_id);
148
150int n_kafka_get_status(N_KAFKA* kafka, size_t* nb_queued, size_t* nb_waiting, size_t* nb_error);
152void n_kafka_delete(N_KAFKA* kafka);
154N_KAFKA* n_kafka_new(int32_t poll_timeout, int32_t poll_interval, size_t errstr_len);
157
159int n_kafka_new_headers(N_KAFKA_EVENT* event, size_t count);
161int n_kafka_add_header_ex(N_KAFKA_EVENT* event, char* key, size_t key_length, char* value, size_t value_length);
163int n_kafka_add_header(N_KAFKA_EVENT* event, N_STR* key, N_STR* value);
164
166int n_kafka_produce(N_KAFKA* kafka, N_KAFKA_EVENT* event);
167
169N_KAFKA_EVENT* n_kafka_new_event(int schema_id);
171N_KAFKA_EVENT* n_kafka_new_event_from_char(const char* string, size_t written, int schema_id);
173N_KAFKA_EVENT* n_kafka_new_event_from_string(const N_STR* string, int schema_id);
175N_KAFKA_EVENT* n_kafka_new_event_from_file(char* filename, int schema_id);
177void n_kafka_event_destroy_ptr(void* event);
180
182int n_kafka_poll(N_KAFKA* kafka);
196int n_kafka_dump_unprocessed(N_KAFKA* kafka, char* directory);
198int n_kafka_load_unprocessed(N_KAFKA* kafka, const char* directory);
199
202
207#ifdef __cplusplus
208}
209#endif
210
211#endif // header guard
static int mode
char * config_file
Definition ex_kafka.c:45
char * key
Structure of a generic LIST container.
Definition n_list.h:58
int32_t event_production_enabled
bool flag to suspend or restart event production
Definition n_kafka.h:137
int32_t poll_interval
poll interval in msecs
Definition n_kafka.h:121
rd_kafka_topic_partition_list_t * subscription
subscribed topics
Definition n_kafka.h:97
int polling_thread_status
polling thread status, 0 => off , 1 => on , 2 => wants to stop, will be turned out to 0 by exiting po...
Definition n_kafka.h:125
cJSON * configuration
kafka json configuration holder
Definition n_kafka.h:117
size_t nb_waiting
number of events waiting for an ack in the waiting list
Definition n_kafka.h:131
int mode
kafka handle mode: RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER
Definition n_kafka.h:115
char ** topics
list of topics to subscribe to
Definition n_kafka.h:93
int schema_id
kafka schema_id
Definition n_kafka.h:79
rd_kafka_topic_t * rd_kafka_topic
kafka topic handle
Definition n_kafka.h:107
rd_kafka_headers_t * rd_kafka_headers
kafka produce event headers structure handle
Definition n_kafka.h:75
pthread_rwlock_t rwlock
access lock
Definition n_kafka.h:113
N_STR * errstr
kafka error string holder
Definition n_kafka.h:109
rd_kafka_conf_t * rd_kafka_conf
kafka structure handle
Definition n_kafka.h:99
LIST * received_events
list of received N_KAFKA_EVENT
Definition n_kafka.h:89
LIST * events_to_send
list of N_KAFKA_EVENT to send
Definition n_kafka.h:87
int32_t poll_timeout
poll timeout in msecs
Definition n_kafka.h:119
pthread_t polling_thread
polling thread id
Definition n_kafka.h:127
rd_kafka_t * rd_kafka_handle
kafka handle (producer or consumer)
Definition n_kafka.h:101
LIST * received_headers
kafka consume event headers structure handle
Definition n_kafka.h:77
N_STR * event_string
string containing the topic id + payload
Definition n_kafka.h:67
char * event_cmd
eventual custom event_cmd
Definition n_kafka.h:95
int32_t monitored_directory_interval
monitored directory refresh interval in msecs
Definition n_kafka.h:123
char * groupid
consumer group id
Definition n_kafka.h:91
N_STR * event_files_to_delete
string containing the original event source file name if it is to be deleted when event is produced.
Definition n_kafka.h:69
int32_t event_consumption_enabled
bool flag to suspend or restart event consumption
Definition n_kafka.h:135
int schema_id
kafka schema id in network order
Definition n_kafka.h:111
struct N_KAFKA * parent_table
access lock
Definition n_kafka.h:81
size_t nb_queued
number of waiting events in the producer waiting list
Definition n_kafka.h:129
size_t nb_error
number of errored events
Definition n_kafka.h:133
char * bootstrap_servers
kafka bootstrap servers string
Definition n_kafka.h:105
char * from_topic
in case of received event, else NULL
Definition n_kafka.h:71
unsigned int status
state of the event: N_KAFKA_EVENT_CREATED ,N_KAFKA_EVENT_QUEUED , N_KAFKA_EVENT_WAITING_ACK ,...
Definition n_kafka.h:73
char * topic
kafka topic string
Definition n_kafka.h:103
int n_kafka_add_header_ex(N_KAFKA_EVENT *event, char *key, size_t key_length, char *value, size_t value_length)
add a header to an event (extended version)
Definition n_kafka.c:632
int n_kafka_dump_unprocessed(N_KAFKA *kafka, char *directory)
dump unprocessed events to a directory
Definition n_kafka.c:1182
N_KAFKA_EVENT * n_kafka_get_event(N_KAFKA *kafka)
get the next received event
Definition n_kafka.c:1256
int n_kafka_stop_polling_thread(N_KAFKA *kafka)
stop the kafka polling thread
Definition n_kafka.c:1083
int32_t n_kafka_get_schema_from_nstr(const N_STR *string)
get schema id from an N_STR string
Definition n_kafka.c:51
int n_kafka_new_headers(N_KAFKA_EVENT *event, size_t count)
create new headers for an event
Definition n_kafka.c:607
int n_kafka_put_schema_in_nstr(N_STR *string, int schema_id)
put schema id in an N_STR string
Definition n_kafka.c:77
int n_kafka_enable_event_production(N_KAFKA *kafka)
enable event production
Definition n_kafka.c:1155
int n_kafka_disable_event_consumption(N_KAFKA *kafka)
disable event consumption
Definition n_kafka.c:1142
N_KAFKA * n_kafka_load_config(char *config_file, int mode)
load kafka configuration from file
Definition n_kafka.c:298
int n_kafka_disable_event_production(N_KAFKA *kafka)
disable event production
Definition n_kafka.c:1168
int n_kafka_get_status(N_KAFKA *kafka, size_t *nb_queued, size_t *nb_waiting, size_t *nb_error)
get kafka queue status counters
Definition n_kafka.c:92
int n_kafka_put_schema_in_char(char *string, int schema_id)
put schema id in a char string
Definition n_kafka.c:64
int n_kafka_poll(N_KAFKA *kafka)
poll kafka for events
Definition n_kafka.c:864
int n_kafka_start_polling_thread(N_KAFKA *kafka)
start the kafka polling thread
Definition n_kafka.c:1051
N_KAFKA_EVENT * n_kafka_new_event_from_char(const char *string, size_t written, int schema_id)
create a new kafka event from a char string
Definition n_kafka.c:756
N_KAFKA_EVENT * n_kafka_new_event_from_string(const N_STR *string, int schema_id)
create a new kafka event from an N_STR string
Definition n_kafka.c:794
int n_kafka_event_destroy(N_KAFKA_EVENT **event)
destroy a kafka event
Definition n_kafka.c:852
int n_kafka_enable_event_consumption(N_KAFKA *kafka)
enable event consumption
Definition n_kafka.c:1129
void n_kafka_event_destroy_ptr(void *event)
destroy a kafka event (void pointer version for list destructor)
Definition n_kafka.c:824
int32_t n_kafka_get_schema_from_char(const char *string)
get schema id from a char string
Definition n_kafka.c:37
int n_kafka_add_header(N_KAFKA_EVENT *event, N_STR *key, N_STR *value)
add a header to an event
Definition n_kafka.c:666
int n_kafka_load_unprocessed(N_KAFKA *kafka, const char *directory)
load unprocessed events from a directory
Definition n_kafka.c:1238
int n_kafka_produce(N_KAFKA *kafka, N_KAFKA_EVENT *event)
produce a kafka event
Definition n_kafka.c:682
N_KAFKA_EVENT * n_kafka_new_event(int schema_id)
create a new empty kafka event
Definition n_kafka.c:584
N_KAFKA_EVENT * n_kafka_new_event_from_file(char *filename, int schema_id)
create a new kafka event from a file
Definition n_kafka.c:809
N_KAFKA * n_kafka_new(int32_t poll_timeout, int32_t poll_interval, size_t errstr_len)
create a new kafka handle
Definition n_kafka.c:246
void n_kafka_delete(N_KAFKA *kafka)
delete a kafka handle
Definition n_kafka.c:185
structure of a KAFKA consumer or producer handle
Definition n_kafka.h:85
structure of a KAFKA message
Definition n_kafka.h:65
A box including a string and his lenght.
Definition n_str.h:60
Generic log system.
Network Engine.