Nilorea Library
C utilities for networking, threading, graphics
Loading...
Searching...
No Matches
n_avro.c
Go to the documentation of this file.
1/*
2 * Nilorea Library
3 * Copyright (C) 2005-2026 Castagnier Mickael
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 */
18
27#include "nilorea/n_avro.h"
28
29#include <string.h>
30#include <time.h>
31
32/* ========================================================================== */
33/* Schema parsing */
34/* ========================================================================== */
35
37static AVRO_TYPE avro_type_from_string(const char* type_str) {
38 if (strcmp(type_str, "null") == 0) return AVRO_NULL;
39 if (strcmp(type_str, "boolean") == 0) return AVRO_BOOLEAN;
40 if (strcmp(type_str, "int") == 0) return AVRO_INT;
41 if (strcmp(type_str, "long") == 0) return AVRO_LONG;
42 if (strcmp(type_str, "float") == 0) return AVRO_FLOAT;
43 if (strcmp(type_str, "double") == 0) return AVRO_DOUBLE;
44 if (strcmp(type_str, "bytes") == 0) return AVRO_BYTES;
45 if (strcmp(type_str, "string") == 0) return AVRO_STRING;
46 return AVRO_NULL;
47}
48
50static const char* avro_type_to_string(AVRO_TYPE type) {
51 switch (type) {
52 case AVRO_NULL:
53 return "null";
54 case AVRO_BOOLEAN:
55 return "boolean";
56 case AVRO_INT:
57 return "int";
58 case AVRO_LONG:
59 return "long";
60 case AVRO_FLOAT:
61 return "float";
62 case AVRO_DOUBLE:
63 return "double";
64 case AVRO_BYTES:
65 return "bytes";
66 case AVRO_STRING:
67 return "string";
68 default:
69 return "null";
70 }
71}
72
74 __n_assert(json, return NULL);
75
76 AVRO_SCHEMA* schema = NULL;
77 Malloc(schema, AVRO_SCHEMA, 1);
78 __n_assert(schema, return NULL);
79
80 /* primitive type as string: "null", "boolean", "int", etc. */
81 if (cJSON_IsString(json)) {
82 schema->type = avro_type_from_string(cJSON_GetStringValue((cJSON*)json));
83 return schema;
84 }
85
86 /* union: JSON array of schemas */
87 if (cJSON_IsArray(json)) {
88 schema->type = AVRO_UNION;
89 int nb = cJSON_GetArraySize(json);
90 schema->nb_branches = (size_t)nb;
91 Malloc(schema->union_branches, AVRO_SCHEMA*, (size_t)nb);
92 __n_assert(schema->union_branches, Free(schema); return NULL);
93 for (int i = 0; i < nb; i++) {
94 schema->union_branches[i] = avro_schema_from_cjson(cJSON_GetArrayItem(json, i));
95 }
96 return schema;
97 }
98
99 /* complex type as object */
100 if (cJSON_IsObject(json)) {
101 const cJSON* type_node = cJSON_GetObjectItemCaseSensitive(json, "type");
102 __n_assert(type_node, Free(schema); return NULL);
103
104 const char* type_str = cJSON_GetStringValue(type_node);
105
106 /* type field can itself be a complex type (array or object) */
107 if (!type_str) {
108 /* nested complex type in the "type" field */
109 AVRO_SCHEMA* inner = avro_schema_from_cjson(type_node);
110 if (inner) {
111 memcpy(schema, inner, sizeof(AVRO_SCHEMA));
112 Free(inner);
113 }
114 /* copy name/namespace from outer object if present */
115 const cJSON* name_node = cJSON_GetObjectItemCaseSensitive(json, "name");
116 if (name_node && cJSON_IsString(name_node)) {
117 schema->name = local_strdup(cJSON_GetStringValue(name_node));
118 }
119 return schema;
120 }
121
122 if (strcmp(type_str, "record") == 0) {
123 schema->type = AVRO_RECORD;
124
125 const cJSON* name_node = cJSON_GetObjectItemCaseSensitive(json, "name");
126 if (name_node && cJSON_IsString(name_node)) {
127 schema->name = local_strdup(cJSON_GetStringValue(name_node));
128 }
129
130 const cJSON* ns_node = cJSON_GetObjectItemCaseSensitive(json, "namespace");
131 if (ns_node && cJSON_IsString(ns_node)) {
132 schema->namespace = local_strdup(cJSON_GetStringValue(ns_node));
133 }
134
135 const cJSON* fields_node = cJSON_GetObjectItemCaseSensitive(json, "fields");
136 if (fields_node && cJSON_IsArray(fields_node)) {
137 int nb = cJSON_GetArraySize(fields_node);
138 schema->nb_fields = (size_t)nb;
139 Malloc(schema->fields, AVRO_FIELD, (size_t)nb);
140 __n_assert(schema->fields, avro_schema_free(&schema); return NULL);
141
142 for (int i = 0; i < nb; i++) {
143 const cJSON* field = cJSON_GetArrayItem(fields_node, i);
144 const cJSON* fname = cJSON_GetObjectItemCaseSensitive(field, "name");
145 const cJSON* ftype = cJSON_GetObjectItemCaseSensitive(field, "type");
146
147 if (fname && cJSON_IsString(fname)) {
148 schema->fields[i].name = local_strdup(cJSON_GetStringValue(fname));
149 }
150 schema->fields[i].schema = avro_schema_from_cjson(ftype);
151 }
152 }
153 } else if (strcmp(type_str, "array") == 0) {
154 schema->type = AVRO_ARRAY;
155 const cJSON* items_node = cJSON_GetObjectItemCaseSensitive(json, "items");
156 schema->items = avro_schema_from_cjson(items_node);
157 } else if (strcmp(type_str, "map") == 0) {
158 schema->type = AVRO_MAP;
159 const cJSON* values_node = cJSON_GetObjectItemCaseSensitive(json, "values");
160 schema->values = avro_schema_from_cjson(values_node);
161 } else if (strcmp(type_str, "enum") == 0) {
162 schema->type = AVRO_ENUM;
163
164 const cJSON* name_node = cJSON_GetObjectItemCaseSensitive(json, "name");
165 if (name_node && cJSON_IsString(name_node)) {
166 schema->name = local_strdup(cJSON_GetStringValue(name_node));
167 }
168
169 const cJSON* ns_node = cJSON_GetObjectItemCaseSensitive(json, "namespace");
170 if (ns_node && cJSON_IsString(ns_node)) {
171 schema->namespace = local_strdup(cJSON_GetStringValue(ns_node));
172 }
173
174 const cJSON* symbols_node = cJSON_GetObjectItemCaseSensitive(json, "symbols");
175 if (symbols_node && cJSON_IsArray(symbols_node)) {
176 int nb = cJSON_GetArraySize(symbols_node);
177 schema->nb_symbols = (size_t)nb;
178 Malloc(schema->symbols, char*, (size_t)nb);
179 __n_assert(schema->symbols, avro_schema_free(&schema); return NULL);
180 for (int i = 0; i < nb; i++) {
181 const cJSON* sym = cJSON_GetArrayItem(symbols_node, i);
182 if (cJSON_IsString(sym)) {
183 schema->symbols[i] = local_strdup(cJSON_GetStringValue(sym));
184 }
185 }
186 }
187 } else if (strcmp(type_str, "fixed") == 0) {
188 schema->type = AVRO_FIXED;
189
190 const cJSON* name_node = cJSON_GetObjectItemCaseSensitive(json, "name");
191 if (name_node && cJSON_IsString(name_node)) {
192 schema->name = local_strdup(cJSON_GetStringValue(name_node));
193 }
194
195 const cJSON* size_node = cJSON_GetObjectItemCaseSensitive(json, "size");
196 if (size_node && cJSON_IsNumber(size_node)) {
197 schema->fixed_size = (size_t)cJSON_GetNumberValue(size_node);
198 }
199 } else {
200 /* primitive type name in object form */
201 schema->type = avro_type_from_string(type_str);
202 }
203 return schema;
204 }
205
206 /* fallback */
207 schema->type = AVRO_NULL;
208 return schema;
209}
210
211AVRO_SCHEMA* avro_schema_parse(const char* json_str) {
212 __n_assert(json_str, return NULL);
213
214 cJSON* json = cJSON_Parse(json_str);
215 __n_assert(json, return NULL);
216
217 AVRO_SCHEMA* schema = avro_schema_from_cjson(json);
218 cJSON_Delete(json);
219 return schema;
220}
221
222void avro_schema_free(AVRO_SCHEMA** schema_ptr) {
223 if (!schema_ptr || !*schema_ptr) return;
224
225 AVRO_SCHEMA* schema = *schema_ptr;
226
227 FreeNoLog(schema->name);
228 FreeNoLog(schema->namespace);
229
230 if (schema->fields) {
231 for (size_t i = 0; i < schema->nb_fields; i++) {
232 FreeNoLog(schema->fields[i].name);
233 avro_schema_free(&schema->fields[i].schema);
234 }
235 FreeNoLog(schema->fields);
236 }
237
238 avro_schema_free(&schema->items);
239 avro_schema_free(&schema->values);
240
241 if (schema->union_branches) {
242 for (size_t i = 0; i < schema->nb_branches; i++) {
243 avro_schema_free(&schema->union_branches[i]);
244 }
245 FreeNoLog(schema->union_branches);
246 }
247
248 if (schema->symbols) {
249 for (size_t i = 0; i < schema->nb_symbols; i++) {
250 FreeNoLog(schema->symbols[i]);
251 }
252 FreeNoLog(schema->symbols);
253 }
254
255 FreeNoLog(*schema_ptr);
256}
257
258char* avro_schema_to_json(const AVRO_SCHEMA* schema) {
259 __n_assert(schema, return NULL);
260
261 cJSON* json = NULL;
262
263 switch (schema->type) {
264 case AVRO_NULL:
265 case AVRO_BOOLEAN:
266 case AVRO_INT:
267 case AVRO_LONG:
268 case AVRO_FLOAT:
269 case AVRO_DOUBLE:
270 case AVRO_BYTES:
271 case AVRO_STRING:
272 /* for simple types used inside complex structures, return just the string */
273 json = cJSON_CreateString(avro_type_to_string(schema->type));
274 break;
275
276 case AVRO_RECORD: {
277 json = cJSON_CreateObject();
278 cJSON_AddStringToObject(json, "type", "record");
279 if (schema->name) {
280 cJSON_AddStringToObject(json, "name", schema->name);
281 }
282 if (schema->namespace) {
283 cJSON_AddStringToObject(json, "namespace", schema->namespace);
284 }
285 cJSON* fields_arr = cJSON_AddArrayToObject(json, "fields");
286 for (size_t i = 0; i < schema->nb_fields; i++) {
287 cJSON* field = cJSON_CreateObject();
288 cJSON_AddStringToObject(field, "name", schema->fields[i].name);
289 char* ftype_str = avro_schema_to_json(schema->fields[i].schema);
290 if (ftype_str) {
291 cJSON* ftype = cJSON_Parse(ftype_str);
292 cJSON_AddItemToObject(field, "type", ftype);
293 Free(ftype_str);
294 }
295 cJSON_AddItemToArray(fields_arr, field);
296 }
297 break;
298 }
299
300 case AVRO_ARRAY: {
301 json = cJSON_CreateObject();
302 cJSON_AddStringToObject(json, "type", "array");
303 char* items_str = avro_schema_to_json(schema->items);
304 if (items_str) {
305 cJSON* items = cJSON_Parse(items_str);
306 cJSON_AddItemToObject(json, "items", items);
307 Free(items_str);
308 }
309 break;
310 }
311
312 case AVRO_MAP: {
313 json = cJSON_CreateObject();
314 cJSON_AddStringToObject(json, "type", "map");
315 char* values_str = avro_schema_to_json(schema->values);
316 if (values_str) {
317 cJSON* values = cJSON_Parse(values_str);
318 cJSON_AddItemToObject(json, "values", values);
319 Free(values_str);
320 }
321 break;
322 }
323
324 case AVRO_ENUM: {
325 json = cJSON_CreateObject();
326 cJSON_AddStringToObject(json, "type", "enum");
327 if (schema->name) {
328 cJSON_AddStringToObject(json, "name", schema->name);
329 }
330 if (schema->namespace) {
331 cJSON_AddStringToObject(json, "namespace", schema->namespace);
332 }
333 cJSON* syms = cJSON_AddArrayToObject(json, "symbols");
334 for (size_t i = 0; i < schema->nb_symbols; i++) {
335 cJSON_AddItemToArray(syms, cJSON_CreateString(schema->symbols[i]));
336 }
337 break;
338 }
339
340 case AVRO_UNION: {
341 json = cJSON_CreateArray();
342 for (size_t i = 0; i < schema->nb_branches; i++) {
343 char* branch_str = avro_schema_to_json(schema->union_branches[i]);
344 if (branch_str) {
345 cJSON* branch = cJSON_Parse(branch_str);
346 cJSON_AddItemToArray(json, branch);
347 Free(branch_str);
348 }
349 }
350 break;
351 }
352
353 case AVRO_FIXED: {
354 json = cJSON_CreateObject();
355 cJSON_AddStringToObject(json, "type", "fixed");
356 if (schema->name) {
357 cJSON_AddStringToObject(json, "name", schema->name);
358 }
359 cJSON_AddNumberToObject(json, "size", (double)schema->fixed_size);
360 break;
361 }
362 }
363
364 char* result = NULL;
365 if (json) {
366 result = cJSON_PrintUnformatted(json);
367 cJSON_Delete(json);
368 }
369 return result;
370}
371
372/* ========================================================================== */
373/* Zig-zag varint encoding/decoding */
374/* ========================================================================== */
375
376int avro_encode_long(N_STR** dest, int64_t value) {
377 __n_assert(dest, return FALSE);
378
379 /* zig-zag encode: portable sign-extension without signed shift UB */
380 uint64_t n = ((uint64_t)value << 1) ^ -(uint64_t)(value < 0);
381
382 unsigned char buf[10];
383 int len = 0;
384 while (n > 0x7F) {
385 buf[len++] = (unsigned char)((n & 0x7F) | 0x80);
386 n >>= 7;
387 }
388 buf[len++] = (unsigned char)n;
389
390 nstrcat_ex(dest, buf, (NSTRBYTE)len, 1);
391 return TRUE;
392}
393
394int avro_decode_long(AVRO_READER* reader, int64_t* value) {
395 __n_assert(reader, return FALSE);
396 __n_assert(value, return FALSE);
397
398 uint64_t n = 0;
399 int shift = 0;
400
401 while (reader->pos < reader->size) {
402 unsigned char b = reader->data[reader->pos++];
403 n |= ((uint64_t)(b & 0x7F)) << shift;
404 if ((b & 0x80) == 0) {
405 /* zig-zag decode */
406 *value = (int64_t)((n >> 1) ^ (~(n & 1) + 1));
407 return TRUE;
408 }
409 shift += 7;
410 if (shift >= 70) {
411 n_log(LOG_ERR, "varint too long");
412 return FALSE;
413 }
414 }
415
416 n_log(LOG_ERR, "unexpected end of data while reading varint");
417 return FALSE;
418}
419
420/* ========================================================================== */
421/* Binary encoding */
422/* ========================================================================== */
423
425static int avro_encode_bytes_raw(N_STR** dest, const unsigned char* data, size_t len) {
426 int ret = avro_encode_long(dest, (int64_t)len);
427 if (ret != TRUE) return ret;
428 if (len > 0) {
429 nstrcat_ex(dest, (void*)data, (NSTRBYTE)len, 1);
430 }
431 return TRUE;
432}
433
435static int avro_find_union_branch(const AVRO_SCHEMA* schema, const cJSON* json) {
436 for (size_t i = 0; i < schema->nb_branches; i++) {
437 const AVRO_SCHEMA* branch = schema->union_branches[i];
438 switch (branch->type) {
439 case AVRO_NULL:
440 if (cJSON_IsNull(json)) return (int)i;
441 break;
442 case AVRO_BOOLEAN:
443 if (cJSON_IsBool(json)) return (int)i;
444 break;
445 case AVRO_INT:
446 case AVRO_LONG:
447 case AVRO_FLOAT:
448 case AVRO_DOUBLE:
449 if (cJSON_IsNumber(json)) return (int)i;
450 break;
451 case AVRO_STRING:
452 if (cJSON_IsString(json)) return (int)i;
453 break;
454 case AVRO_BYTES:
455 if (cJSON_IsString(json)) return (int)i;
456 break;
457 case AVRO_RECORD:
458 case AVRO_MAP:
459 if (cJSON_IsObject(json)) return (int)i;
460 break;
461 case AVRO_ARRAY:
462 if (cJSON_IsArray(json)) return (int)i;
463 break;
464 case AVRO_ENUM:
465 if (cJSON_IsString(json)) return (int)i;
466 break;
467 case AVRO_UNION:
468 case AVRO_FIXED:
469 break;
470 }
471 }
472 return -1;
473}
474
475int avro_encode_datum(N_STR** dest, const AVRO_SCHEMA* schema, const cJSON* json) {
476 __n_assert(dest, return FALSE);
477 __n_assert(schema, return FALSE);
478
479 switch (schema->type) {
480 case AVRO_NULL:
481 /* null is zero bytes */
482 return TRUE;
483
484 case AVRO_BOOLEAN: {
485 unsigned char b = cJSON_IsTrue(json) ? 1 : 0;
486 nstrcat_ex(dest, &b, 1, 1);
487 return TRUE;
488 }
489
490 case AVRO_INT: {
491 int32_t val = 0;
492 if (cJSON_IsNumber(json)) {
493 val = (int32_t)cJSON_GetNumberValue(json);
494 }
495 return avro_encode_long(dest, (int64_t)val);
496 }
497
498 case AVRO_LONG: {
499 int64_t val = 0;
500 if (cJSON_IsNumber(json)) {
501 val = (int64_t)cJSON_GetNumberValue(json);
502 }
503 return avro_encode_long(dest, val);
504 }
505
506 case AVRO_FLOAT: {
507 float val = 0.0f;
508 if (cJSON_IsNumber(json)) {
509 val = (float)cJSON_GetNumberValue(json);
510 }
511 /* Avro float is 4 bytes little-endian IEEE 754 */
512 unsigned char buf[4];
513 memcpy(buf, &val, 4);
514#if BYTEORDER_ENDIAN == BYTEORDER_BIG_ENDIAN
515 unsigned char tmp;
516 tmp = buf[0];
517 buf[0] = buf[3];
518 buf[3] = tmp;
519 tmp = buf[1];
520 buf[1] = buf[2];
521 buf[2] = tmp;
522#endif
523 nstrcat_ex(dest, buf, 4, 1);
524 return TRUE;
525 }
526
527 case AVRO_DOUBLE: {
528 double val = 0.0;
529 if (cJSON_IsNumber(json)) {
530 val = cJSON_GetNumberValue(json);
531 }
532 /* Avro double is 8 bytes little-endian IEEE 754 */
533 unsigned char buf[8];
534 memcpy(buf, &val, 8);
535#if BYTEORDER_ENDIAN == BYTEORDER_BIG_ENDIAN
536 unsigned char tmp;
537 tmp = buf[0];
538 buf[0] = buf[7];
539 buf[7] = tmp;
540 tmp = buf[1];
541 buf[1] = buf[6];
542 buf[6] = tmp;
543 tmp = buf[2];
544 buf[2] = buf[5];
545 buf[5] = tmp;
546 tmp = buf[3];
547 buf[3] = buf[4];
548 buf[4] = tmp;
549#endif
550 nstrcat_ex(dest, buf, 8, 1);
551 return TRUE;
552 }
553
554 case AVRO_BYTES: {
555 const char* str = cJSON_GetStringValue((cJSON*)json);
556 if (str) {
557 return avro_encode_bytes_raw(dest, (const unsigned char*)str, strlen(str));
558 }
559 return avro_encode_bytes_raw(dest, NULL, 0);
560 }
561
562 case AVRO_STRING: {
563 const char* str = cJSON_GetStringValue((cJSON*)json);
564 if (str) {
565 return avro_encode_bytes_raw(dest, (const unsigned char*)str, strlen(str));
566 }
567 return avro_encode_bytes_raw(dest, (const unsigned char*)"", 0);
568 }
569
570 case AVRO_RECORD: {
571 if (!cJSON_IsObject(json)) {
572 n_log(LOG_ERR, "expected JSON object for record type");
573 return FALSE;
574 }
575 for (size_t i = 0; i < schema->nb_fields; i++) {
576 const cJSON* field_val = cJSON_GetObjectItemCaseSensitive(json, schema->fields[i].name);
577 if (!field_val) {
578 /* try with a null placeholder */
579 cJSON null_val;
580 memset(&null_val, 0, sizeof(null_val));
581 null_val.type = cJSON_NULL;
582 if (avro_encode_datum(dest, schema->fields[i].schema, &null_val) != TRUE) {
583 return FALSE;
584 }
585 } else {
586 if (avro_encode_datum(dest, schema->fields[i].schema, field_val) != TRUE) {
587 return FALSE;
588 }
589 }
590 }
591 return TRUE;
592 }
593
594 case AVRO_ENUM: {
595 const char* sym = cJSON_GetStringValue((cJSON*)json);
596 if (!sym) {
597 n_log(LOG_ERR, "expected string for enum type");
598 return FALSE;
599 }
600 for (size_t i = 0; i < schema->nb_symbols; i++) {
601 if (strcmp(sym, schema->symbols[i]) == 0) {
602 return avro_encode_long(dest, (int64_t)i);
603 }
604 }
605 n_log(LOG_ERR, "unknown enum symbol: %s", sym);
606 return FALSE;
607 }
608
609 case AVRO_ARRAY: {
610 if (!cJSON_IsArray(json)) {
611 n_log(LOG_ERR, "expected JSON array for array type");
612 return FALSE;
613 }
614 int count = cJSON_GetArraySize(json);
615 if (count > 0) {
616 /* write block with count */
617 if (avro_encode_long(dest, (int64_t)count) != TRUE) return FALSE;
618 for (int i = 0; i < count; i++) {
619 if (avro_encode_datum(dest, schema->items, cJSON_GetArrayItem(json, i)) != TRUE) {
620 return FALSE;
621 }
622 }
623 }
624 /* terminating zero-count block */
625 return avro_encode_long(dest, 0);
626 }
627
628 case AVRO_MAP: {
629 if (!cJSON_IsObject(json)) {
630 n_log(LOG_ERR, "expected JSON object for map type");
631 return FALSE;
632 }
633 int count = cJSON_GetArraySize(json);
634 if (count > 0) {
635 if (avro_encode_long(dest, (int64_t)count) != TRUE) return FALSE;
636 const cJSON* item = NULL;
637 cJSON_ArrayForEach(item, json) {
638 /* encode key as string */
639 const char* key = item->string;
640 if (avro_encode_bytes_raw(dest, (const unsigned char*)key, strlen(key)) != TRUE) {
641 return FALSE;
642 }
643 /* encode value */
644 if (avro_encode_datum(dest, schema->values, item) != TRUE) {
645 return FALSE;
646 }
647 }
648 }
649 return avro_encode_long(dest, 0);
650 }
651
652 case AVRO_UNION: {
653 int branch_idx = avro_find_union_branch(schema, json);
654 if (branch_idx < 0) {
655 n_log(LOG_ERR, "no matching union branch for JSON value");
656 return FALSE;
657 }
658 if (avro_encode_long(dest, (int64_t)branch_idx) != TRUE) return FALSE;
659 return avro_encode_datum(dest, schema->union_branches[branch_idx], json);
660 }
661
662 case AVRO_FIXED: {
663 const char* str = cJSON_GetStringValue((cJSON*)json);
664 if (!str) {
665 n_log(LOG_ERR, "expected string for fixed type");
666 return FALSE;
667 }
668 size_t len = strlen(str);
669 if (len > schema->fixed_size) {
670 len = schema->fixed_size;
671 }
672 /* write exactly fixed_size bytes, pad with zeros if needed */
673 nstrcat_ex(dest, (void*)str, (NSTRBYTE)len, 1);
674 if (len < schema->fixed_size) {
675 size_t pad = schema->fixed_size - len;
676 unsigned char* zeros = NULL;
677 Malloc(zeros, unsigned char, pad);
678 if (zeros) {
679 nstrcat_ex(dest, zeros, (NSTRBYTE)pad, 1);
680 Free(zeros);
681 }
682 }
683 return TRUE;
684 }
685 }
686
687 return FALSE;
688}
689
690/* ========================================================================== */
691/* Binary decoding */
692/* ========================================================================== */
693
695static int avro_decode_bytes_raw(AVRO_READER* reader, unsigned char** out, size_t* out_len) {
696 int64_t len = 0;
697 if (avro_decode_long(reader, &len) != TRUE) return FALSE;
698 if (len < 0) {
699 n_log(LOG_ERR, "negative byte length: %" PRId64, len);
700 return FALSE;
701 }
702 *out_len = (size_t)len;
703 if ((size_t)len > reader->size - reader->pos) {
704 n_log(LOG_ERR, "not enough data for bytes: need %zu, have %zu", (size_t)len, reader->size - reader->pos);
705 return FALSE;
706 }
707 Malloc(*out, unsigned char, (size_t)len + 1);
708 __n_assert(*out, return FALSE);
709 if (len > 0) {
710 memcpy(*out, reader->data + reader->pos, (size_t)len);
711 }
712 (*out)[(size_t)len] = '\0';
713 reader->pos += (size_t)len;
714 return TRUE;
715}
716
717cJSON* avro_decode_datum(AVRO_READER* reader, const AVRO_SCHEMA* schema) {
718 __n_assert(reader, return NULL);
719 __n_assert(schema, return NULL);
720
721 switch (schema->type) {
722 case AVRO_NULL:
723 return cJSON_CreateNull();
724
725 case AVRO_BOOLEAN: {
726 if (reader->pos >= reader->size) {
727 n_log(LOG_ERR, "unexpected end of data reading boolean");
728 return NULL;
729 }
730 unsigned char b = reader->data[reader->pos++];
731 return cJSON_CreateBool(b != 0);
732 }
733
734 case AVRO_INT: {
735 int64_t val = 0;
736 if (avro_decode_long(reader, &val) != TRUE) return NULL;
737 return cJSON_CreateNumber((double)(int32_t)val);
738 }
739
740 case AVRO_LONG: {
741 int64_t val = 0;
742 if (avro_decode_long(reader, &val) != TRUE) return NULL;
743 return cJSON_CreateNumber((double)val);
744 }
745
746 case AVRO_FLOAT: {
747 if (reader->pos + 4 > reader->size) {
748 n_log(LOG_ERR, "unexpected end of data reading float");
749 return NULL;
750 }
751 unsigned char buf[4];
752 memcpy(buf, reader->data + reader->pos, 4);
753 reader->pos += 4;
754#if BYTEORDER_ENDIAN == BYTEORDER_BIG_ENDIAN
755 unsigned char tmp;
756 tmp = buf[0];
757 buf[0] = buf[3];
758 buf[3] = tmp;
759 tmp = buf[1];
760 buf[1] = buf[2];
761 buf[2] = tmp;
762#endif
763 float val;
764 memcpy(&val, buf, 4);
765 return cJSON_CreateNumber((double)val);
766 }
767
768 case AVRO_DOUBLE: {
769 if (reader->pos + 8 > reader->size) {
770 n_log(LOG_ERR, "unexpected end of data reading double");
771 return NULL;
772 }
773 unsigned char buf[8];
774 memcpy(buf, reader->data + reader->pos, 8);
775 reader->pos += 8;
776#if BYTEORDER_ENDIAN == BYTEORDER_BIG_ENDIAN
777 unsigned char tmp;
778 tmp = buf[0];
779 buf[0] = buf[7];
780 buf[7] = tmp;
781 tmp = buf[1];
782 buf[1] = buf[6];
783 buf[6] = tmp;
784 tmp = buf[2];
785 buf[2] = buf[5];
786 buf[5] = tmp;
787 tmp = buf[3];
788 buf[3] = buf[4];
789 buf[4] = tmp;
790#endif
791 double val;
792 memcpy(&val, buf, 8);
793 return cJSON_CreateNumber(val);
794 }
795
796 case AVRO_BYTES: {
797 unsigned char* data = NULL;
798 size_t len = 0;
799 if (avro_decode_bytes_raw(reader, &data, &len) != TRUE) return NULL;
800 cJSON* result = cJSON_CreateString((const char*)data);
801 Free(data);
802 return result;
803 }
804
805 case AVRO_STRING: {
806 unsigned char* data = NULL;
807 size_t len = 0;
808 if (avro_decode_bytes_raw(reader, &data, &len) != TRUE) return NULL;
809 cJSON* result = cJSON_CreateString((const char*)data);
810 Free(data);
811 return result;
812 }
813
814 case AVRO_RECORD: {
815 cJSON* obj = cJSON_CreateObject();
816 for (size_t i = 0; i < schema->nb_fields; i++) {
817 cJSON* field_val = avro_decode_datum(reader, schema->fields[i].schema);
818 if (!field_val) {
819 cJSON_Delete(obj);
820 return NULL;
821 }
822 cJSON_AddItemToObject(obj, schema->fields[i].name, field_val);
823 }
824 return obj;
825 }
826
827 case AVRO_ENUM: {
828 int64_t idx = 0;
829 if (avro_decode_long(reader, &idx) != TRUE) return NULL;
830 if (idx < 0 || (size_t)idx >= schema->nb_symbols) {
831 n_log(LOG_ERR, "enum index %" PRId64 " out of range (0-%zu)", idx, schema->nb_symbols - 1);
832 return NULL;
833 }
834 return cJSON_CreateString(schema->symbols[(size_t)idx]);
835 }
836
837 case AVRO_ARRAY: {
838 cJSON* arr = cJSON_CreateArray();
839 int64_t block_count = 0;
840 for (;;) {
841 if (avro_decode_long(reader, &block_count) != TRUE) {
842 cJSON_Delete(arr);
843 return NULL;
844 }
845 if (block_count == 0) break;
846 if (block_count < 0) {
847 /* negative count means block size follows */
848 block_count = -block_count;
849 int64_t block_size = 0;
850 if (avro_decode_long(reader, &block_size) != TRUE) {
851 cJSON_Delete(arr);
852 return NULL;
853 }
854 (void)block_size;
855 }
856 for (int64_t i = 0; i < block_count; i++) {
857 cJSON* item = avro_decode_datum(reader, schema->items);
858 if (!item) {
859 cJSON_Delete(arr);
860 return NULL;
861 }
862 cJSON_AddItemToArray(arr, item);
863 }
864 }
865 return arr;
866 }
867
868 case AVRO_MAP: {
869 cJSON* obj = cJSON_CreateObject();
870 int64_t block_count = 0;
871 for (;;) {
872 if (avro_decode_long(reader, &block_count) != TRUE) {
873 cJSON_Delete(obj);
874 return NULL;
875 }
876 if (block_count == 0) break;
877 if (block_count < 0) {
878 block_count = -block_count;
879 int64_t block_size = 0;
880 if (avro_decode_long(reader, &block_size) != TRUE) {
881 cJSON_Delete(obj);
882 return NULL;
883 }
884 (void)block_size;
885 }
886 for (int64_t i = 0; i < block_count; i++) {
887 /* decode key */
888 unsigned char* key = NULL;
889 size_t key_len = 0;
890 if (avro_decode_bytes_raw(reader, &key, &key_len) != TRUE) {
891 cJSON_Delete(obj);
892 return NULL;
893 }
894 /* decode value */
895 cJSON* val = avro_decode_datum(reader, schema->values);
896 if (!val) {
897 Free(key);
898 cJSON_Delete(obj);
899 return NULL;
900 }
901 cJSON_AddItemToObject(obj, (const char*)key, val);
902 Free(key);
903 }
904 }
905 return obj;
906 }
907
908 case AVRO_UNION: {
909 int64_t branch_idx = 0;
910 if (avro_decode_long(reader, &branch_idx) != TRUE) return NULL;
911 if (branch_idx < 0 || (size_t)branch_idx >= schema->nb_branches) {
912 n_log(LOG_ERR, "union branch index %" PRId64 " out of range (0-%zu)", branch_idx, schema->nb_branches - 1);
913 return NULL;
914 }
915 return avro_decode_datum(reader, schema->union_branches[(size_t)branch_idx]);
916 }
917
918 case AVRO_FIXED: {
919 if (reader->pos + schema->fixed_size > reader->size) {
920 n_log(LOG_ERR, "unexpected end of data reading fixed(%zu)", schema->fixed_size);
921 return NULL;
922 }
923 char* buf = NULL;
924 Malloc(buf, char, schema->fixed_size + 1);
925 __n_assert(buf, return NULL);
926 memcpy(buf, reader->data + reader->pos, schema->fixed_size);
927 buf[schema->fixed_size] = '\0';
928 reader->pos += schema->fixed_size;
929 cJSON* result = cJSON_CreateString(buf);
930 Free(buf);
931 return result;
932 }
933 }
934
935 return NULL;
936}
937
938/* ========================================================================== */
939/* Object Container File format */
940/* ========================================================================== */
941
942N_STR* avro_encode_container(const AVRO_SCHEMA* schema, const cJSON* records) {
943 __n_assert(schema, return NULL);
944 __n_assert(records, return NULL);
945
946 if (!cJSON_IsArray(records)) {
947 n_log(LOG_ERR, "records must be a JSON array");
948 return NULL;
949 }
950
951 N_STR* output = NULL;
952 nstrprintf(output, "%s", "");
953 __n_assert(output, return NULL);
954
955 /* 1. Magic bytes */
956 nstrcat_ex(&output, (void*)AVRO_MAGIC, AVRO_MAGIC_LEN, 1);
957
958 /* 2. File metadata as Avro map */
959 char* schema_json = avro_schema_to_json(schema);
960 __n_assert(schema_json, free_nstr(&output); return NULL);
961
962 /* metadata map: 1 block with 2 entries, then 0 block */
963 /* block count = 2 (avro.schema + avro.codec) */
964 avro_encode_long(&output, 2);
965
966 /* entry 1: avro.schema */
967 const char* key1 = "avro.schema";
968 avro_encode_long(&output, (int64_t)strlen(key1));
969 nstrcat_ex(&output, (void*)key1, (NSTRBYTE)strlen(key1), 1);
970 avro_encode_long(&output, (int64_t)strlen(schema_json));
971 nstrcat_ex(&output, (void*)schema_json, (NSTRBYTE)strlen(schema_json), 1);
972
973 /* entry 2: avro.codec = "null" (no compression) */
974 const char* key2 = "avro.codec";
975 avro_encode_long(&output, (int64_t)strlen(key2));
976 nstrcat_ex(&output, (void*)key2, (NSTRBYTE)strlen(key2), 1);
977 const char* codec = "null";
978 avro_encode_long(&output, (int64_t)strlen(codec));
979 nstrcat_ex(&output, (void*)codec, (NSTRBYTE)strlen(codec), 1);
980
981 /* end of metadata map */
982 avro_encode_long(&output, 0);
983
984 Free(schema_json);
985
986 /* 3. Generate sync marker (16 bytes) */
987 unsigned char sync[AVRO_SYNC_LEN];
988 srand((unsigned)time(NULL));
989 for (int i = 0; i < AVRO_SYNC_LEN; i++) {
990 sync[i] = (unsigned char)(rand() % 256);
991 }
992 nstrcat_ex(&output, sync, AVRO_SYNC_LEN, 1);
993
994 /* 4. Data block: encode all records into a single block */
995 int count = cJSON_GetArraySize(records);
996
997 N_STR* block_data = NULL;
998 nstrprintf(block_data, "%s", "");
999 __n_assert(block_data, free_nstr(&output); return NULL);
1000
1001 for (int i = 0; i < count; i++) {
1002 const cJSON* record = cJSON_GetArrayItem(records, i);
1003 if (avro_encode_datum(&block_data, schema, record) != TRUE) {
1004 n_log(LOG_ERR, "failed to encode record %d", i);
1005 free_nstr(&block_data);
1006 free_nstr(&output);
1007 return NULL;
1008 }
1009 }
1010
1011 /* write block header: object count, then byte size of serialized data */
1012 avro_encode_long(&output, (int64_t)count);
1013 avro_encode_long(&output, (int64_t)block_data->written);
1014
1015 /* write serialized data */
1016 nstrcat_ex(&output, block_data->data, (NSTRBYTE)block_data->written, 1);
1017 free_nstr(&block_data);
1018
1019 /* write sync marker */
1020 nstrcat_ex(&output, sync, AVRO_SYNC_LEN, 1);
1021
1022 return output;
1023}
1024
1025cJSON* avro_decode_container(const AVRO_SCHEMA* schema, const N_STR* avro_data) {
1026 __n_assert(schema, return NULL);
1027 __n_assert(avro_data, return NULL);
1028 __n_assert(avro_data->data, return NULL);
1029
1030 AVRO_READER reader;
1031 reader.data = (const unsigned char*)avro_data->data;
1032 reader.size = avro_data->written;
1033 reader.pos = 0;
1034
1035 /* 1. Check magic */
1036 if (reader.size < AVRO_MAGIC_LEN + AVRO_SYNC_LEN) {
1037 n_log(LOG_ERR, "data too small for Avro container");
1038 return NULL;
1039 }
1040 if (memcmp(reader.data, AVRO_MAGIC, AVRO_MAGIC_LEN) != 0) {
1041 n_log(LOG_ERR, "invalid Avro magic bytes");
1042 return NULL;
1043 }
1044 reader.pos = AVRO_MAGIC_LEN;
1045
1046 /* 2. Read metadata map (skip it, we use the provided schema) */
1047 int64_t block_count = 0;
1048 for (;;) {
1049 if (avro_decode_long(&reader, &block_count) != TRUE) return NULL;
1050 if (block_count == 0) break;
1051 if (block_count < 0) {
1052 /* negative count: skip block_size bytes */
1053 int64_t block_size = 0;
1054 if (avro_decode_long(&reader, &block_size) != TRUE) return NULL;
1055 if (block_size < 0 || (size_t)block_size > reader.size - reader.pos) {
1056 n_log(LOG_ERR, "invalid metadata block size");
1057 return NULL;
1058 }
1059 reader.pos += (size_t)block_size;
1060 continue;
1061 }
1062 /* positive count: read key-value pairs */
1063 for (int64_t i = 0; i < block_count; i++) {
1064 unsigned char* key = NULL;
1065 size_t key_len = 0;
1066 if (avro_decode_bytes_raw(&reader, &key, &key_len) != TRUE) return NULL;
1067 Free(key);
1068 unsigned char* val = NULL;
1069 size_t val_len = 0;
1070 if (avro_decode_bytes_raw(&reader, &val, &val_len) != TRUE) return NULL;
1071 Free(val);
1072 }
1073 }
1074
1075 /* 3. Read sync marker */
1076 if (reader.pos + AVRO_SYNC_LEN > reader.size) {
1077 n_log(LOG_ERR, "unexpected end of data reading sync marker");
1078 return NULL;
1079 }
1080 unsigned char sync[AVRO_SYNC_LEN];
1081 memcpy(sync, reader.data + reader.pos, AVRO_SYNC_LEN);
1082 reader.pos += AVRO_SYNC_LEN;
1083
1084 /* 4. Read data blocks */
1085 cJSON* all_records = cJSON_CreateArray();
1086
1087 while (reader.pos < reader.size) {
1088 int64_t obj_count = 0;
1089 if (avro_decode_long(&reader, &obj_count) != TRUE) {
1090 cJSON_Delete(all_records);
1091 return NULL;
1092 }
1093 if (obj_count <= 0) break;
1094
1095 int64_t block_byte_size = 0;
1096 if (avro_decode_long(&reader, &block_byte_size) != TRUE) {
1097 cJSON_Delete(all_records);
1098 return NULL;
1099 }
1100
1101 if (block_byte_size < 0 || (size_t)block_byte_size > reader.size - reader.pos) {
1102 n_log(LOG_ERR, "invalid data block size: %" PRId64, block_byte_size);
1103 cJSON_Delete(all_records);
1104 return NULL;
1105 }
1106
1107 size_t block_end = reader.pos + (size_t)block_byte_size;
1108
1109 for (int64_t i = 0; i < obj_count; i++) {
1110 cJSON* record = avro_decode_datum(&reader, schema);
1111 if (!record) {
1112 n_log(LOG_ERR, "failed to decode record %" PRId64, i);
1113 cJSON_Delete(all_records);
1114 return NULL;
1115 }
1116 cJSON_AddItemToArray(all_records, record);
1117 }
1118
1119 /* verify we consumed exactly the block */
1120 if (reader.pos != block_end) {
1121 n_log(LOG_WARNING, "block size mismatch: expected pos %zu, got %zu", block_end, reader.pos);
1122 reader.pos = block_end;
1123 }
1124
1125 /* read and verify sync marker */
1126 if (reader.pos + AVRO_SYNC_LEN > reader.size) {
1127 n_log(LOG_ERR, "unexpected end of data reading block sync marker");
1128 cJSON_Delete(all_records);
1129 return NULL;
1130 }
1131 if (memcmp(reader.data + reader.pos, sync, AVRO_SYNC_LEN) != 0) {
1132 n_log(LOG_ERR, "sync marker mismatch");
1133 cJSON_Delete(all_records);
1134 return NULL;
1135 }
1136 reader.pos += AVRO_SYNC_LEN;
1137 }
1138
1139 return all_records;
1140}
1141
1142/* ========================================================================== */
1143/* File-level convenience functions */
1144/* ========================================================================== */
1145
1146int avro_json_to_file(const char* avro_filename, const char* schema_filename, const char* json_filename) {
1147 __n_assert(avro_filename, return FALSE);
1148 __n_assert(schema_filename, return FALSE);
1149 __n_assert(json_filename, return FALSE);
1150
1151 /* load schema file */
1152 N_STR* schema_str = file_to_nstr((char*)schema_filename);
1153 __n_assert(schema_str, return FALSE);
1154
1155 AVRO_SCHEMA* schema = avro_schema_parse(schema_str->data);
1156 free_nstr(&schema_str);
1157 __n_assert(schema, return FALSE);
1158
1159 /* load JSON file */
1160 N_STR* json_str = file_to_nstr((char*)json_filename);
1161 if (!json_str) {
1162 avro_schema_free(&schema);
1163 return FALSE;
1164 }
1165
1166 cJSON* json = cJSON_Parse(json_str->data);
1167 free_nstr(&json_str);
1168 if (!json) {
1169 n_log(LOG_ERR, "failed to parse JSON file %s: %s", json_filename, _str(cJSON_GetErrorPtr()));
1170 avro_schema_free(&schema);
1171 return FALSE;
1172 }
1173
1174 /* if json is a single object, wrap it in an array */
1175 cJSON* records = json;
1176 int wrapped = 0;
1177 if (cJSON_IsObject(json)) {
1178 records = cJSON_CreateArray();
1179 cJSON_AddItemToArray(records, cJSON_Duplicate(json, 1));
1180 wrapped = 1;
1181 }
1182
1183 /* encode to Avro container */
1184 N_STR* avro_data = avro_encode_container(schema, records);
1185
1186 if (wrapped) {
1187 cJSON_Delete(records);
1188 }
1189 cJSON_Delete(json);
1190 avro_schema_free(&schema);
1191
1192 if (!avro_data) {
1193 n_log(LOG_ERR, "failed to encode Avro container");
1194 return FALSE;
1195 }
1196
1197 /* write Avro file */
1198 int ret = nstr_to_file(avro_data, (char*)avro_filename);
1199 free_nstr(&avro_data);
1200
1201 if (ret != TRUE) {
1202 n_log(LOG_ERR, "failed to write Avro file %s", avro_filename);
1203 return FALSE;
1204 }
1205
1206 n_log(LOG_INFO, "wrote Avro file %s", avro_filename);
1207 return TRUE;
1208}
1209
1210int avro_file_to_json(const char* avro_filename, const char* schema_filename, const char* json_filename) {
1211 __n_assert(avro_filename, return FALSE);
1212 __n_assert(schema_filename, return FALSE);
1213 __n_assert(json_filename, return FALSE);
1214
1215 /* load schema file */
1216 N_STR* schema_str = file_to_nstr((char*)schema_filename);
1217 __n_assert(schema_str, return FALSE);
1218
1219 AVRO_SCHEMA* schema = avro_schema_parse(schema_str->data);
1220 free_nstr(&schema_str);
1221 __n_assert(schema, return FALSE);
1222
1223 /* load Avro file */
1224 N_STR* avro_data = file_to_nstr((char*)avro_filename);
1225 if (!avro_data) {
1226 avro_schema_free(&schema);
1227 return FALSE;
1228 }
1229
1230 /* decode from Avro container */
1231 cJSON* records = avro_decode_container(schema, avro_data);
1232 free_nstr(&avro_data);
1233 avro_schema_free(&schema);
1234
1235 if (!records) {
1236 n_log(LOG_ERR, "failed to decode Avro file %s", avro_filename);
1237 return FALSE;
1238 }
1239
1240 /* write JSON file */
1241 char* json_output = cJSON_Print(records);
1242 cJSON_Delete(records);
1243
1244 if (!json_output) {
1245 n_log(LOG_ERR, "failed to serialize JSON");
1246 return FALSE;
1247 }
1248
1249 N_STR* json_nstr = char_to_nstr(json_output);
1250 free(json_output);
1251
1252 if (!json_nstr) {
1253 return FALSE;
1254 }
1255
1256 int ret = nstr_to_file(json_nstr, (char*)json_filename);
1257 free_nstr(&json_nstr);
1258
1259 if (ret != TRUE) {
1260 n_log(LOG_ERR, "failed to write JSON file %s", json_filename);
1261 return FALSE;
1262 }
1263
1264 n_log(LOG_INFO, "wrote JSON file %s", json_filename);
1265 return TRUE;
1266}
1267
1268/* ========================================================================== */
1269/* N_STR in-memory convenience functions */
1270/* ========================================================================== */
1271
1273 __n_assert(schema_nstr, return NULL);
1274 __n_assert(schema_nstr->data, return NULL);
1275
1276 return avro_schema_parse(schema_nstr->data);
1277}
1278
1279N_STR* avro_nstr_json_to_avro(const N_STR* schema_nstr, const N_STR* json_nstr) {
1280 __n_assert(schema_nstr, return NULL);
1281 __n_assert(schema_nstr->data, return NULL);
1282 __n_assert(json_nstr, return NULL);
1283 __n_assert(json_nstr->data, return NULL);
1284
1285 AVRO_SCHEMA* schema = avro_schema_parse(schema_nstr->data);
1286 __n_assert(schema, return NULL);
1287
1288 cJSON* json = cJSON_Parse(json_nstr->data);
1289 if (!json) {
1290 n_log(LOG_ERR, "failed to parse JSON: %s", _str(cJSON_GetErrorPtr()));
1291 avro_schema_free(&schema);
1292 return NULL;
1293 }
1294
1295 /* if json is a single object, wrap it in an array */
1296 cJSON* records = json;
1297 int wrapped = 0;
1298 if (cJSON_IsObject(json)) {
1299 records = cJSON_CreateArray();
1300 cJSON_AddItemToArray(records, cJSON_Duplicate(json, 1));
1301 wrapped = 1;
1302 }
1303
1304 N_STR* avro_data = avro_encode_container(schema, records);
1305
1306 if (wrapped) {
1307 cJSON_Delete(records);
1308 }
1309 cJSON_Delete(json);
1310 avro_schema_free(&schema);
1311
1312 return avro_data;
1313}
1314
1315N_STR* avro_nstr_avro_to_json(const N_STR* schema_nstr, const N_STR* avro_nstr) {
1316 __n_assert(schema_nstr, return NULL);
1317 __n_assert(schema_nstr->data, return NULL);
1318 __n_assert(avro_nstr, return NULL);
1319 __n_assert(avro_nstr->data, return NULL);
1320
1321 AVRO_SCHEMA* schema = avro_schema_parse(schema_nstr->data);
1322 __n_assert(schema, return NULL);
1323
1324 cJSON* records = avro_decode_container(schema, avro_nstr);
1325 avro_schema_free(&schema);
1326
1327 if (!records) {
1328 n_log(LOG_ERR, "failed to decode Avro container");
1329 return NULL;
1330 }
1331
1332 char* json_output = cJSON_Print(records);
1333 cJSON_Delete(records);
1334
1335 if (!json_output) {
1336 n_log(LOG_ERR, "failed to serialize JSON");
1337 return NULL;
1338 }
1339
1340 N_STR* result = char_to_nstr(json_output);
1341 free(json_output);
1342
1343 return result;
1344}
char * key
size_t nb_symbols
number of enum symbols
Definition n_avro.h:106
char * name
field name
Definition n_avro.h:78
AVRO_SCHEMA ** union_branches
union branch schemas
Definition n_avro.h:100
char * name
name (for record, enum, fixed)
Definition n_avro.h:88
AVRO_SCHEMA * items
item schema (for array)
Definition n_avro.h:96
AVRO_SCHEMA * values
value schema (for map)
Definition n_avro.h:98
AVRO_FIELD * fields
namespace (for record, enum)
Definition n_avro.h:90
size_t pos
current position
Definition n_avro.h:118
size_t fixed_size
fixed size
Definition n_avro.h:108
AVRO_SCHEMA * schema
field schema
Definition n_avro.h:80
AVRO_TYPE type
schema type
Definition n_avro.h:86
const unsigned char * data
data buffer
Definition n_avro.h:114
size_t nb_branches
number of union branches
Definition n_avro.h:102
char ** symbols
enum symbols
Definition n_avro.h:104
size_t nb_fields
number of fields (for record)
Definition n_avro.h:94
size_t size
total size
Definition n_avro.h:116
N_STR * avro_nstr_json_to_avro(const N_STR *schema_nstr, const N_STR *json_nstr)
Convert JSON N_STR to Avro N_STR using schema N_STR (all in-memory)
Definition n_avro.c:1279
#define AVRO_SYNC_LEN
Avro sync marker length.
Definition n_avro.h:52
AVRO_TYPE
Avro schema type enumeration.
Definition n_avro.h:55
#define AVRO_MAGIC_LEN
Avro magic length.
Definition n_avro.h:50
int avro_file_to_json(const char *avro_filename, const char *schema_filename, const char *json_filename)
Read an Avro object container file and produce a JSON file using a schema file.
Definition n_avro.c:1210
N_STR * avro_encode_container(const AVRO_SCHEMA *schema, const cJSON *records)
Encode a cJSON array of records into Avro container format N_STR.
Definition n_avro.c:942
AVRO_SCHEMA * avro_schema_from_cjson(const cJSON *json)
Parse an Avro schema from a cJSON object.
Definition n_avro.c:73
int avro_encode_datum(N_STR **dest, const AVRO_SCHEMA *schema, const cJSON *json)
Encode a cJSON value as Avro binary according to schema.
Definition n_avro.c:475
AVRO_SCHEMA * avro_schema_parse_nstr(const N_STR *schema_nstr)
Parse schema from N_STR.
Definition n_avro.c:1272
AVRO_SCHEMA * avro_schema_parse(const char *json_str)
Parse an Avro schema from a JSON string.
Definition n_avro.c:211
int avro_decode_long(AVRO_READER *reader, int64_t *value)
Decode a zig-zag varint from reader into a 64-bit signed integer.
Definition n_avro.c:394
cJSON * avro_decode_container(const AVRO_SCHEMA *schema, const N_STR *avro_data)
Decode an Avro container format N_STR into a cJSON array of records.
Definition n_avro.c:1025
cJSON * avro_decode_datum(AVRO_READER *reader, const AVRO_SCHEMA *schema)
Decode an Avro binary datum into cJSON according to schema.
Definition n_avro.c:717
N_STR * avro_nstr_avro_to_json(const N_STR *schema_nstr, const N_STR *avro_nstr)
Convert Avro N_STR to JSON N_STR using schema N_STR (all in-memory)
Definition n_avro.c:1315
char * avro_schema_to_json(const AVRO_SCHEMA *schema)
Convert an Avro schema back to JSON string (caller must free)
Definition n_avro.c:258
void avro_schema_free(AVRO_SCHEMA **schema_ptr)
Free an Avro schema.
Definition n_avro.c:222
int avro_encode_long(N_STR **dest, int64_t value)
Encode a 64-bit signed integer as zig-zag varint into N_STR.
Definition n_avro.c:376
#define AVRO_MAGIC
Avro object container file magic bytes.
Definition n_avro.h:48
int avro_json_to_file(const char *avro_filename, const char *schema_filename, const char *json_filename)
Write an Avro object container file from a JSON file and schema file.
Definition n_avro.c:1146
@ AVRO_ENUM
Definition n_avro.h:65
@ AVRO_MAP
Definition n_avro.h:67
@ AVRO_FIXED
Definition n_avro.h:69
@ AVRO_UNION
Definition n_avro.h:68
@ AVRO_ARRAY
Definition n_avro.h:66
@ AVRO_FLOAT
Definition n_avro.h:60
@ AVRO_DOUBLE
Definition n_avro.h:61
@ AVRO_BOOLEAN
Definition n_avro.h:57
@ AVRO_INT
Definition n_avro.h:58
@ AVRO_STRING
Definition n_avro.h:63
@ AVRO_LONG
Definition n_avro.h:59
@ AVRO_NULL
Definition n_avro.h:56
@ AVRO_BYTES
Definition n_avro.h:62
@ AVRO_RECORD
Definition n_avro.h:64
Avro schema field (for records)
Definition n_avro.h:76
Avro read cursor for decoding.
Definition n_avro.h:112
Avro schema definition.
Definition n_avro.h:84
#define FreeNoLog(__ptr)
Free Handler without log.
Definition n_common.h:271
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
Definition n_common.h:203
#define __n_assert(__ptr, __ret)
macro to assert things
Definition n_common.h:278
#define _str(__PTR)
define true
Definition n_common.h:192
#define Free(__ptr)
Free Handler to get errors.
Definition n_common.h:262
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
Definition n_log.h:88
#define LOG_ERR
error conditions
Definition n_log.h:75
#define LOG_WARNING
warning conditions
Definition n_log.h:77
#define LOG_INFO
informational
Definition n_log.h:81
size_t written
size of the written data inside the string
Definition n_str.h:66
char * data
the string
Definition n_str.h:62
size_t NSTRBYTE
N_STR base unit.
Definition n_str.h:57
#define free_nstr(__ptr)
free a N_STR structure and set the pointer to NULL
Definition n_str.h:201
#define local_strdup(__src_)
Do tar(1) matching rules, which ignore a trailing slash?
Definition n_str.h:77
int nstr_to_file(N_STR *str, char *filename)
Write a N_STR content into a file.
Definition n_str.c:416
N_STR * nstrcat_ex(N_STR **dest, void *src, NSTRBYTE size, int resize_flag)
Append data into N_STR using internal N_STR size and cursor position.
Definition n_str.c:1081
N_STR * char_to_nstr(const char *src)
Convert a char into a N_STR, short version.
Definition n_str.c:254
#define nstrprintf(__nstr_var, __format,...)
Macro to quickly allocate and sprintf to N_STR.
Definition n_str.h:115
N_STR * file_to_nstr(char *filename)
Load a whole file into a N_STR.
Definition n_str.c:286
A box including a string and his lenght.
Definition n_str.h:60
static int avro_encode_bytes_raw(N_STR **dest, const unsigned char *data, size_t len)
helper: encode raw bytes with length prefix
Definition n_avro.c:425
static AVRO_TYPE avro_type_from_string(const char *type_str)
helper to get AVRO_TYPE from a type name string
Definition n_avro.c:37
static int avro_decode_bytes_raw(AVRO_READER *reader, unsigned char **out, size_t *out_len)
helper: read raw bytes with length prefix
Definition n_avro.c:695
static const char * avro_type_to_string(AVRO_TYPE type)
helper to get type name from AVRO_TYPE
Definition n_avro.c:50
static int avro_find_union_branch(const AVRO_SCHEMA *schema, const cJSON *json)
helper: find which union branch matches a cJSON value
Definition n_avro.c:435
Avro binary format encoding/decoding with JSON conversion.