Nilorea Library
C utilities for networking, threading, graphics
Loading...
Searching...
No Matches
KAFKA: generic event producer and consumer functions

Data Structures

struct  N_KAFKA
 structure of a KAFKA consumer or producer handle More...
 
struct  N_KAFKA_EVENT
 structure of a KAFKA message More...
 

Macros

#define N_KAFKA_EVENT_CREATED   5
 state of a freshly created event
 
#define N_KAFKA_EVENT_ERROR   2
 state of an errored event
 
#define N_KAFKA_EVENT_OK   4
 state of an OK event
 
#define N_KAFKA_EVENT_QUEUED   0
 state of a queued event
 
#define N_KAFKA_EVENT_WAITING_ACK   1
 state of a sent event waiting for acknowledgement
 

Functions

int n_kafka_add_header (N_KAFKA_EVENT *event, N_STR *key, N_STR *value)
 add a header to an event
 
int n_kafka_add_header_ex (N_KAFKA_EVENT *event, char *key, size_t key_length, char *value, size_t value_length)
 add a header to an event (extended version)
 
void n_kafka_delete (N_KAFKA *kafka)
 delete a kafka handle
 
int n_kafka_disable_event_consumption (N_KAFKA *kafka)
 disable event consumption
 
int n_kafka_disable_event_production (N_KAFKA *kafka)
 disable event production
 
int n_kafka_dump_unprocessed (N_KAFKA *kafka, char *directory)
 dump unprocessed events to a directory
 
int n_kafka_enable_event_consumption (N_KAFKA *kafka)
 enable event consumption
 
int n_kafka_enable_event_production (N_KAFKA *kafka)
 enable event production
 
int n_kafka_event_destroy (N_KAFKA_EVENT **event)
 destroy a kafka event
 
void n_kafka_event_destroy_ptr (void *event)
 destroy a kafka event (void pointer version for list destructor)
 
N_KAFKA_EVENTn_kafka_get_event (N_KAFKA *kafka)
 get the next received event
 
int32_t n_kafka_get_schema_from_char (const char *string)
 get schema id from a char string
 
int32_t n_kafka_get_schema_from_nstr (const N_STR *string)
 get schema id from an N_STR string
 
int n_kafka_get_status (N_KAFKA *kafka, size_t *nb_queued, size_t *nb_waiting, size_t *nb_error)
 get kafka queue status counters
 
N_KAFKAn_kafka_load_config (char *config_file, int mode)
 load kafka configuration from file
 
int n_kafka_load_unprocessed (N_KAFKA *kafka, const char *directory)
 load unprocessed events from a directory
 
N_KAFKAn_kafka_new (int32_t poll_timeout, int32_t poll_interval, size_t errstr_len)
 create a new kafka handle
 
N_KAFKA_EVENTn_kafka_new_event (int schema_id)
 create a new empty kafka event
 
N_KAFKA_EVENTn_kafka_new_event_from_char (const char *string, size_t written, int schema_id)
 create a new kafka event from a char string
 
N_KAFKA_EVENTn_kafka_new_event_from_file (char *filename, int schema_id)
 create a new kafka event from a file
 
N_KAFKA_EVENTn_kafka_new_event_from_string (const N_STR *string, int schema_id)
 create a new kafka event from an N_STR string
 
int n_kafka_new_headers (N_KAFKA_EVENT *event, size_t count)
 create new headers for an event
 
int n_kafka_poll (N_KAFKA *kafka)
 poll kafka for events
 
int n_kafka_produce (N_KAFKA *kafka, N_KAFKA_EVENT *event)
 produce a kafka event
 
int n_kafka_put_schema_in_char (char *string, int schema_id)
 put schema id in a char string
 
int n_kafka_put_schema_in_nstr (N_STR *string, int schema_id)
 put schema id in an N_STR string
 
int n_kafka_start_polling_thread (N_KAFKA *kafka)
 start the kafka polling thread
 
int n_kafka_stop_polling_thread (N_KAFKA *kafka)
 stop the kafka polling thread
 

Detailed Description


Data Structure Documentation

◆ N_KAFKA

struct N_KAFKA

structure of a KAFKA consumer or producer handle

Examples
ex_kafka.c.

Definition at line 85 of file n_kafka.h.

+ Collaboration diagram for N_KAFKA:
Data Fields
char * bootstrap_servers kafka bootstrap servers string
cJSON * configuration kafka json configuration holder
N_STR * errstr kafka error string holder
char * event_cmd eventual custom event_cmd
int32_t event_consumption_enabled bool flag to suspend or restart event consumption
int32_t event_production_enabled bool flag to suspend or restart event production
LIST * events_to_send list of N_KAFKA_EVENT to send
char * groupid consumer group id
int mode kafka handle mode: RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER
int32_t monitored_directory_interval monitored directory refresh interval in msecs
size_t nb_error number of errored events
size_t nb_queued number of waiting events in the producer waiting list
size_t nb_waiting number of events waiting for an ack in the waiting list
int32_t poll_interval poll interval in msecs
int32_t poll_timeout poll timeout in msecs
pthread_t polling_thread polling thread id
int polling_thread_status polling thread status, 0 => off , 1 => on , 2 => wants to stop, will be turned out to 0 by exiting polling thread
rd_kafka_conf_t * rd_kafka_conf kafka structure handle
rd_kafka_t * rd_kafka_handle kafka handle (producer or consumer)
rd_kafka_topic_t * rd_kafka_topic kafka topic handle
LIST * received_events list of received N_KAFKA_EVENT
pthread_rwlock_t rwlock access lock
int schema_id kafka schema id in network order
rd_kafka_topic_partition_list_t * subscription subscribed topics
char * topic kafka topic string
char ** topics list of topics to subscribe to

◆ N_KAFKA_EVENT

struct N_KAFKA_EVENT

structure of a KAFKA message

Examples
ex_kafka.c.

Definition at line 65 of file n_kafka.h.

+ Collaboration diagram for N_KAFKA_EVENT:
Data Fields
N_STR * event_files_to_delete string containing the original event source file name if it is to be deleted when event is produced.

List separated with ',' else it's NULL

N_STR * event_string string containing the topic id + payload
char * from_topic in case of received event, else NULL
struct N_KAFKA * parent_table access lock
rd_kafka_headers_t * rd_kafka_headers kafka produce event headers structure handle
LIST * received_headers kafka consume event headers structure handle
int schema_id kafka schema_id
unsigned int status state of the event: N_KAFKA_EVENT_CREATED ,N_KAFKA_EVENT_QUEUED , N_KAFKA_EVENT_WAITING_ACK , N_KAFKA_EVENT_ERROR , N_KAFKA_EVENT_OK

Macro Definition Documentation

◆ N_KAFKA_EVENT_CREATED

#define N_KAFKA_EVENT_CREATED   5

state of a freshly created event

Definition at line 62 of file n_kafka.h.

◆ N_KAFKA_EVENT_ERROR

#define N_KAFKA_EVENT_ERROR   2

state of an errored event

Definition at line 58 of file n_kafka.h.

◆ N_KAFKA_EVENT_OK

#define N_KAFKA_EVENT_OK   4

state of an OK event

Definition at line 60 of file n_kafka.h.

◆ N_KAFKA_EVENT_QUEUED

#define N_KAFKA_EVENT_QUEUED   0

state of a queued event

Definition at line 54 of file n_kafka.h.

◆ N_KAFKA_EVENT_WAITING_ACK

#define N_KAFKA_EVENT_WAITING_ACK   1

state of a sent event waiting for acknowledgement

Definition at line 56 of file n_kafka.h.

Function Documentation

◆ n_kafka_add_header()

int n_kafka_add_header ( N_KAFKA_EVENT event,
N_STR key,
N_STR value 
)

add a header to an event

add a header to an event

headers array must have been allocated before

Parameters
eventtarget event
keyN_STR *string of the key
valueN_STR *string of the value
Returns
TRUE or FALSE

Definition at line 666 of file n_kafka.c.

References __n_assert, N_STR::data, key, n_kafka_add_header_ex(), and N_STR::written.

+ Here is the call graph for this function:

◆ n_kafka_add_header_ex()

int n_kafka_add_header_ex ( N_KAFKA_EVENT event,
char *  key,
size_t  key_length,
char *  value,
size_t  value_length 
)

add a header to an event (extended version)

add a header to an event (extended version)

headers array must have been allocated before

Parameters
eventtarget event
keystring of the key
key_lengthsize of the key string
valuestring of the value
value_lengthsize of the value string
Returns
TRUE or FALSE

Definition at line 632 of file n_kafka.c.

References __n_assert, key, LOG_ERR, n_log, and N_KAFKA_EVENT::rd_kafka_headers.

Referenced by n_kafka_add_header().

+ Here is the caller graph for this function:

◆ n_kafka_delete()

void n_kafka_delete ( N_KAFKA kafka)

delete a kafka handle

delete a kafka handle

Parameters
kafkaN_KAFKA handle to delete

Definition at line 185 of file n_kafka.c.

References __n_assert, bootstrap_servers, configuration, errstr, event_cmd, events_to_send, Free, free_nstr, free_split_result(), FreeNoLog, groupid, list_destroy(), LOG_DEBUG, mode, n_kafka_stop_polling_thread(), n_log, poll_timeout, rd_kafka_conf, rd_kafka_handle, rd_kafka_topic, received_events, rw_lock_destroy, rwlock, subscription, topic, and topics.

Referenced by main(), n_kafka_load_config(), and n_kafka_new().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_disable_event_consumption()

int n_kafka_disable_event_consumption ( N_KAFKA kafka)

disable event consumption

Parameters
kafkathe N_KAFKA instance
Returns
TRUE or FALSE

Definition at line 1142 of file n_kafka.c.

References __n_assert, event_consumption_enabled, rwlock, unlock, and write_lock.

Referenced by n_kafka_stop_polling_thread().

+ Here is the caller graph for this function:

◆ n_kafka_disable_event_production()

int n_kafka_disable_event_production ( N_KAFKA kafka)

disable event production

Parameters
kafkathe N_KAFKA instance
Returns
TRUE or FALSE

Definition at line 1168 of file n_kafka.c.

References __n_assert, event_production_enabled, rwlock, unlock, and write_lock.

◆ n_kafka_dump_unprocessed()

int n_kafka_dump_unprocessed ( N_KAFKA kafka,
char *  directory 
)

dump unprocessed events to a directory

dump unprocessed events to a directory

Parameters
kafkakafka handle to use
directorythe directory in which to dump the events
Returns
TRUE or FALSE

Definition at line 1182 of file n_kafka.c.

References __n_assert, _nstr, N_STR::data, events_to_send, free_nstr, N_STR::length, list_foreach, LOG_DEBUG, LOG_ERR, Malloc, N_KAFKA_EVENT_OK, n_log, nb_error, nb_queued, nb_waiting, nstr_to_file(), nstrprintf, polling_thread_status, read_lock, rwlock, topic, unlock, and N_STR::written.

Referenced by main().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_enable_event_consumption()

int n_kafka_enable_event_consumption ( N_KAFKA kafka)

enable event consumption

Parameters
kafkathe N_KAFKA instance
Returns
TRUE or FALSE

Definition at line 1129 of file n_kafka.c.

References __n_assert, event_consumption_enabled, rwlock, unlock, and write_lock.

◆ n_kafka_enable_event_production()

int n_kafka_enable_event_production ( N_KAFKA kafka)

enable event production

Parameters
kafkathe N_KAFKA instance
Returns
TRUE or FALSE

Definition at line 1155 of file n_kafka.c.

References __n_assert, event_production_enabled, rwlock, unlock, and write_lock.

◆ n_kafka_event_destroy()

int n_kafka_event_destroy ( N_KAFKA_EVENT **  event)

destroy a kafka event

destroy a kafka event

Parameters
eventevent to delete
Returns
TRUE or FALSE

Definition at line 852 of file n_kafka.c.

References __n_assert, and n_kafka_event_destroy_ptr().

Referenced by main(), and n_kafka_poll().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_event_destroy_ptr()

void n_kafka_event_destroy_ptr ( void *  event_ptr)

destroy a kafka event (void pointer version for list destructor)

destroy a kafka event (void pointer version for list destructor)

Parameters
event_ptrvoid pointer to target

Definition at line 824 of file n_kafka.c.

References __n_assert, free_nstr, FreeNoLog, and list_destroy().

Referenced by n_kafka_event_destroy(), n_kafka_poll(), and n_kafka_produce().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_get_event()

N_KAFKA_EVENT * n_kafka_get_event ( N_KAFKA kafka)

get the next received event

get the next received event

Parameters
kafkakafka handle to use
Returns
a received event ror NULL

Definition at line 1256 of file n_kafka.c.

References __n_assert, received_events, remove_list_node, rwlock, LIST::start, unlock, and write_lock.

Referenced by main().

+ Here is the caller graph for this function:

◆ n_kafka_get_schema_from_char()

int32_t n_kafka_get_schema_from_char ( const char *  string)

get schema id from a char string

get schema id from a char string

Parameters
stringchar *source from where to read schema id
Returns
zero or positive number on success, -1 on error

Definition at line 37 of file n_kafka.c.

References __n_assert.

Referenced by n_kafka_get_schema_from_nstr().

+ Here is the caller graph for this function:

◆ n_kafka_get_schema_from_nstr()

int32_t n_kafka_get_schema_from_nstr ( const N_STR string)

get schema id from an N_STR string

get schema id from an N_STR string

Parameters
stringN_STR *source from where to read schema id
Returns
zero or positive number on success, -1 on error

Definition at line 51 of file n_kafka.c.

References __n_assert, N_STR::data, n_kafka_get_schema_from_char(), and N_STR::written.

Referenced by n_kafka_poll().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_get_status()

int n_kafka_get_status ( N_KAFKA kafka,
size_t *  nb_queued,
size_t *  nb_waiting,
size_t *  nb_error 
)

get kafka queue status counters

get kafka queue status counters

Parameters
kafkahandler to use
nb_queuedpointer to queue number holder
nb_waitingpointer to waiting number holder
nb_errorpointer to error number holder
Returns
TRUE or FALSE

Definition at line 92 of file n_kafka.c.

References __n_assert, nb_error, nb_queued, nb_waiting, polling_thread_status, read_lock, rwlock, and unlock.

Referenced by main().

+ Here is the caller graph for this function:

◆ n_kafka_load_config()

N_KAFKA * n_kafka_load_config ( char *  config_file,
int  mode 
)

load kafka configuration from file

load kafka configuration from file

Parameters
config_filepath and filename of the config file
modeRD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER
Returns
an allocated and configured N_KAFKA *kafka handle or NULL

Definition at line 298 of file n_kafka.c.

References __n_assert, _nstr, _nstrp, _str, bootstrap_servers, config_file, N_STR::data, errstr, event_cmd, file_to_nstr(), free_nstr, get_computer_name(), groupid, join(), N_STR::length, LOG_DEBUG, LOG_ERR, mode, mode, monitored_directory_interval, n_kafka_delete(), n_kafka_delivery_message_callback(), n_kafka_new(), n_log, new_nstr(), nstrprintf, poll_interval, poll_timeout, rd_kafka_conf, rd_kafka_handle, rd_kafka_topic, schema_id, split(), split_count(), subscription, topic, and topics.

Referenced by main().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_load_unprocessed()

int n_kafka_load_unprocessed ( N_KAFKA kafka,
const char *  directory 
)

load unprocessed events from a directory

load unprocessed events from a directory

Parameters
kafkakafka handle to use
directorythe directory from which to load the events
Returns
TRUE or FALSE

Definition at line 1238 of file n_kafka.c.

References __n_assert, rwlock, unlock, and write_lock.

◆ n_kafka_new()

N_KAFKA * n_kafka_new ( int32_t  poll_timeout,
int32_t  poll_interval,
size_t  errstr_len 
)

create a new kafka handle

create a new kafka handle

Parameters
poll_intervalset polling interval
poll_timeoutset polling
errstr_lenset the size of the error string buffer
Returns
a new empty N_KAFKA *kafka handle or NULL

Definition at line 246 of file n_kafka.c.

References __n_assert, bootstrap_servers, configuration, errstr, event_cmd, event_consumption_enabled, event_production_enabled, events_to_send, Free, groupid, init_lock, LOG_ERR, Malloc, MAX_LIST_ITEMS, mode, monitored_directory_interval, n_kafka_delete(), n_log, nb_error, nb_queued, nb_waiting, new_generic_list(), new_nstr(), poll_interval, poll_timeout, polling_thread_status, rd_kafka_conf, rd_kafka_handle, received_events, rwlock, schema_id, subscription, topic, and topics.

Referenced by n_kafka_load_config().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_new_event()

N_KAFKA_EVENT * n_kafka_new_event ( int  schema_id)

create a new empty kafka event

create a new empty kafka event

Parameters
schema_idschema id that the event is using, -1 if not using avro schema_id formatted events
Returns
a new empty N_KAFKA_EVENT *event

Definition at line 584 of file n_kafka.c.

References __n_assert, Malloc, and N_KAFKA_EVENT_CREATED.

Referenced by n_kafka_new_event_from_char().

+ Here is the caller graph for this function:

◆ n_kafka_new_event_from_char()

N_KAFKA_EVENT * n_kafka_new_event_from_char ( const char *  string,
size_t  written,
int  schema_id 
)

create a new kafka event from a char string

create a new kafka event from a char string

Parameters
stringsource string
writtenlenght of the string
schema_idthe schema id to use. Pass -1 for non avro formatted events
Returns
a new N_KAFKA *event or NULL

Definition at line 756 of file n_kafka.c.

References __n_assert, Malloc, N_KAFKA_EVENT_QUEUED, n_kafka_new_event(), and n_kafka_put_schema_in_nstr().

Referenced by main(), n_kafka_new_event_from_string(), and n_kafka_poll().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_new_event_from_file()

N_KAFKA_EVENT * n_kafka_new_event_from_file ( char *  filename,
int  schema_id 
)

create a new kafka event from a file

create a new kafka event from a file

Parameters
filenamesource file path and filename
schema_idthe schema id to use. Pass -1 for non avro formatted events
Returns
a new N_KAFKA *event or NULL

Definition at line 809 of file n_kafka.c.

References __n_assert, file_to_nstr(), free_nstr, and n_kafka_new_event_from_string().

Referenced by main().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_new_event_from_string()

N_KAFKA_EVENT * n_kafka_new_event_from_string ( const N_STR string,
int  schema_id 
)

create a new kafka event from an N_STR string

create a new kafka event from an N_STR string

Parameters
stringsource string
schema_idthe schema id to use. Pass -1 for non avro formatted events
Returns
a new N_KAFKA *event or NULL

Definition at line 794 of file n_kafka.c.

References __n_assert, N_STR::data, n_kafka_new_event_from_char(), and N_STR::written.

Referenced by n_kafka_new_event_from_file().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_new_headers()

int n_kafka_new_headers ( N_KAFKA_EVENT event,
size_t  count 
)

create new headers for an event

create new headers for an event

Parameters
eventtarget event
countsize in elements of the created headers array
Returns
TRUE or FALSE

Definition at line 607 of file n_kafka.c.

References __n_assert, LOG_ERR, n_log, and N_KAFKA_EVENT::rd_kafka_headers.

◆ n_kafka_poll()

◆ n_kafka_produce()

int n_kafka_produce ( N_KAFKA kafka,
N_KAFKA_EVENT event 
)

produce a kafka event

produce a kafka event

Parameters
kafkathe producer N_KAFKA handle to use
eventevent to send. Will be owned by N_KAFKA *kafka handle once done
Returns
TRUE or FALSE

Definition at line 682 of file n_kafka.c.

References __n_assert, events_to_send, list_push(), LOG_DEBUG, n_kafka_event_destroy_ptr(), n_log, nb_queued, rd_kafka_handle, rwlock, topic, unlock, and write_lock.

Referenced by main().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_put_schema_in_char()

int n_kafka_put_schema_in_char ( char *  string,
int  schema_id 
)

put schema id in a char string

put schema id in a char string

Parameters
stringchar *destination where to write schema id
schema_idthe schema_id to write
Returns
zero or positive number on success, -1 on error

Definition at line 64 of file n_kafka.c.

References __n_assert.

Referenced by n_kafka_put_schema_in_nstr().

+ Here is the caller graph for this function:

◆ n_kafka_put_schema_in_nstr()

int n_kafka_put_schema_in_nstr ( N_STR string,
int  schema_id 
)

put schema id in an N_STR string

put schema id in an N_STR string

Parameters
stringN_STR *destination where to write schema id
schema_idthe schema_id to write
Returns
zero or positive number on success, -1 on error

Definition at line 77 of file n_kafka.c.

References __n_assert, N_STR::data, n_kafka_put_schema_in_char(), and N_STR::written.

Referenced by n_kafka_new_event_from_char().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_start_polling_thread()

int n_kafka_start_polling_thread ( N_KAFKA kafka)

start the kafka polling thread

start the kafka polling thread

Parameters
kafkakafka handle to use
Returns
TRUE or FALSE

Definition at line 1051 of file n_kafka.c.

References __n_assert, LOG_DEBUG, LOG_ERR, n_kafka_polling_thread(), n_log, polling_thread, polling_thread_status, rd_kafka_handle, read_lock, rwlock, unlock, and write_lock.

Referenced by main().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_stop_polling_thread()

int n_kafka_stop_polling_thread ( N_KAFKA kafka)

stop the kafka polling thread

stop the kafka polling thread

Parameters
kafkatarget kafka handle
Returns
TRUE or FALSE

Definition at line 1083 of file n_kafka.c.

References __n_assert, LOG_DEBUG, LOG_ERR, mode, n_kafka_disable_event_consumption(), n_log, polling_thread, polling_thread_status, rd_kafka_handle, read_lock, rwlock, unlock, and write_lock.

Referenced by n_kafka_delete().

+ Here is the call graph for this function:
+ Here is the caller graph for this function: