Commit 992c166b authored by Jordan Sissel's avatar Jordan Sissel

- emit json messages over zmq

parent 07d3df22
#define _BSD_SOURCE
#define _XOPEN_SOURCE 500 /* for useconds_t */
#include <string.h> /* for strsep, etc */
#include <string.h> /* for strerror(3) */
#include <errno.h> /* for errno */
#include <fcntl.h> /* for open(2) */
#include <unistd.h> /* for close, etc */
#include <stdio.h> /* printf and friends */
#include <zmq.h>
#include <zmq.h> /* zeromq messaging library */
#include <jansson.h> /* jansson JSON library */
#include "harvester.h"
#include "backoff.h"
#include "insist.h"
extern const char * HOSTNAME; /* lumberjack.c */
#define EMITTER_SOCKET "inproc://emitter"
#define BUFFERSIZE 16384
static struct timespec min_sleep = { 0, 10000000 }; /* 10ms */
static struct timespec max_sleep = { 15, 0 }; /* 15 */
void *harvest(void *arg) {
struct harvest_config *config = arg;
int fd;
int rc;
/* Make this so we only call it once. */
char hostname[200];
gethostname(hostname, sizeof(hostname));
fd = open(config->path, O_RDONLY);
insist(fd >= 0, "open(%s) failed: %s", config->path, strerror(errno));
......@@ -29,11 +35,17 @@ void *harvest(void *arg) {
buf = calloc(BUFFERSIZE, sizeof(char));
struct backoff sleeper;
backoff_init(&sleeper, 10000 /* 10ms */, 15000000 /* 15 seconds */);
backoff_init(&sleeper, &min_sleep, &max_sleep);
void *socket = zmq_socket(config->zmq, ZMQ_PUSH);
insist(socket != NULL, "zmq_socket() failed: %s", strerror(errno));
json_t *event = json_object();
/* HOSTNAME is set globally by lumberjack.c */
json_object_set_new(event, "host", json_string(hostname));
json_object_set_new(event, "file", json_string(config->path));
/* Wait for the zmq endpoint to be up (wait for connect to succeed) */
for (;;) {
rc = zmq_connect(socket, config->zmq_endpoint);
if (rc != 0 && errno == ECONNREFUSED) {
......@@ -71,7 +83,23 @@ void *harvest(void *arg) {
} else {
/* emit line as an event */
zmq_msg_t message;
zmq_msg_init_data(&message, line, septok - start - 1, NULL, NULL);
char *serialized;
json_t *line_obj = json_string(line);
/* TODO(sissel): skip if line_obj is null (means line was invalid UTF-8) */
json_object_set(event, "line", line_obj);
/* TODO(sissel): include file offset */
/* serialize to json */
serialized = json_dumps(event, 0);
/* Purge the 'line' from the event object so it'll be freed */
json_object_del(event, "line");
json_decref(line_obj);
/* Ship it off to zeromq */
//zmq_msg_init_data(&message, line, septok - start - 1, NULL, NULL);
zmq_msg_init_data(&message, serialized, strlen(serialized) + 1, NULL, NULL);
rc = zmq_send(socket, &message, 0);
insist(rc == 0, "zmq_send() failed: %s", zmq_strerror(rc));
zmq_msg_close(&message);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment