Commit d38a10ce authored by Jordan Sissel's avatar Jordan Sissel

- no reason to have a local 'file' variable

parent 6335027d
......@@ -14,37 +14,28 @@ type Harvester struct {
Fields map[string]string
Offset int64
file os.File /* the file being watched */
file *os.File /* the file being watched */
}
func (h *Harvester) Harvest(output chan *FileEvent) {
// TODO(sissel): Read the file
// TODO(sissel): Emit FileEvent for each line to 'output'
// TODO(sissel): Handle rotation
// TODO(sissel): Sleep when there's nothing to do
// TODO(sissel): Quit if we think the file is dead (file dev/inode changed, no data in X seconds)
if h.Offset > 0 {
log.Printf("Starting harvester at position %d: %s\n", h.Offset, h.Path)
} else {
log.Printf("Starting harvester: %s\n", h.Path)
}
file := h.open()
info, _ := file.Stat() // TODO(sissel): Check error
defer file.Close()
h.open()
info, _ := h.file.Stat() // TODO(sissel): Check error
defer h.file.Close()
//info, _ := file.Stat()
// TODO(sissel): Ask the registrar for the start position?
// TODO(sissel): record the current file inode/device/etc
var line uint64 = 0 // Ask registrar about the line number
// get current offset in file
offset, _ := file.Seek(0, os.SEEK_CUR)
offset, _ := h.file.Seek(0, os.SEEK_CUR)
// TODO(sissel): Make the buffer size tunable at start-time
reader := bufio.NewReaderSize(file, 16<<10) // 16kb buffer by default
reader := bufio.NewReaderSize(h.file, 16<<10) // 16kb buffer by default
var read_timeout = 10 * time.Second
last_read_time := time.Now()
......@@ -92,16 +83,15 @@ func (h *Harvester) Harvest(output chan *FileEvent) {
}
func (h *Harvester) open() *os.File {
var file *os.File
// Special handling that "-" means to read from standard input
if h.Path == "-" {
return os.Stdin
h.file = os.Stdin
return h.file
}
for {
var err error
file, err = os.Open(h.Path)
h.file, err = os.Open(h.Path)
if err != nil {
// retry on failure.
......@@ -114,12 +104,12 @@ func (h *Harvester) open() *os.File {
// TODO(sissel): Only seek if the file is a file, not a pipe or socket.
if h.Offset > 0 {
file.Seek(h.Offset, os.SEEK_SET)
h.file.Seek(h.Offset, os.SEEK_SET)
} else {
file.Seek(0, os.SEEK_END)
h.file.Seek(0, os.SEEK_END)
}
return file
return h.file
}
func (h *Harvester) readline(reader *bufio.Reader, eof_timeout time.Duration) (*string, error) {
......@@ -129,7 +119,6 @@ func (h *Harvester) readline(reader *bufio.Reader, eof_timeout time.Duration) (*
segment, is_partial, err := reader.ReadLine()
if err != nil {
// TODO(sissel): if eof and line_complete is false, don't check rotation unless a very long time has passed
if err == io.EOF {
time.Sleep(1 * time.Second) // TODO(sissel): Implement backoff
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment