harvester.c 6.28 KB
Newer Older
1 2
#define _BSD_SOURCE
#include <string.h> /* for strsep, strerror, etc */
3 4 5
#include <errno.h> /* for errno */
#include <fcntl.h> /* for open(2) */
#include <unistd.h> /* for close, etc */
6
#include <arpa/inet.h> /* for ntohl */
7
#include <stdio.h> /* printf and friends */
8
#include "zmq.h" /* zeromq messaging library */
9 10
#include "str.h" /* dynamic string library */
#include "proto.h" /* lumberjack wire format serialization */
Jordan Sissel's avatar
Jordan Sissel committed
11
#include <sys/stat.h>
12
#include "jemalloc/jemalloc.h"
13

14 15
#include "harvester.h"
#include "backoff.h"
16
#include "insist.h"
17
#include "sleepdefs.h"
Jordan Sissel's avatar
Jordan Sissel committed
18
#include "flog.h"
19
#include "zmq_compat.h"
20

21
#ifdef __MACH__
22 23 24
/* OS X is dumb, or I am dumb, or we are both dumb. I don't know anymore,
 * but I need to declare these explicitly even though they are defined
 * in string.h, unistd.h respectively */
25 26 27 28
extern char *strsep(char **stringp, const char *delim);
extern int gethostname(char *name, size_t namelen);
#endif

29 30 31
#define EMITTER_SOCKET "inproc://emitter"
#define BUFFERSIZE 16384

32
/* A free function that simply calls free(3) for zmq_msg */
33 34 35 36 37 38 39 40
//static inline void free2(void *data, void __attribute__((__unused__)) *hint) {
  //free(data);
//} /* free2 */

/* A free function for zmq_msg's with 'struct str' objects */
static inline void my_str_free(void __attribute__((__unused__)) *data, void *hint) {
  str_free((struct str *)hint);
} /* my_str_free */
41

Jordan Sissel's avatar
Jordan Sissel committed
42 43
static void track_rotation(int *fd, const char *path);

44
void *harvest(void *arg) {
45
  struct harvest_config *config = arg;
46
  int fd;
47
  int rc;
48 49
  char hostname[200];
  size_t hostname_len, path_len;
Jordan Sissel's avatar
Jordan Sissel committed
50 51 52
  
  /* Make this so we only call it once. */
  gethostname(hostname, sizeof(hostname));
53
  hostname_len = strlen(hostname);
54

Jordan Sissel's avatar
Jordan Sissel committed
55 56 57 58 59
  if (strcmp(config->path, "-") == 0) {
    /* path is '-', use stdin */
    fd = 0;
  } else {
    fd = open(config->path, O_RDONLY);
60
    insist(fd >= 0, "open(%s) failed: %s", config->path, strerror(errno));
Jordan Sissel's avatar
Jordan Sissel committed
61
    /* Start at the end of the file */
62 63 64
    off_t seek_ret = lseek(fd, 0, SEEK_END);
    insist(seek_ret >= 0, "lseek(%s, 0, SEEK_END) failed: %s",
           config->path, strerror(errno));
Jordan Sissel's avatar
Jordan Sissel committed
65
  }
66 67
  path_len = strlen(config->path);

68 69 70 71 72 73 74 75 76 77 78 79
  struct kv *event = calloc(3 + config->fields_len, sizeof(struct kv));

  /* will fill the 'line' value in later for each line read */
  event[0].key = "line"; event[0].key_len = 4;
  event[0].value = NULL; event[0].value_len = 0;
  event[1].key = "file"; event[1].key_len = 4;
  event[1].value = config->path; event[1].value_len = path_len;
  event[2].key = "host"; event[2].key_len = 4;
  event[2].value = hostname; event[2].value_len = hostname_len;
  for (size_t i = 0; i < config->fields_len; i++) {
    memcpy(&event[i + 3], &config->fields[i], sizeof(struct kv));
  }
80

81 82
  char *buf;
  ssize_t bytes;
83
  buf = calloc(BUFFERSIZE, sizeof(char));
84

85 86 87
  void *socket = zmq_socket(config->zmq, ZMQ_PUSH);
  insist(socket != NULL, "zmq_socket() failed: %s", strerror(errno));

Jordan Sissel's avatar
Jordan Sissel committed
88
  int64_t hwm = 100;
89 90
  //zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm));
  zmq_compat_set_sendhwm(socket, hwm);
91

Jordan Sissel's avatar
Jordan Sissel committed
92
  /* Wait for the zmq endpoint to be up (wait for connect to succeed) */
Jordan Sissel's avatar
Jordan Sissel committed
93 94
  struct backoff sleeper;
  backoff_init(&sleeper, &MIN_SLEEP, &MAX_SLEEP);
95
  for (;;) {
96 97 98 99
    rc = zmq_connect(socket, config->zmq_endpoint);
    if (rc != 0 && errno == ECONNREFUSED) {
      backoff(&sleeper);
      continue; /* retry */
Jordan Sissel's avatar
Jordan Sissel committed
100
    }
101 102 103 104 105 106 107
    insist(rc == 0, "zmq_connect(%s) failed: %s", config->zmq_endpoint,
           zmq_strerror(errno));
    break;
  }

  int offset = 0;
  for (;;) {
Jordan Sissel's avatar
Jordan Sissel committed
108 109
    flog_if_slow(stdout, 0.250, {
      bytes = read(fd, buf + offset, BUFFERSIZE - offset - 1);
110 111
    }, "read of %d bytes (got %d bytes) on '%s'",
    BUFFERSIZE - offset - 1, bytes, config->path);
Jordan Sissel's avatar
Jordan Sissel committed
112

113
    offset += bytes;
114
    if (bytes < 0) {
115
      /* error, maybe indicate a failure of some kind. */
Jordan Sissel's avatar
Jordan Sissel committed
116 117
      printf("read(%d '%s') failed: %s\n", fd,
             config->path, strerror(errno));
118 119 120
      break;
    } else if (bytes == 0) {
      backoff(&sleeper);
Jordan Sissel's avatar
Jordan Sissel committed
121 122 123 124
      if (strcmp(config->path, "-") == 0) {
        /* stdin gave EOF, close out. */
        break;
      }
Jordan Sissel's avatar
Jordan Sissel committed
125
      track_rotation(&fd, config->path);
126
    } else {
Jordan Sissel's avatar
Jordan Sissel committed
127
      /* Data read, handle it! */
128
      backoff_clear(&sleeper);
Jordan Sissel's avatar
Jordan Sissel committed
129 130 131 132 133 134 135
      /* For each line, emit. Save the remainder */
      char *line;
      char *septok = buf;
      char *start = NULL;
      while (start = septok, (line = strsep(&septok, "\n")) != NULL) {
        if (septok == NULL) {
          /* last token found, no terminator though */
136 137
          offset = offset - (line - buf);
          memmove(buf, line, strlen(line));
Jordan Sissel's avatar
Jordan Sissel committed
138 139
        } else {
          /* emit line as an event */
Jordan Sissel's avatar
Jordan Sissel committed
140 141
          /* 'septok' points at the start of the next token, so subtract one. */
          size_t line_len = septok - start - 1;
142
          struct str *serialized;
143

144
          /* Set the line */
145 146
          event[0].value = line;
          event[0].value_len = line_len;
147

148
          /* pack using lumberjack data payload */
149
          serialized = lumberjack_kv_pack(event, 3 + config->fields_len);
150 151

          zmq_msg_t event;
Jordan Sissel's avatar
Jordan Sissel committed
152 153
          zmq_msg_init_data(&event, str_data(serialized), str_length(serialized),
                            my_str_free, serialized);
Jordan Sissel's avatar
Jordan Sissel committed
154
          flog_if_slow(stdout, 0.250, {
155 156
            //rc = zmq_send(socket, &event, 0);
            rc = zmq_compat_sendmsg(socket, &event, 0);
Jordan Sissel's avatar
Jordan Sissel committed
157
          }, "zmq_send (harvesting file '%s')", config->path);
158 159
          insist(rc == 0, "zmq_send(event) failed: %s", zmq_strerror(rc));
          zmq_msg_close(&event);
Jordan Sissel's avatar
Jordan Sissel committed
160
        }
161
      } /* for each token */
162
    }
163 164 165
  } /* loop forever, reading from a file */

  free(arg); /* allocated by the main method, up to us to free */
166 167 168 169 170
  close(fd);

  return NULL;
} /* harvest */

Jordan Sissel's avatar
Jordan Sissel committed
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
void track_rotation(int *fd, const char *path) {
  struct stat pathstat, fdstat;
  int rc;
  fstat(*fd, &fdstat);
  rc = stat(path, &pathstat);
  if (rc == -1) {
    /* error stat'ing the file path, restart loop and try again */
    return;
  }

  if (pathstat.st_dev != fdstat.st_dev || pathstat.st_ino != fdstat.st_ino) {
    /* device or inode number changed, this file was renamed or rotated. */
    rc = open(path, O_RDONLY);
    if (rc == -1) {
      /* Error opening file, restart loop and try again. */
      return;
    }
    close(*fd);
    /* start reading the new file! */
    *fd = rc; 
  } else if (pathstat.st_size < fdstat.st_size) {
    /* the file was truncated, jump back to the beginning */
    lseek(*fd, 0, SEEK_SET);
  }
} /* track_rotation */