Commit c5c781af authored by Jordan Sissel's avatar Jordan Sissel

- add the file stat to the FileEvent

- prep for registrar support
parent 2666e2ee
package liblumberjack
import "os"
type FileEvent struct {
Source *string `json:"source,omitempty"`
Offset uint64 `json:"offset,omitempty"`
Line uint64 `json:"line,omitempty"`
Text *string `json:"text,omitempty"`
fileinfo *os.FileInfo
}
......@@ -25,6 +25,7 @@ func (h *Harvester) Harvest(output chan *FileEvent) {
log.Printf("Starting harvester: %s\n", h.Path)
file := h.open()
info, _ := file.Stat() // TODO(sissel): Check error
defer file.Close()
//info, _ := file.Stat()
......@@ -68,6 +69,7 @@ func (h *Harvester) Harvest(output chan *FileEvent) {
Offset: uint64(offset),
Line: line,
Text: text,
fileinfo: &info,
}
offset += int64(len(*event.Text)) + 1 // +1 because of the line terminator
......
......@@ -43,8 +43,7 @@ func (s *FFS) Send(data []byte, flags zmq.SendRecvOption) (err error) {
count, err := zmq.Poll(pi, s.SendTimeout)
if count == 0 {
// not ready in time, fail the socket and try again.
log.Printf("%s: timed out waiting to Send(): %s\n",
s.endpoint, err)
log.Printf("%s: timed out waiting to Send(): %s\n", s.endpoint, err)
s.fail_socket()
} else {
//log.Printf("%s: sending %d payload\n", s.endpoint, len(data))
......@@ -157,6 +156,7 @@ func (s *FFS) fail_socket() {
}
func Publish(input chan []*FileEvent,
registrar chan []*FileEvent,
server_list []string,
public_key [sodium.PUBLICKEYBYTES]byte,
secret_key [sodium.SECRETKEYBYTES]byte,
......@@ -176,17 +176,16 @@ func Publish(input chan []*FileEvent,
// got a bunch of events, ship them out.
log.Printf("Publisher received %d events\n", len(events))
data, err := json.Marshal(events)
data, _ := json.Marshal(events)
// TODO(sissel): check error
_ = err
// Compress it
// A new compressor is used for every payload of events so
// that any individual payload can be decompressed alone.
// A new zlib writer is used for every payload of events so that any
// individual payload can be decompressed alone.
// TODO(sissel): Make compression level tunable
compressor, _ := zlib.NewWriterLevel(&buffer, 3)
buffer.Truncate(0)
_, err = compressor.Write(data)
_, err := compressor.Write(data)
err = compressor.Flush()
compressor.Close()
......@@ -205,30 +204,24 @@ func Publish(input chan []*FileEvent,
log.Printf("nonce: %d\n", len(nonce))
// TODO(sissel): figure out encoding for ciphertext + nonce
// TODO(sissel): Figure out the protocol for envelopes
// - compression envelope:
// - what compression system was used
// - any parameters?
// - payload
// - encryption envelope:
// - what encryption system was used
// - any public parameters necessary to decrypt (public key, nonce)
// - payload
// TODO(sissel): figure out encoding for ciphertext + nonce
// Loop forever trying to send.
// This will cause reconnects/etc on failures automatically
for {
err = socket.Send(buffer.Bytes(), 0)
if err != nil {
continue // send failed, retry!
}
data, err = socket.Recv(0)
// TODO(sissel): Figure out acknowledgement protocol? If any?
if err == nil {
// success!
break
break // success!
}
}
// TODO(sissel): Check data value of reply?
// TODO(sissel): retry on failure or timeout
// TODO(sissel): notify registrar of success
// Tell the registrar that we've successfully sent these events
registrar <- events
} /* for each event payload */
} // Publish
......@@ -70,6 +70,7 @@ func main() {
// TODO(sissel): support flags for setting... stuff
event_chan := make(chan *lumberjack.FileEvent, 16)
publisher_chan := make(chan []*lumberjack.FileEvent, 1)
registrar_chan := make(chan []*lumberjack.FileEvent, 1)
paths := flag.Args()
......@@ -115,9 +116,10 @@ func main() {
// Harvesters dump events into the spooler.
go lumberjack.Spool(event_chan, publisher_chan, *spool_size, *idle_timeout)
lumberjack.Publish(publisher_chan, server_list, public_key, secret_key,
*server_timeout)
lumberjack.Publish(publisher_chan, registrar_chan, server_list,
public_key, secret_key, *server_timeout)
// TODO(sissel): publisher should send state to the registrar
// TODO(sissel): registrar db path
// TODO(sissel): registrar records last acknowledged positions in all files.
//lumberjack.Registrar(registrar_chan)
} /* main */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment