emitter.c 4.46 KB
Newer Older
1
#include <errno.h>
2
#include <stdint.h> /* C99 for int64_t */
3
#include <string.h>
Jordan Sissel's avatar
Jordan Sissel committed
4
#include <unistd.h>
5
#include "zmq.h"
6 7 8 9 10 11
#include "ring.h"
#include "emitter.h"
#include "insist.h"
#include "proto.h"
#include "backoff.h"
#include "clock_gettime.h"
12
#include "flog.h"
13

14 15
#include <sys/resource.h>

16
#include "zmq_compat.h"
17
#include "sleepdefs.h"
18

19 20
void *emitter(void *arg) {
  int rc;
Jordan Sissel's avatar
Jordan Sissel committed
21 22 23 24 25 26 27 28
  struct emitter_config *config = arg;
  insist(config->zmq != NULL, "zmq is NULL");
  insist(config->zmq_endpoint != NULL, "zmq_endpoint is NULL");
  insist(config->ssl_ca_path != NULL, "ssl_ca_path is NULL");
  insist(config->window_size > 0, "window_size (%d) must be positive",
         (int)config->window_size);
  insist(config->host != NULL, "host is NULL");
  insist(config->port > 0, "port (%hd) must be positive", config->port);
29 30 31

  void *socket = zmq_socket(config->zmq, ZMQ_PULL);
  insist(socket != NULL, "zmq_socket() failed: %s", strerror(errno));
Jordan Sissel's avatar
Jordan Sissel committed
32
  int64_t hwm = 100;
33 34
  //zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm));
  zmq_compat_set_recvhwm(socket, hwm);
35 36 37 38
  rc = zmq_bind(socket, config->zmq_endpoint);
  insist(rc != -1, "zmq_bind(%s) failed: %s", config->zmq_endpoint,
         zmq_strerror(errno));

Jordan Sissel's avatar
Jordan Sissel committed
39 40
  struct timespec start;
  clock_gettime(CLOCK_MONOTONIC, &start);
41 42 43 44 45 46
  //long count = 0;

  struct backoff sleeper;
  backoff_init(&sleeper, &MIN_SLEEP, &MAX_SLEEP);

  struct lumberjack *lumberjack;
Jordan Sissel's avatar
Jordan Sissel committed
47
  lumberjack = lumberjack_new(config->host, config->port, config->window_size);
48
  insist(lumberjack != NULL, "lumberjack_new failed");
Jordan Sissel's avatar
Jordan Sissel committed
49
  lumberjack->ring_size = config->window_size;
Jordan Sissel's avatar
Jordan Sissel committed
50

Jordan Sissel's avatar
Jordan Sissel committed
51 52
  if (config->ssl_ca_path != NULL) {
    rc = lumberjack_set_ssl_ca(lumberjack, config->ssl_ca_path);
Jordan Sissel's avatar
Jordan Sissel committed
53 54
    insist(rc == 0, "lumberjack_set_ssl_ca failed, is '%s' a valid ssl cert?",
           config->ssl_ca_path);
Jordan Sissel's avatar
Jordan Sissel committed
55 56
  }

57 58 59
  unsigned long count = 0;
  unsigned long bytes = 0;
  unsigned long report_interval = config->window_size * 4;
60 61 62 63 64 65

  zmq_pollitem_t items[1];

  items[0].socket = socket;
  items[0].events = ZMQ_POLLIN;

66
  int can_flush = 0;
Jordan Sissel's avatar
Jordan Sissel committed
67
  for (;;) {
68 69 70 71 72
    /* Receive an event from a harvester and put it in the queue */
    zmq_msg_t message;

    rc = zmq_msg_init(&message);
    insist(rc == 0, "zmq_msg_init failed");
73
    rc = zmq_poll(items, 1, 1000000 /* microseconds */);
74

75 76
    if (rc == 0) {
      /* poll timeout. We're idle, so let's flush and back-off. */
77 78 79 80 81 82 83 84 85 86 87
      if (can_flush) {
        /* only flush if there's something to flush... */
        flog(stdout, "flushing since nothing came in over zmq");
        /* We flush here to keep slow feeders closer to real-time */
        rc = lumberjack_flush(lumberjack);
        can_flush = 0;
        if (rc != 0) {
          /* write failure, reconnect (which will resend) and such */
          lumberjack_disconnect(lumberjack);
          lumberjack_ensure_connected(lumberjack);
        }
88 89
      }
      backoff(&sleeper);
90 91

      /* Restart the loop - checking to see if there's any messages */
92 93
      continue;
    } 
Jordan Sissel's avatar
Jordan Sissel committed
94

95
    /* poll successful, read a message */
96 97
    //rc = zmq_recv(socket, &message, 0);
    rc = zmq_compat_recvmsg(socket, &message, 0);
98 99 100 101 102
    insist(rc == 0 /*|| errno == EAGAIN */,
           "zmq_recv(%s) failed (returned %d): %s",
           config->zmq_endpoint, rc, zmq_strerror(errno));

    /* Clear the backoff timer since we received a message successfully */
103
    backoff_clear(&sleeper);
104 105 106 107 108

    /* Write the data over lumberjack. This will handle any
     * connection/reconnection/ack issues */
    lumberjack_send_data(lumberjack, zmq_msg_data(&message),
                         zmq_msg_size(&message));
109 110
    /* Since we sent data, let it be known that we can flush if idle */
    can_flush = 1;
111 112
    /* Stats for debugging */
    count++;
Jordan Sissel's avatar
Jordan Sissel committed
113 114 115 116
    bytes += zmq_msg_size(&message);

    zmq_msg_close(&message);

117
    if (count == report_interval) {
118 119 120 121
      struct timespec now;
      clock_gettime(CLOCK_MONOTONIC, &now);
      double s = (start.tv_sec + 0.0) + (start.tv_nsec / 1000000000.0);
      double n = (now.tv_sec + 0.0) + (now.tv_nsec / 1000000000.0);
122
      flog(stdout, "Rate: %f (bytes: %f)", (count + 0.0) / (n - s), (bytes + 0.0) / (n - s));
123 124
      struct rusage rusage;
      rc = getrusage(RUSAGE_SELF, &rusage);
125 126
      insist(rc == 0, "getrusage failed: %s", strerror(errno));
      flog(stdout, "cpu user/system: %d.%06d / %d.%06dn",
Jordan Sissel's avatar
Jordan Sissel committed
127 128
           (int)rusage.ru_utime.tv_sec, (int)rusage.ru_utime.tv_usec,
           (int)rusage.ru_stime.tv_sec, (int)rusage.ru_stime.tv_usec);
129
      clock_gettime(CLOCK_MONOTONIC, &start);
Jordan Sissel's avatar
Jordan Sissel committed
130
      bytes = 0;
131
      count = 0;
132 133 134 135
    }
  } /* forever */
} /* emitter */