Commit c5f1aad9 authored by Jordan Sissel's avatar Jordan Sissel

- purge C stuff; it's in a separate branch now

parent 3b0e5e04
VERSION=0.0.30
# By default, all dependencies (zeromq, etc) will be downloaded and installed
# locally. You can change this if you are deploying your own.
VENDOR?=zeromq jemalloc openssl zlib
# Where to install to.
PREFIX?=/opt/lumberjack
FETCH=sh fetch.sh
CFLAGS+=-D_POSIX_C_SOURCE=199309 -std=c99 -Wall -Wextra -Werror -pipe
CFLAGS+=-g
CFLAGS+=-Wno-unused-function
LDFLAGS+=-pthread
LIBS=-lzmq -ljemalloc -lssl -lcrypto -luuid -lz
MAKE?=make
CFLAGS+=-Ibuild/include
LDFLAGS+=-Lbuild/lib -Wl,-rpath,'$$ORIGIN/../lib'
default: build-all
build-all: build/bin/lumberjack build/bin/lumberjack.sh
include Makefile.ext
ifeq ($(UNAME),Linux)
# clock_gettime is in librt on linux.
LIBS+=-lrt
endif
clean:
-@rm -fr lumberjack unixsock *.o build
vendor-clean:
-$(MAKE) -C vendor/msgpack/ clean
-$(MAKE) -C vendor/jansson/ clean
-$(MAKE) -C vendor/jemalloc/ clean
-$(MAKE) -C vendor/libuuid/ clean
-$(MAKE) -C vendor/zeromq/ clean
-$(MAKE) -C vendor/zlib/ clean
-$(MAKE) -C vendor/apr/ clean
rpm deb: | build-all
fpm -s dir -t $@ -n lumberjack -v $(VERSION) --prefix /opt/lumberjack \
--exclude '*.a' --exclude 'lib/pkgconfig/zlib.pc' -C build \
--description "a log shipping tool" \
--url "https://github.com/jordansissel/lumberjack" \
bin/lumberjack bin/lumberjack.sh lib
#install: build/bin/lumberjack build/lib/libzmq.$(LIBEXT)
# install -d -m 755 build/bin/* $(PREFIX)/bin/lumberjack
# install -d build/lib/* $(PREFIX)/lib
flog.o: flog.c flog.h
strlist.o: strlist.h
emitter.o: strlist.h
backoff.o: backoff.c backoff.h
harvester.o: harvester.c harvester.h proto.h str.h sleepdefs.h flog.h
emitter.o: emitter.c emitter.h ring.h sleepdefs.h flog.h
lumberjack.o: lumberjack.c backoff.h harvester.h emitter.h flog.h
str.o: str.c str.h
proto.o: proto.c proto.h str.h sleepdefs.h flog.h
ring.o: ring.c ring.h
harvester.o: build/include/insist.h
lumberjack.o: build/include/insist.h
# Vendor'd dependencies
# If VENDOR contains 'zeromq' download and build it.
ifeq ($(filter zeromq,$(VENDOR)),zeromq)
emitter.o: build/include/zmq.h
harvester.o: build/include/zmq.h
lumberjack.o: build/include/zmq.h
build/bin/lumberjack: | build/bin build/lib/libzmq.$(LIBEXT)
endif # zeromq
ifeq ($(filter jemalloc,$(VENDOR)),jemalloc)
harvester.o lumberjack.o ring.o str.o: build/include/jemalloc/jemalloc.h
build/bin/lumberjack: | build/lib/libjemalloc.$(LIBEXT)
endif # jemalloc
ifeq ($(filter openssl,$(VENDOR)),openssl)
proto.o: build/include/openssl/ssl.h
lumberjack.o: build/include/openssl/ssl.h
build/bin/lumberjack: | build/lib/libssl.$(LIBEXT)
build/bin/lumberjack: | build/lib/libcrypto.$(LIBEXT)
endif # openssl
ifeq ($(filter zlib,$(VENDOR)),zlib)
proto.o: build/include/zlib.h
build/bin/lumberjack: | build/lib/libz.$(LIBEXT)
endif # zlib
.PHONY: test
test: | build/test/test_ring
build/test/test_ring
# Tests
test_ring.o: ring.h build/include/jemalloc/jemalloc.h build/include/insist.h
build/test/test_ring: test_ring.o ring.o | build/test
$(CC) $(LDFLAGS) -o $@ $^ -ljemalloc
build/bin/lumberjack.sh: lumberjack.sh | build/bin
install -m 755 $^ $@
build/bin/lumberjack: | build/bin
build/bin/lumberjack: lumberjack.o backoff.o harvester.o emitter.o str.o proto.o ring.o strlist.o flog.o
$(CC) $(LDFLAGS) -o $@ $^ $(LIBS)
@echo " => Build complete: $@"
@echo " => Run '$(MAKE) rpm' to build an rpm (or deb or tarball)"
build/include/insist.h: | build/include
PATH=$$PWD:$$PATH fetch.sh -o $@ https://raw.github.com/jordansissel/experiments/master/c/better-assert/insist.h
build/include/zmq.h build/lib/libzmq.$(LIBEXT): | build
@echo " => Building zeromq"
PATH=$$PWD:$$PATH $(MAKE) -C vendor/zeromq/ install PREFIX=$$PWD/build DEBUG=$(DEBUG)
build/include/msgpack.h build/lib/libmsgpack.$(LIBEXT): | build
@echo " => Building msgpack"
PATH=$$PWD:$$PATH $(MAKE) -C vendor/msgpack/ install PREFIX=$$PWD/build DEBUG=$(DEBUG)
build/include/jemalloc/jemalloc.h build/lib/libjemalloc.$(LIBEXT): | build
@echo " => Building jemalloc"
PATH=$$PWD:$$PATH $(MAKE) -C vendor/jemalloc/ install PREFIX=$$PWD/build DEBUG=$(DEBUG)
build/include/sodium/sodium.h build/lib/libsodium.$(LIBEXT): | build
@echo " => Building libsodium"
PATH=$$PWD:$$PATH $(MAKE) -C vendor/libsodium/ install PREFIX=$$PWD/build DEBUG=$(DEBUG)
build/include/lz4.h build/lib/liblz4.$(LIBEXT): | build
@echo " => Building lz4"
PATH=$$PWD:$$PATH $(MAKE) -C vendor/lz4/ install PREFIX=$$PWD/build DEBUG=$(DEBUG)
build/include/zlib.h build/lib/libz.$(LIBEXT): | build
@echo " => Building zlib"
PATH=$$PWD:$$PATH $(MAKE) -C vendor/zlib/ install PREFIX=$$PWD/build DEBUG=$(DEBUG)
build/include/openssl/ssl.h build/lib/libssl.$(LIBEXT) build/lib/libcrypto.$(LIBEXT): | build
@echo " => Building openssl"
PATH=$$PWD:$$PATH $(MAKE) -C vendor/openssl install PREFIX=$$PWD/build DEBUG=$(DEBUG)
build/include/apr-1/apr.h build/lib/libapr-1.$(LIBEXT): | build
@echo " => Building apr"
PATH=$$PWD:$$PATH $(MAKE) -C vendor/apr install PREFIX=$$PWD/build DEBUG=$(DEBUG)
build:
mkdir $@
build/include build/bin build/test: | build
mkdir $@
# Protocol
The needs that lead to this protocol are:
* Encryption amd Authentication to protect
* Compression should be used to reduce bandwidth
* Round-trip latency should not damage throughput
* Application-level message acknowledgement
## Implementation Considerations
# Lumberjack Protocol v1
## Behavior
Sequence and ack behavior (including sliding window, etc) is similar to TCP,
but instead of bytes, messages are the base unit.
A writer with a window size of 50 events can send up to 50 unacked events
before blocking. A reader can acknowledge the 'last event' received to
support bulk acknowledgements.
Reliable, ordered byte transport is ensured by using TCP (or TLS on top), and
this protocol aims to provide reliable, application-level, message transport.
## Encryption and Authentication
Currently this is to be handled by TLS.
## Wire Format
### Layering
This entire protocol is built to be layered on top of TCP or TLS.
### Framing
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+---------------+---------------+-------------------------------+
| version(1) | frame type | payload ... |
+---------------------------------------------------------------+
| payload continued... |
+---------------------------------------------------------------+
### 'data' frame type
* SENT FROM WRITER ONLY
* frame type value: ASCII 'D' aka byte value 0x44
data is a map of string:string pairs. This is analogous to a Hash in Ruby, a
JSON map, etc, but only strings are supported at this time.
Payload:
* 32bit unsigned sequence number
* 32bit 'pair' count (how many key/value sequences follow)
* 32bit unsigned key length followed by that many bytes for the key
* 32bit unsigned value length followed by that many bytes for the value
* repeat key/value 'count' times.
Sequence number roll-over: If you receive a sequence number less than the
previous value, this signals that the sequence number has rolled over.
### 'ack' frame type
* SENT FROM READER ONLY
* frame type value: ASCII 'A' aka byte value 0x41
Payload:
* 32bit unsigned sequence number.
Bulk acks are supported. If you receive data frames in sequence order
1,2,3,4,5,6, you can send an ack for '6' and the writer will take this to
mean you are acknowledging all data frames before and including '6'.
### 'window size' frame type
* SENT FROM WRITER ONLY
* frame type value: ASCII 'W' aka byte value 0x57
Payload:
* 32bit unsigned window size value in units of whole data frames.
This frame is used to tell the reader the maximum number of unacknowledged
data frames the writer will send before blocking for acks.
### 'compressed' frame type
* SENT FROM WRITER ONLY
* frame type value: ASCII 'C' aka byte value 0x43
Payload:
* 32bit unsigned payload length
* 'length' bytes of compressed payload
This frame type allows you to compress many frames into a single compressed
envelope and is useful for efficiently compressing many small data frames.
The compressed payload MUST contain full frames only, not partial frames.
The uncompressed payload MUST be a valid frame stream by itself. As an example,
you could have 3 data frames compressed into a single 'compressed' frame type:
1D{k,v}{k,v}1D{k,v}{k,v}1D{k,v}{k,v} - when uncompressed, you should process
the uncompressed payload as you would reading uncompressed frames from the
network.
TODO(sissel): It's likely this model is suboptimal, instead choose to
use whole-stream compression z_stream in zlib (Zlib::ZStream in ruby) might be
preferable.
//#define _POSIX_C_SOURCE 199309L /* for struct timespec */
#include <stdio.h>
#include "backoff.h"
#define MAX_TV_NSEC 999999999L
static inline void timespec_double(struct timespec *t) {
/* Exponential backoff */
t->tv_sec <<= 1;
t->tv_nsec <<= 1;
if (t->tv_nsec > MAX_TV_NSEC) {
/* handle carry/overflow of tv_nsec */
t->tv_nsec -= (MAX_TV_NSEC + 1);
t->tv_sec += 1;
}
} /* timespec_double */
static inline long timespec_compare(struct timespec *a, struct timespec *b) {
time_t val;
val = a->tv_sec - b->tv_sec;
if (val != 0) {
return val;
}
return a->tv_nsec - b->tv_nsec;
} /* timespec_compare */
static inline void timespec_copy(struct timespec *source, struct timespec *dest) {
/* TODO(sissel): Could use memcpy here instead... */
dest->tv_sec = source->tv_sec;
dest->tv_nsec = source->tv_nsec;
} /* timespec_copy */
inline void backoff_clear(struct backoff *b) {
timespec_copy(&b->min, &b->sleep);
} /* backoff_clear */
inline void backoff_init(struct backoff *b, struct timespec *min,
struct timespec *max) {
timespec_copy(min, &b->min);
timespec_copy(max, &b->max);
backoff_clear(b);
} /* backoff_init */
inline void backoff(struct backoff *b) {
//printf("Sleeping %ld.%09ld\n", b->sleep.tv_sec, b->sleep.tv_nsec);
nanosleep(&b->sleep, NULL);
/* Exponential backoff */
timespec_double(&b->sleep);
//printf("Candidate vs max: %ld.%09ld vs %ld.%09ld: %ld\n",
//b->sleep.tv_sec, b->sleep.tv_nsec,
//b->max.tv_sec, b->max.tv_nsec,
//timespec_compare(&b->sleep, &b->max));
//printf("tv_sec: %ld\n", b->sleep.tv_sec - b->max.tv_sec);
//printf("tv_nsec: %ld\n", b->sleep.tv_nsec - b->max.tv_nsec);
/* Cap at 'max' if sleep time exceeds it */
if (timespec_compare(&b->sleep, &b->max) > 0) {
timespec_copy(&b->max, &b->sleep);
}
} /* backoff_sleep */
#ifndef _BACKOFF_H_
#define _BACKOFF_H_
#include <time.h>
struct backoff {
struct timespec max;
struct timespec min;
struct timespec sleep;
};
/* Initialize a backoff struct with a max value */
void backoff_init(struct backoff *b, struct timespec *min, struct timespec *max);
/* Execute a backoff. This will sleep for a time.
* The next backoff() call will sleep twice as long (or the max value,
* whichever is smaller) */
void backoff(struct backoff *b);
/* Reset the next backoff() call to sleep the minimum (1ms) */
void backoff_clear(struct backoff *b);
#endif /* _BACKOFF_H_ */
#ifndef _CLOCK_GETTIME_H_
#define _CLOCK_GETTIME_H_
#include <time.h> /* struct timespec, clock_gettime */
// copied mostly from https://gist.github.com/1087739
/* OS X doesn't have clock_gettime, sigh. */
#ifdef __MACH__
#include <mach/clock.h>
#include <mach/mach.h>
typedef int clockid_t;
#define CLOCK_MONOTONIC 1
static long clock_gettime(clockid_t __attribute__((unused)) which_clock, struct timespec *tp) {
clock_serv_t cclock;
mach_timespec_t mts;
host_get_clock_service(mach_host_self(), REALTIME_CLOCK, &cclock);
clock_get_time(cclock, &mts);
mach_port_deallocate(mach_task_self(), cclock);
tp->tv_sec = mts.tv_sec;
tp->tv_nsec = mts.tv_nsec;
return 0; /* success, according to clock_gettime(3) */
}
#endif
// end gist copy
#endif /* _CLOCK_GETTIME_H_ */
#include <errno.h>
#include <stdint.h> /* C99 for int64_t */
#include <string.h>
#include <unistd.h>
#include "zmq.h"
#include "ring.h"
#include "emitter.h"
#include "insist.h"
#include "proto.h"
#include "backoff.h"
#include "clock_gettime.h"
#include "flog.h"
#include <sys/resource.h>
#include "zmq_compat.h"
#include "sleepdefs.h"
void *emitter(void *arg) {
int rc;
struct emitter_config *config = arg;
insist(config->zmq != NULL, "zmq is NULL");
insist(config->zmq_endpoint != NULL, "zmq_endpoint is NULL");
insist(config->ssl_ca_path != NULL, "ssl_ca_path is NULL");
insist(config->window_size > 0, "window_size (%d) must be positive",
(int)config->window_size);
insist(config->host != NULL, "host is NULL");
insist(config->port > 0, "port (%hd) must be positive", config->port);
void *socket = zmq_socket(config->zmq, ZMQ_PULL);
insist(socket != NULL, "zmq_socket() failed: %s", strerror(errno));
int64_t hwm = 100;
//zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm));
zmq_compat_set_recvhwm(socket, hwm);
rc = zmq_bind(socket, config->zmq_endpoint);
insist(rc != -1, "zmq_bind(%s) failed: %s", config->zmq_endpoint,
zmq_strerror(errno));
struct timespec start;
clock_gettime(CLOCK_MONOTONIC, &start);
//long count = 0;
struct backoff sleeper;
backoff_init(&sleeper, &MIN_SLEEP, &MAX_SLEEP);
struct lumberjack *lumberjack;
lumberjack = lumberjack_new(config->host, config->port, config->window_size);
insist(lumberjack != NULL, "lumberjack_new failed");
lumberjack->ring_size = config->window_size;
if (config->ssl_ca_path != NULL) {
rc = lumberjack_set_ssl_ca(lumberjack, config->ssl_ca_path);
insist(rc == 0, "lumberjack_set_ssl_ca failed, is '%s' a valid ssl cert?",
config->ssl_ca_path);
}
unsigned long count = 0;
unsigned long bytes = 0;
unsigned long report_interval = config->window_size * 4;
zmq_pollitem_t items[1];
items[0].socket = socket;
items[0].events = ZMQ_POLLIN;
int can_flush = 0;
for (;;) {
/* Receive an event from a harvester and put it in the queue */
zmq_msg_t message;
rc = zmq_msg_init(&message);
insist(rc == 0, "zmq_msg_init failed");
rc = zmq_poll(items, 1, 1000000 /* microseconds */);
if (rc == 0) {
/* poll timeout. We're idle, so let's flush and back-off. */
if (can_flush) {
/* only flush if there's something to flush... */
flog(stdout, "flushing since nothing came in over zmq");
/* We flush here to keep slow feeders closer to real-time */
rc = lumberjack_flush(lumberjack);
can_flush = 0;
if (rc != 0) {
/* write failure, reconnect (which will resend) and such */
lumberjack_disconnect(lumberjack);
lumberjack_ensure_connected(lumberjack);
}
}
backoff(&sleeper);
/* Restart the loop - checking to see if there's any messages */
continue;
}
/* poll successful, read a message */
//rc = zmq_recv(socket, &message, 0);
rc = zmq_compat_recvmsg(socket, &message, 0);
insist(rc == 0 /*|| errno == EAGAIN */,
"zmq_recv(%s) failed (returned %d): %s",
config->zmq_endpoint, rc, zmq_strerror(errno));
/* Clear the backoff timer since we received a message successfully */
backoff_clear(&sleeper);
/* Write the data over lumberjack. This will handle any
* connection/reconnection/ack issues */
lumberjack_send_data(lumberjack, zmq_msg_data(&message),
zmq_msg_size(&message));
/* Since we sent data, let it be known that we can flush if idle */
can_flush = 1;
/* Stats for debugging */
count++;
bytes += zmq_msg_size(&message);
zmq_msg_close(&message);
if (count == report_interval) {
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
double s = (start.tv_sec + 0.0) + (start.tv_nsec / 1000000000.0);
double n = (now.tv_sec + 0.0) + (now.tv_nsec / 1000000000.0);
flog(stdout, "Rate: %f (bytes: %f)", (count + 0.0) / (n - s), (bytes + 0.0) / (n - s));
struct rusage rusage;
rc = getrusage(RUSAGE_SELF, &rusage);
insist(rc == 0, "getrusage failed: %s", strerror(errno));
flog(stdout, "cpu user/system: %d.%06d / %d.%06dn",
(int)rusage.ru_utime.tv_sec, (int)rusage.ru_utime.tv_usec,
(int)rusage.ru_stime.tv_sec, (int)rusage.ru_stime.tv_usec);
clock_gettime(CLOCK_MONOTONIC, &start);
bytes = 0;
count = 0;
}
} /* forever */
} /* emitter */
#ifndef _EMITTER_H_
#define _EMITTER_H_
struct emitter_config {
void *zmq; /* zmq context */
char *zmq_endpoint; /* inproc://whatever */
char *ssl_ca_path; /* path to trusted ssl ca, can be a directory or a file */
size_t window_size; /* the window size */
char *host;
unsigned short port;
};
void *emitter(void *arg);
#endif /* _EMITTER_H_ */
#include <stdio.h> /* for FILE, sprintf, fprintf, etc */
#include <time.h> /* for struct tm, localtime_r */
#include <sys/time.h> /* for gettimeofday */
#include <stdarg.h> /* for va_start, va_end */
void flog(FILE *stream, const char *format, ...) {
va_list args;
struct timeval tv;
struct tm tm;
char timestamp[] = "YYYY-MM-ddTHH:mm:ss.SSS+0000";
gettimeofday(&tv, NULL);
/* convert to time to 'struct tm' for use with strftime */
localtime_r(&tv.tv_sec, &tm);
/* format the time */
strftime(timestamp, sizeof(timestamp), "%Y-%m-%dT%H:%M:%S.000%z", &tm);
/* add in milliseconds, since strftime() can't do that */
/* '20' is the string offset of the millisecond value in our timestamp */
/* we have to include 'timestamp + 23' to keep the timezone value */
sprintf(timestamp + 20, "%03ld%s", tv.tv_usec / 1000, timestamp + 23);
/* print the timestamp */
fprintf(stream, "%.28s ", timestamp); /* 28 is the length of the timestamp */
/* print the log message */
va_start(args, format);
vfprintf(stream, format, args);
va_end(args);
/* print a newline */
fprintf(stream, "\n");
} /* flog */
double duration(struct timeval *start) {
struct timeval tv;
gettimeofday(&tv, NULL); /* what time is it now? */
tv.tv_sec -= start->tv_sec;
tv.tv_usec -= start->tv_usec;
if (tv.tv_usec < 0) {
tv.tv_sec -= 1;
tv.tv_usec += 1000000L;
}
return tv.tv_sec + ((double)tv.tv_usec / 1000000.0);
} /* duration */
#ifndef _FLOG_H_
#define _FLOG_H_
#include <stdio.h> /* for FILE */
#include <sys/time.h> /* for struct timeval */
void flog(FILE *stream, const char *format, ...);
double duration(struct timeval *start);
#define flog_if_slow(stream, max_duration, block, format, args...) \
{ \
struct timeval __start; \
gettimeofday(&__start, NULL); \
{ \
block \
} \
double __duration = duration(&__start); \
if (__duration >= max_duration) { \
flog(stream, "slow operation (%.3f seconds): " format , __duration, args); \
} \
}
#endif /* _FLOG_H_ */
#define _BSD_SOURCE
#include <string.h> /* for strsep, strerror, etc */
#include <errno.h> /* for errno */
#include <fcntl.h> /* for open(2) */
#include <unistd.h> /* for close, etc */
#include <arpa/inet.h> /* for ntohl */
#include <stdio.h> /* printf and friends */
#include "zmq.h" /* zeromq messaging library */
#include "str.h" /* dynamic string library */
#include "proto.h" /* lumberjack wire format serialization */
#include <sys/stat.h>
#include "jemalloc/jemalloc.h"
#include "harvester.h"
#include "backoff.h"
#include "insist.h"
#include "sleepdefs.h"
#include "flog.h"
#include "zmq_compat.h"
#ifdef __MACH__
/* OS X is dumb, or I am dumb, or we are both dumb. I don't know anymore,
* but I need to declare these explicitly even though they are defined
* in string.h, unistd.h respectively */
extern char *strsep(char **stringp, const char *delim);
extern int gethostname(char *name, size_t namelen);
#endif
#define EMITTER_SOCKET "inproc://emitter"
#define BUFFERSIZE 16384
/* A free function that simply calls free(3) for zmq_msg */
//static inline void free2(void *data, void __attribute__((__unused__)) *hint) {
//free(data);
//} /* free2 */
/* A free function for zmq_msg's with 'struct str' objects */
static inline void my_str_free(void __attribute__((__unused__)) *data, void *hint) {
str_free((struct str *)hint);
} /* my_str_free */
static void track_rotation(int *fd, const char *path);
void *harvest(void *arg) {
struct harvest_config *config = arg;
int fd;
int rc;
char hostname[200];
size_t hostname_len, path_len;
/* Make this so we only call it once. */
gethostname(hostname, sizeof(hostname));
hostname_len = strlen(hostname);
if (strcmp(config->path, "-") == 0) {
/* path is '-', use stdin */
fd = 0;
} else {
fd = open(config->path, O_RDONLY);
insist(fd >= 0, "open(%s) failed: %s", config->path, strerror(errno));
/* Start at the end of the file */
off_t seek_ret = lseek(fd, 0, SEEK_END);
insist(seek_ret >= 0, "lseek(%s, 0, SEEK_END) failed: %s",
config->path, strerror(errno));
}
path_len = strlen(config->path);
struct kv *event = calloc(3 + config->fields_len, sizeof(struct kv));
/* will fill the 'line' value in later for each line read */
event[0].key = "line"; event[0].key_len = 4;
event[0].value = NULL; event[0].value_len = 0;
event[1].key = "file"; event[1].key_len = 4;
event[1].value = config->path; event[1].value_len = path_len;
event[2].key = "host"; event[2].key_len = 4;
event[2].value = hostname; event[2].value_len = hostname_len;
for (size_t i = 0; i < config->fields_len; i++) {
memcpy(&event[i + 3], &config->fields[i], sizeof(struct kv));
}
char *buf;
ssize_t bytes;
buf = calloc(BUFFERSIZE, sizeof(char));
void *socket = zmq_socket(config->zmq, ZMQ_PUSH);
insist(socket != NULL, "zmq_socket() failed: %s", strerror(errno));
int64_t hwm = 100;
//zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm));
zmq_compat_set_sendhwm(socket, hwm);
/* Wait for the zmq endpoint to be up (wait for connect to succeed) */
struct backoff sleeper;
backoff_init(&sleeper, &MIN_SLEEP, &MAX_SLEEP);
for (;;) {
rc = zmq_connect(socket, config->zmq_endpoint);
if (rc != 0 && errno == ECONNREFUSED) {
backoff(&sleeper);
continue; /* retry */
}
insist(rc == 0, "zmq_connect(%s) failed: %s", config->zmq_endpoint,
zmq_strerror(errno));
break;
}
int offset = 0;
for (;;) {
flog_if_slow(stdout, 0.250, {
bytes = read(fd, buf + offset, BUFFERSIZE - offset - 1);
}, "read of %d bytes (got %d bytes) on '%s'",
BUFFERSIZE - offset - 1, bytes, config->path);
offset += bytes;
if (bytes < 0) {
/* error, maybe indicate a failure of some kind. */
printf("read(%d '%s') failed: %s\n", fd,
config->path, strerror(errno));
break;
} else if (bytes == 0) {
backoff(&sleeper);
if (strcmp(config->path, "-") == 0) {
/* stdin gave EOF, close out. */
break;
}
track_rotation(&fd, config->path);
} else {
/* Data read, handle it! */
backoff_clear(&sleeper);
/* For each line, emit. Save the remainder */
char *line;