Commit a5ada677 authored by Jordan Sissel's avatar Jordan Sissel

use msgpack

parent 6d8b0937
......@@ -2,9 +2,12 @@ VERSION=0.0.1
CFLAGS+=-Ibuild/include
CFLAGS+=-D_POSIX_C_SOURCE=199309 -std=c99 -Wall -Wextra -Werror -pipe -g
# msgpack fails to compile without this.
CFLAGS+=-Wno-unused-function
LDFLAGS+=-pthread
LDFLAGS+=-Lbuild/lib -Wl,-rpath,'$$ORIGIN/../lib'
LIBS=-lzmq -ljansson
LIBS=-lzmq -lmsgpack
#-ljansson
PREFIX?=/opt/lumberjack
......@@ -13,6 +16,7 @@ include Makefile.ext
clean:
-@rm -fr lumberjack unixsock *.o build
-@make -C vendor/msgpack/ clean
-@make -C vendor/jansson/ clean
-@make -C vendor/zeromq/ clean
......@@ -28,14 +32,14 @@ rpm deb:
backoff.c: backoff.h
harvester.c: harvester.h
emitter.c: emitter.h
lumberjack.c: build/include/insist.h build/include/zmq.h build/include/jansson.h
lumberjack.c: build/include/insist.h build/include/zmq.h build/include/msgpack.h
lumberjack.c: backoff.h harvester.h emitter.h
build/bin/pushpull: | build/lib/libzmq.$(LIBEXT) build/lib/libjansson.$(LIBEXT) build/bin
build/bin/pushpull: | build/lib/libzmq.$(LIBEXT) build/lib/libmsgpack.$(LIBEXT) build/bin
build/bin/pushpull: pushpull.o
$(CC) $(LDFLAGS) -o $@ $^ $(LIBS)
build/bin/lumberjack: | build/bin build/lib/libzmq.$(LIBEXT) build/lib/libjansson.$(LIBEXT)
build/bin/lumberjack: | build/bin build/lib/libzmq.$(LIBEXT) build/lib/libmsgpack.$(LIBEXT)
build/bin/lumberjack: lumberjack.o backoff.o harvester.o emitter.o
$(CC) $(LDFLAGS) -o $@ $^ $(LIBS)
@echo " => Build complete: $@"
......@@ -51,8 +55,8 @@ build/include/zmq.h build/lib/libzmq.$(LIBEXT): | build
#build/include/msgpack.h build/lib/libmsgpack.$(LIBEXT): | build
# $(MAKE) -C vendor/msgpack/ install PREFIX=$$PWD/build
build/include/jansson.h build/lib/libjansson.$(LIBEXT): | build
$(MAKE) -C vendor/jansson/ install PREFIX=$$PWD/build
build/include/msgpack.h build/lib/libmsgpack.$(LIBEXT): | build
$(MAKE) -C vendor/msgpack/ install PREFIX=$$PWD/build
build:
mkdir $@
......
......@@ -57,8 +57,8 @@ void *emitter(void *arg) {
rc = zmq_recv(socket, &message, 0);
insist(rc == 0, "zmq_recv(%s) failed (returned %d): %s",
config->zmq_endpoint, rc, zmq_strerror(errno));
//printf("received: %.*s\n", (int)zmq_msg_size(&message),
//(char *)zmq_msg_data(&message));
printf("received: %.*s\n", (int)zmq_msg_size(&message),
(char *)zmq_msg_data(&message));
/* TODO(sissel): ship this out to a remote server */
zmq_msg_close(&message);
......
......@@ -6,7 +6,7 @@
#include <arpa/inet.h> /* for ntohl */
#include <stdio.h> /* printf and friends */
#include <zmq.h> /* zeromq messaging library */
#include <jansson.h> /* jansson JSON library */
#include <msgpack.h> /* msgpack serialization library */
#include "harvester.h"
#include "backoff.h"
......@@ -57,11 +57,6 @@ void *harvest(void *arg) {
int64_t hwm = 100;
zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm));
json_t *event = json_object();
/* HOSTNAME is set globally by lumberjack.c */
json_object_set_new(event, "host", json_string(hostname));
json_object_set_new(event, "file", json_string(config->path));
/* Wait for the zmq endpoint to be up (wait for connect to succeed) */
for (;;) {
rc = zmq_connect(socket, config->zmq_endpoint);
......@@ -92,8 +87,6 @@ void *harvest(void *arg) {
char *line;
char *septok = buf;
char *start = NULL;
json_t *line_obj = json_string(NULL);
json_object_set(event, "line", line_obj);
while (start = septok, (line = strsep(&septok, "\n")) != NULL) {
if (septok == NULL) {
/* last token found, no terminator though */
......@@ -101,51 +94,23 @@ void *harvest(void *arg) {
memmove(buf + offset, buf, strlen(buf + offset));
} else {
/* emit line as an event */
char *serialized;
size_t serialized_len;
/* TODO(sissel): skip if line_obj is null (means line was invalid UTF-8) */
/* TODO(sissel): include file offset */
#ifdef SERIALIZE_JSON
/* serialize to json */
json_string_set(line_obj, line);
serialized = json_dumps(event, 0);
serialized_len = strlen(serialized_len);
//json_object_del(line_obj, "line");
//json_decref(line_obj);
#endif
size_t line_len = septok - start;
/** SERIALIZING MY WAY */
#ifdef SERIALIZE
int32_t length = 0;
char *pos; /* moving pointer for writing */
serialized_len = sizeof(length) + hostname_len + sizeof(length) + path_len
+ sizeof(length) + (septok-start);
serialized = malloc(serialized_len);
pos = serialized;
/* write length + hostname */
length = ntohl(hostname_len);
memcpy(pos, &length, sizeof(length)); pos += sizeof(length);
memcpy(pos, hostname, hostname_len); pos += hostname_len;
/* write length + file path */
length = ntohl(path_len);
memcpy(pos, &length, sizeof(length)); pos += sizeof(length);
memcpy(pos, config->path, path_len); pos += path_len;
/* write length + line */
length = ntohl(septok - start);
memcpy(pos, &length, sizeof(length)); pos += sizeof(length);
memcpy(pos, line, septok - start); pos += septok - start;
#endif
msgpack_sbuffer *sbuffer = msgpack_sbuffer_new();
msgpack_packer *packer = msgpack_packer_new(sbuffer, msgpack_sbuffer_write);
msgpack_pack_int(packer, 1); /* version */
serialized = line;
serialized_len = septok - start;
msgpack_pack_map(packer, 3); /* host, file, message */
msgpack_pack_raw(packer, hostname_len);
msgpack_pack_raw_body(packer, hostname, hostname_len);
msgpack_pack_raw(packer, path_len);
msgpack_pack_raw_body(packer, config->path, path_len);
msgpack_pack_raw(packer, line_len);
msgpack_pack_raw_body(packer, line, line_len);
zmq_msg_t event;
//zmq_msg_init_data(&event, serialized, serialized_len, free2, NULL);
zmq_msg_init_data(&event, serialized, serialized_len, NULL, NULL);
zmq_msg_init_data(&event, sbuffer->data, sbuffer->size, NULL, NULL);
rc = zmq_send(socket, &event, 0);
insist(rc == 0, "zmq_send(event) failed: %s", zmq_strerror(rc));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment