harvester.go 3.93 KB
Newer Older
1
package main
2 3 4

import (
  "os" // for File and friends
Jordan Sissel's avatar
Jordan Sissel committed
5
  "log"
6 7 8 9
  "bytes"
  "io"
  "bufio"
  "time"
10 11 12 13
)

type Harvester struct {
  Path string /* the file path to harvest */
14
  Fields map[string]string
15
  Offset int64
16

17
  file *os.File /* the file being watched */
18 19 20
}

func (h *Harvester) Harvest(output chan *FileEvent) {
21 22 23 24 25
  if h.Offset > 0 {
    log.Printf("Starting harvester at position %d: %s\n", h.Offset, h.Path)
  } else {
    log.Printf("Starting harvester: %s\n", h.Path)
  }
26

27 28 29
  h.open()
  info, _ := h.file.Stat() // TODO(sissel): Check error
  defer h.file.Close()
Jordan Sissel's avatar
Jordan Sissel committed
30
  //info, _ := file.Stat()
31

32 33 34
  var line uint64 = 0 // Ask registrar about the line number

  // get current offset in file
35
  offset, _ := h.file.Seek(0, os.SEEK_CUR)
36

37 38
  log.Printf("Current file offset: %d\n", offset)

39
  // TODO(sissel): Make the buffer size tunable at start-time
40
  reader := bufio.NewReaderSize(h.file, 16<<10) // 16kb buffer by default
41

42
  var read_timeout = 10 * time.Second
Jordan Sissel's avatar
Jordan Sissel committed
43
  last_read_time := time.Now()
44
  for {
45
    text, err := h.readline(reader, read_timeout)
46 47

    if err != nil {
48 49
      if err == io.EOF {
        // timed out waiting for data, got eof.
Jordan Sissel's avatar
Jordan Sissel committed
50 51 52 53 54 55 56 57 58 59
        // Check to see if the file was truncated
        info, _ := h.file.Stat()
        if info.Size() < offset {
          log.Printf("File truncated, seeking to beginning: %s\n", h.Path)
          h.file.Seek(0, os.SEEK_SET)
          offset = 0
        } else if age := time.Since(last_read_time); age > (24 * time.Hour) {
          // if last_read_time was more than 24 hours ago, this file is probably
          // dead. Stop watching it.
          // TODO(sissel): Make this time configurable
60
          // This file is idle for more than 24 hours. Give up and stop harvesting.
Jordan Sissel's avatar
Jordan Sissel committed
61
          log.Printf("Stopping harvest of %s; last change was %d seconds ago\n", h.Path, age.Seconds())
62 63 64 65
          return
        }
        continue
      } else {
Jordan Sissel's avatar
Jordan Sissel committed
66
        log.Printf("Unexpected state reading from %s; error: %s\n", h.Path, err)
67 68
        return
      }
69
    }
70
    last_read_time = time.Now()
71 72 73

    line++
    event := &FileEvent{
74
      Source: &h.Path,
75
      Offset: offset,
76
      Line: line,
77
      Text: text,
78
      Fields: &h.Fields,
79
      fileinfo: &info,
80
    }
81
    offset += int64(len(*event.Text)) + 1  // +1 because of the line terminator
82

83
    output <- event // ship the new event downstream
84 85 86 87
  } /* forever */
}

func (h *Harvester) open() *os.File {
88 89
  // Special handling that "-" means to read from standard input
  if h.Path == "-" {
90 91
    h.file = os.Stdin
    return h.file
92 93
  } 

94 95
  for {
    var err error
96
    h.file, err = os.Open(h.Path)
97 98 99

    if err != nil {
      // retry on failure.
Jordan Sissel's avatar
Jordan Sissel committed
100
      log.Printf("Failed opening %s: %s\n", h.Path, err)
101 102 103
      time.Sleep(5 * time.Second)
    } else {
      break
104 105
    }
  }
106

107
  // TODO(sissel): Only seek if the file is a file, not a pipe or socket.
108
  if h.Offset > 0 {
109
    h.file.Seek(h.Offset, os.SEEK_SET)
110 111
  } else if *from_beginning {
    h.file.Seek(0, os.SEEK_SET)
112
  } else {
113
    h.file.Seek(0, os.SEEK_END)
114
  }
115

116
  return h.file
117 118
}

119
func (h *Harvester) readline(reader *bufio.Reader, eof_timeout time.Duration) (*string, error) {
120
  var buffer bytes.Buffer
Jordan Sissel's avatar
Jordan Sissel committed
121
  start_time := time.Now()
122 123
  for {
    segment, is_partial, err := reader.ReadLine()
124

125 126
    if err != nil {
      if err == io.EOF {
127 128 129 130 131 132 133
        time.Sleep(1 * time.Second) // TODO(sissel): Implement backoff

        // Give up waiting for data after a certain amount of time.
        // If we time out, return the error (eof)
        if time.Since(start_time) > eof_timeout {
          return nil, err
        }
134 135
        continue
      } else {
Jordan Sissel's avatar
Jordan Sissel committed
136
        log.Println(err)
137
        return nil, err // TODO(sissel): don't do this?
138 139 140 141 142 143 144
      }
    }

    // TODO(sissel): if buffer exceeds a certain length, maybe report an error condition? chop it?
    buffer.Write(segment)

    if !is_partial {
145
      // If we got a full line, return the whole line.
146 147 148 149 150 151 152
      str := new(string)
      *str = buffer.String()
      return str, nil
    }
  } /* forever read chunks */

  return nil, nil
153
}