Commit 9a16b2a5 authored by Jordan Sissel's avatar Jordan Sissel

- start a prototype of lumberjack in go + zmq + protobufs

  Same functional goals, different implementation and wire protocol.
parent f09324a4
Harvester reads events from files
Event:
Byte offset of start of event (number)
Line number of event (number)
File origin of event (string)
Message (string)
Work model:
Harvester(s)
-> Buffer (flush when full or after N idle seconds)
-> Compressor
-> Encryptor
-> Emitter
Network model w/ ZMQ:
pushpull events
reqreq for acknowledgements
Types of events:
File Event - represents an event read from a file
- file origin of event
- byte offset of event
- line number of event
- event message (the contents)
Compressed Envelope
- number of items
- type of item
- compressed payload
Encrypted Envelope
- cipher
- payload
# custom paths for zmq
CGO_CFLAGS=-I$HOME/projects/lumberjack/build/include CGO_LDFLAGS=-L$HOME/projects/lumberjack/build/lib go get github.com/alecthomas/gozmq
package main
import (
"fmt"
"lumberjack"
"time"
proto "code.google.com/p/goprotobuf/proto"
)
func emit(events []*lumberjack.FileEvent) {
envelope := &lumberjack.EventEnvelope{Events: events}
data, _ := proto.Marshal(envelope)
fmt.Printf("Flushing !!! %d events: %d encoded\n", len(events), len(data))
} /* emit */
func main() {
// TODO(sissel): support flags for setting... stuff
// TODO(sissel): need a HarvestForeman to manage the harvester
// TODO(sissel): Need an encryptor
// TODO(sissel): Need a compressor
event_stream := make(chan *lumberjack.FileEvent, 32)
h := lumberjack.Harvester{Path: "/var/log/messages"}
go h.Harvest(event_stream)
go h.Harvest(event_stream)
//flusher = func(events []interface{}) {
//fmt.Printf("Flushing %d events\n", len(events))
//}
var window_size uint64 = 1024
timeout := 1 * time.Second
emitter_stream := make(chan *lumberjack.EventEnvelope, 1)
// harvester -> spooler
go lumberjack.Spooler(event_stream, emitter_stream, window_size, timeout)
// spooler -> emitter
for x := range emitter_stream {
// got a bunch of events, ship them out.
fmt.Printf("Spooler gave me %d events\n", len(x.Events))
}
//lumberjack.Emitter(
} /* main */
// Code generated by protoc-gen-go.
// source: event.proto
// DO NOT EDIT!
package lumberjack
import proto "code.google.com/p/goprotobuf/proto"
import json "encoding/json"
import math "math"
// Reference proto, json, and math imports to suppress error if they are not otherwise used.
var _ = proto.Marshal
var _ = &json.SyntaxError{}
var _ = math.Inf
type FileEvent struct {
Source *string `protobuf:"bytes,1,req,name=source" json:"source,omitempty"`
Offset *uint64 `protobuf:"varint,2,req,name=offset" json:"offset,omitempty"`
Line *uint64 `protobuf:"varint,3,req,name=line" json:"line,omitempty"`
Text *string `protobuf:"bytes,4,req,name=text" json:"text,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (this *FileEvent) Reset() { *this = FileEvent{} }
func (this *FileEvent) String() string { return proto.CompactTextString(this) }
func (*FileEvent) ProtoMessage() {}
func (this *FileEvent) GetSource() string {
if this != nil && this.Source != nil {
return *this.Source
}
return ""
}
func (this *FileEvent) GetOffset() uint64 {
if this != nil && this.Offset != nil {
return *this.Offset
}
return 0
}
func (this *FileEvent) GetLine() uint64 {
if this != nil && this.Line != nil {
return *this.Line
}
return 0
}
func (this *FileEvent) GetText() string {
if this != nil && this.Text != nil {
return *this.Text
}
return ""
}
type EventEnvelope struct {
Events []*FileEvent `protobuf:"bytes,1,rep,name=events" json:"events,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (this *EventEnvelope) Reset() { *this = EventEnvelope{} }
func (this *EventEnvelope) String() string { return proto.CompactTextString(this) }
func (*EventEnvelope) ProtoMessage() {}
func (this *EventEnvelope) GetEvents() []*FileEvent {
if this != nil {
return this.Events
}
return nil
}
func init() {
}
package lumberjack;
message FileEvent {
// The source of this event (file path, etc)
required string source = 1;
// The byte offset (where in the source this event came from */
required uint64 offset = 2;
// The line offset
required uint64 line = 3;
// The contents of the event
required string text = 4;
}
message EventEnvelope {
repeated FileEvent events = 1;
}
package lumberjack
import (
"os" // for File and friends
"time"
proto "code.google.com/p/goprotobuf/proto"
)
type Harvester struct {
Path string /* the file path to harvest */
file os.File /* the file being watched */
}
func (h *Harvester) Harvest(output chan *FileEvent) {
// TODO(sissel): Read the file
// TODO(sissel): Emit FileEvent for each line to 'output'
// TODO(sissel): Handle rotation
// TODO(sissel): Sleep when there's nothing to do
for {
dummy := &FileEvent{
Source: proto.String("/var/log/example"),
Offset: proto.Uint64(0),
Line: proto.Uint64(0),
Text: proto.String("alskdjf laskdjf laskdfj laskdjf laskdjf lasdkfj asldkfj hello world!"),
}
output <- dummy
time.Sleep(10 * time.Millisecond)
}
}
package lumberjack
import (
"time"
"fmt"
)
func Spooler(input chan *FileEvent,
output chan *EventEnvelope,
max_size uint64,
idle_timeout time.Duration) {
// heartbeat periodically. If the last flush was longer than
// 'idle_timeout' time ago, then we'll force a flush to prevent us from
// holding on to spooled events for too long.
ticker := time.NewTicker(idle_timeout / 2)
spool := make([]*FileEvent, max_size)
var spool_i int = 0
timeout := 1 * time.Second
start := time.Now()
for {
select {
case event := <- input:
spool[spool_i] = event
spool_i++
// Flush if full
if spool_i == len(spool) {
output <- &EventEnvelope{Events: spool[:]}
start = time.Now() // reset 'start' time
spool_i = 0
}
case <- ticker.C:
if duration := time.Since(start); duration > timeout {
/* Timeout occurred */
fmt.Printf("timeout: %d > %d\n", time.Since(start), timeout)
// Flush what we have, if anything
if spool_i > 0 {
start = time.Now()
output <- &EventEnvelope{Events: spool[0:spool_i]}
spool_i = 0
}
} /* if duration > timeout */
/* case ... */
} /* select */
} /* for */
} /* spooler */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment