Commit 90ab0fcd authored by Jordan Sissel's avatar Jordan Sissel

- emit lines

parent 96d22200
#define _XOPEN_SOURCE 500 /* for useconds_t */
#define _BSD_SOURCE
#include <string.h> /* for strsep, etc */
#include "harvester.h"
#include <string.h> /* for strerror(3) */
#include <errno.h> /* for errno */
......@@ -44,7 +45,6 @@ void *harvest(void *arg) {
backoff_clear(&sleeper);
int offset = 0;
zmq_msg_t message;
for (;;) {
bytes = read(fd, buf + offset, BUFFERSIZE - offset);
if (bytes < 0) {
......@@ -55,6 +55,26 @@ void *harvest(void *arg) {
} else {
backoff_clear(&sleeper);
/* For each line, emit. Save the remainder */
char *line;
char *septok = buf;
char *start = NULL;
while (start = septok, (line = strsep(&septok, "\n")) != NULL) {
if (septok == NULL) {
/* last token found, no terminator though */
offset = start - line;
strcpy(buf + offset, buf);
} else {
/* emit line as an event */
zmq_msg_t message;
zmq_msg_init_data(&message, line, septok - start - 1, NULL, NULL);
rc = zmq_send(socket, &message, 0);
insist(rc == 0, "zmq_send() failed: %s", zmq_strerror(rc));
zmq_msg_close(&message);
}
};
/* Find newlines, emit an event */
/* Event contents:
* - file
......@@ -64,10 +84,6 @@ void *harvest(void *arg) {
* Pick a serialization? msgpack?
* host+file+message
*/
zmq_msg_init_data(&message, buf, bytes + offset, NULL, NULL);
rc = zmq_send(socket, &message, 0);
insist(rc == 0, "zmq_send() failed: %s", zmq_strerror(rc));
zmq_msg_close(&message);
}
} /* loop forever, reading from a file */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment