Commit 85db39e7 authored by Jordan Sissel's avatar Jordan Sissel

try fiddling with various serialization methods

parent 902e810b
......@@ -3,6 +3,7 @@
#include <errno.h> /* for errno */
#include <fcntl.h> /* for open(2) */
#include <unistd.h> /* for close, etc */
#include <arpa/inet.h> /* for ntohl */
#include <stdio.h> /* printf and friends */
#include <zmq.h> /* zeromq messaging library */
#include <jansson.h> /* jansson JSON library */
......@@ -11,7 +12,6 @@
#include "backoff.h"
#include "insist.h"
extern const char * HOSTNAME; /* lumberjack.c */
#define EMITTER_SOCKET "inproc://emitter"
#define BUFFERSIZE 16384
......@@ -27,12 +27,15 @@ void *harvest(void *arg) {
struct harvest_config *config = arg;
int fd;
int rc;
char hostname[200];
size_t hostname_len, path_len;
/* Make this so we only call it once. */
char hostname[200];
gethostname(hostname, sizeof(hostname));
hostname_len = strlen(hostname);
fd = open(config->path, O_RDONLY);
path_len = strlen(config->path);
insist(fd >= 0, "open(%s) failed: %s", config->path, strerror(errno));
char *buf;
......@@ -72,7 +75,7 @@ void *harvest(void *arg) {
/* TODO(sissel): what about log rotation? */
bytes = read(fd, buf + offset, BUFFERSIZE - offset - 1);
if (bytes < 0) {
/* error */
/* error, maybe indicate a failure of some kind. */
break;
} else if (bytes == 0) {
backoff(&sleeper);
......@@ -83,6 +86,8 @@ void *harvest(void *arg) {
char *line;
char *septok = buf;
char *start = NULL;
json_t *line_obj = json_string(NULL);
json_object_set(event, "line", line_obj);
while (start = septok, (line = strsep(&septok, "\n")) != NULL) {
if (septok == NULL) {
/* last token found, no terminator though */
......@@ -90,26 +95,55 @@ void *harvest(void *arg) {
memmove(buf + offset, buf, strlen(buf + offset));
} else {
/* emit line as an event */
zmq_msg_t message;
char *serialized;
json_t *line_obj = json_string(line);
size_t serialized_len;
/* TODO(sissel): skip if line_obj is null (means line was invalid UTF-8) */
json_object_set(event, "line", line_obj);
/* TODO(sissel): include file offset */
#ifdef SERIALIZE_JSON
/* serialize to json */
json_string_set(line_obj, line);
serialized = json_dumps(event, 0);
/* Purge the 'line' from the event object so it'll be freed */
zmq_msg_init_data(&message, serialized, strlen(serialized), free2, NULL);
//zmq_msg_init_data(&message, line, septok - start, NULL, NULL);
rc = zmq_send(socket, &message, 0);
insist(rc == 0, "zmq_send() failed: %s", zmq_strerror(rc));
json_object_del(event, "line");
json_decref(line_obj);
zmq_msg_close(&message);
serialized_len = strlen(serialized_len);
//json_object_del(line_obj, "line");
//json_decref(line_obj);
#endif
/** SERIALIZING MY WAY */
#ifdef SERIALIZE
int32_t length = 0;
char *pos; /* moving pointer for writing */
serialized_len = sizeof(length) + hostname_len + sizeof(length) + path_len
+ sizeof(length) + (septok-start);
serialized = malloc(serialized_len);
pos = serialized;
/* write length + hostname */
length = ntohl(hostname_len);
memcpy(pos, &length, sizeof(length)); pos += sizeof(length);
memcpy(pos, hostname, hostname_len); pos += hostname_len;
/* write length + file path */
length = ntohl(path_len);
memcpy(pos, &length, sizeof(length)); pos += sizeof(length);
memcpy(pos, config->path, path_len); pos += path_len;
/* write length + line */
length = ntohl(septok - start);
memcpy(pos, &length, sizeof(length)); pos += sizeof(length);
memcpy(pos, line, septok - start); pos += septok - start;
#endif
serialized = line;
serialized_len = septok - start;
zmq_msg_t event;
//zmq_msg_init_data(&event, serialized, serialized_len, free2, NULL);
zmq_msg_init_data(&event, serialized, serialized_len, NULL, NULL);
rc = zmq_send(socket, &event, 0);
insist(rc == 0, "zmq_send(event) failed: %s", zmq_strerror(rc));
zmq_msg_close(&event);
}
} /* for each token */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment