From c5f1aad984cbd03c9a8fd63ce1534e09ef2ee81a Mon Sep 17 00:00:00 2001 From: Jordan Sissel Date: Fri, 12 Apr 2013 17:44:42 -0700 Subject: [PATCH] - purge C stuff; it's in a separate branch now --- c/Makefile | 152 -------- c/PROTOCOL.md | 112 ------ c/backoff.c | 61 ---- c/backoff.h | 22 -- c/clock_gettime.h | 26 -- c/emitter.c | 135 ------- c/emitter.h | 16 - c/flog.c | 48 --- c/flog.h | 21 -- c/harvester.c | 195 ---------- c/harvester.h | 15 - c/lumberjack.c | 266 -------------- c/proto.c | 888 ---------------------------------------------- c/proto.h | 84 ----- c/ring.c | 68 ---- c/ring.h | 29 -- c/sleepdefs.h | 5 - c/str.c | 73 ---- c/str.h | 31 -- c/strlist.c | 53 --- c/strlist.h | 15 - c/test_ring.c | 41 --- c/timespec.c | 52 --- c/timespec.h | 13 - c/unixsock.c | 29 -- c/zmq_compat.h | 18 - 26 files changed, 2468 deletions(-) delete mode 100644 c/Makefile delete mode 100644 c/PROTOCOL.md delete mode 100644 c/backoff.c delete mode 100644 c/backoff.h delete mode 100644 c/clock_gettime.h delete mode 100644 c/emitter.c delete mode 100644 c/emitter.h delete mode 100644 c/flog.c delete mode 100644 c/flog.h delete mode 100644 c/harvester.c delete mode 100644 c/harvester.h delete mode 100644 c/lumberjack.c delete mode 100644 c/proto.c delete mode 100644 c/proto.h delete mode 100644 c/ring.c delete mode 100644 c/ring.h delete mode 100644 c/sleepdefs.h delete mode 100644 c/str.c delete mode 100644 c/str.h delete mode 100644 c/strlist.c delete mode 100644 c/strlist.h delete mode 100644 c/test_ring.c delete mode 100644 c/timespec.c delete mode 100644 c/timespec.h delete mode 100644 c/unixsock.c delete mode 100644 c/zmq_compat.h diff --git a/c/Makefile b/c/Makefile deleted file mode 100644 index 8193f84..0000000 --- a/c/Makefile +++ /dev/null @@ -1,152 +0,0 @@ -VERSION=0.0.30 - -# By default, all dependencies (zeromq, etc) will be downloaded and installed -# locally. You can change this if you are deploying your own. -VENDOR?=zeromq jemalloc openssl zlib - -# Where to install to. -PREFIX?=/opt/lumberjack - -FETCH=sh fetch.sh - -CFLAGS+=-D_POSIX_C_SOURCE=199309 -std=c99 -Wall -Wextra -Werror -pipe -CFLAGS+=-g -CFLAGS+=-Wno-unused-function -LDFLAGS+=-pthread -LIBS=-lzmq -ljemalloc -lssl -lcrypto -luuid -lz - -MAKE?=make - -CFLAGS+=-Ibuild/include -LDFLAGS+=-Lbuild/lib -Wl,-rpath,'$$ORIGIN/../lib' - -default: build-all -build-all: build/bin/lumberjack build/bin/lumberjack.sh -include Makefile.ext - -ifeq ($(UNAME),Linux) -# clock_gettime is in librt on linux. -LIBS+=-lrt -endif - -clean: - -@rm -fr lumberjack unixsock *.o build - -vendor-clean: - -$(MAKE) -C vendor/msgpack/ clean - -$(MAKE) -C vendor/jansson/ clean - -$(MAKE) -C vendor/jemalloc/ clean - -$(MAKE) -C vendor/libuuid/ clean - -$(MAKE) -C vendor/zeromq/ clean - -$(MAKE) -C vendor/zlib/ clean - -$(MAKE) -C vendor/apr/ clean - -rpm deb: | build-all - fpm -s dir -t $@ -n lumberjack -v $(VERSION) --prefix /opt/lumberjack \ - --exclude '*.a' --exclude 'lib/pkgconfig/zlib.pc' -C build \ - --description "a log shipping tool" \ - --url "https://github.com/jordansissel/lumberjack" \ - bin/lumberjack bin/lumberjack.sh lib - -#install: build/bin/lumberjack build/lib/libzmq.$(LIBEXT) -# install -d -m 755 build/bin/* $(PREFIX)/bin/lumberjack -# install -d build/lib/* $(PREFIX)/lib - -flog.o: flog.c flog.h -strlist.o: strlist.h -emitter.o: strlist.h -backoff.o: backoff.c backoff.h -harvester.o: harvester.c harvester.h proto.h str.h sleepdefs.h flog.h -emitter.o: emitter.c emitter.h ring.h sleepdefs.h flog.h -lumberjack.o: lumberjack.c backoff.h harvester.h emitter.h flog.h -str.o: str.c str.h -proto.o: proto.c proto.h str.h sleepdefs.h flog.h -ring.o: ring.c ring.h - -harvester.o: build/include/insist.h -lumberjack.o: build/include/insist.h - -# Vendor'd dependencies -# If VENDOR contains 'zeromq' download and build it. -ifeq ($(filter zeromq,$(VENDOR)),zeromq) -emitter.o: build/include/zmq.h -harvester.o: build/include/zmq.h -lumberjack.o: build/include/zmq.h -build/bin/lumberjack: | build/bin build/lib/libzmq.$(LIBEXT) -endif # zeromq - -ifeq ($(filter jemalloc,$(VENDOR)),jemalloc) -harvester.o lumberjack.o ring.o str.o: build/include/jemalloc/jemalloc.h -build/bin/lumberjack: | build/lib/libjemalloc.$(LIBEXT) -endif # jemalloc - -ifeq ($(filter openssl,$(VENDOR)),openssl) -proto.o: build/include/openssl/ssl.h -lumberjack.o: build/include/openssl/ssl.h -build/bin/lumberjack: | build/lib/libssl.$(LIBEXT) -build/bin/lumberjack: | build/lib/libcrypto.$(LIBEXT) -endif # openssl - -ifeq ($(filter zlib,$(VENDOR)),zlib) -proto.o: build/include/zlib.h -build/bin/lumberjack: | build/lib/libz.$(LIBEXT) -endif # zlib - -.PHONY: test -test: | build/test/test_ring - build/test/test_ring - -# Tests -test_ring.o: ring.h build/include/jemalloc/jemalloc.h build/include/insist.h -build/test/test_ring: test_ring.o ring.o | build/test - $(CC) $(LDFLAGS) -o $@ $^ -ljemalloc - -build/bin/lumberjack.sh: lumberjack.sh | build/bin - install -m 755 $^ $@ - -build/bin/lumberjack: | build/bin -build/bin/lumberjack: lumberjack.o backoff.o harvester.o emitter.o str.o proto.o ring.o strlist.o flog.o - $(CC) $(LDFLAGS) -o $@ $^ $(LIBS) - @echo " => Build complete: $@" - @echo " => Run '$(MAKE) rpm' to build an rpm (or deb or tarball)" - -build/include/insist.h: | build/include - PATH=$$PWD:$$PATH fetch.sh -o $@ https://raw.github.com/jordansissel/experiments/master/c/better-assert/insist.h - -build/include/zmq.h build/lib/libzmq.$(LIBEXT): | build - @echo " => Building zeromq" - PATH=$$PWD:$$PATH $(MAKE) -C vendor/zeromq/ install PREFIX=$$PWD/build DEBUG=$(DEBUG) - -build/include/msgpack.h build/lib/libmsgpack.$(LIBEXT): | build - @echo " => Building msgpack" - PATH=$$PWD:$$PATH $(MAKE) -C vendor/msgpack/ install PREFIX=$$PWD/build DEBUG=$(DEBUG) - -build/include/jemalloc/jemalloc.h build/lib/libjemalloc.$(LIBEXT): | build - @echo " => Building jemalloc" - PATH=$$PWD:$$PATH $(MAKE) -C vendor/jemalloc/ install PREFIX=$$PWD/build DEBUG=$(DEBUG) - -build/include/sodium/sodium.h build/lib/libsodium.$(LIBEXT): | build - @echo " => Building libsodium" - PATH=$$PWD:$$PATH $(MAKE) -C vendor/libsodium/ install PREFIX=$$PWD/build DEBUG=$(DEBUG) - -build/include/lz4.h build/lib/liblz4.$(LIBEXT): | build - @echo " => Building lz4" - PATH=$$PWD:$$PATH $(MAKE) -C vendor/lz4/ install PREFIX=$$PWD/build DEBUG=$(DEBUG) - -build/include/zlib.h build/lib/libz.$(LIBEXT): | build - @echo " => Building zlib" - PATH=$$PWD:$$PATH $(MAKE) -C vendor/zlib/ install PREFIX=$$PWD/build DEBUG=$(DEBUG) - -build/include/openssl/ssl.h build/lib/libssl.$(LIBEXT) build/lib/libcrypto.$(LIBEXT): | build - @echo " => Building openssl" - PATH=$$PWD:$$PATH $(MAKE) -C vendor/openssl install PREFIX=$$PWD/build DEBUG=$(DEBUG) - -build/include/apr-1/apr.h build/lib/libapr-1.$(LIBEXT): | build - @echo " => Building apr" - PATH=$$PWD:$$PATH $(MAKE) -C vendor/apr install PREFIX=$$PWD/build DEBUG=$(DEBUG) - -build: - mkdir $@ - -build/include build/bin build/test: | build - mkdir $@ diff --git a/c/PROTOCOL.md b/c/PROTOCOL.md deleted file mode 100644 index 65df309..0000000 --- a/c/PROTOCOL.md +++ /dev/null @@ -1,112 +0,0 @@ -# Protocol - -The needs that lead to this protocol are: - -* Encryption amd Authentication to protect -* Compression should be used to reduce bandwidth -* Round-trip latency should not damage throughput -* Application-level message acknowledgement - -## Implementation Considerations - -# Lumberjack Protocol v1 - -## Behavior - -Sequence and ack behavior (including sliding window, etc) is similar to TCP, -but instead of bytes, messages are the base unit. - -A writer with a window size of 50 events can send up to 50 unacked events -before blocking. A reader can acknowledge the 'last event' received to -support bulk acknowledgements. - -Reliable, ordered byte transport is ensured by using TCP (or TLS on top), and -this protocol aims to provide reliable, application-level, message transport. - -## Encryption and Authentication - -Currently this is to be handled by TLS. - -## Wire Format - -### Layering - -This entire protocol is built to be layered on top of TCP or TLS. - -### Framing - - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - +---------------+---------------+-------------------------------+ - | version(1) | frame type | payload ... | - +---------------------------------------------------------------+ - | payload continued... | - +---------------------------------------------------------------+ - -### 'data' frame type - -* SENT FROM WRITER ONLY -* frame type value: ASCII 'D' aka byte value 0x44 - -data is a map of string:string pairs. This is analogous to a Hash in Ruby, a -JSON map, etc, but only strings are supported at this time. - -Payload: - -* 32bit unsigned sequence number -* 32bit 'pair' count (how many key/value sequences follow) -* 32bit unsigned key length followed by that many bytes for the key -* 32bit unsigned value length followed by that many bytes for the value -* repeat key/value 'count' times. - -Sequence number roll-over: If you receive a sequence number less than the -previous value, this signals that the sequence number has rolled over. - -### 'ack' frame type - -* SENT FROM READER ONLY -* frame type value: ASCII 'A' aka byte value 0x41 - -Payload: - -* 32bit unsigned sequence number. - -Bulk acks are supported. If you receive data frames in sequence order -1,2,3,4,5,6, you can send an ack for '6' and the writer will take this to -mean you are acknowledging all data frames before and including '6'. - -### 'window size' frame type - -* SENT FROM WRITER ONLY -* frame type value: ASCII 'W' aka byte value 0x57 - -Payload: - -* 32bit unsigned window size value in units of whole data frames. - -This frame is used to tell the reader the maximum number of unacknowledged -data frames the writer will send before blocking for acks. - -### 'compressed' frame type - -* SENT FROM WRITER ONLY -* frame type value: ASCII 'C' aka byte value 0x43 - -Payload: - -* 32bit unsigned payload length -* 'length' bytes of compressed payload - -This frame type allows you to compress many frames into a single compressed -envelope and is useful for efficiently compressing many small data frames. - -The compressed payload MUST contain full frames only, not partial frames. -The uncompressed payload MUST be a valid frame stream by itself. As an example, -you could have 3 data frames compressed into a single 'compressed' frame type: -1D{k,v}{k,v}1D{k,v}{k,v}1D{k,v}{k,v} - when uncompressed, you should process -the uncompressed payload as you would reading uncompressed frames from the -network. - -TODO(sissel): It's likely this model is suboptimal, instead choose to -use whole-stream compression z_stream in zlib (Zlib::ZStream in ruby) might be -preferable. diff --git a/c/backoff.c b/c/backoff.c deleted file mode 100644 index bde38b7..0000000 --- a/c/backoff.c +++ /dev/null @@ -1,61 +0,0 @@ -//#define _POSIX_C_SOURCE 199309L /* for struct timespec */ -#include -#include "backoff.h" - -#define MAX_TV_NSEC 999999999L - -static inline void timespec_double(struct timespec *t) { - /* Exponential backoff */ - t->tv_sec <<= 1; - t->tv_nsec <<= 1; - if (t->tv_nsec > MAX_TV_NSEC) { - /* handle carry/overflow of tv_nsec */ - t->tv_nsec -= (MAX_TV_NSEC + 1); - t->tv_sec += 1; - } -} /* timespec_double */ - -static inline long timespec_compare(struct timespec *a, struct timespec *b) { - time_t val; - val = a->tv_sec - b->tv_sec; - if (val != 0) { - return val; - } - return a->tv_nsec - b->tv_nsec; -} /* timespec_compare */ - -static inline void timespec_copy(struct timespec *source, struct timespec *dest) { - /* TODO(sissel): Could use memcpy here instead... */ - dest->tv_sec = source->tv_sec; - dest->tv_nsec = source->tv_nsec; -} /* timespec_copy */ - -inline void backoff_clear(struct backoff *b) { - timespec_copy(&b->min, &b->sleep); -} /* backoff_clear */ - -inline void backoff_init(struct backoff *b, struct timespec *min, - struct timespec *max) { - timespec_copy(min, &b->min); - timespec_copy(max, &b->max); - backoff_clear(b); -} /* backoff_init */ - -inline void backoff(struct backoff *b) { - //printf("Sleeping %ld.%09ld\n", b->sleep.tv_sec, b->sleep.tv_nsec); - nanosleep(&b->sleep, NULL); - - /* Exponential backoff */ - timespec_double(&b->sleep); - //printf("Candidate vs max: %ld.%09ld vs %ld.%09ld: %ld\n", - //b->sleep.tv_sec, b->sleep.tv_nsec, - //b->max.tv_sec, b->max.tv_nsec, - //timespec_compare(&b->sleep, &b->max)); - //printf("tv_sec: %ld\n", b->sleep.tv_sec - b->max.tv_sec); - //printf("tv_nsec: %ld\n", b->sleep.tv_nsec - b->max.tv_nsec); - - /* Cap at 'max' if sleep time exceeds it */ - if (timespec_compare(&b->sleep, &b->max) > 0) { - timespec_copy(&b->max, &b->sleep); - } -} /* backoff_sleep */ diff --git a/c/backoff.h b/c/backoff.h deleted file mode 100644 index cec82cd..0000000 --- a/c/backoff.h +++ /dev/null @@ -1,22 +0,0 @@ -#ifndef _BACKOFF_H_ -#define _BACKOFF_H_ - -#include - -struct backoff { - struct timespec max; - struct timespec min; - struct timespec sleep; -}; - -/* Initialize a backoff struct with a max value */ -void backoff_init(struct backoff *b, struct timespec *min, struct timespec *max); - -/* Execute a backoff. This will sleep for a time. - * The next backoff() call will sleep twice as long (or the max value, - * whichever is smaller) */ -void backoff(struct backoff *b); - -/* Reset the next backoff() call to sleep the minimum (1ms) */ -void backoff_clear(struct backoff *b); -#endif /* _BACKOFF_H_ */ diff --git a/c/clock_gettime.h b/c/clock_gettime.h deleted file mode 100644 index ff09d85..0000000 --- a/c/clock_gettime.h +++ /dev/null @@ -1,26 +0,0 @@ -#ifndef _CLOCK_GETTIME_H_ -#define _CLOCK_GETTIME_H_ -#include /* struct timespec, clock_gettime */ - -// copied mostly from https://gist.github.com/1087739 -/* OS X doesn't have clock_gettime, sigh. */ -#ifdef __MACH__ -#include -#include - -typedef int clockid_t; -#define CLOCK_MONOTONIC 1 -static long clock_gettime(clockid_t __attribute__((unused)) which_clock, struct timespec *tp) { - clock_serv_t cclock; - mach_timespec_t mts; - host_get_clock_service(mach_host_self(), REALTIME_CLOCK, &cclock); - clock_get_time(cclock, &mts); - mach_port_deallocate(mach_task_self(), cclock); - tp->tv_sec = mts.tv_sec; - tp->tv_nsec = mts.tv_nsec; - return 0; /* success, according to clock_gettime(3) */ -} -#endif -// end gist copy - -#endif /* _CLOCK_GETTIME_H_ */ diff --git a/c/emitter.c b/c/emitter.c deleted file mode 100644 index cc9d64d..0000000 --- a/c/emitter.c +++ /dev/null @@ -1,135 +0,0 @@ -#include -#include /* C99 for int64_t */ -#include -#include -#include "zmq.h" -#include "ring.h" -#include "emitter.h" -#include "insist.h" -#include "proto.h" -#include "backoff.h" -#include "clock_gettime.h" -#include "flog.h" - -#include - -#include "zmq_compat.h" -#include "sleepdefs.h" - -void *emitter(void *arg) { - int rc; - struct emitter_config *config = arg; - insist(config->zmq != NULL, "zmq is NULL"); - insist(config->zmq_endpoint != NULL, "zmq_endpoint is NULL"); - insist(config->ssl_ca_path != NULL, "ssl_ca_path is NULL"); - insist(config->window_size > 0, "window_size (%d) must be positive", - (int)config->window_size); - insist(config->host != NULL, "host is NULL"); - insist(config->port > 0, "port (%hd) must be positive", config->port); - - void *socket = zmq_socket(config->zmq, ZMQ_PULL); - insist(socket != NULL, "zmq_socket() failed: %s", strerror(errno)); - int64_t hwm = 100; - //zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm)); - zmq_compat_set_recvhwm(socket, hwm); - rc = zmq_bind(socket, config->zmq_endpoint); - insist(rc != -1, "zmq_bind(%s) failed: %s", config->zmq_endpoint, - zmq_strerror(errno)); - - struct timespec start; - clock_gettime(CLOCK_MONOTONIC, &start); - //long count = 0; - - struct backoff sleeper; - backoff_init(&sleeper, &MIN_SLEEP, &MAX_SLEEP); - - struct lumberjack *lumberjack; - lumberjack = lumberjack_new(config->host, config->port, config->window_size); - insist(lumberjack != NULL, "lumberjack_new failed"); - lumberjack->ring_size = config->window_size; - - if (config->ssl_ca_path != NULL) { - rc = lumberjack_set_ssl_ca(lumberjack, config->ssl_ca_path); - insist(rc == 0, "lumberjack_set_ssl_ca failed, is '%s' a valid ssl cert?", - config->ssl_ca_path); - } - - unsigned long count = 0; - unsigned long bytes = 0; - unsigned long report_interval = config->window_size * 4; - - zmq_pollitem_t items[1]; - - items[0].socket = socket; - items[0].events = ZMQ_POLLIN; - - int can_flush = 0; - for (;;) { - /* Receive an event from a harvester and put it in the queue */ - zmq_msg_t message; - - rc = zmq_msg_init(&message); - insist(rc == 0, "zmq_msg_init failed"); - rc = zmq_poll(items, 1, 1000000 /* microseconds */); - - if (rc == 0) { - /* poll timeout. We're idle, so let's flush and back-off. */ - if (can_flush) { - /* only flush if there's something to flush... */ - flog(stdout, "flushing since nothing came in over zmq"); - /* We flush here to keep slow feeders closer to real-time */ - rc = lumberjack_flush(lumberjack); - can_flush = 0; - if (rc != 0) { - /* write failure, reconnect (which will resend) and such */ - lumberjack_disconnect(lumberjack); - lumberjack_ensure_connected(lumberjack); - } - } - backoff(&sleeper); - - /* Restart the loop - checking to see if there's any messages */ - continue; - } - - /* poll successful, read a message */ - //rc = zmq_recv(socket, &message, 0); - rc = zmq_compat_recvmsg(socket, &message, 0); - insist(rc == 0 /*|| errno == EAGAIN */, - "zmq_recv(%s) failed (returned %d): %s", - config->zmq_endpoint, rc, zmq_strerror(errno)); - - /* Clear the backoff timer since we received a message successfully */ - backoff_clear(&sleeper); - - /* Write the data over lumberjack. This will handle any - * connection/reconnection/ack issues */ - lumberjack_send_data(lumberjack, zmq_msg_data(&message), - zmq_msg_size(&message)); - /* Since we sent data, let it be known that we can flush if idle */ - can_flush = 1; - /* Stats for debugging */ - count++; - bytes += zmq_msg_size(&message); - - zmq_msg_close(&message); - - if (count == report_interval) { - struct timespec now; - clock_gettime(CLOCK_MONOTONIC, &now); - double s = (start.tv_sec + 0.0) + (start.tv_nsec / 1000000000.0); - double n = (now.tv_sec + 0.0) + (now.tv_nsec / 1000000000.0); - flog(stdout, "Rate: %f (bytes: %f)", (count + 0.0) / (n - s), (bytes + 0.0) / (n - s)); - struct rusage rusage; - rc = getrusage(RUSAGE_SELF, &rusage); - insist(rc == 0, "getrusage failed: %s", strerror(errno)); - flog(stdout, "cpu user/system: %d.%06d / %d.%06dn", - (int)rusage.ru_utime.tv_sec, (int)rusage.ru_utime.tv_usec, - (int)rusage.ru_stime.tv_sec, (int)rusage.ru_stime.tv_usec); - clock_gettime(CLOCK_MONOTONIC, &start); - bytes = 0; - count = 0; - } - } /* forever */ -} /* emitter */ - diff --git a/c/emitter.h b/c/emitter.h deleted file mode 100644 index 066a0da..0000000 --- a/c/emitter.h +++ /dev/null @@ -1,16 +0,0 @@ -#ifndef _EMITTER_H_ -#define _EMITTER_H_ - -struct emitter_config { - void *zmq; /* zmq context */ - char *zmq_endpoint; /* inproc://whatever */ - char *ssl_ca_path; /* path to trusted ssl ca, can be a directory or a file */ - - size_t window_size; /* the window size */ - - char *host; - unsigned short port; -}; - -void *emitter(void *arg); -#endif /* _EMITTER_H_ */ diff --git a/c/flog.c b/c/flog.c deleted file mode 100644 index e116532..0000000 --- a/c/flog.c +++ /dev/null @@ -1,48 +0,0 @@ -#include /* for FILE, sprintf, fprintf, etc */ -#include /* for struct tm, localtime_r */ -#include /* for gettimeofday */ -#include /* for va_start, va_end */ - -void flog(FILE *stream, const char *format, ...) { - va_list args; - struct timeval tv; - struct tm tm; - char timestamp[] = "YYYY-MM-ddTHH:mm:ss.SSS+0000"; - gettimeofday(&tv, NULL); - - /* convert to time to 'struct tm' for use with strftime */ - localtime_r(&tv.tv_sec, &tm); - - /* format the time */ - strftime(timestamp, sizeof(timestamp), "%Y-%m-%dT%H:%M:%S.000%z", &tm); - - /* add in milliseconds, since strftime() can't do that */ - /* '20' is the string offset of the millisecond value in our timestamp */ - /* we have to include 'timestamp + 23' to keep the timezone value */ - sprintf(timestamp + 20, "%03ld%s", tv.tv_usec / 1000, timestamp + 23); - - /* print the timestamp */ - fprintf(stream, "%.28s ", timestamp); /* 28 is the length of the timestamp */ - - /* print the log message */ - va_start(args, format); - vfprintf(stream, format, args); - va_end(args); - - /* print a newline */ - fprintf(stream, "\n"); -} /* flog */ - -double duration(struct timeval *start) { - struct timeval tv; - gettimeofday(&tv, NULL); /* what time is it now? */ - - tv.tv_sec -= start->tv_sec; - tv.tv_usec -= start->tv_usec; - - if (tv.tv_usec < 0) { - tv.tv_sec -= 1; - tv.tv_usec += 1000000L; - } - return tv.tv_sec + ((double)tv.tv_usec / 1000000.0); -} /* duration */ diff --git a/c/flog.h b/c/flog.h deleted file mode 100644 index 41daa60..0000000 --- a/c/flog.h +++ /dev/null @@ -1,21 +0,0 @@ -#ifndef _FLOG_H_ -#define _FLOG_H_ -#include /* for FILE */ -#include /* for struct timeval */ -void flog(FILE *stream, const char *format, ...); -double duration(struct timeval *start); - -#define flog_if_slow(stream, max_duration, block, format, args...) \ -{ \ - struct timeval __start; \ - gettimeofday(&__start, NULL); \ - { \ - block \ - } \ - double __duration = duration(&__start); \ - if (__duration >= max_duration) { \ - flog(stream, "slow operation (%.3f seconds): " format , __duration, args); \ - } \ -} - -#endif /* _FLOG_H_ */ diff --git a/c/harvester.c b/c/harvester.c deleted file mode 100644 index 3f6b651..0000000 --- a/c/harvester.c +++ /dev/null @@ -1,195 +0,0 @@ -#define _BSD_SOURCE -#include /* for strsep, strerror, etc */ -#include /* for errno */ -#include /* for open(2) */ -#include /* for close, etc */ -#include /* for ntohl */ -#include /* printf and friends */ -#include "zmq.h" /* zeromq messaging library */ -#include "str.h" /* dynamic string library */ -#include "proto.h" /* lumberjack wire format serialization */ -#include -#include "jemalloc/jemalloc.h" - -#include "harvester.h" -#include "backoff.h" -#include "insist.h" -#include "sleepdefs.h" -#include "flog.h" -#include "zmq_compat.h" - -#ifdef __MACH__ -/* OS X is dumb, or I am dumb, or we are both dumb. I don't know anymore, - * but I need to declare these explicitly even though they are defined - * in string.h, unistd.h respectively */ -extern char *strsep(char **stringp, const char *delim); -extern int gethostname(char *name, size_t namelen); -#endif - -#define EMITTER_SOCKET "inproc://emitter" -#define BUFFERSIZE 16384 - -/* A free function that simply calls free(3) for zmq_msg */ -//static inline void free2(void *data, void __attribute__((__unused__)) *hint) { - //free(data); -//} /* free2 */ - -/* A free function for zmq_msg's with 'struct str' objects */ -static inline void my_str_free(void __attribute__((__unused__)) *data, void *hint) { - str_free((struct str *)hint); -} /* my_str_free */ - -static void track_rotation(int *fd, const char *path); - -void *harvest(void *arg) { - struct harvest_config *config = arg; - int fd; - int rc; - char hostname[200]; - size_t hostname_len, path_len; - - /* Make this so we only call it once. */ - gethostname(hostname, sizeof(hostname)); - hostname_len = strlen(hostname); - - if (strcmp(config->path, "-") == 0) { - /* path is '-', use stdin */ - fd = 0; - } else { - fd = open(config->path, O_RDONLY); - insist(fd >= 0, "open(%s) failed: %s", config->path, strerror(errno)); - /* Start at the end of the file */ - off_t seek_ret = lseek(fd, 0, SEEK_END); - insist(seek_ret >= 0, "lseek(%s, 0, SEEK_END) failed: %s", - config->path, strerror(errno)); - } - path_len = strlen(config->path); - - struct kv *event = calloc(3 + config->fields_len, sizeof(struct kv)); - - /* will fill the 'line' value in later for each line read */ - event[0].key = "line"; event[0].key_len = 4; - event[0].value = NULL; event[0].value_len = 0; - event[1].key = "file"; event[1].key_len = 4; - event[1].value = config->path; event[1].value_len = path_len; - event[2].key = "host"; event[2].key_len = 4; - event[2].value = hostname; event[2].value_len = hostname_len; - for (size_t i = 0; i < config->fields_len; i++) { - memcpy(&event[i + 3], &config->fields[i], sizeof(struct kv)); - } - - char *buf; - ssize_t bytes; - buf = calloc(BUFFERSIZE, sizeof(char)); - - void *socket = zmq_socket(config->zmq, ZMQ_PUSH); - insist(socket != NULL, "zmq_socket() failed: %s", strerror(errno)); - - int64_t hwm = 100; - //zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm)); - zmq_compat_set_sendhwm(socket, hwm); - - /* Wait for the zmq endpoint to be up (wait for connect to succeed) */ - struct backoff sleeper; - backoff_init(&sleeper, &MIN_SLEEP, &MAX_SLEEP); - for (;;) { - rc = zmq_connect(socket, config->zmq_endpoint); - if (rc != 0 && errno == ECONNREFUSED) { - backoff(&sleeper); - continue; /* retry */ - } - insist(rc == 0, "zmq_connect(%s) failed: %s", config->zmq_endpoint, - zmq_strerror(errno)); - break; - } - - int offset = 0; - for (;;) { - flog_if_slow(stdout, 0.250, { - bytes = read(fd, buf + offset, BUFFERSIZE - offset - 1); - }, "read of %d bytes (got %d bytes) on '%s'", - BUFFERSIZE - offset - 1, bytes, config->path); - - offset += bytes; - if (bytes < 0) { - /* error, maybe indicate a failure of some kind. */ - printf("read(%d '%s') failed: %s\n", fd, - config->path, strerror(errno)); - break; - } else if (bytes == 0) { - backoff(&sleeper); - if (strcmp(config->path, "-") == 0) { - /* stdin gave EOF, close out. */ - break; - } - track_rotation(&fd, config->path); - } else { - /* Data read, handle it! */ - backoff_clear(&sleeper); - /* For each line, emit. Save the remainder */ - char *line; - char *septok = buf; - char *start = NULL; - while (start = septok, (line = strsep(&septok, "\n")) != NULL) { - if (septok == NULL) { - /* last token found, no terminator though */ - offset = offset - (line - buf); - memmove(buf, line, strlen(line)); - } else { - /* emit line as an event */ - /* 'septok' points at the start of the next token, so subtract one. */ - size_t line_len = septok - start - 1; - struct str *serialized; - - /* Set the line */ - event[0].value = line; - event[0].value_len = line_len; - - /* pack using lumberjack data payload */ - serialized = lumberjack_kv_pack(event, 3 + config->fields_len); - - zmq_msg_t event; - zmq_msg_init_data(&event, str_data(serialized), str_length(serialized), - my_str_free, serialized); - flog_if_slow(stdout, 0.250, { - //rc = zmq_send(socket, &event, 0); - rc = zmq_compat_sendmsg(socket, &event, 0); - }, "zmq_send (harvesting file '%s')", config->path); - insist(rc == 0, "zmq_send(event) failed: %s", zmq_strerror(rc)); - zmq_msg_close(&event); - } - } /* for each token */ - } - } /* loop forever, reading from a file */ - - free(arg); /* allocated by the main method, up to us to free */ - close(fd); - - return NULL; -} /* harvest */ - -void track_rotation(int *fd, const char *path) { - struct stat pathstat, fdstat; - int rc; - fstat(*fd, &fdstat); - rc = stat(path, &pathstat); - if (rc == -1) { - /* error stat'ing the file path, restart loop and try again */ - return; - } - - if (pathstat.st_dev != fdstat.st_dev || pathstat.st_ino != fdstat.st_ino) { - /* device or inode number changed, this file was renamed or rotated. */ - rc = open(path, O_RDONLY); - if (rc == -1) { - /* Error opening file, restart loop and try again. */ - return; - } - close(*fd); - /* start reading the new file! */ - *fd = rc; - } else if (pathstat.st_size < fdstat.st_size) { - /* the file was truncated, jump back to the beginning */ - lseek(*fd, 0, SEEK_SET); - } -} /* track_rotation */ diff --git a/c/harvester.h b/c/harvester.h deleted file mode 100644 index 7ea656d..0000000 --- a/c/harvester.h +++ /dev/null @@ -1,15 +0,0 @@ -#ifndef _HARVESTER_H_ -#define _HARVESTER_H_ - -struct harvest_config { - char *path; /* the path to harvest */ - - void *zmq; /* zmq context */ - char *zmq_endpoint; /* inproc://whatever */ - - struct kv *fields; /* any extra fields to add to each event */ - size_t fields_len; /* number of fields */ -}; - -void *harvest(void *arg); -#endif /* _HARVESTER_H_ */ diff --git a/c/lumberjack.c b/c/lumberjack.c deleted file mode 100644 index 95f5b1a..0000000 --- a/c/lumberjack.c +++ /dev/null @@ -1,266 +0,0 @@ -#define _BSD_SOURCE /* to get gethostname() under linux/gcc */ -#include -#include /* for setrlimit */ -#include -#include "insist.h" -#include -#include /* for gethostname */ -#include "zmq.h" -#include "harvester.h" -#include "emitter.h" -#include "jemalloc/jemalloc.h" -#include -#include -#include -#include -#include "proto.h" -#include "flog.h" - -#define ZMQ_EMITTER_ENDPOINT "inproc://emitter" - -typedef enum { - opt_help = 'h', - opt_version = 'v', - opt_field, - opt_ssl_ca_path, - opt_host, - opt_port, - opt_window_size, -} optlist_t; - -struct option_doc { - const char *name; - int has_arg; - int val; - const char *documentation; -}; - -static struct option_doc options[] = { - { "help", no_argument, opt_help, "show this help" }, - { "version", no_argument, opt_version, "show the version of lumberjack" }, - - /* Support arbitrary fields in the events, like: - * ./lumberjack --field host=$(hostname) --field role=frontend .../access.log - * - * This will allow you to send any arbitrary data along with every event. - * { "host": "foo", "file": "/path/to/file.log", "message": "hello ..." } - */ - { "field", required_argument, opt_field, - "Add a custom key-value mapping to every line emitted" }, - - /* ssl cert and key, optional */ - //{ "ssl-certificate", required_argument, NULL, opt_ssl_certificate }, - //{ "ssl-key", required_argument, NULL, opt_ssl_key }, - /* TODO(sissel): How to provide key passphrase/credentials? */ - - /* What cert authority to trust. This can be the path to a single self-signed - * certificate if you choose. */ - { "ssl-ca-path", required_argument, opt_ssl_ca_path, - "Set the trusted cert/ca path for lumberjack's ssl client. " \ - "Can be a file or a directory." }, - { "host", required_argument, opt_host, - "The hostname to send lumberjack messages to. You can specify multiple " \ - "by separating hosts with a comma." }, - { "port", required_argument, opt_port, - "The port to connect on the lumberjack server" }, - { "window-size", required_argument, opt_window_size, - "The maximum number of outstanding messages to send before we will " \ - "wait for an acknowledgement" }, - { NULL, 0, 0, NULL }, -}; - -void usage(const char *prog) { - printf("Usage: %s [options] /path/to/file [/path/to/file2 ...]\n", prog); - - for (int i = 0; options[i].name != NULL; i++) { - printf(" --%s%s %.*s %s\n", options[i].name, - options[i].has_arg ? " VALUE" : "", - (int)(20 - strlen(options[i].name) - (options[i].has_arg ? 6 : 0)), - " ", - options[i].documentation); - } -} /* usage */ - -void set_resource_limits(int file_count) { - struct rlimit limits; - int rc; - - rc = nice(1); /* ask for less priority in the scheduler */ - insist(rc != -1, "nice(1) failed: %s", strerror(errno)); - - /* Only set resource limits if not running under valgrind. - * If we set limits under valgrind, it crashes due to exceeding said limits - */ - - if ((getenv("LD_PRELOAD") != NULL) \ - && (strstr(getenv("LD_PRELOAD"), "/vgpreload_") != NULL)) { - flog(stdout, "Valgrind detected, skipping self-resource limitations"); - return; - } - - /* Set open file limit - * 3 'open files' per log file watched: - * - one for the file itself - * - two for the socketpair in zeromq - * */ - limits.rlim_cur = limits.rlim_max = (file_count * 3 ) + 100; - flog(stdout, "Watching %d files, setting open file limit to %ld", - file_count, limits.rlim_max); - rc = setrlimit(RLIMIT_NOFILE, &limits); - insist(rc != -1, "setrlimit(RLIMIT_NOFILE, ... %d) failed: %s", - (int)limits.rlim_max, strerror(errno)); - - /* I'd like to set RLIMIT_NPROC, but that setting applies to the entire user - * for all processes, not just subprocesses or threads belonging to this - * process. */ - //limits.rlim_cur = limits.rlim_max = file_count + 10; - //rc = setrlimit(RLIMIT_NPROC, &limits); - //insist(rc != -1, "setrlimit(RLIMIT_NPROC, ... %d) failed: %s\n", - //(int)limits.rlim_max, strerror(errno)); - - /* Set resident memory limit */ - /* Allow 1mb per file opened */ - int bytes = (1<<20) * file_count; - /* RLIMIT_RSS uses 'pages' as the unit, convert bytes to pages. */ - limits.rlim_cur = limits.rlim_max = (int)(bytes / sysconf(_SC_PAGESIZE)); - flog(stdout, "Watching %d files, setting memory usage limit to %d bytes", - file_count, bytes); - rc = setrlimit(RLIMIT_RSS, &limits); - insist(rc != -1, "setrlimit(RLIMIT_RSS, %d pages (%d bytes)) failed: %s", - (int)limits.rlim_max, bytes, strerror(errno)); -} /* set_resource_limits */ - -int main(int argc, char **argv) { - int c, i; - struct emitter_config emitter_config; - struct option *getopt_options = NULL; - - struct kv *extra_fields = NULL; - size_t extra_fields_len = 0; - - /* defaults */ - memset(&emitter_config, 0, sizeof(struct emitter_config)); - emitter_config.port = 5001; - emitter_config.window_size = 4096; - - /* convert the 'option_doc' array into a 'struct option' array - * for use with getopt_long_only */ - for (i = 0; options[i].name != NULL; i++) { - getopt_options = realloc(getopt_options, (i+1) * sizeof(struct option)); - getopt_options[i].name = options[i].name; - getopt_options[i].has_arg = options[i].has_arg; - getopt_options[i].flag = NULL; - getopt_options[i].val = options[i].val; - } - - /* Add one last item for the list terminator NULL */ - getopt_options = realloc(getopt_options, (i+1) * sizeof(struct option)); - getopt_options[i].name = NULL; - - char *tmp; - while (i = -1, c = getopt_long_only(argc, argv, "+hv", getopt_options, &i), c != -1) { - /* TODO(sissel): handle args */ - switch (c) { - case opt_ssl_ca_path: - emitter_config.ssl_ca_path = strdup(optarg); - break; - case opt_version: - printf("version unknown. Could be awesome.\n"); - break; - case opt_help: - usage(argv[0]); - return 0; - case opt_host: - emitter_config.host = strdup(optarg); - break; - case opt_port: - emitter_config.port = (unsigned short)atoi(optarg); - break; - case opt_window_size: - emitter_config.window_size = (size_t)atoi(optarg); - printf("size: %d\n", (int)emitter_config.window_size); - break; - case opt_field: - tmp = strchr(optarg, '='); - if (tmp == NULL) { - printf("Invalid --field setting, expected 'foo=bar' form, " \ - "didn't see '=' in '%s'", optarg); - usage(argv[0]); - exit(1); - } - extra_fields_len += 1; - extra_fields = realloc(extra_fields, extra_fields_len * sizeof(struct kv)); - *tmp = '\0'; // turn '=' into null terminator - tmp++; /* skip to first char of value */ - extra_fields[extra_fields_len - 1].key = strdup(optarg); - extra_fields[extra_fields_len - 1].key_len = strlen(optarg); - extra_fields[extra_fields_len - 1].value = strdup(tmp); - extra_fields[extra_fields_len - 1].value_len = strlen(tmp); - break; - default: - insist(i == -1, "Flag (--%s%s%s) known, but someone forgot to " \ - "implement handling of it! This is certainly a bug.", - options[i].name, - options[i].has_arg ? " " : "", - options[i].has_arg ? optarg : ""); - - usage(argv[0]); - return 1; - } - } - free(getopt_options); - - if (emitter_config.host == NULL) { - printf("Missing --host flag\n"); - usage(argv[0]); - return 1; - } - - if (emitter_config.port == 0) { - printf("Missing --port flag\n"); - usage(argv[0]); - return 1; - } - - argc -= optind; - argv += optind; - - /* I'll handle write failures; no signals please */ - signal(SIGPIPE, SIG_IGN); - - insist(argc > 0, "No arguments given. What log files do you want shipped?"); - - /* Set resource (memory, open file, etc) limits based on the - * number of files being watched. */ - set_resource_limits(argc); - - pthread_t *harvesters = calloc(argc, sizeof(pthread_t)); - /* no I/O threads needed since we use inproc:// only */ - void *zmq = zmq_init(0 /* IO threads */); - - /* Start harvesters for each path given */ - for (i = 0; i < argc; i++) { - struct harvest_config *harvester = calloc(1, sizeof(struct harvest_config)); - harvester->zmq = zmq; - harvester->zmq_endpoint = ZMQ_EMITTER_ENDPOINT; - harvester->path = argv[i]; - harvester->fields = extra_fields; - harvester->fields_len = extra_fields_len; - pthread_create(&harvesters[i], NULL, harvest, harvester); - } - - pthread_t emitter_thread; - emitter_config.zmq = zmq; - emitter_config.zmq_endpoint = ZMQ_EMITTER_ENDPOINT; - pthread_create(&emitter_thread, NULL, emitter, &emitter_config); - - for (i = 0; i < argc; i++) { - pthread_join(harvesters[i], NULL); - } - - flog(stdout, "All harvesters completed. Exiting."); - free(harvesters); - - /* TODO(sissel): Tell emitter to flush and exit */ - return 1; -} /* main */ diff --git a/c/proto.c b/c/proto.c deleted file mode 100644 index 63e5ea8..0000000 --- a/c/proto.c +++ /dev/null @@ -1,888 +0,0 @@ -#define _BSD_SOURCE /* for hstrerror */ -#include -#include -#include -#include -#include -#include -#include "proto.h" -#include /* for writev */ -#include "str.h" -#include -#include -#include -#include -#include -#include - -#include "zlib.h" -#include "backoff.h" -#include "insist.h" -#include "openssl/bio.h" -#include "openssl/ssl.h" -#include "openssl/err.h" -#include "openssl/rand.h" - -#include "strlist.h" -#include "sleepdefs.h" -#include "flog.h" - -#define LUMBERJACK_POLL_READ 0x01 -#define LUMBERJACK_POLL_WRITE 0x02 - -static void lumberjack_init(void); -static int lumberjack_tcp_connect(struct lumberjack *lumberjack); -static int lumberjack_ssl_handshake(struct lumberjack *lumberjack); -static int lumberjack_connected(struct lumberjack *lumberjack); -static int lumberjack_wait_for_ack(struct lumberjack *lumberjack); -static int lumberjack_retransmit_all(struct lumberjack *lumberjack); -static int lumberjack_write_window_size(struct lumberjack *lumberjack); -static int lumberjack_poll(struct lumberjack *lumberjack, - time_t seconds, int flags); - -static int lumberjack_init_done = 0; - -static unsigned int rand_uint32() { - unsigned char bytes[4]; - /* TODO(sissel): Check error code? -1 means we failed... */ - RAND_pseudo_bytes(bytes, 4); - return *(unsigned int *)(bytes); -} /* rand_uint32 */ - -static void lumberjack_init(void) { - if (lumberjack_init_done) { - return; - } - - /* ssl init */ - CRYPTO_malloc_init(); - SSL_library_init(); - SSL_load_error_strings(); - ERR_load_BIO_strings(); - OpenSSL_add_all_algorithms(); - lumberjack_init_done = 1; -} /* lumberjack_init */ - -struct lumberjack *lumberjack_new(const char *host, unsigned short port, size_t window_size) { - struct lumberjack *lumberjack; - lumberjack_init(); /* global one-time init */ - - lumberjack = malloc(sizeof(struct lumberjack)); - lumberjack->host = host; - lumberjack->port = port; - lumberjack->fd = -1; - lumberjack->sequence = 0; //rand(); - lumberjack->ssl = NULL; - lumberjack->connected = 0; - - /* I tried with 128, 256, 512, 1024, 2048, and 16384, - * in a local network, an window size of 1024 seemed to have the best - * performance (equal to 2048 and 16384) for the least memory cost. */ - if (window_size < 1024) { - flog(stdout, "Window size less than 1024 (%d) isn't shown to have " \ - "speed-performance benefits", window_size); - } - - lumberjack->ring_size = window_size; - lumberjack->ring = ring_new_size(lumberjack->ring_size); - - /* Create this once. */ - lumberjack->ssl_context = SSL_CTX_new(SSLv23_client_method()); - SSL_CTX_set_verify(lumberjack->ssl_context, SSL_VERIFY_PEER, NULL); - - lumberjack->io_buffer = str_new_size(16384); /* TODO(sissel): tunable */ - - /* zlib provides compressBound() to give a 'worst case' compressed - * payload size on a input payload of a given size. */ - lumberjack->compression_buffer = str_new_size(compressBound(16384)); - return lumberjack; -} /* lumberjack_new */ - -int lumberjack_set_ssl_ca(struct lumberjack *lumberjack, const char *path) { - insist(lumberjack != NULL, "lumberjack is NULL"); - insist(path != NULL, "path is NULL"); - insist(!lumberjack_connected(lumberjack), - "You cannot call lumberjack_set_ssl_ca while connected."); - - int rc; - /* Check whether 'path' is a directory or not. */ - struct stat path_stat; - rc = stat(path, &path_stat); - if (rc == -1) { - /* Failed to stat the file */ - flog(stdout, "lumberjack_set_ssl_ca: stat(%s) failed: %s", - path, strerror(errno)); - return -1; - } - - if (S_ISDIR(path_stat.st_mode)) { - rc = SSL_CTX_load_verify_locations(lumberjack->ssl_context, NULL, path); - } else { - /* assume a file */ - rc = SSL_CTX_load_verify_locations(lumberjack->ssl_context, path, NULL); - } - - if (rc == 0) { - ERR_print_errors_fp(stdout); - return -1; - } - - return 0; -} /* lumberjack_set_ssl_ca */ - -int lumberjack_connect(struct lumberjack *lumberjack) { - /* TODO(sissel): support ipv6, if anyone ever uses that in production ;) */ - insist(lumberjack != NULL, "lumberjack must not be NULL"); - - int rc; - rc = lumberjack_tcp_connect(lumberjack); - if (rc < 0) { - return -1; - } - - rc = lumberjack_ssl_handshake(lumberjack); - if (rc < 0) { - flog(stdout, "ssl handshake failed"); - lumberjack_disconnect(lumberjack); - return -1; - } - - /* If we get here, tcp connect + ssl handshake has succeeded */ - lumberjack->connected = 1; - - /* Always truncate the output buffer on a new connection. - * This prevents accidental buffer leaks from an old/dead connection - * into this new one. - * - * The symptom of such a leak is that, upon a new connection, - * the oldest non-ack'd event is not the first event seen by the receiver. - */ - str_truncate(lumberjack->io_buffer); - - /* Send our window size */ - rc = lumberjack_write_window_size(lumberjack); - if (rc < 0) { - flog(stdout, "lumberjack_write_window_size failed: %d", rc); - lumberjack_disconnect(lumberjack); - return -1; - } - - /* Retransmit anything currently in the ring (unacknowledged data frames) - * This is a no-op if there's nothing in the ring. */ - rc = lumberjack_retransmit_all(lumberjack); - if (rc < 0) { - flog(stdout, "lumberjack_retransmit_all failed"); - /* Retransmit failed, which means a write failed during retransmit, - * disconnect and claim a connection failure. */ - lumberjack_disconnect(lumberjack); - return -1; - } - - insist(lumberjack->fd > 0, - "lumberjack->fd must be > 0 after a connect, was %d", lumberjack->fd); - insist(lumberjack->ssl != NULL, - "lumberjack->ssl must not be NULL after a connect"); - return 0; -} /* lumberjack_connect */ - -int lumberjack_ensure_connected(struct lumberjack *lumberjack) { - int rc; - struct backoff sleeper; - backoff_init(&sleeper, &MIN_SLEEP, &MAX_SLEEP); - - while (!lumberjack_connected(lumberjack)) { - backoff(&sleeper); - rc = lumberjack_connect(lumberjack); - if (rc != 0) { - flog(stdout, "Connection attempt to %s:%hd failed: %s", - lumberjack->host, lumberjack->port, strerror(errno)); - } else { - /* we're connected! */ - backoff_clear(&sleeper); - } - } - insist(lumberjack->fd > 0, - "lumberjack->fd must be > 0 after a connect, was %d", lumberjack->fd); - insist(lumberjack->ssl != NULL, - "lumberjack->ssl must not be NULL after a connect"); - return 0; -} /* lumberjack_connect_block */ - -static struct hostent *lumberjack_choose_address(const char *host) { - strlist_t *hostlist; - split(&hostlist, host, ","); - insist(hostlist->nitems > 0, "host string must not be empty"); - - struct backoff sleeper; - backoff_init(&sleeper, &MIN_SLEEP, &MAX_SLEEP); - - struct hostent *hostinfo = NULL; - while (hostinfo == NULL) { - int item = rand_uint32() % hostlist->nitems; - char *chosen = hostlist->items[item]; - flog_if_slow(stdout, 0.200, { - hostinfo = gethostbyname(chosen); - }, "gethostbyname('%s')", chosen); - if (hostinfo == NULL) { - flog(stdout, "gethostbyname(%s) failed: %s", chosen, - hstrerror(h_errno)); - backoff(&sleeper); - } else { - if (hostlist->nitems > 1) { - /* only log that we've selected a random host if there's multiple hosts - * in the list. */ - flog(stdout, "Random host selection: %s from %s", chosen, host); - } - } - } - strlist_free(hostlist); - return hostinfo; -} /* lumberjack_choose_address */ - -/* Connect to a host:port. If 'host' resolves to multiple addresses, one is - * picked at random. */ -static int lumberjack_tcp_connect(struct lumberjack *lumberjack) { - insist(lumberjack->fd < 0, "already connected (fd %d > 0)", lumberjack->fd); - insist(lumberjack->host != NULL, "lumberjack host must not be NULL"); - insist(lumberjack->port > 0, "lumberjack port must be > 9 (is %hd)", lumberjack->port); - insist(lumberjack != NULL, "lumberjack must not be NULL"); - int rc; - int fd; - - struct hostent *hostinfo = lumberjack_choose_address(lumberjack->host); - - /* 'struct hostent' has the list of addresses resolved in 'h_addr_list' - * It's a null-terminated list, so count how many are there. */ - unsigned int addr_count; - for (addr_count = 0; hostinfo->h_addr_list[addr_count] != NULL; addr_count++); - /* hostnames can resolve to multiple addresses, pick one at random. */ - char *address = hostinfo->h_addr_list[rand_uint32() % addr_count]; - - flog(stdout, "Connecting to %s(%s):%hu", hostinfo->h_name, - inet_ntoa(*(struct in_addr *)address), lumberjack->port); - fd = socket(PF_INET, SOCK_STREAM, 0); - insist(fd >= 0, "socket() failed: %s", strerror(errno)); - - struct sockaddr_in sockaddr; - sockaddr.sin_family = PF_INET; - sockaddr.sin_port = htons(lumberjack->port); - memcpy(&sockaddr.sin_addr, address, hostinfo->h_length); - - flog_if_slow(stdout, 0.250, { - rc = connect(fd, (struct sockaddr *)&sockaddr, sizeof(sockaddr)); - }, "connect to %s:%hu", inet_ntoa(*(struct in_addr *)address), lumberjack->port); - if (rc < 0) { - close(fd); - return -1; - } - - flog(stdout, "Connected successfully to %s(%s):%hu", hostinfo->h_name, - inet_ntoa(*(struct in_addr *)address), lumberjack->port); - - lumberjack->fd = fd; - - return 0; -} /* lumberjack_tcp_connect */ - -static int lumberjack_ssl_handshake(struct lumberjack *lumberjack) { - insist(lumberjack != NULL, "lumberjack must not be NULL"); - insist(lumberjack->ssl == NULL, "ssl already established, cannot handshake"); - - int rc; - BIO *bio = BIO_new_socket(lumberjack->fd, 0 /* don't close on free */); - if (bio == NULL) { - ERR_print_errors_fp(stdout); - insist(bio != NULL, "BIO_new_socket failed"); - } - - lumberjack->ssl = SSL_new(lumberjack->ssl_context); - insist(lumberjack->ssl != NULL, "SSL_new must not return NULL"); - - SSL_set_connect_state(lumberjack->ssl); /* we're a client */ - SSL_set_mode(lumberjack->ssl, SSL_MODE_AUTO_RETRY); /* retry writes/reads that would block */ - SSL_set_bio(lumberjack->ssl, bio, bio); - - struct backoff sleeper; - backoff_init(&sleeper, &MIN_SLEEP, &MAX_SLEEP); - for (rc = SSL_connect(lumberjack->ssl); rc < 0; rc = SSL_connect(lumberjack->ssl)) { - /* loop until ssl handshake succeeds */ - switch(SSL_get_error(lumberjack->ssl, rc)) { - case SSL_ERROR_WANT_READ: - case SSL_ERROR_WANT_WRITE: - /* TODO(sissel): Instead of sleeping, we should call select() for - * whichever case (read or write) is wanted */ - backoff(&sleeper); - continue; /* retry */ - default: - /* Some other SSL error */ - flog(stdout, "SSL_connect error vv"); - ERR_print_errors_fp(stdout); - flog(stdout, "SSL_connect error ^^"); - return -1; - } - } - ERR_print_errors_fp(stdout); - - /* TODO(sissel): Verify peer certificate */ - return 0; -} /* lumberjack_ssl_handshake */ - -int lumberjack_write(struct lumberjack *lumberjack, struct str *payload) { - insist(lumberjack != NULL, "lumberjack must not be NULL"); - insist(payload != NULL, "payload must not be NULL"); - insist(lumberjack->ssl != NULL, "lumberjack->ssl must not be NULL"); - - /* writing is an error if you are not connected */ - if (!lumberjack_connected(lumberjack)) { - return -1; - } - - /* TODO(sissel): For compression - * - append payload to the buffer - * - if the buffer is longer than BUFFER_FLUSH_SIZE - * - lumberjack_flush() - * - else continue, do not write on the wire. - */ - str_append_str(lumberjack->io_buffer, payload); - - //uint32_t seq; - //memcpy(&seq, str_data(frame) + 2, sizeof(uint32_t)); - //seq = ntohl(seq); - //flog(stdout, "io_buffer: %.*s", str_length(lumberjack->io_buffer), str_data(lumberjack->io_buffer)); - - if (str_length(lumberjack->io_buffer) > 16384) { - //flog(stdout, "io_buffer large enough (%d), flushing", - //str_length(lumberjack->io_buffer)); - return lumberjack_flush(lumberjack); - } - return 0; -} /* lumberjack_write */ - -int lumberjack_flush(struct lumberjack *lumberjack) { - ssize_t bytes; - size_t length = str_length(lumberjack->io_buffer); - /* Zlib */ - int rc; - - if (length == 0) { - return 0; /* nothing to do */ - } - - if (!lumberjack_connected(lumberjack)) { - return -1; - } - - uLongf compressed_length = (uLongf) lumberjack->compression_buffer->data_size; - /* compress2 is provided by zlib */ - rc = compress2((Bytef *)str_data(lumberjack->compression_buffer), - &compressed_length, - (Bytef *)str_data(lumberjack->io_buffer), length, 1); - insist(rc == Z_OK, "compress2(..., %lu, ..., %zd) failed; returned %d", - compressed_length, length, rc); - - str_truncate(lumberjack->io_buffer); - //flog(stdout, "lumberjack_flush: flushing %d bytes (compressed to %d bytes)", - //(int)length, (int)compressed_length); - - /* TODO(sissel): Handle timeouts on any writes. */ - /* Write the 'compressed block' frame header */ - struct str *header = str_new_size(6); - str_append_char(header, LUMBERJACK_VERSION_1); - str_append_char(header, LUMBERJACK_COMPRESSED_BLOCK_FRAME); - str_append_uint32(header, compressed_length); - - time_t timeout = 30; - rc = lumberjack_poll(lumberjack, timeout, LUMBERJACK_POLL_WRITE); - if ((rc & LUMBERJACK_POLL_WRITE) == 0) { - /* socket was not writable after the given timeout. Fail it. */ - flog(stdout, "Waited %d seconds for a writable socket. Giving up.", timeout); - lumberjack_disconnect(lumberjack); - return -1; - } - - flog_if_slow(stdout, 1.0, { - bytes = SSL_write(lumberjack->ssl, str_data(header), str_length(header)); - }, "SSL_write (lumberjack compressed header)", NULL); - str_free(header); - - if (bytes < 0) { - /* error occurred while writing. */ - rc = SSL_get_error(lumberjack->ssl, bytes); - if (rc == SSL_ERROR_SYSCALL) { - flog(stdout, "SSL_write failed: %s", strerror(errno)); - } else { - flog(stdout, "SSL_write returned %d (code: %d), something is wrong", - bytes, rc); - flog(stdout, "SSL_write error vv"); - ERR_print_errors_fp(stdout); - flog(stdout, "SSL_write error ^^"); - } - lumberjack_disconnect(lumberjack); - return -1; - } - - /* write the compressed payload */ - ssize_t remaining = compressed_length; - size_t offset = 0; - - struct timeval start; - gettimeofday(&start, NULL); - double elapsed; - - struct backoff sleeper; - backoff_init(&sleeper, &MIN_SLEEP, &MAX_SLEEP); - - while (remaining > 0) { - time_t timeout = 30; - rc = lumberjack_poll(lumberjack, timeout, LUMBERJACK_POLL_WRITE); - if ((rc & LUMBERJACK_POLL_WRITE) == 0) { - /* socket was not writable after the given timeout. Fail it. */ - flog(stdout, "Waited %d seconds for a writable socket. Giving up.", timeout); - lumberjack_disconnect(lumberjack); - return -1; - } - - bytes = SSL_write(lumberjack->ssl, - str_data(lumberjack->compression_buffer) + offset, - remaining); - - if (bytes < 0) { - elapsed = duration(&start); - if (elapsed > 30) { - flog(stdout, "SSL_write took too long (%.3f seconds), assuming " \ - "dead/busy server and disconnecting.", elapsed); - lumberjack_disconnect(lumberjack); - return -1; - } - - rc = SSL_get_error(lumberjack->ssl, bytes); - if (rc == SSL_ERROR_WANT_READ) { - /* TODO(sissel): instead of backing off, select for read, then retry - * the write. */ - backoff(&sleeper); - } else if (rc == SSL_ERROR_WANT_WRITE) { - /* TODO(sissel): instead of backing off, select for write, then retry - * the write. */ - backoff(&sleeper); - } - } - - remaining -= bytes; - offset += bytes; - } - return 0; -} /* lumberjack_flush */ - -void lumberjack_disconnect(struct lumberjack *lumberjack) { - flog(stdout, "Disconnect requested"); - if (lumberjack->ssl) { - SSL_shutdown(lumberjack->ssl); - SSL_free(lumberjack->ssl); - lumberjack->ssl = NULL; - } - if (lumberjack->fd >= 0) { - close(lumberjack->fd); - lumberjack->fd = -1; - } - - lumberjack->connected = 0; - insist(!lumberjack_connected(lumberjack), - "lumberjack_connected() must not return true after a disconnect"); -} /* lumberjack_disconnect */ - -int lumberjack_connected(struct lumberjack *lumberjack) { - return lumberjack->connected; -} /* lumberjack_connected */ - -static int lumberjack_read_ack(struct lumberjack *lumberjack, - uint32_t *sequence_ret) { - if (!lumberjack_connected(lumberjack)) { - flog(stdout, "NOT CONNECTED"); - return -1; - } - - /* This is a subpar implementation... reading 6 bytes at a time, etc, - * but the idea is that you can do bulk acks, so data-to-ack ratio should be - * high */ - char buf[6]; - ssize_t bytes; - size_t remaining = 6; /* version + frame type + 32bit sequence value */ - size_t offset = 0; - - int rc; - while (remaining > 0) { - /* Allow a few seconds for a read timeout. If it occurs, fail this read. - * This timeout should cause a disconnect and reconnect to a new server. - * The idea is to prevent one receiving server from becoming overloaded. */ - time_t timeout = 30; - rc = lumberjack_poll(lumberjack, timeout, LUMBERJACK_POLL_READ); - if ((rc & LUMBERJACK_POLL_READ) == 0) { - /* socket was not writable after the given timeout. Fail it. */ - flog(stdout, "Waited %d seconds for a readable socket. Giving up.", timeout); - errno = ETIMEDOUT; - return -1; - } - - flog_if_slow(stdout, 1.0, { - bytes = SSL_read(lumberjack->ssl, buf + offset, remaining); - }, "SSL_read (tried to read %d bytes)", remaining); - if (bytes == 0) { - /* EOF or some other similar error */ - errno = EPIPE; /* close enough to fake EOF? */ - return -1; - } else if (bytes < 0) { - rc = SSL_get_error(lumberjack->ssl, bytes /* error code */); - flog(stdout, "SSL_read error vv"); - ERR_print_errors_fp(stdout); - flog(stdout, "SSL_read error ^^"); - return -1; - } - offset += bytes; - remaining -= bytes; - } - - if ((buf[0] != LUMBERJACK_VERSION_1) || (buf[1] != LUMBERJACK_ACK_FRAME)) { - return -1; /* invalid version or frame type */ - } - - /* bytes 2-6 are the sequence number in network byte-order */ - memcpy(sequence_ret, buf + 2, sizeof(uint32_t)); - *sequence_ret = ntohl(*sequence_ret); - - return 0; -} /* lumberjack_read_ack */ - -int lumberjack_send_data(struct lumberjack *lumberjack, const char *payload, - size_t payload_len) { - /* TODO(sissel): support a 'free' function to free the payload when it's done */ - //void (*free_func)(void *payload, void *hint())) - struct str *frame = str_new_size(sizeof(uint32_t) + payload_len); - int rc; - - lumberjack->sequence++; - /* TODO(sissel): How to handle sequence value overflow (MAX_INT -> 0) */ - - str_append_char(frame, LUMBERJACK_VERSION_1); - str_append_char(frame, LUMBERJACK_DATA_FRAME); - str_append_uint32(frame, lumberjack->sequence); - str_append(frame, payload, payload_len); - - lumberjack_ensure_connected(lumberjack); - /* if the ring is currently full, we need to wait for acks. */ - while (ring_is_full(lumberjack->ring)) { - /* flush any writes waiting for buffering/compression */ - flog(stdout, "ring buffer is full (%d items), flushing.", - ring_count(lumberjack->ring)); - flog_if_slow(stdout, 0.500, { - lumberjack_flush(lumberjack); - }, "flush complete", NULL); - - /* read at least one ACK */ - flog_if_slow(stdout, 0.500, { - rc = lumberjack_wait_for_ack(lumberjack); - if (rc != 0) { - flog(stdout, "lumberjack_wait_for_ack failed %d", rc); - } - }, "wait for ack (current sequence: %u)", lumberjack->sequence); - } - - /* Send this data frame on the wire */ - rc = lumberjack_write(lumberjack, frame); - if (rc != 0) { - /* write failure, reconnect (which will resend) and such */ - flog(stdout, "lumberjack_write failed: %d", rc); - lumberjack_disconnect(lumberjack); - lumberjack_ensure_connected(lumberjack); - } - - /* Push this into the ring buffer, indicating it needs to be acknowledged */ - rc = ring_push(lumberjack->ring, frame); - insist(rc == RING_OK, "ring_push failed (returned %d, expected RING_OK(%d)", - rc, RING_OK); - - return 0; /* SUCCESS */ -} /* lumberjack_send_data */ - -static int lumberjack_wait_for_ack(struct lumberjack *lumberjack) { - uint32_t ack; - int rc; - struct backoff sleeper; - backoff_init(&sleeper, &MIN_SLEEP, &MAX_SLEEP); - - flog(stdout, "lumberjack_wait_for_ack: waiting for ack"); - - /* Wait for an ack */ - while ((rc = lumberjack_read_ack(lumberjack, &ack)) < 0) { - /* read error. */ - flog(stdout, "lumberjack_read_ack failed: %s", strerror(errno)); - lumberjack_disconnect(lumberjack); - backoff(&sleeper); - lumberjack_ensure_connected(lumberjack); - } - - flog(stdout, "lumberjack_wait_for_ack: received ack: %u", ack); - - /* TODO(sissel): Verify this is even a sane ack */ - - /* Acknowledge anything in the ring that has a sequence number <= this ack */ - /* Clear anything in the ring with a sequence less than the one just acked */ - for (int i = 0, count = ring_count(lumberjack->ring); i < count; i++) { - struct str *frame; - uint32_t cur_seq; - - /* look at, but don't remove, the first item in the ring */ - ring_peek(lumberjack->ring, 0, (void **)&frame); - - /* this is a silly way, but since the ring only stores strings right now */ - memcpy(&cur_seq, str_data(frame) + 2, sizeof(uint32_t)); - cur_seq = ntohl(cur_seq); - - if (cur_seq <= ack) { - //printf("bulk ack: %d\n", cur_seq); - ring_pop(lumberjack->ring, NULL); /* destroy this item */ - str_free(frame); - } else { - /* found a sequence number > this ack, - * we're done purging acknowledgements */ - } - } - - return 0; -} /* lumberjack_wait_for_ack */ - -static int lumberjack_retransmit_all(struct lumberjack *lumberjack) { - int rc; - /* New connection, anything in the ring buffer is assumed to be - * un-acknowledged. Send it. */ - for (int i = 0, count = ring_count(lumberjack->ring); i < count; i++) { - struct str *frame; - rc = ring_peek(lumberjack->ring, i, (void **)&frame); - insist(rc == RING_OK, "ring_peek(%d) failed unexpectedly: %d\n", i, rc); - rc = lumberjack_write(lumberjack, frame); - - if (rc != 0) { - /* write failure, fail. */ - flog(stdout, "write failure"); - return -1; - } - } /* for each item in the ring */ - - return 0; -} /* lumberjack_retransmit_all */ - -struct str* lumberjack_kv_pack(struct kv *kv_list, size_t kv_count) { - struct str *payload; - - /* I experimented with different values here. - * - * As this as input: - * char log[] = "Aug 3 17:01:05 sandwich ReportCrash[38216]: Removing excessive log: file://localhost/Users/jsissel/Library/Logs/DiagnosticReports/a.out_2012-08-01-164517_sandwich.crash"; - * char file[] = "/var/log/system.log"; - * char hostname[] = "sandwich"; - * struct kv map[] = { - * { "line", 4, log, strlen(log) }, - * { "file", 4, file, strlen(file) }, - * { "host", 4, hostname, strlen(hostname) } - * }; - * - * Looping doing this: - * p = _kv_pack(map, 3); - * str_free(p); - * - * Relative time spent (on 10,000,000 iterations): - * - 768 bytes - 1.65 - * - 1008 bytes - 1.65 - * - 1009 bytes - 1.24 - * - 1010 bytes - 1.24 - * - 1024 bytes - 1.24 - * - * Platform tested was OS X 10.7 with XCode's clang/cc - * % cc -O4 ... - * - * Given that, I pick 1024 (nice round number) for the initial string size - * for the payload. - */ - payload = str_new_size(1024); - - str_append_uint32(payload, kv_count); - for (size_t i = 0; i < kv_count; i++) { - str_append_uint32(payload, kv_list[i].key_len); - str_append(payload, kv_list[i].key, kv_list[i].key_len); - str_append_uint32(payload, kv_list[i].value_len); - str_append(payload, kv_list[i].value, kv_list[i].value_len); - } - - return payload; -} /* lumberjack_kv_pack */ - -int lumberjack_write_window_size(struct lumberjack *lumberjack) { - uint32_t size = lumberjack->ring_size; - char data[6]; - - flog(stdout, "Declaring window size of %u", size); - - data[0] = LUMBERJACK_VERSION_1; - data[1] = LUMBERJACK_WINDOW_SIZE_FRAME; - - /* network byte order! */ - size = htonl(size); - memcpy(data + 2, &size, sizeof(uint32_t)); - - struct str payload = { - .data_len = 6, - .data_size = 6, - .data = data - }; - - int rc; - rc = lumberjack_write(lumberjack, &payload); - if (rc != 0) { - /* write failure, fail. */ - flog(stdout, "write failure while writing the window size"); - return -1; - } - return 0; -} /* lumberjack_write_window_size */ - -int lumberjack_poll(struct lumberjack *lumberjack, time_t seconds, int flags) { - int ssl_fd = SSL_get_rfd(lumberjack->ssl); - fd_set read_fds, write_fds; - FD_ZERO(&read_fds); - FD_ZERO(&write_fds); - - if (flags & LUMBERJACK_POLL_READ) { - FD_SET(ssl_fd, &read_fds); - } - if (flags & LUMBERJACK_POLL_WRITE) { - FD_SET(ssl_fd, &write_fds); - } - - struct timeval timeout = { seconds, 0 }; - - int rc = select(ssl_fd + 1 /* 'max fd in the list' */, - &read_fds, &write_fds, NULL, &timeout); - if (rc == 0) { - /* timeout, fail the read */ - errno = ETIMEDOUT; - return 0; - } - - int retval = 0; - if (FD_ISSET(ssl_fd, &read_fds)) { - retval |= LUMBERJACK_POLL_READ; - } - if (FD_ISSET(ssl_fd, &write_fds)) { - retval |= LUMBERJACK_POLL_WRITE; - } - return retval; -} /* lumberjack_poll */ - -int lumberjack_ssl_write(const struct lumberjack *lumberjack, - const void *buffer, const int length, - const double timeout_seconds) { - // Attempt to write 'length' bytes from buffer - // If we can't write the full amount after 'timeout_seconds' passes, - // return ETIMEDOUT - int rc; - struct timeval start; - gettimeofday(&start, NULL); - double elapsed; - int offset = 0; - int remaining = length; - - struct backoff sleeper; - backoff_init(&sleeper, &MIN_SLEEP, &MAX_SLEEP); - - //flog(stdout, "Writing [%d] %.*s", remaining, remaining, buffer + offset); - while (remaining > 0) { - rc = SSL_write(lumberjack->ssl, buffer + offset, remaining); - - if (rc < 0) { - elapsed = duration(&start); - if (elapsed > timeout_seconds) { - flog(stdout, "SSL_write took too long (%.3f seconds), assuming " \ - "dead/busy server and disconnecting.", elapsed); - errno = ETIMEDOUT; - return -1; - } - - int ssl_error = SSL_get_error(lumberjack->ssl, rc); - if (ssl_error == SSL_ERROR_WANT_READ) { - /* TODO(sissel): instead of backing off, select for read, then retry - * the write. */ - backoff(&sleeper); - } else if (ssl_error == SSL_ERROR_WANT_WRITE) { - /* TODO(sissel): instead of backing off, select for write, then retry - * the write. */ - backoff(&sleeper); - } else { - // Some other ssl error - flog(stdout, "SSL_write failed: %d (ssl error: %d)", rc, ssl_error); - return rc; // - } - } else { - remaining -= rc; - offset += rc; - } - } /* while remaining > 0 */ - - /* If we get here, success! */ - return 0; -} /* lumberjack_ssl_write */ - -int lumberjack_ssl_read(const struct lumberjack *lumberjack, void *buffer, - const int length, const double timeout_seconds) { - // Attempt to read 'length' bytes from buffer - // If we can't read the full amount after 'timeout_seconds' passes, - // return ETIMEDOUT - int rc; - struct timeval start; - gettimeofday(&start, NULL); - double elapsed; - int offset = 0; - int remaining = length; - - struct backoff sleeper; - static struct timespec max_sleep = { 0, 250000000 }; /* 250ms */ - backoff_init(&sleeper, &MIN_SLEEP, &max_sleep); - - while (remaining > 0) { - rc = SSL_read(lumberjack->ssl, buffer + offset, remaining); - //flog(stdout, "SSL_read read %d ; returned %d", remaining, rc); - - if (rc > 0) { - // Healthy read. - remaining -= rc; - offset += rc; - } else if (rc == 0) { - errno = EPIPE; // close enough to calling EOF as an error code - return -1; - } else if (rc < 0) { - elapsed = duration(&start); - if (elapsed > timeout_seconds) { - flog(stdout, "SSL_read took too long (%.3f seconds), assuming " \ - "dead/busy server and disconnecting.", elapsed); - errno = ETIMEDOUT; - return -1; - } - - int ssl_error = SSL_get_error(lumberjack->ssl, rc); - if (ssl_error == SSL_ERROR_WANT_READ) { - /* TODO(sissel): instead of backing off, select for read, then retry - * the write. */ - backoff(&sleeper); - } else if (ssl_error == SSL_ERROR_WANT_WRITE) { - /* TODO(sissel): instead of backing off, select for write, then retry - * the write. */ - backoff(&sleeper); - } else { - // Some other ssl error - flog(stdout, "SSL_read failed: %d (ssl error: %d)", rc, ssl_error); - return rc; // - } - } - } /* while remaining > 0 */ - - /* If we get here, success! */ - return 0; -} /* lumberjack_ssl_read */ diff --git a/c/proto.h b/c/proto.h deleted file mode 100644 index 1522182..0000000 --- a/c/proto.h +++ /dev/null @@ -1,84 +0,0 @@ -#ifndef _PROTO_H_ -#define _PROTO_H_ -#include -#include -#include -#include -#include "openssl/ssl.h" -#include "ring.h" -#include "str.h" - -struct kv { - char *key; - size_t key_len; - char *value; - size_t value_len; -}; /* struct kv */ - -struct lumberjack { - const char *host; - unsigned short port; - - /* internal state you don't need to access normally */ - int connected; /* are we connected? */ - uint32_t sequence; /* the current data frame sequence number */ - int fd; /* the socket conection (used by ssl) */ - SSL *ssl; /* the ssl connection */ - SSL_CTX *ssl_context; /* ssl context */ - - size_t ring_size; /* the size of the ring */ - struct ring *ring; /* the ring buffer of things needing acknowledgement */ - - struct str *io_buffer; - struct str *compression_buffer; -}; - -#define LUMBERJACK_VERSION_1 '1' -#define LUMBERJACK_DATA_FRAME 'D' -#define LUMBERJACK_ACK_FRAME 'A' -#define LUMBERJACK_WINDOW_SIZE_FRAME 'W' -#define LUMBERJACK_COMPRESSED_BLOCK_FRAME 'C' - -/* Create a new lumberjack client. - * - * - host is a hostname or IP address. - * - port is the port to connect to. - * - window_size is how many events to send before waiting for an ack. - * - * If the hostname resolves to multiple addresses, one address is picked at - * random each time a connection is made. - */ -struct lumberjack *lumberjack_new(const char *host, unsigned short port, size_t window_size); - -/* Tell lumberjack about an SSL cert/ca it should trust - * - * - path is a string; can be a path to a file or directory. - */ -int lumberjack_set_ssl_ca(struct lumberjack *lumberjack, const char *path); - -/** PUBLIC API */ -/* Send a data frame with a given payload and length */ -int lumberjack_send_data(struct lumberjack *lumberjack, const char *payload, - size_t payload_len); - //void (*free_func)(void *payload, void *hint())); - -int lumberjack_flush(struct lumberjack *lumberjack); -/* TODO(sissel): permit inspection of currently-unacknowledged events? */ - -//int lumberjack_send_kv(struct *kv map); - -/* blocks until all messages in the ring have been acknowledged */ -void lumberjack_disconnect(struct lumberjack *lumberjack); -int lumberjack_ensure_connected(struct lumberjack *lumberjack); - -/* Pack a key-value list according to the lumberjack protocol */ -struct str *lumberjack_kv_pack(struct kv *kv_list, size_t kv_count); - -//struct str *lumberjack_encode_data(uint32_t sequence, const char *payload, size_t payload_len); -//int lumberjack_connect(struct lumberjack *lumberjack); -//int lumberjack_connected(struct lumberjack *lumberjack); -//void lumberjack_disconnect(struct lumberjack *lumberjack); -//int lumberjack_write(struct lumberjack *lumberjack, struct str *payload); -//int lumberjack_read_ack(struct lumberjack *lumberjack, uint32_t *sequence_ret); - -#endif /* _PROTO_H_ */ diff --git a/c/ring.c b/c/ring.c deleted file mode 100644 index 16b1577..0000000 --- a/c/ring.c +++ /dev/null @@ -1,68 +0,0 @@ -#include "ring.h" -#include "insist.h" -#include - -struct ring *ring_new_size(uint32_t size) { - insist((size & (size - 1)) == 0, - "size must be a power of two, %d is not.", size); - - struct ring *r = malloc(sizeof(struct ring)); - r->writer = 0; - r->reader = 0; - r->count = 0; - r->size = size; - r->buffer = malloc(r->size * sizeof(void *)); - return r; -} /* ring_new_size */ - - -inline int ring_is_empty(struct ring *ring) { - return ring->count == 0; -} /* ring_is_empty */ - -inline int ring_is_full(struct ring *ring) { - return ring->count == ring->size; -} /* ring_is_full */ - -inline int ring_peek(struct ring *ring, uint32_t i, void **object_ret) { - if (i >= ring->count) { - return RING_INDEX_OUT_OF_BOUNDS; - } - /* item 0 is the next one after the reader - * we mask with 'size - 1' as a way of wrapping the value since we enforce - * power-of-two-ness */ - *object_ret = ring->buffer[(ring->reader + i) & (ring->size - 1)]; - return RING_OK; -} /* ring_peek */ - -inline int ring_pop(struct ring *ring, void **object_ret) { - int rc; - if (object_ret != NULL) { - /* Only store it if object_ret is not NULL */ - rc = ring_peek(ring, 0, object_ret); - if (rc != RING_OK) { - return RING_IS_EMPTY; - } - } - - /* increment reader position and wrap write if necessary */ - ring->reader = (ring->reader + 1) & (ring->size - 1); - ring->count--; - return RING_OK; -} /* ring_pop */ - -inline int ring_push(struct ring *ring, void *object) { - if (ring_is_full(ring)) { - return RING_IS_FULL; - } - - ring->buffer[ring->writer] = object; - /* increment write position and wrap write if necessary */ - ring->writer = (ring->writer + 1) & (ring->size - 1); - ring->count++; - return RING_OK; -} /* ring_push */ - -inline uint32_t ring_count(struct ring *ring) { - return ring->count; -} /* ring count */ diff --git a/c/ring.h b/c/ring.h deleted file mode 100644 index be1c846..0000000 --- a/c/ring.h +++ /dev/null @@ -1,29 +0,0 @@ -#ifndef _RING_H_ -#define _RING_H_ -#include - -struct ring { - uint32_t writer; /* write position */ - uint32_t reader; /* read position */ - uint32_t size; /* maximum number of items */ - uint32_t count; /* current count of items */ - void **buffer; /* array of pointers to whatever objects we're storing */ -}; - -#define RING_OK 0x00 -#define RING_IS_EMPTY 0x01 -#define RING_IS_FULL 0x02 -#define RING_INDEX_OUT_OF_BOUNDS 0x03 - -struct ring *ring_new_size(uint32_t count); - -int ring_is_empty(struct ring *ring); -int ring_is_full(struct ring *ring); - -uint32_t ring_count(struct ring *ring); - -int ring_pop(struct ring *ring, void **object_ret); -int ring_peek(struct ring *ring, uint32_t index, void **object_ret); -int ring_push(struct ring *ring, void *object); - -#endif /* _RING_H_ */ diff --git a/c/sleepdefs.h b/c/sleepdefs.h deleted file mode 100644 index fb4e927..0000000 --- a/c/sleepdefs.h +++ /dev/null @@ -1,5 +0,0 @@ -#ifndef _SLEEPDEFS_H_ -#define _SLEEPDEFS_H_ -static struct timespec MIN_SLEEP = { 0, 10000000 }; /* 10ms */ -static struct timespec MAX_SLEEP = { 5, 0 }; -#endif /* _SLEEPDEFS_H_ */ diff --git a/c/str.c b/c/str.c deleted file mode 100644 index f29c9ba..0000000 --- a/c/str.c +++ /dev/null @@ -1,73 +0,0 @@ -#include /* for htonl */ -#include /* for memmove, etc */ -#include /* for calloc, realloc, etc */ -#include "str.h" -#include - -inline struct str *str_new_size(size_t size) { - struct str *str; - str = malloc(sizeof(struct str)); - str->data_size = size; - str->data_len = 0; - str->data = malloc(str->data_size * sizeof(char)); - /* We could save ourselves a malloc call by storing the str struct and its - * data in the same allocation. Needs benchmarking. */ - // Example: str->data = (char *)(str + sizeof(struct str)); - return str; -} /* str */ - -inline struct str *str_new(void) { - return str_new_size(20); /* default small size */ -} /* str */ - -inline void str_free(struct str *str) { - free(str->data); - free(str); -} /* str */ - -inline void str_grow(struct str *str) { - str->data_size <<= 1; /* double the data size */ - str->data = realloc(str->data, str->data_size); -} /* str */ - -inline size_t str_length(struct str *str) { - return str->data_len; -} /* str_length */ - -inline size_t str_size(struct str *str) { - return str->data_size; -} /* str_size */ - -inline char *str_data(struct str *str) { - return str->data; -} /* str_data */ - -inline void str_append(struct str *str, const char *data, size_t length) { - /* Grow the string if the new length will be longer than the current - * allocation */ - while (str->data_size < (str->data_len + length)) { - str_grow(str); - } - - memmove(str->data + str->data_len, data, length); - str->data_len += length; -} /* str_append */ - -/* Append an unsigned 32bit unsigned integer to the str written - * in network byte order */ -inline void str_append_uint32(struct str *str, uint32_t value) { - value = htonl(value); /* use network byte ordering */ - str_append(str, (char *)&value, sizeof(value)); -} /* str_append_uint32 */ - -inline void str_append_char(struct str *str, char value) { - str_append(str, &value, sizeof(value)); -} /* str_append_char */ - -inline void str_truncate(struct str *str) { - str->data_len = 0; -} /* str_zero */ - -inline void str_append_str(struct str *dst, struct str *src) { - str_append(dst, str_data(src), str_length(src)); -} /* str_append_str */ diff --git a/c/str.h b/c/str.h deleted file mode 100644 index d021802..0000000 --- a/c/str.h +++ /dev/null @@ -1,31 +0,0 @@ -#ifndef _STR_H_ -#define _STR_H_ -#include -#include - -struct str { - size_t data_len; - size_t data_size; - char *data; -}; /* struct str */ - -/* Make a new str. */ -struct str *str_new(void); -struct str *str_new_size(size_t size); - -/* Free a str */ -void str_free(struct str *str); - -/* grow a string; doubles the storage size */ -void str_grow(struct str *str); - -size_t str_length(struct str *str); -size_t str_size(struct str *str); -char *str_data(struct str *str); -void str_append(struct str *str, const char *data, size_t length); -void str_append_str(struct str *dst_str, struct str *src_str); -void str_append_uint32(struct str *str, uint32_t value); -void str_append_char(struct str *str, char value); -void str_truncate(struct str *str); - -#endif /* _STR_H_ */ diff --git a/c/strlist.c b/c/strlist.c deleted file mode 100644 index e0a84ab..0000000 --- a/c/strlist.c +++ /dev/null @@ -1,53 +0,0 @@ -#include "strlist.h" -#define _BSD_SOURCE /* for stddup in glibc */ -#include -#include - -strlist_t* strlist_new() { - strlist_t *list; - list = malloc(sizeof(strlist_t)); - - list->max_items = 10; - list->nitems = 0; - list->items = malloc(list->max_items * sizeof(char*)); - - return list; -} - -void strlist_free(strlist_t *list) { - int i; - for (i = 0; i < list->nitems; i++) - free(list->items[i]); - free(list->items); - free(list); -} - -void strlist_append(strlist_t *list, const char *str) { - list->items[list->nitems] = strdup(str); - - list->nitems++; - if (list->nitems == list->max_items) { - list->max_items *= 2; - list->items = realloc(list->items, list->max_items * sizeof(char *)); - } -} - -void split(strlist_t **tokens, const char *buf, const char *sep) { - char *strptr = NULL; - char *tokctx; - char *dupbuf = NULL; - char *tok; - - dupbuf = strdup(buf); - strptr = dupbuf; - - *tokens = strlist_new(); - - //printf("Split: '%s' on '%s'\n", buf, sep); - while ((tok = strtok_r(strptr, sep, &tokctx)) != NULL) { - strptr = NULL; - strlist_append(*tokens, tok); - } - free(dupbuf); -} - diff --git a/c/strlist.h b/c/strlist.h deleted file mode 100644 index fc76b4f..0000000 --- a/c/strlist.h +++ /dev/null @@ -1,15 +0,0 @@ -#ifndef _STRLIST_H_ -#define _STRLIST_H_ -typedef struct strlist { - char **items; - int nitems; - int max_items; -} strlist_t; - -strlist_t* strlist_new(); -void strlist_free(strlist_t *list); -void strlist_append(strlist_t *list, const char *str); - -void split(strlist_t **tokens, const char *buf, const char *sep); - -#endif /* _STRLIST_H_ */ diff --git a/c/test_ring.c b/c/test_ring.c deleted file mode 100644 index b03eeaf..0000000 --- a/c/test_ring.c +++ /dev/null @@ -1,41 +0,0 @@ -#include "ring.h" -#include -#include "insist.h" - -int main(void) { - struct ring *ring; - ring = ring_new_size(4); - - char *val; - insist(ring_is_empty(ring), "A new ring must be empty"); - insist(ring_push(ring, "Hello world 1") == RING_OK, "Pushing 1 into a 4-slot ring must be OK"); - ring_peek(ring, 0, (void **)&val); - insist(strcmp(val, "Hello world 1") == 0, "ring_peek(0) failed"); - insist(ring_push(ring, "Hello world 2") == RING_OK, "Pushing 2 into a 4-slot ring must be OK"); - ring_peek(ring, 1, (void **)&val); - insist(strcmp(val, "Hello world 2") == 0, "ring_peek(1) failed"); - insist(ring_push(ring, "Hello world 3") == RING_OK, "Pushing 3 into a 4-slot ring must be OK"); - ring_peek(ring, 2, (void **)&val); - insist(strcmp(val, "Hello world 3") == 0, "ring_peek(2) failed"); - insist(ring_push(ring, "Hello world 4") == RING_OK, "Pushing 4 into a 4-slot ring must be OK"); - ring_peek(ring, 3, (void **)&val); - insist(strcmp(val, "Hello world 4") == 0, "ring_peek(3) failed"); - insist(ring_push(ring, "Hello world 5") == RING_IS_FULL, "Pushing 5 into a 4-slot ring must fail "); - insist(ring_is_full(ring), "The ring must be full at this point"); - insist(!ring_is_empty(ring), "Ring must not be empty at this point"); - - insist(ring_pop(ring, (void **)&val) == RING_OK, "Popping from a full ring must succeed"); - insist(strcmp(val, "Hello world 1") == 0, "Got the wrong string?"); - insist(ring_pop(ring, (void **)&val) == RING_OK, "Popping on a non-empty ring must succeed"); - insist(strcmp(val, "Hello world 2") == 0, "Got the wrong string?"); - insist(ring_pop(ring, (void **)&val) == RING_OK, "Popping on a non-empty ring must succeed"); - insist(strcmp(val, "Hello world 3") == 0, "Got the wrong string?"); - insist(ring_pop(ring, (void **)&val) == RING_OK, "Popping on a non-empty ring must succeed"); - insist(strcmp(val, "Hello world 4") == 0, "Got the wrong string?"); - insist(ring_pop(ring, (void **)&val) == RING_IS_EMPTY, "Pop on an empty ring must fail"); - insist(ring_is_empty(ring), "Ring must be empty at this point"); - insist(!ring_is_full(ring), "Ring must not be full at this point"); - - printf("%s OK\n", __FILE__); - return 0; -} /* main */ diff --git a/c/timespec.c b/c/timespec.c deleted file mode 100644 index 5074b0e..0000000 --- a/c/timespec.c +++ /dev/null @@ -1,52 +0,0 @@ -#include "timespec.h" -const struct timespec TIME_ZERO = { 0, 0 }; - -inline void timespec_double(struct timespec *t) { - /* Exponential backoff */ - t->tv_sec <<= 1; - t->tv_nsec <<= 1; - if (t->tv_nsec > MAX_TV_NSEC) { - /* handle carry/overflow of tv_nsec */ - t->tv_nsec -= (MAX_TV_NSEC + 1); - t->tv_sec += 1; - } -} /* timespec_double */ - -inline long timespec_compare(struct timespec *a, struct timespec *b) { - time_t val; - val = a->tv_sec - b->tv_sec; - if (val != 0) { - return val; - } - return a->tv_nsec - b->tv_nsec; -} /* timespec_compare */ - -inline void timespec_copy(struct timespec *source, struct timespec *dest) { - /* TODO(sissel): Could use memcpy here instead... */ - dest->tv_sec = source->tv_sec; - dest->tv_nsec = source->tv_nsec; -} /* timespec_copy */ - -inline void timespec_subtract(struct timespec *a, struct timespec *b, - struct timespec *result) { - result->tv_nsec = a->tv_nsec - b->tv_nsec; - result->tv_sec = a->tv_sec - b->tv_sec; - - if (result->tv_nsec < 0) { - /* Handle carry */ - result->tv_nsec += 1000000000L; - result->tv_sec -= 1; - } -} - -inline void timespec_add(struct timespec *a, struct timespec *b, - struct timespec *result) { - result->tv_nsec = a->tv_nsec + b->tv_nsec; - result->tv_sec = a->tv_sec + b->tv_sec; - - if (result->tv_nsec > MAX_TV_NSEC) { - /* Handle carry */ - result->tv_nsec -= 1000000000L; - result->tv_sec += 1; - } -} diff --git a/c/timespec.h b/c/timespec.h deleted file mode 100644 index fc79617..0000000 --- a/c/timespec.h +++ /dev/null @@ -1,13 +0,0 @@ -#ifndef _TIMESPEC_H_ -#define _TIMESPEC_H_ -#include -#define MAX_TV_NSEC 999999999L -void timespec_double(struct timespec *t); -long timespec_compare(struct timespec *a, struct timespec *b); -void timespec_copy(struct timespec *source, struct timespec *dest); -void timespec_subtract(struct timespec *a, struct timespec *b, - struct timespec *result); -void timespec_add(struct timespec *a, struct timespec *b, - struct timespec *result); - -#endif /* _TIMESPEC_H_ */ diff --git a/c/unixsock.c b/c/unixsock.c deleted file mode 100644 index 0ff321c..0000000 --- a/c/unixsock.c +++ /dev/null @@ -1,29 +0,0 @@ -#include /* for socket(2) */ -#include /* for struct sockaddr_un */ -#include /* for errno */ -#include /* for strerror(3) */ -#include /* github/jordansissel/insist */ -#include /* for unlink(2) */ - -int main() { - int r; - int sock; - sock = socket(PF_LOCAL, SOCK_DGRAM, 0); - insist(sock != -1, "socket() failed: %s", strerror(errno)); - - struct sockaddr_un addr; - addr.sun_family = PF_LOCAL; - strcpy(addr.sun_path, "/tmp/fancylog"); - //unlink(addr.sun_path); - r = bind(sock, (struct sockaddr *)&addr, - sizeof(addr.sun_family) + strlen(addr.sun_path) + 1); - insist(r == 0, "bind(%s) failed: %s", addr.sun_path, strerror(errno)); - - char buffer[65536]; - for (;;) { - ssize_t bytes; - bytes = recvfrom(sock, buffer, 65536, 0, NULL, NULL); - insist(bytes > 0, "recvfrom() returned %d: %s", (int)bytes, strerror(errno)); - printf("Received: %.*s\n", (int)bytes, buffer); - } -} /* main */ diff --git a/c/zmq_compat.h b/c/zmq_compat.h deleted file mode 100644 index 093d7e3..0000000 --- a/c/zmq_compat.h +++ /dev/null @@ -1,18 +0,0 @@ -#ifndef _ZMQ_COMPAT_H_ -#define _ZMQ_COMPAT_H_ - -# if ZMQ_VERSION_MAJOR == 2 /* zeromq 2 */ -# define zmq_compat_set_sendhwm(socket, hwm) zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm)) -# define zmq_compat_set_recvhwm(socket, hwm) zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm)) -# define zmq_compat_recvmsg(socket, message, flags) zmq_recv(socket, message, flags) -# define zmq_compat_sendmsg(socket, message, flags) zmq_send(socket, message, flags) -# elif ZMQ_VERSION_MAJOR == 3 /* zeromq 3 */ -# define zmq_compat_set_sendhwm(socket, hwm) zmq_setsockopt(socket, ZMQ_SNDHWM, &hwm, sizeof(hwm)) -# define zmq_compat_set_recvhwm(socket, hwm) zmq_setsockopt(socket, ZMQ_RCVHWM, &hwm, sizeof(hwm)) -# define zmq_compat_recvmsg(socket, message, flags) zmq_recvmsg(socket, message, flags) -# define zmq_compat_sendmsg(socket, message, flags) zmq_sendmsg(socket, message, flags) -# else -# error "Unsupported zeromq version " ## ZMQ_VERSION_MAJOR -# endif - -#endif /* _ZMQ_COMPAT_H_ */ -- GitLab