Commit d06a122f authored by Jordan Sissel's avatar Jordan Sissel

- hwm 100

- some style fixings
- Display rate for debugging
parent 4b9a1fea
#define _BSD_SOURCE
#include "emitter.h"
#include <time.h>
#include <zmq.h>
#include "insist.h"
#include <errno.h>
......@@ -11,16 +12,27 @@ void *emitter(void *arg) {
void *socket = zmq_socket(config->zmq, ZMQ_PULL);
insist(socket != NULL, "zmq_socket() failed: %s", strerror(errno));
int64_t hwm = 1;
int64_t hwm = 100;
zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm));
rc = zmq_bind(socket, config->zmq_endpoint);
insist(rc != -1, "zmq_bind(%s) failed: %s", config->zmq_endpoint,
zmq_strerror(errno));
for (;;) {
struct timespec start;
clock_gettime(CLOCK_MONOTONIC, &start);
for (int count = 0; ;count++) {
zmq_msg_t message;
rc = zmq_msg_init(&message);
insist(rc == 0, "zmq_msg_init failed");
if (count == 1000000) {
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
double s = (start.tv_sec + 0.0) + (start.tv_nsec / 1000000000.0);
double n = (now.tv_sec + 0.0) + (now.tv_nsec / 1000000000.0);
printf("Rate: %f\n", (count + 0.0) / (n - s));
clock_gettime(CLOCK_MONOTONIC, &start);
count = 0;
}
rc = zmq_recv(socket, &message, 0);
insist(rc == 0, "zmq_recv(%s) failed (returned %d): %s",
config->zmq_endpoint, rc, zmq_strerror(errno));
......
......@@ -45,7 +45,7 @@ void *harvest(void *arg) {
void *socket = zmq_socket(config->zmq, ZMQ_PUSH);
insist(socket != NULL, "zmq_socket() failed: %s", strerror(errno));
int64_t hwm = 1;
int64_t hwm = 100;
zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm));
json_t *event = json_object();
......@@ -102,14 +102,13 @@ void *harvest(void *arg) {
serialized = json_dumps(event, 0);
/* Purge the 'line' from the event object so it'll be freed */
json_object_del(event, "line");
json_decref(line_obj);
zmq_msg_init_data(&message, serialized, strlen(serialized), free2, NULL);
/* if I uncomment this, memory grows unbouned. */
//zmq_msg_init_data(&message, line, septok - start, NULL, NULL);
rc = zmq_send(socket, &message, 0);
insist(rc == 0, "zmq_send() failed: %s", zmq_strerror(rc));
json_object_del(event, "line");
json_decref(line_obj);
zmq_msg_close(&message);
}
} /* for each token */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment