Commit cca836f4 authored by Jordan Sissel's avatar Jordan Sissel

- get cracking on the registrar

parent c59902ac
...@@ -10,3 +10,10 @@ type FileEvent struct { ...@@ -10,3 +10,10 @@ type FileEvent struct {
fileinfo *os.FileInfo fileinfo *os.FileInfo
} }
type FileState struct {
Source *string `json:"source,omitempty"`
Offset uint64 `json:"offset,omitempty"`
Inode uint64 `json:"inode,omitempty"`
Device uint64 `json:"device,omitempty"`
}
...@@ -227,6 +227,6 @@ func Publish(input chan []*FileEvent, ...@@ -227,6 +227,6 @@ func Publish(input chan []*FileEvent,
} }
// Tell the registrar that we've successfully sent these events // Tell the registrar that we've successfully sent these events
//registrar <- events registrar <- events
} /* for each event payload */ } /* for each event payload */
} // Publish } // Publish
package liblumberjack
import (
"log"
"os"
"syscall"
"encoding/json"
)
func Registrar(input chan []*FileEvent) {
for events := range input {
state := make(map[string]*FileState)
log.Printf("Registrar received %d events\n", len(events))
// Take the last event found for each file source
for _, event := range events {
if *event.Source == "-" {
continue
}
// have to dereference the FileInfo here because os.FileInfo is an
// interface, not a struct, so Go doesn't have smarts to call the Sys()
// method on a pointer to os.FileInfo. :(
fstat := (*(event.fileinfo)).Sys().(*syscall.Stat_t)
state[*event.Source] = &FileState{
Source: event.Source,
Offset: event.Offset + uint64(len(*event.Text)),
Inode: fstat.Ino,
Device: fstat.Dev,
}
}
if len(state) > 0 {
write(state)
os.Rename(".lumberjack.new", ".lumberjack")
}
}
}
func write(state map[string]*FileState) {
log.Printf("Saving registrar state.\n")
// Open tmp file, write, flush, rename
file, err := os.Create(".lumberjack.new")
if err != nil {
log.Printf("Failed to open .lumberjack.new for writing: %s\n", err)
return
}
encoder := json.NewEncoder(file)
encoder.Encode(state)
file.Close()
}
...@@ -116,10 +116,10 @@ func main() { ...@@ -116,10 +116,10 @@ func main() {
// Harvesters dump events into the spooler. // Harvesters dump events into the spooler.
go lumberjack.Spool(event_chan, publisher_chan, *spool_size, *idle_timeout) go lumberjack.Spool(event_chan, publisher_chan, *spool_size, *idle_timeout)
lumberjack.Publish(publisher_chan, registrar_chan, server_list, go lumberjack.Publish(publisher_chan, registrar_chan, server_list,
public_key, secret_key, *server_timeout) public_key, secret_key, *server_timeout)
// TODO(sissel): registrar db path // TODO(sissel): registrar db path
// TODO(sissel): registrar records last acknowledged positions in all files. // TODO(sissel): registrar records last acknowledged positions in all files.
//lumberjack.Registrar(registrar_chan) lumberjack.Registrar(registrar_chan)
} /* main */ } /* main */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment