Commit 5ca6f8f9 authored by Jordan Sissel's avatar Jordan Sissel

- fix some memory growth problems

  (one big one remains, will debug shortly)
parent b25708eb
#include <string.h> /* for strsep, etc */
#include <string.h> /* for strerror(3) */
#define _BSD_SOURCE
#include <string.h> /* for strsep, strerror, etc */
#include <errno.h> /* for errno */
#include <fcntl.h> /* for open(2) */
#include <unistd.h> /* for close, etc */
......@@ -18,6 +18,11 @@ extern const char * HOSTNAME; /* lumberjack.c */
static struct timespec min_sleep = { 0, 10000000 }; /* 10ms */
static struct timespec max_sleep = { 15, 0 }; /* 15 */
/* A free function that simply calls free(3) for zmq_msg */
static inline void free2(void *data, void __attribute__((__unused__)) *hint) {
free(data);
} /* free2 */
void *harvest(void *arg) {
struct harvest_config *config = arg;
int fd;
......@@ -40,6 +45,9 @@ void *harvest(void *arg) {
void *socket = zmq_socket(config->zmq, ZMQ_PUSH);
insist(socket != NULL, "zmq_socket() failed: %s", strerror(errno));
int hwm = 500;
zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm));
json_t *event = json_object();
/* HOSTNAME is set globally by lumberjack.c */
json_object_set_new(event, "host", json_string(hostname));
......@@ -62,7 +70,7 @@ void *harvest(void *arg) {
for (;;) {
/* TODO(sissel): is truncation handled? */
/* TODO(sissel): what about log rotation? */
bytes = read(fd, buf + offset, BUFFERSIZE - offset);
bytes = read(fd, buf + offset, BUFFERSIZE - offset - 1);
if (bytes < 0) {
/* error */
break;
......@@ -79,7 +87,7 @@ void *harvest(void *arg) {
if (septok == NULL) {
/* last token found, no terminator though */
offset = start - line;
strcpy(buf + offset, buf);
memmove(buf + offset, buf, strlen(buf + offset));
} else {
/* emit line as an event */
zmq_msg_t message;
......@@ -96,16 +104,16 @@ void *harvest(void *arg) {
/* Purge the 'line' from the event object so it'll be freed */
json_object_del(event, "line");
json_decref(line_obj);
free(serialized);
zmq_msg_init_data(&message, serialized, strlen(serialized), free2, NULL);
/* if I uncomment this, memory grows unbouned. */
//rc = zmq_send(socket, &message, 0);
//insist(rc == 0, "zmq_send() failed: %s", zmq_strerror(rc));
/* Ship it off to zeromq */
//zmq_msg_init_data(&message, line, septok - start - 1, NULL, NULL);
zmq_msg_init_data(&message, serialized, strlen(serialized) + 1, NULL, NULL);
rc = zmq_send(socket, &message, 0);
insist(rc == 0, "zmq_send() failed: %s", zmq_strerror(rc));
zmq_msg_close(&message);
}
};
} /* for each token */
/* Find newlines, emit an event */
/* Event contents:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment