38 if (strcmp(type_str,
"null") == 0)
return AVRO_NULL;
39 if (strcmp(type_str,
"boolean") == 0)
return AVRO_BOOLEAN;
40 if (strcmp(type_str,
"int") == 0)
return AVRO_INT;
41 if (strcmp(type_str,
"long") == 0)
return AVRO_LONG;
42 if (strcmp(type_str,
"float") == 0)
return AVRO_FLOAT;
43 if (strcmp(type_str,
"double") == 0)
return AVRO_DOUBLE;
44 if (strcmp(type_str,
"bytes") == 0)
return AVRO_BYTES;
45 if (strcmp(type_str,
"string") == 0)
return AVRO_STRING;
81 if (cJSON_IsString(json)) {
87 if (cJSON_IsArray(json)) {
89 int nb = cJSON_GetArraySize(json);
93 for (
int i = 0; i < nb; i++) {
100 if (cJSON_IsObject(json)) {
101 const cJSON* type_node = cJSON_GetObjectItemCaseSensitive(json,
"type");
104 const char* type_str = cJSON_GetStringValue(type_node);
115 const cJSON* name_node = cJSON_GetObjectItemCaseSensitive(json,
"name");
116 if (name_node && cJSON_IsString(name_node)) {
122 if (strcmp(type_str,
"record") == 0) {
125 const cJSON* name_node = cJSON_GetObjectItemCaseSensitive(json,
"name");
126 if (name_node && cJSON_IsString(name_node)) {
130 const cJSON* ns_node = cJSON_GetObjectItemCaseSensitive(json,
"namespace");
131 if (ns_node && cJSON_IsString(ns_node)) {
132 schema->namespace =
local_strdup(cJSON_GetStringValue(ns_node));
135 const cJSON* fields_node = cJSON_GetObjectItemCaseSensitive(json,
"fields");
136 if (fields_node && cJSON_IsArray(fields_node)) {
137 int nb = cJSON_GetArraySize(fields_node);
142 for (
int i = 0; i < nb; i++) {
143 const cJSON* field = cJSON_GetArrayItem(fields_node, i);
144 const cJSON* fname = cJSON_GetObjectItemCaseSensitive(field,
"name");
145 const cJSON* ftype = cJSON_GetObjectItemCaseSensitive(field,
"type");
147 if (fname && cJSON_IsString(fname)) {
153 }
else if (strcmp(type_str,
"array") == 0) {
155 const cJSON* items_node = cJSON_GetObjectItemCaseSensitive(json,
"items");
157 }
else if (strcmp(type_str,
"map") == 0) {
159 const cJSON* values_node = cJSON_GetObjectItemCaseSensitive(json,
"values");
161 }
else if (strcmp(type_str,
"enum") == 0) {
164 const cJSON* name_node = cJSON_GetObjectItemCaseSensitive(json,
"name");
165 if (name_node && cJSON_IsString(name_node)) {
169 const cJSON* ns_node = cJSON_GetObjectItemCaseSensitive(json,
"namespace");
170 if (ns_node && cJSON_IsString(ns_node)) {
171 schema->namespace =
local_strdup(cJSON_GetStringValue(ns_node));
174 const cJSON* symbols_node = cJSON_GetObjectItemCaseSensitive(json,
"symbols");
175 if (symbols_node && cJSON_IsArray(symbols_node)) {
176 int nb = cJSON_GetArraySize(symbols_node);
180 for (
int i = 0; i < nb; i++) {
181 const cJSON* sym = cJSON_GetArrayItem(symbols_node, i);
182 if (cJSON_IsString(sym)) {
187 }
else if (strcmp(type_str,
"fixed") == 0) {
190 const cJSON* name_node = cJSON_GetObjectItemCaseSensitive(json,
"name");
191 if (name_node && cJSON_IsString(name_node)) {
195 const cJSON* size_node = cJSON_GetObjectItemCaseSensitive(json,
"size");
196 if (size_node && cJSON_IsNumber(size_node)) {
197 schema->
fixed_size = (size_t)cJSON_GetNumberValue(size_node);
214 cJSON* json = cJSON_Parse(json_str);
223 if (!schema_ptr || !*schema_ptr)
return;
231 for (
size_t i = 0; i < schema->
nb_fields; i++) {
249 for (
size_t i = 0; i < schema->
nb_symbols; i++) {
263 switch (schema->
type) {
277 json = cJSON_CreateObject();
278 cJSON_AddStringToObject(json,
"type",
"record");
280 cJSON_AddStringToObject(json,
"name", schema->
name);
282 if (schema->namespace) {
283 cJSON_AddStringToObject(json,
"namespace", schema->namespace);
285 cJSON* fields_arr = cJSON_AddArrayToObject(json,
"fields");
286 for (
size_t i = 0; i < schema->
nb_fields; i++) {
287 cJSON* field = cJSON_CreateObject();
288 cJSON_AddStringToObject(field,
"name", schema->
fields[i].
name);
291 cJSON* ftype = cJSON_Parse(ftype_str);
292 cJSON_AddItemToObject(field,
"type", ftype);
295 cJSON_AddItemToArray(fields_arr, field);
301 json = cJSON_CreateObject();
302 cJSON_AddStringToObject(json,
"type",
"array");
305 cJSON* items = cJSON_Parse(items_str);
306 cJSON_AddItemToObject(json,
"items", items);
313 json = cJSON_CreateObject();
314 cJSON_AddStringToObject(json,
"type",
"map");
317 cJSON* values = cJSON_Parse(values_str);
318 cJSON_AddItemToObject(json,
"values", values);
325 json = cJSON_CreateObject();
326 cJSON_AddStringToObject(json,
"type",
"enum");
328 cJSON_AddStringToObject(json,
"name", schema->
name);
330 if (schema->namespace) {
331 cJSON_AddStringToObject(json,
"namespace", schema->namespace);
333 cJSON* syms = cJSON_AddArrayToObject(json,
"symbols");
334 for (
size_t i = 0; i < schema->
nb_symbols; i++) {
335 cJSON_AddItemToArray(syms, cJSON_CreateString(schema->
symbols[i]));
341 json = cJSON_CreateArray();
345 cJSON* branch = cJSON_Parse(branch_str);
346 cJSON_AddItemToArray(json, branch);
354 json = cJSON_CreateObject();
355 cJSON_AddStringToObject(json,
"type",
"fixed");
357 cJSON_AddStringToObject(json,
"name", schema->
name);
359 cJSON_AddNumberToObject(json,
"size", (
double)schema->
fixed_size);
366 result = cJSON_PrintUnformatted(json);
380 uint64_t n = ((uint64_t)value << 1) ^ -(uint64_t)(value < 0);
382 unsigned char buf[10];
385 buf[len++] = (
unsigned char)((n & 0x7F) | 0x80);
388 buf[len++] = (
unsigned char)n;
401 while (reader->
pos < reader->
size) {
402 unsigned char b = reader->
data[reader->
pos++];
403 n |= ((uint64_t)(b & 0x7F)) << shift;
404 if ((b & 0x80) == 0) {
406 *value = (int64_t)((n >> 1) ^ (~(n & 1) + 1));
416 n_log(
LOG_ERR,
"unexpected end of data while reading varint");
427 if (ret != TRUE)
return ret;
438 switch (branch->
type) {
440 if (cJSON_IsNull(json))
return (
int)i;
443 if (cJSON_IsBool(json))
return (
int)i;
449 if (cJSON_IsNumber(json))
return (
int)i;
452 if (cJSON_IsString(json))
return (
int)i;
455 if (cJSON_IsString(json))
return (
int)i;
459 if (cJSON_IsObject(json))
return (
int)i;
462 if (cJSON_IsArray(json))
return (
int)i;
465 if (cJSON_IsString(json))
return (
int)i;
479 switch (schema->
type) {
485 unsigned char b = cJSON_IsTrue(json) ? 1 : 0;
492 if (cJSON_IsNumber(json)) {
493 val = (int32_t)cJSON_GetNumberValue(json);
500 if (cJSON_IsNumber(json)) {
501 val = (int64_t)cJSON_GetNumberValue(json);
508 if (cJSON_IsNumber(json)) {
509 val = (float)cJSON_GetNumberValue(json);
512 unsigned char buf[4];
513 memcpy(buf, &val, 4);
514#if BYTEORDER_ENDIAN == BYTEORDER_BIG_ENDIAN
529 if (cJSON_IsNumber(json)) {
530 val = cJSON_GetNumberValue(json);
533 unsigned char buf[8];
534 memcpy(buf, &val, 8);
535#if BYTEORDER_ENDIAN == BYTEORDER_BIG_ENDIAN
555 const char* str = cJSON_GetStringValue((cJSON*)json);
563 const char* str = cJSON_GetStringValue((cJSON*)json);
571 if (!cJSON_IsObject(json)) {
575 for (
size_t i = 0; i < schema->
nb_fields; i++) {
576 const cJSON* field_val = cJSON_GetObjectItemCaseSensitive(json, schema->
fields[i].
name);
580 memset(&null_val, 0,
sizeof(null_val));
581 null_val.type = cJSON_NULL;
595 const char* sym = cJSON_GetStringValue((cJSON*)json);
600 for (
size_t i = 0; i < schema->
nb_symbols; i++) {
601 if (strcmp(sym, schema->
symbols[i]) == 0) {
610 if (!cJSON_IsArray(json)) {
614 int count = cJSON_GetArraySize(json);
618 for (
int i = 0; i < count; i++) {
629 if (!cJSON_IsObject(json)) {
633 int count = cJSON_GetArraySize(json);
636 const cJSON* item = NULL;
637 cJSON_ArrayForEach(item, json) {
639 const char*
key = item->string;
654 if (branch_idx < 0) {
655 n_log(
LOG_ERR,
"no matching union branch for JSON value");
663 const char* str = cJSON_GetStringValue((cJSON*)json);
668 size_t len = strlen(str);
674 if (len < schema->fixed_size) {
676 unsigned char* zeros = NULL;
677 Malloc(zeros,
unsigned char, pad);
702 *out_len = (size_t)len;
703 if ((
size_t)len > reader->
size - reader->
pos) {
704 n_log(
LOG_ERR,
"not enough data for bytes: need %zu, have %zu", (
size_t)len, reader->
size - reader->
pos);
707 Malloc(*out,
unsigned char, (
size_t)len + 1);
710 memcpy(*out, reader->
data + reader->
pos, (
size_t)len);
712 (*out)[(size_t)len] =
'\0';
713 reader->
pos += (size_t)len;
721 switch (schema->
type) {
723 return cJSON_CreateNull();
726 if (reader->
pos >= reader->
size) {
727 n_log(
LOG_ERR,
"unexpected end of data reading boolean");
730 unsigned char b = reader->
data[reader->
pos++];
731 return cJSON_CreateBool(b != 0);
737 return cJSON_CreateNumber((
double)(int32_t)val);
743 return cJSON_CreateNumber((
double)val);
747 if (reader->
pos + 4 > reader->
size) {
751 unsigned char buf[4];
752 memcpy(buf, reader->
data + reader->
pos, 4);
754#if BYTEORDER_ENDIAN == BYTEORDER_BIG_ENDIAN
764 memcpy(&val, buf, 4);
765 return cJSON_CreateNumber((
double)val);
769 if (reader->
pos + 8 > reader->
size) {
770 n_log(
LOG_ERR,
"unexpected end of data reading double");
773 unsigned char buf[8];
774 memcpy(buf, reader->
data + reader->
pos, 8);
776#if BYTEORDER_ENDIAN == BYTEORDER_BIG_ENDIAN
792 memcpy(&val, buf, 8);
793 return cJSON_CreateNumber(val);
797 unsigned char* data = NULL;
800 cJSON* result = cJSON_CreateString((
const char*)data);
806 unsigned char* data = NULL;
809 cJSON* result = cJSON_CreateString((
const char*)data);
815 cJSON* obj = cJSON_CreateObject();
816 for (
size_t i = 0; i < schema->
nb_fields; i++) {
822 cJSON_AddItemToObject(obj, schema->
fields[i].
name, field_val);
830 if (idx < 0 || (
size_t)idx >= schema->
nb_symbols) {
834 return cJSON_CreateString(schema->
symbols[(
size_t)idx]);
838 cJSON* arr = cJSON_CreateArray();
839 int64_t block_count = 0;
845 if (block_count == 0)
break;
846 if (block_count < 0) {
848 block_count = -block_count;
849 int64_t block_size = 0;
856 for (int64_t i = 0; i < block_count; i++) {
862 cJSON_AddItemToArray(arr, item);
869 cJSON* obj = cJSON_CreateObject();
870 int64_t block_count = 0;
876 if (block_count == 0)
break;
877 if (block_count < 0) {
878 block_count = -block_count;
879 int64_t block_size = 0;
886 for (int64_t i = 0; i < block_count; i++) {
888 unsigned char*
key = NULL;
901 cJSON_AddItemToObject(obj, (
const char*)
key, val);
909 int64_t branch_idx = 0;
911 if (branch_idx < 0 || (
size_t)branch_idx >= schema->
nb_branches) {
929 cJSON* result = cJSON_CreateString(buf);
946 if (!cJSON_IsArray(records)) {
951 N_STR* output = NULL;
967 const char* key1 =
"avro.schema";
974 const char* key2 =
"avro.codec";
977 const char* codec =
"null";
988 srand((
unsigned)time(NULL));
990 sync[i] = (
unsigned char)(rand() % 256);
995 int count = cJSON_GetArraySize(records);
997 N_STR* block_data = NULL;
1001 for (
int i = 0; i < count; i++) {
1002 const cJSON* record = cJSON_GetArrayItem(records, i);
1031 reader.
data = (
const unsigned char*)avro_data->
data;
1047 int64_t block_count = 0;
1050 if (block_count == 0)
break;
1051 if (block_count < 0) {
1053 int64_t block_size = 0;
1055 if (block_size < 0 || (
size_t)block_size > reader.
size - reader.
pos) {
1059 reader.
pos += (size_t)block_size;
1063 for (int64_t i = 0; i < block_count; i++) {
1064 unsigned char*
key = NULL;
1068 unsigned char* val = NULL;
1077 n_log(
LOG_ERR,
"unexpected end of data reading sync marker");
1085 cJSON* all_records = cJSON_CreateArray();
1087 while (reader.
pos < reader.
size) {
1088 int64_t obj_count = 0;
1090 cJSON_Delete(all_records);
1093 if (obj_count <= 0)
break;
1095 int64_t block_byte_size = 0;
1097 cJSON_Delete(all_records);
1101 if (block_byte_size < 0 || (
size_t)block_byte_size > reader.
size - reader.
pos) {
1102 n_log(
LOG_ERR,
"invalid data block size: %" PRId64, block_byte_size);
1103 cJSON_Delete(all_records);
1107 size_t block_end = reader.
pos + (size_t)block_byte_size;
1109 for (int64_t i = 0; i < obj_count; i++) {
1113 cJSON_Delete(all_records);
1116 cJSON_AddItemToArray(all_records, record);
1120 if (reader.
pos != block_end) {
1121 n_log(
LOG_WARNING,
"block size mismatch: expected pos %zu, got %zu", block_end, reader.
pos);
1122 reader.
pos = block_end;
1127 n_log(
LOG_ERR,
"unexpected end of data reading block sync marker");
1128 cJSON_Delete(all_records);
1133 cJSON_Delete(all_records);
1146int avro_json_to_file(
const char* avro_filename,
const char* schema_filename,
const char* json_filename) {
1166 cJSON* json = cJSON_Parse(json_str->
data);
1169 n_log(
LOG_ERR,
"failed to parse JSON file %s: %s", json_filename,
_str(cJSON_GetErrorPtr()));
1175 cJSON* records = json;
1177 if (cJSON_IsObject(json)) {
1178 records = cJSON_CreateArray();
1179 cJSON_AddItemToArray(records, cJSON_Duplicate(json, 1));
1187 cJSON_Delete(records);
1198 int ret =
nstr_to_file(avro_data, (
char*)avro_filename);
1202 n_log(
LOG_ERR,
"failed to write Avro file %s", avro_filename);
1210int avro_file_to_json(
const char* avro_filename,
const char* schema_filename,
const char* json_filename) {
1236 n_log(
LOG_ERR,
"failed to decode Avro file %s", avro_filename);
1241 char* json_output = cJSON_Print(records);
1242 cJSON_Delete(records);
1256 int ret =
nstr_to_file(json_nstr, (
char*)json_filename);
1260 n_log(
LOG_ERR,
"failed to write JSON file %s", json_filename);
1288 cJSON* json = cJSON_Parse(json_nstr->
data);
1296 cJSON* records = json;
1298 if (cJSON_IsObject(json)) {
1299 records = cJSON_CreateArray();
1300 cJSON_AddItemToArray(records, cJSON_Duplicate(json, 1));
1307 cJSON_Delete(records);
1332 char* json_output = cJSON_Print(records);
1333 cJSON_Delete(records);
size_t nb_symbols
number of enum symbols
AVRO_SCHEMA ** union_branches
union branch schemas
char * name
name (for record, enum, fixed)
AVRO_SCHEMA * items
item schema (for array)
AVRO_SCHEMA * values
value schema (for map)
AVRO_FIELD * fields
namespace (for record, enum)
size_t pos
current position
size_t fixed_size
fixed size
AVRO_SCHEMA * schema
field schema
AVRO_TYPE type
schema type
const unsigned char * data
data buffer
size_t nb_branches
number of union branches
char ** symbols
enum symbols
size_t nb_fields
number of fields (for record)
N_STR * avro_nstr_json_to_avro(const N_STR *schema_nstr, const N_STR *json_nstr)
Convert JSON N_STR to Avro N_STR using schema N_STR (all in-memory)
#define AVRO_SYNC_LEN
Avro sync marker length.
AVRO_TYPE
Avro schema type enumeration.
#define AVRO_MAGIC_LEN
Avro magic length.
int avro_file_to_json(const char *avro_filename, const char *schema_filename, const char *json_filename)
Read an Avro object container file and produce a JSON file using a schema file.
N_STR * avro_encode_container(const AVRO_SCHEMA *schema, const cJSON *records)
Encode a cJSON array of records into Avro container format N_STR.
AVRO_SCHEMA * avro_schema_from_cjson(const cJSON *json)
Parse an Avro schema from a cJSON object.
int avro_encode_datum(N_STR **dest, const AVRO_SCHEMA *schema, const cJSON *json)
Encode a cJSON value as Avro binary according to schema.
AVRO_SCHEMA * avro_schema_parse_nstr(const N_STR *schema_nstr)
Parse schema from N_STR.
AVRO_SCHEMA * avro_schema_parse(const char *json_str)
Parse an Avro schema from a JSON string.
int avro_decode_long(AVRO_READER *reader, int64_t *value)
Decode a zig-zag varint from reader into a 64-bit signed integer.
cJSON * avro_decode_container(const AVRO_SCHEMA *schema, const N_STR *avro_data)
Decode an Avro container format N_STR into a cJSON array of records.
cJSON * avro_decode_datum(AVRO_READER *reader, const AVRO_SCHEMA *schema)
Decode an Avro binary datum into cJSON according to schema.
N_STR * avro_nstr_avro_to_json(const N_STR *schema_nstr, const N_STR *avro_nstr)
Convert Avro N_STR to JSON N_STR using schema N_STR (all in-memory)
char * avro_schema_to_json(const AVRO_SCHEMA *schema)
Convert an Avro schema back to JSON string (caller must free)
void avro_schema_free(AVRO_SCHEMA **schema_ptr)
Free an Avro schema.
int avro_encode_long(N_STR **dest, int64_t value)
Encode a 64-bit signed integer as zig-zag varint into N_STR.
#define AVRO_MAGIC
Avro object container file magic bytes.
int avro_json_to_file(const char *avro_filename, const char *schema_filename, const char *json_filename)
Write an Avro object container file from a JSON file and schema file.
Avro schema field (for records)
Avro read cursor for decoding.
#define FreeNoLog(__ptr)
Free Handler without log.
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
#define __n_assert(__ptr, __ret)
macro to assert things
#define _str(__PTR)
define true
#define Free(__ptr)
Free Handler to get errors.
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
#define LOG_ERR
error conditions
#define LOG_WARNING
warning conditions
#define LOG_INFO
informational
size_t written
size of the written data inside the string
size_t NSTRBYTE
N_STR base unit.
#define free_nstr(__ptr)
free a N_STR structure and set the pointer to NULL
#define local_strdup(__src_)
Do tar(1) matching rules, which ignore a trailing slash?
int nstr_to_file(N_STR *str, char *filename)
Write a N_STR content into a file.
N_STR * nstrcat_ex(N_STR **dest, void *src, NSTRBYTE size, int resize_flag)
Append data into N_STR using internal N_STR size and cursor position.
N_STR * char_to_nstr(const char *src)
Convert a char into a N_STR, short version.
#define nstrprintf(__nstr_var, __format,...)
Macro to quickly allocate and sprintf to N_STR.
N_STR * file_to_nstr(char *filename)
Load a whole file into a N_STR.
A box including a string and his lenght.
static int avro_encode_bytes_raw(N_STR **dest, const unsigned char *data, size_t len)
helper: encode raw bytes with length prefix
static AVRO_TYPE avro_type_from_string(const char *type_str)
helper to get AVRO_TYPE from a type name string
static int avro_decode_bytes_raw(AVRO_READER *reader, unsigned char **out, size_t *out_len)
helper: read raw bytes with length prefix
static const char * avro_type_to_string(AVRO_TYPE type)
helper to get type name from AVRO_TYPE
static int avro_find_union_branch(const AVRO_SCHEMA *schema, const cJSON *json)
helper: find which union branch matches a cJSON value
Avro binary format encoding/decoding with JSON conversion.