harvester.go 4.54 KB
Newer Older
1
package main
2 3

import (
4
  "bufio"
5 6
  "bytes"
  "io"
7 8
  "log"
  "os" // for File and friends
9
  "time"
10 11 12
)

type Harvester struct {
13
  Path   string /* the file path to harvest */
14
  Fields map[string]string
15
  Offset int64
16

17
  file *os.File /* the file being watched */
18 19 20
}

func (h *Harvester) Harvest(output chan *FileEvent) {
21 22 23 24 25
  if h.Offset > 0 {
    log.Printf("Starting harvester at position %d: %s\n", h.Offset, h.Path)
  } else {
    log.Printf("Starting harvester: %s\n", h.Path)
  }
26

27 28 29
  h.open()
  info, _ := h.file.Stat() // TODO(sissel): Check error
  defer h.file.Close()
Jordan Sissel's avatar
Jordan Sissel committed
30
  //info, _ := file.Stat()
31

32 33 34
  var line uint64 = 0 // Ask registrar about the line number

  // get current offset in file
35
  offset, _ := h.file.Seek(0, os.SEEK_CUR)
36

37 38
  log.Printf("Current file offset: %d\n", offset)

39
  // TODO(sissel): Make the buffer size tunable at start-time
40
  reader := bufio.NewReaderSize(h.file, 16<<10) // 16kb buffer by default
Thomas Zahari's avatar
Thomas Zahari committed
41
  buffer := new(bytes.Buffer)
42

43
  var read_timeout = 10 * time.Second
Jordan Sissel's avatar
Jordan Sissel committed
44
  last_read_time := time.Now()
45
  for {
Thomas Zahari's avatar
Thomas Zahari committed
46
    text, bytesread, err := h.readline(reader, buffer, read_timeout)
47 48

    if err != nil {
49 50
      if err == io.EOF {
        // timed out waiting for data, got eof.
Jordan Sissel's avatar
Jordan Sissel committed
51 52 53 54 55 56 57 58 59 60
        // Check to see if the file was truncated
        info, _ := h.file.Stat()
        if info.Size() < offset {
          log.Printf("File truncated, seeking to beginning: %s\n", h.Path)
          h.file.Seek(0, os.SEEK_SET)
          offset = 0
        } else if age := time.Since(last_read_time); age > (24 * time.Hour) {
          // if last_read_time was more than 24 hours ago, this file is probably
          // dead. Stop watching it.
          // TODO(sissel): Make this time configurable
61
          // This file is idle for more than 24 hours. Give up and stop harvesting.
Jordan Sissel's avatar
Jordan Sissel committed
62
          log.Printf("Stopping harvest of %s; last change was %d seconds ago\n", h.Path, age.Seconds())
63 64 65 66
          return
        }
        continue
      } else {
Jordan Sissel's avatar
Jordan Sissel committed
67
        log.Printf("Unexpected state reading from %s; error: %s\n", h.Path, err)
68 69
        return
      }
70
    }
71
    last_read_time = time.Now()
72 73 74

    line++
    event := &FileEvent{
75 76 77 78 79
      Source:   &h.Path,
      Offset:   offset,
      Line:     line,
      Text:     text,
      Fields:   &h.Fields,
80
      fileinfo: &info,
81
    }
82
    offset += int64(bytesread)
83

84
    output <- event // ship the new event downstream
85 86 87 88
  } /* forever */
}

func (h *Harvester) open() *os.File {
89 90
  // Special handling that "-" means to read from standard input
  if h.Path == "-" {
91 92
    h.file = os.Stdin
    return h.file
93
  }
94

95 96
  for {
    var err error
97
    h.file, err = os.Open(h.Path)
98 99 100

    if err != nil {
      // retry on failure.
Jordan Sissel's avatar
Jordan Sissel committed
101
      log.Printf("Failed opening %s: %s\n", h.Path, err)
102 103 104
      time.Sleep(5 * time.Second)
    } else {
      break
105 106
    }
  }
107

108
  // TODO(sissel): Only seek if the file is a file, not a pipe or socket.
109
  if *from_beginning {
110
    h.file.Seek(0, os.SEEK_SET)
111 112
  } else  if h.Offset > 0 {
    h.file.Seek(h.Offset, os.SEEK_SET)
113
  } else {
114
    h.file.Seek(0, os.SEEK_END)
115
  }
116

117
  return h.file
118 119
}

Thomas Zahari's avatar
Thomas Zahari committed
120
func (h *Harvester) readline(reader *bufio.Reader, buffer *bytes.Buffer, eof_timeout time.Duration) (*string, int, error) {
121
  var is_partial bool = true
Thomas Zahari's avatar
Thomas Zahari committed
122
  var newline_length int = 1
Jordan Sissel's avatar
Jordan Sissel committed
123
  start_time := time.Now()
124

125
  for {
126 127 128 129 130 131
    segment, err := reader.ReadBytes('\n')

    if segment != nil && len(segment) > 0 {
      if segment[len(segment)-1] == '\n' {
        // Found a complete line
        is_partial = false
132 133 134

        // Check if also a CR present
        if len(segment) > 1 && segment[len(segment)-2] == '\r' {
Thomas Zahari's avatar
Thomas Zahari committed
135
          newline_length++
136
        }
137
      }
138 139

      // TODO(sissel): if buffer exceeds a certain length, maybe report an error condition? chop it?
Thomas Zahari's avatar
Thomas Zahari committed
140
      buffer.Write(segment)
141 142
    }

143
    if err != nil {
144
      if err == io.EOF && is_partial {
145 146 147 148 149
        time.Sleep(1 * time.Second) // TODO(sissel): Implement backoff

        // Give up waiting for data after a certain amount of time.
        // If we time out, return the error (eof)
        if time.Since(start_time) > eof_timeout {
150
          return nil, 0, err
151
        }
152 153
        continue
      } else {
Jordan Sissel's avatar
Jordan Sissel committed
154
        log.Println(err)
155
        return nil, 0, err // TODO(sissel): don't do this?
156 157 158
      }
    }

Thomas Zahari's avatar
Thomas Zahari committed
159
    // If we got a full line, return the whole line without the EOL chars (CRLF or LF)
160
    if !is_partial {
Thomas Zahari's avatar
Thomas Zahari committed
161 162
      // Get the str length with the EOL chars (LF or CRLF)
      bufferSize := buffer.Len()
163
      str := new(string)
Thomas Zahari's avatar
Thomas Zahari committed
164 165 166
      *str = buffer.String()[:bufferSize - newline_length]
      // Reset the buffer for the next line
      buffer.Reset()
167
      return str, bufferSize, nil
168 169 170
    }
  } /* forever read chunks */

171
  return nil, 0, nil
172
}