Commit 5b970578 authored by Jordan Sissel's avatar Jordan Sissel

- try reorganizing the code structure

parent cca836f4
......@@ -60,21 +60,16 @@ endif # libsodium
build/bin/lumberjack.sh: lumberjack.sh | build/bin
install -m 755 $^ $@
build/bin/lumberjack: bin/lumberjack | build/bin
cp bin/lumberjack build/bin/lumberjack
build/bin/keygen: bin/keygen | build/bin
cp bin/keygen build/bin/keygen
bin/lumberjack: pkg/linux_amd64/github.com/alecthomas/gozmq.a
bin/lumberjack: | build/lib/pkgconfig/sodium.pc
build/bin/lumberjack: pkg/linux_amd64/github.com/alecthomas/gozmq.a
build/bin/lumberjack: | build/lib/pkgconfig/sodium.pc
PKG_CONFIG_PATH=$$PWD/build/lib/pkgconfig \
go install -ldflags '-r $$ORIGIN/../lib' lumberjack
bin/keygen: | build/lib/pkgconfig/sodium.pc
go build -ldflags '-r $$ORIGIN/../lib' -v -o $@
build/bin/keygen: | build/lib/pkgconfig/sodium.pc
PKG_CONFIG_PATH=$$PWD/build/lib/pkgconfig \
go install -ldflags '-r $$ORIGIN/../lib' keygen
go install -ldflags '-r $$ORIGIN/../lib' -o $@
# Mark these phony; 'go install' takes care of knowing how and when to rebuild.
.PHONY: bin/keygen bin/lumberjack
.PHONY: build/bin/keygen build/bin/lumberjack
build/lib/pkgconfig/sodium.pc: src/sodium/sodium.pc | build/lib/pkgconfig
cp $< $@
......@@ -84,8 +79,6 @@ build/lib/pkgconfig: | build/lib
build/lib: | build
mkdir $@
# gozmq
src/github.com/alecthomas/gozmq/zmq.go:
go get -d github.com/alecthomas/gozmq
......
package main
import (
"encoding/json"
"os"
"log"
)
type Config struct {
Network NetworkConfig "json:network"
Files []FileConfig "json:files"
}
type NetworkConfig struct {
Servers []string "json:servers"
SSLCertificate string "json:ssl certificate"
}
type FileConfig struct {
Paths []string "json:paths"
Fields map[string]string "json:fields"
}
func LoadConfig(path string) (config Config, err error) {
config_file, err := os.Open(path)
if err != nil {
log.Printf("Failed to open config file '%s': %s\n", path, err)
return
}
fi, _ := config_file.Stat()
if fi.Size() > (10 << 20) {
log.Printf("Config file too large? Aborting, just in case. '%s' is %d bytes\n",
path, fi)
return
}
buffer := make([]byte, fi.Size())
_, err = config_file.Read(buffer)
log.Printf("%s\n", buffer)
err = json.Unmarshal(buffer, &config)
if err != nil {
log.Printf("Failed unmarshalling json: %s\n", err)
return
}
return
}
package main
import (
"testing"
"encoding/json"
)
type FileConfig struct {
Paths []string "json:paths"
Fields map[string]string "json:fields"
}
func TestJSONLoading(t *testing.T) {
var f File
err := json.Unmarshal([]byte("{ \"paths\": [ \"/var/log/fail2ban.log\" ], \"fields\": { \"type\": \"fail2ban\" } }"), &f)
if err != nil { t.Fatalf("json.Unmarshal failed") }
if len(f.Paths) != 1 { t.FailNow() }
if f.Paths[0] != "/var/log/fail2ban.log" { t.FailNow() }
if f.Fields["type"] != "fail2ban" { t.FailNow() }
}
package main
package liblumberjack
package main
import "os"
......@@ -7,6 +7,7 @@ type FileEvent struct {
Offset uint64 `json:"offset,omitempty"`
Line uint64 `json:"line,omitempty"`
Text *string `json:"text,omitempty"`
Fields *map[string]string
fileinfo *os.FileInfo
}
......
package liblumberjack
package main
import (
"os" // for File and friends
......@@ -11,6 +11,7 @@ import (
type Harvester struct {
Path string /* the file path to harvest */
Fields map[string]string
file os.File /* the file being watched */
}
......@@ -69,6 +70,7 @@ func (h *Harvester) Harvest(output chan *FileEvent) {
Offset: uint64(offset),
Line: line,
Text: text,
Fields: &h.Fields,
fileinfo: &info,
}
offset += int64(len(*event.Text)) + 1 // +1 because of the line terminator
......
......@@ -2,34 +2,17 @@ package main
import (
"log"
lumberjack "liblumberjack"
"os"
"time"
"flag"
"strings"
"runtime/pprof"
"sodium"
)
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var spool_size = flag.Uint64("spool-size", 1024, "Maximum number of events to spool before a flush is forced.")
var idle_timeout = flag.Duration("idle-flush-time", 5 * time.Second, "Maximum time to wait for a full spool before flushing anyway")
var server_timeout = flag.Duration("server-timeout", 30 * time.Second, "Maximum time to wait for a request to a server before giving up and trying another.")
var servers = flag.String("servers", "", "Server (or comma-separated list of servers) to send events to. Each server can be a 'host' or 'host:port'. If the port is not specified, port 5005 is assumed. One server is chosen of the list at random, and only on failure is another server used.")
var their_public_key_path = flag.String("their-public-key", "", "the file containing the NaCl public key for the server you are talking to.")
var our_secret_key_path = flag.String("my-secret-key", "", "the file containing the NaCl secret key for this process to encrypt with. If none is given, one is generated at runtime.")
//var our_public_key_path = flag.String("my-public-key", "", "the file containing the NaCl public key for this process to encrypt with. If you specify this, you MUST specify -my-private-key.")
func read_key(path string, key []byte) (err error) {
file, err := os.Open(path)
if err != nil {
return
}
// TODO(sissel): check length of read
_, err = file.Read(key)
return
}
var config_file = flag.String("config", "", "The config file to load")
func main() {
flag.Parse()
......@@ -47,58 +30,19 @@ func main() {
}()
}
if *their_public_key_path == "" {
log.Fatalf("No -their-public-key flag given")
}
// Turn 'host' and 'host:port' into 'tcp://host:port'
if *servers == "" {
log.Fatalf("No servers specified, please provide the -servers setting\n")
}
server_list := strings.Split(*servers, ",")
for i, server := range server_list {
if !strings.Contains(server, ":") {
server_list[i] = "tcp://" + server + ":5005"
} else {
server_list[i] = "tcp://" + server
}
}
log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds)
// TODO(sissel): support flags for setting... stuff
event_chan := make(chan *lumberjack.FileEvent, 16)
publisher_chan := make(chan []*lumberjack.FileEvent, 1)
registrar_chan := make(chan []*lumberjack.FileEvent, 1)
paths := flag.Args()
if len(paths) == 0 {
log.Fatalf("No paths given. What files do you want me to watch?\n")
}
var public_key [sodium.PUBLICKEYBYTES]byte
err := read_key(*their_public_key_path, public_key[:])
config, err := LoadConfig(*config_file)
if err != nil {
log.Fatalf("Unable to read public key path (%s): %s\n",
*their_public_key_path, err)
return
}
var secret_key [sodium.SECRETKEYBYTES]byte
if *our_secret_key_path == "" {
log.Printf("No secret key given; generating one.")
_, secret_key = sodium.CryptoBoxKeypair()
} else {
err := read_key(*our_secret_key_path, secret_key[:])
if err != nil {
log.Printf("Unable to read secret key (%s): %s\n",
*our_secret_key_path, err)
log.Printf("Generating a key pair now.\n")
_, sk := sodium.CryptoBoxKeypair()
copy(secret_key[:], sk[:])
}
event_chan := make(chan *FileEvent, 16)
publisher_chan := make(chan []*FileEvent, 1)
registrar_chan := make(chan []*FileEvent, 1)
if len(config.Files) == 0 {
log.Fatalf("No paths given. What files do you want me to watch?\n")
}
// The basic model of execution:
......@@ -111,15 +55,17 @@ func main() {
// determine where in each file to resume a harvester.
// Prospect the globs/paths given on the command line and launch harvesters
go lumberjack.Prospect(paths, event_chan)
for _, fileconfig := range config.Files {
go Prospect(fileconfig, event_chan)
}
// Harvesters dump events into the spooler.
go lumberjack.Spool(event_chan, publisher_chan, *spool_size, *idle_timeout)
go Spool(event_chan, publisher_chan, *spool_size, *idle_timeout)
go lumberjack.Publish(publisher_chan, registrar_chan, server_list,
public_key, secret_key, *server_timeout)
//go Publish(publisher_chan, registrar_chan, server_list,
//public_key, secret_key, *server_timeout)
// TODO(sissel): registrar db path
// TODO(sissel): registrar records last acknowledged positions in all files.
lumberjack.Registrar(registrar_chan)
Registrar(registrar_chan)
} /* main */
package liblumberjack
package main
import (
"time"
......@@ -8,23 +8,11 @@ import (
"log"
)
func Prospect(paths []string, output chan *FileEvent) {
// Scan for "-" to do stdin special handling.
for i, path := range paths {
if path == "-" {
harvester := Harvester{Path: path}
go harvester.Harvest(output)
// remove "-" from the paths list
paths = append(paths[0:i], paths[i+1:]...)
break
}
}
func Prospect(fileconfig FileConfig, output chan *FileEvent) {
fileinfo := make(map[string]os.FileInfo)
for {
for _, path := range paths {
prospector_scan(path, fileinfo, output)
for _, path := range fileconfig.Paths {
prospector_scan(path, fileconfig.Fields, fileinfo, output)
}
// Defer next scan for a bit.
......@@ -32,7 +20,8 @@ func Prospect(paths []string, output chan *FileEvent) {
}
} /* Prospect */
func prospector_scan(path string, fileinfo map[string]os.FileInfo,
func prospector_scan(path string, fields map[string]string,
fileinfo map[string]os.FileInfo,
output chan *FileEvent) {
log.Printf("Prospecting %s\n", path)
......@@ -98,7 +87,7 @@ func prospector_scan(path string, fileinfo map[string]os.FileInfo,
if !renamed {
log.Printf("Launching harvester on new file: %s\n", file)
harvester := Harvester{Path: file}
harvester := Harvester{Path: file, Fields: fields}
go harvester.Harvest(output)
}
}
......@@ -112,7 +101,7 @@ func prospector_scan(path string, fileinfo map[string]os.FileInfo,
log.Printf("Launching harvester on rotated file: %s\n", file)
// TODO(sissel): log 'file rotated' or osmething
// Start a harvester on the path; a new file appeared with the same name.
harvester := Harvester{Path: file}
harvester := Harvester{Path: file, Fields: fields}
go harvester.Harvest(output)
}
}
......
package main
import (
"bytes"
"encoding/binary"
//"crypto/tls"
"log"
//"time"
"compress/zlib"
)
func init() {
}
func connect(config *NetworkConfig) {
}
func Publishv1(input chan []*FileEvent,
registrar chan []*FileEvent,
config *NetworkConfig) {
var zbuf, packbuf bytes.Buffer
socket := connect(config)
for events := range input {
for event := range events {
}
// Compress it
// A new zlib writer is used for every payload of events so that any
// individual payload can be decompressed alone.
// TODO(sissel): Make compression level tunable
compressor, _ := zlib.NewWriterLevel(&buffer, 3)
buffer.Truncate(0)
_, err := compressor.Write(data)
err = compressor.Flush()
compressor.Close()
// Loop forever trying to send.
// This will cause reconnects/etc on failures automatically
for {
err = socket.Send(nonce, zmq.SNDMORE)
if err != nil {
continue // send failed, retry!
}
err = socket.Send(ciphertext, 0)
if err != nil {
continue // send failed, retry!
}
data, err = socket.Recv(0)
// TODO(sissel): Figure out acknowledgement protocol? If any?
if err == nil {
break // success!
}
}
// Tell the registrar that we've successfully sent these events
registrar <- events
} /* for each event payload */
} // Publish
package liblumberjack
package main
import (
"log"
......
package liblumberjack
package main
import (
"time"
......
package liblumberjack
import (
"bytes"
"encoding/json"
zmq "github.com/alecthomas/gozmq"
"log"
"math/big"
"syscall"
"time"
"compress/zlib"
"crypto/rand"
"sodium"
)
var context *zmq.Context
func init() {
context, _ = zmq.NewContext()
}
// Forever Faithful Socket
type FFS struct {
Endpoints []string // set of endpoints available to ship to
// Socket type; zmq.REQ, etc
SocketType zmq.SocketType
// Various timeout values
SendTimeout time.Duration
RecvTimeout time.Duration
endpoint string // the current endpoint in use
socket *zmq.Socket // the current zmq socket
connected bool // are we connected?
}
func (s *FFS) Send(data []byte, flags zmq.SendRecvOption) (err error) {
for {
s.ensure_connect()
pi := zmq.PollItems{zmq.PollItem{Socket: s.socket, Events: zmq.POLLOUT}}
count, err := zmq.Poll(pi, s.SendTimeout)
if count == 0 {
// not ready in time, fail the socket and try again.
log.Printf("%s: timed out waiting to Send(): %s\n", s.endpoint, err)
s.fail_socket()
} else {
//log.Printf("%s: sending %d payload\n", s.endpoint, len(data))
err = s.socket.Send(data, flags)
if err != nil {
log.Printf("%s: Failed to Send() %d byte message: %s\n",
s.endpoint, len(data), err)
s.fail_socket()
} else {
// Success!
break
}
}
}
return
}
func (s *FFS) Recv(flags zmq.SendRecvOption) (data []byte, err error) {
s.ensure_connect()
pi := zmq.PollItems{zmq.PollItem{Socket: s.socket, Events: zmq.POLLIN}}
count, err := zmq.Poll(pi, s.RecvTimeout)
if count == 0 {
// not ready in time, fail the socket and try again.
s.fail_socket()
err = syscall.ETIMEDOUT
log.Printf("%s: timed out waiting to Recv(): %s\n",
s.endpoint, err)
return nil, err
} else {
data, err = s.socket.Recv(flags)
if err != nil {
log.Printf("%s: Failed to Recv() %d byte message: %s\n",
s.endpoint, len(data), err)
s.fail_socket()
return nil, err
} else {
// Success!
}
}
return
}
func (s *FFS) Close() (err error) {
err = s.socket.Close()
if err != nil {
return
}
s.socket = nil
s.connected = false
return nil
}
func (s *FFS) ensure_connect() {
if s.connected {
return
}
if s.SendTimeout == 0 {
s.SendTimeout = 1 * time.Second
}
if s.RecvTimeout == 0 {
s.RecvTimeout = 1 * time.Second
}
if s.SocketType == 0 {
log.Panicf("No socket type set on zmq socket")
}
if s.socket != nil {
s.socket.Close()
s.socket = nil
}
var err error
s.socket, err = context.NewSocket(s.SocketType)
if err != nil {
log.Panicf("zmq.NewSocket(%d) failed: %s\n", s.SocketType, err)
}
//s.socket.SetSockOptUInt64(zmq.HWM, 1)
//s.socket.SetSockOptInt(zmq.RCVTIMEO, int(s.RecvTimeout.Nanoseconds() / 1000000))
//s.socket.SetSockOptInt(zmq.SNDTIMEO, int(s.SendTimeout.Nanoseconds() / 1000000))
// Abort anything in-flight on a socket that's closed.
s.socket.SetSockOptInt(zmq.LINGER, 0)
for !s.connected {
var max *big.Int = big.NewInt(int64(len(s.Endpoints)))
i, _ := rand.Int(rand.Reader, max)
s.endpoint = s.Endpoints[i.Int64()]
log.Printf("Connecting to %s\n", s.endpoint)
err := s.socket.Connect(s.endpoint)
if err != nil {
log.Printf("%s: Error connecting: %s\n", s.endpoint, err)
time.Sleep(500 * time.Millisecond)
continue
}
// No error, we're connected.
s.connected = true
}
}
func (s *FFS) fail_socket() {
if !s.connected {
return
}
s.Close()
}
func Publish(input chan []*FileEvent,
registrar chan []*FileEvent,
server_list []string,
public_key [sodium.PUBLICKEYBYTES]byte,
secret_key [sodium.SECRETKEYBYTES]byte,
server_timeout time.Duration) {
var buffer bytes.Buffer
session := sodium.NewSession(public_key, secret_key)
socket := FFS{
Endpoints: server_list,
SocketType: zmq.REQ,
RecvTimeout: server_timeout,
SendTimeout: server_timeout,
}
//defer socket.Close()
for events := range input {
// got a bunch of events, ship them out.
//log.Printf("Publisher received %d events\n", len(events))
data, _ := json.Marshal(events)
// TODO(sissel): check error
// Compress it
// A new zlib writer is used for every payload of events so that any
// individual payload can be decompressed alone.
// TODO(sissel): Make compression level tunable
compressor, _ := zlib.NewWriterLevel(&buffer, 3)
buffer.Truncate(0)
_, err := compressor.Write(data)
err = compressor.Flush()
compressor.Close()
//log.Printf("compressed %d bytes\n", buffer.Len())
// TODO(sissel): check err
// TODO(sissel): implement security/encryption/etc
// Send full payload over zeromq REQ/REP
// TODO(sissel): check error
//buffer.Write(data)
ciphertext, nonce := session.Box(buffer.Bytes())
//log.Printf("plaintext: %d\n", len(data))
//log.Printf("compressed: %d\n", buffer.Len())
//log.Printf("ciphertext: %d %v\n", len(ciphertext), ciphertext[:20])
//log.Printf("nonce: %d\n", len(nonce))
// TODO(sissel): figure out encoding for ciphertext + nonce
// TODO(sissel): figure out encoding for ciphertext + nonce
// Loop forever trying to send.
// This will cause reconnects/etc on failures automatically
for {
err = socket.Send(nonce, zmq.SNDMORE)
if err != nil {
continue // send failed, retry!
}
err = socket.Send(ciphertext, 0)
if err != nil {
continue // send failed, retry!
}
data, err = socket.Recv(0)
// TODO(sissel): Figure out acknowledgement protocol? If any?
if err == nil {
break // success!
}
}
// Tell the registrar that we've successfully sent these events
registrar <- events
} /* for each event payload */
} // Publish
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment