Commit 4a8ef7c1 authored by Jordan Sissel's avatar Jordan Sissel

more work in progress on harvester management

parent 8ffeb3d0
......@@ -2,8 +2,12 @@ package lumberjack
import (
"os" // for File and friends
"time"
"fmt"
"bytes"
"io"
"bufio"
proto "code.google.com/p/goprotobuf/proto"
"time"
)
type Harvester struct {
......@@ -18,14 +22,88 @@ func (h *Harvester) Harvest(output chan *FileEvent) {
// TODO(sissel): Handle rotation
// TODO(sissel): Sleep when there's nothing to do
fmt.Printf("Starting harvester: %s\n", h.Path)
file := h.open()
// TODO(sissel): Ask the registrar for the start position?
var line uint64 = 0 // Ask registrar about the line number
// get current offset in file
offset, _ := file.Seek(0, os.SEEK_CUR)
// TODO(sissel): Make the buffer size tunable at start-time
reader := bufio.NewReaderSize(file, 16<<10) // 16kb buffer by default
for {
dummy := &FileEvent{
Source: proto.String("/var/log/example"),
Offset: proto.Uint64(0),
Line: proto.Uint64(0),
Text: proto.String("alskdjf laskdjf laskdfj laskdjf laskdjf lasdkfj asldkfj hello world!"),
text, err := h.readline(reader)
if err != nil {
return
}
line++
event := &FileEvent{
Source: proto.String(h.Path),
Offset: proto.Uint64(uint64(offset)),
Line: proto.Uint64(line),
Text: text,
}
offset += int64(len(*event.Text))
output <- event
} /* forever */
}
func (h *Harvester) open() *os.File {
var file *os.File
for {
var err error
file, err = os.Open(h.Path)
if err != nil {
// retry on failure.
fmt.Printf("Failed opening %s: %s\n", h.Path, err)
time.Sleep(5 * time.Second)
} else {
break
}
output <- dummy
time.Sleep(10 * time.Millisecond)
}
// TODO(sissel): In the future, use the registrary to determine where to seek.
file.Seek(0, os.SEEK_END)
return file
}
func (h *Harvester) readline(reader *bufio.Reader) (*string, error) {
var buffer bytes.Buffer
for {
segment, is_partial, err := reader.ReadLine()
if err != nil {
// TODO(sissel): Handle the error, check io.EOF?
// TODO(sissel): if eof and line_complete is false, don't check rotation unless a very long time has passed
if err == io.EOF {
time.Sleep(1 * time.Second)
continue
} else {
fmt.Println(err)
return nil, err // TODO(sissel): don't do this
}
// TODO(sissel): At EOF, check rotation
// TODO(sissel): if nothing to do, sleep
}
// TODO(sissel): if buffer exceeds a certain length, maybe report an error condition? chop it?
buffer.Write(segment)
if !is_partial {
str := new(string)
*str = buffer.String()
return str, nil
}
} /* forever read chunks */
return nil, nil
}
package lumberjack
import (
"time"
"path/filepath"
"fmt"
)
func Prospect(paths []string, output chan *FileEvent) {
// For each path
// - evaluate glob
// - for any new file paths, start a harvester
active := make(map[string]Harvester)
for {
for _, path := range paths {
matches, err := filepath.Glob(path)
if err != nil {
fmt.Print("glob(%s) failed: %v\n", path, err)
continue
}
for _, file := range matches {
// Skip already-watched files
if _, already_exists := active[file]; already_exists { continue }
harvester := Harvester{Path: file}
active[file] = harvester
go harvester.Harvest(output)
} // for each file matched by the glob
} // for each path in the paths
time.Sleep(10 * time.Second)
} // forever
} /* Prospect */
......@@ -2,7 +2,7 @@ package lumberjack
import (
"time"
"fmt"
//"fmt"
)
func Spooler(input chan *FileEvent,
......@@ -36,11 +36,11 @@ func Spooler(input chan *FileEvent,
spool_i = 0
}
case <- ticker.C:
fmt.Println("tick")
//fmt.Println("tick")
if now := time.Now(); now.After(next_flush_time) {
// if current time is after the next_flush_time, flush!
fmt.Printf("timeout: %d exceeded by %d\n", idle_timeout,
now.Sub(next_flush_time))
//fmt.Printf("timeout: %d exceeded by %d\n", idle_timeout,
//now.Sub(next_flush_time))
// Flush what we have, if anything
if spool_i > 0 {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment