Commit 8c050664 authored by Jordan Sissel's avatar Jordan Sissel

Merge pull request #156 from elasticsearch/gofmt

gofmt -tabs=false -tabwidth=2
parents d2ba8895 ae65de48
......@@ -2,27 +2,27 @@ package main
import (
"encoding/json"
"os"
"log"
"os"
"time"
)
type Config struct {
Network NetworkConfig `json:network`
Files []FileConfig `json:files`
Files []FileConfig `json:files`
}
type NetworkConfig struct {
Servers []string `json:servers`
SSLCertificate string `json:"ssl certificate"`
SSLKey string `json:"ssl key"`
SSLCA string `json:"ssl ca"`
Timeout int64 `json:timeout`
timeout time.Duration
}
Servers []string `json:servers`
SSLCertificate string `json:"ssl certificate"`
SSLKey string `json:"ssl key"`
SSLCA string `json:"ssl ca"`
Timeout int64 `json:timeout`
timeout time.Duration
}
type FileConfig struct {
Paths []string `json:paths`
Paths []string `json:paths`
Fields map[string]string `json:fields`
//DeadTime time.Duration `json:"dead time"`
}
......@@ -37,7 +37,7 @@ func LoadConfig(path string) (config Config, err error) {
fi, _ := config_file.Stat()
if fi.Size() > (10 << 20) {
log.Printf("Config file too large? Aborting, just in case. '%s' is %d bytes\n",
path, fi)
path, fi)
return
}
......@@ -58,9 +58,9 @@ func LoadConfig(path string) (config Config, err error) {
config.Network.timeout = time.Duration(config.Network.Timeout) * time.Second
//for _, fileconfig := range config.Files {
//if fileconfig.DeadTime == 0 {
//fileconfig.DeadTime = 24 * time.Hour
//}
//if fileconfig.DeadTime == 0 {
//fileconfig.DeadTime = 24 * time.Hour
//}
//}
return
......
package main
import (
"testing"
"encoding/json"
"testing"
)
type FileConfig struct {
Paths []string "json:paths"
Paths []string "json:paths"
Fields map[string]string "json:fields"
}
......@@ -14,8 +14,16 @@ func TestJSONLoading(t *testing.T) {
var f File
err := json.Unmarshal([]byte("{ \"paths\": [ \"/var/log/fail2ban.log\" ], \"fields\": { \"type\": \"fail2ban\" } }"), &f)
if err != nil { t.Fatalf("json.Unmarshal failed") }
if len(f.Paths) != 1 { t.FailNow() }
if f.Paths[0] != "/var/log/fail2ban.log" { t.FailNow() }
if f.Fields["type"] != "fail2ban" { t.FailNow() }
if err != nil {
t.Fatalf("json.Unmarshal failed")
}
if len(f.Paths) != 1 {
t.FailNow()
}
if f.Paths[0] != "/var/log/fail2ban.log" {
t.FailNow()
}
if f.Fields["type"] != "fail2ban" {
t.FailNow()
}
}
......@@ -4,9 +4,9 @@ import "os"
type FileEvent struct {
Source *string `json:"source,omitempty"`
Offset int64 `json:"offset,omitempty"`
Line uint64 `json:"line,omitempty"`
Text *string `json:"text,omitempty"`
Offset int64 `json:"offset,omitempty"`
Line uint64 `json:"line,omitempty"`
Text *string `json:"text,omitempty"`
Fields *map[string]string
fileinfo *os.FileInfo
......
......@@ -32,4 +32,3 @@ func is_file_renamed(file string, info os.FileInfo, fileinfo map[string]os.FileI
}
return false
}
......@@ -2,7 +2,7 @@ package main
type FileState struct {
Source *string `json:"source,omitempty"`
Offset int64 `json:"offset,omitempty"`
Inode uint64 `json:"inode,omitempty"`
Device int32 `json:"device,omitempty"`
Offset int64 `json:"offset,omitempty"`
Inode uint64 `json:"inode,omitempty"`
Device int32 `json:"device,omitempty"`
}
......@@ -2,7 +2,7 @@ package main
type FileState struct {
Source *string `json:"source,omitempty"`
Offset int64 `json:"offset,omitempty"`
Inode uint64 `json:"inode,omitempty"`
Device uint64 `json:"device,omitempty"`
Offset int64 `json:"offset,omitempty"`
Inode uint64 `json:"inode,omitempty"`
Device uint64 `json:"device,omitempty"`
}
......@@ -2,7 +2,7 @@ package main
type FileState struct {
Source *string `json:"source,omitempty"`
Offset int64 `json:"offset,omitempty"`
Inode uint64 `json:"inode,omitempty"`
Device uint64 `json:"device,omitempty"`
Offset int64 `json:"offset,omitempty"`
Inode uint64 `json:"inode,omitempty"`
Device uint64 `json:"device,omitempty"`
}
package main
import (
"os" // for File and friends
"log"
"bufio"
"bytes"
"io"
"bufio"
"log"
"os" // for File and friends
"time"
)
type Harvester struct {
Path string /* the file path to harvest */
Path string /* the file path to harvest */
Fields map[string]string
Offset int64
......@@ -71,14 +71,14 @@ func (h *Harvester) Harvest(output chan *FileEvent) {
line++
event := &FileEvent{
Source: &h.Path,
Offset: offset,
Line: line,
Text: text,
Fields: &h.Fields,
Source: &h.Path,
Offset: offset,
Line: line,
Text: text,
Fields: &h.Fields,
fileinfo: &info,
}
offset += int64(len(*event.Text)) + 1 // +1 because of the line terminator
offset += int64(len(*event.Text)) + 1 // +1 because of the line terminator
output <- event // ship the new event downstream
} /* forever */
......@@ -89,7 +89,7 @@ func (h *Harvester) open() *os.File {
if h.Path == "-" {
h.file = os.Stdin
return h.file
}
}
for {
var err error
......
package main
import (
"flag"
"log"
"os"
"time"
"flag"
"runtime/pprof"
"time"
)
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var spool_size = flag.Uint64("spool-size", 1024, "Maximum number of events to spool before a flush is forced.")
var idle_timeout = flag.Duration("idle-flush-time", 5 * time.Second, "Maximum time to wait for a full spool before flushing anyway")
var idle_timeout = flag.Duration("idle-flush-time", 5*time.Second, "Maximum time to wait for a full spool before flushing anyway")
var config_file = flag.String("config", "", "The config file to load")
var use_syslog = flag.Bool("log-to-syslog", false, "Log to syslog instead of stdout")
var from_beginning = flag.Bool("from-beginning", false, "Read new files from the beginning, instead of the end")
......@@ -21,7 +21,7 @@ func main() {
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal(err)
log.Fatal(err)
}
pprof.StartCPUProfile(f)
go func() {
......@@ -52,7 +52,7 @@ func main() {
// - registrar: records positions of files read
// Finally, prospector uses the registrar information, on restart, to
// determine where in each file to resume a harvester.
log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds)
if *use_syslog {
configureSyslog()
......
package main
import (
"time"
"path/filepath"
"encoding/json"
"os"
"log"
"os"
"path/filepath"
"time"
)
func Prospect(fileconfig FileConfig, output chan *FileEvent) {
......@@ -49,7 +49,9 @@ func resume_tracking(fileconfig FileConfig, fileinfo map[string]os.FileInfo, out
// if the file is the same inode/device as we last saw,
// start a harvester on it at the last known position
info, err := os.Stat(path)
if err != nil { continue }
if err != nil {
continue
}
if is_file_same(path, info, state) {
// same file, seek to last known position
......@@ -58,7 +60,7 @@ func resume_tracking(fileconfig FileConfig, fileinfo map[string]os.FileInfo, out
for _, pathglob := range fileconfig.Paths {
match, _ := filepath.Match(pathglob, path)
if match {
harvester := Harvester{Path: path, Fields: fileconfig.Fields, Offset: state.Offset }
harvester := Harvester{Path: path, Fields: fileconfig.Fields, Offset: state.Offset}
go harvester.Harvest(output)
break
}
......@@ -68,9 +70,9 @@ func resume_tracking(fileconfig FileConfig, fileinfo map[string]os.FileInfo, out
}
}
func prospector_scan(path string, fields map[string]string,
fileinfo map[string]os.FileInfo,
output chan *FileEvent) {
func prospector_scan(path string, fields map[string]string,
fileinfo map[string]os.FileInfo,
output chan *FileEvent) {
//log.Printf("Prospecting %s\n", path)
// Evaluate the path as a wildcards/shell glob
......@@ -109,7 +111,7 @@ func prospector_scan(path string, fields map[string]string,
// Conditions for starting a new harvester:
// - file path hasn't been seen before
// - the file's inode or device changed
if !is_known {
if !is_known {
// TODO(sissel): Skip files with modification dates older than N
// TODO(sissel): Make the 'ignore if older than N' tunable
if time.Since(info.ModTime()) > 24*time.Hour {
......
package main
import (
"math/rand"
"bytes"
"encoding/binary"
"encoding/pem"
"compress/zlib"
"crypto/tls"
"crypto/x509"
"net"
"encoding/binary"
"encoding/pem"
"fmt"
"io"
"os"
"io/ioutil"
"log"
"time"
"compress/zlib"
"strconv"
"math/rand"
"net"
"os"
"regexp"
"fmt"
"strconv"
"time"
)
var hostname string
......@@ -29,8 +29,8 @@ func init() {
}
func Publishv1(input chan []*FileEvent,
registrar chan []*FileEvent,
config *NetworkConfig) {
registrar chan []*FileEvent,
config *NetworkConfig) {
var buffer bytes.Buffer
var socket *tls.Conn
var sequence uint32
......@@ -56,7 +56,7 @@ func Publishv1(input chan []*FileEvent,
oops := func(err error) {
// TODO(sissel): Track how frequently we timeout and reconnect. If we're
// timing out too frequently, there's really no point in timing out since
// basically everything is slow or down. We'll want to ratchet up the
// basically everything is slow or down. We'll want to ratchet up the
// timeout value slowly until things improve, then ratchet it down once
// things seem healthy.
log.Printf("Socket error, will reconnect: %s\n", err)
......@@ -65,24 +65,40 @@ func Publishv1(input chan []*FileEvent,
socket = connect(config)
}
SendPayload: for {
SendPayload:
for {
// Abort if our whole request takes longer than the configured
// network timeout.
socket.SetDeadline(time.Now().Add(config.timeout))
// Set the window size to the length of this payload in events.
_, err = socket.Write([]byte("1W"))
if err != nil { oops(err); continue }
if err != nil {
oops(err)
continue
}
binary.Write(socket, binary.BigEndian, uint32(len(events)))
if err != nil { oops(err); continue }
if err != nil {
oops(err)
continue
}
// Write compressed frame
socket.Write([]byte("1C"))
if err != nil { oops(err); continue }
if err != nil {
oops(err)
continue
}
binary.Write(socket, binary.BigEndian, uint32(len(compressed_payload)))
if err != nil { oops(err); continue }
if err != nil {
oops(err)
continue
}
_, err = socket.Write(compressed_payload)
if err != nil { oops(err); continue }
if err != nil {
oops(err)
continue
}
// read ack
response := make([]byte, 0, 6)
......@@ -114,7 +130,7 @@ func connect(config *NetworkConfig) (socket *tls.Conn) {
if len(config.SSLCertificate) > 0 && len(config.SSLKey) > 0 {
log.Printf("Loading client ssl certificate: %s and %s\n",
config.SSLCertificate, config.SSLKey)
config.SSLCertificate, config.SSLKey)
cert, err := tls.LoadX509KeyPair(config.SSLCertificate, config.SSLKey)
if err != nil {
log.Fatalf("Failed loading client ssl certificate: %s\n", err)
......@@ -127,7 +143,9 @@ func connect(config *NetworkConfig) (socket *tls.Conn) {
tlsconfig.RootCAs = x509.NewCertPool()
pemdata, err := ioutil.ReadFile(config.SSLCA)
if err != nil { log.Fatalf("Failure reading CA certificate: %s\n", err) }
if err != nil {
log.Fatalf("Failure reading CA certificate: %s\n", err)
}
block, _ := pem.Decode(pemdata)
if block == nil {
......@@ -146,7 +164,7 @@ func connect(config *NetworkConfig) (socket *tls.Conn) {
for {
// Pick a random server from the list.
hostport := config.Servers[rand.Int() % len(config.Servers)]
hostport := config.Servers[rand.Int()%len(config.Servers)]
submatch := hostport_re.FindSubmatch([]byte(hostport))
if submatch == nil {
log.Fatalf("Invalid host:port given: %s", hostport)
......@@ -156,12 +174,12 @@ func connect(config *NetworkConfig) (socket *tls.Conn) {
addresses, err := net.LookupHost(host)
if err != nil {
log.Printf("DNS lookup failure \"%s\": %s\n", host, err);
log.Printf("DNS lookup failure \"%s\": %s\n", host, err)
time.Sleep(1 * time.Second)
continue
}
address := addresses[rand.Int() % len(addresses)]
address := addresses[rand.Int()%len(addresses)]
addressport := fmt.Sprintf("%s:%s", address, port)
log.Printf("Connecting to %s (%s) \n", addressport, host)
......@@ -198,13 +216,13 @@ func writeDataFrame(event *FileEvent, sequence uint32, output io.Writer) {
// sequence number
binary.Write(output, binary.BigEndian, uint32(sequence))
// 'pair' count
binary.Write(output, binary.BigEndian, uint32(len(*event.Fields) + 4))
binary.Write(output, binary.BigEndian, uint32(len(*event.Fields)+4))
writeKV("file", *event.Source, output)
writeKV("host", hostname, output)
writeKV("offset", strconv.FormatInt(event.Offset, 10), output)
writeKV("line", *event.Text, output)
for k, v := range(*event.Fields) {
for k, v := range *event.Fields {
writeKV(k, v, output)
}
}
......
......@@ -23,7 +23,7 @@ func Registrar(input chan []*FileEvent) {
// take the offset + length of the line + newline char and
// save it as the new starting offset.
Offset: event.Offset + int64(len(*event.Text)) + 1,
Inode: ino,
Inode: ino,
Device: dev,
}
//log.Printf("State %s: %d\n", *event.Source, event.Offset)
......@@ -34,4 +34,3 @@ func Registrar(input chan []*FileEvent) {
}
}
}
// +build !windows
package main
import (
"encoding/json"
"os"
"log"
"os"
)
func WriteRegistry(state map[string]*FileState, path string) {
......
......@@ -2,8 +2,8 @@ package main
import (
"encoding/json"
"os"
"log"
"os"
)
func WriteRegistry(state map[string]*FileState, path string) {
......
......@@ -4,10 +4,10 @@ import (
"time"
)
func Spool(input chan *FileEvent,
output chan []*FileEvent,
max_size uint64,
idle_timeout time.Duration) {
func Spool(input chan *FileEvent,
output chan []*FileEvent,
max_size uint64,
idle_timeout time.Duration) {
// heartbeat periodically. If the last flush was longer than
// 'idle_timeout' time ago, then we'll force a flush to prevent us from
// holding on to spooled events for too long.
......@@ -24,38 +24,38 @@ func Spool(input chan *FileEvent,
next_flush_time := time.Now().Add(idle_timeout)
for {
select {
case event := <- input:
//append(spool, event)
spool[spool_i] = event
spool_i++
// Flush if full
if spool_i == cap(spool) {
//spoolcopy := make([]*FileEvent, max_size)
case event := <-input:
//append(spool, event)
spool[spool_i] = event
spool_i++
// Flush if full
if spool_i == cap(spool) {
//spoolcopy := make([]*FileEvent, max_size)
var spoolcopy []*FileEvent
//fmt.Println(spool[0])
spoolcopy = append(spoolcopy, spool[:]...)
output <- spoolcopy
next_flush_time = time.Now().Add(idle_timeout)
spool_i = 0
}
case <-ticker.C:
//fmt.Println("tick")
if now := time.Now(); now.After(next_flush_time) {
// if current time is after the next_flush_time, flush!
//fmt.Printf("timeout: %d exceeded by %d\n", idle_timeout,
//now.Sub(next_flush_time))
// Flush what we have, if anything
if spool_i > 0 {
var spoolcopy []*FileEvent
//fmt.Println(spool[0])
spoolcopy = append(spoolcopy, spool[:]...)
spoolcopy = append(spoolcopy, spool[0:spool_i]...)
output <- spoolcopy
next_flush_time = time.Now().Add(idle_timeout)
next_flush_time = now.Add(idle_timeout)
spool_i = 0
}
case <- ticker.C:
//fmt.Println("tick")
if now := time.Now(); now.After(next_flush_time) {
// if current time is after the next_flush_time, flush!
//fmt.Printf("timeout: %d exceeded by %d\n", idle_timeout,
//now.Sub(next_flush_time))
// Flush what we have, if anything
if spool_i > 0 {
var spoolcopy []*FileEvent
spoolcopy = append(spoolcopy, spool[0:spool_i]...)
output <- spoolcopy
next_flush_time = now.Add(idle_timeout)
spool_i = 0
}
} /* if 'now' is after 'next_flush_time' */
} /* if 'now' is after 'next_flush_time' */
/* case ... */
} /* select */
} /* for */
......
// +build !windows
package main
import (
"log"
"log/syslog"
)
func configureSyslog() {
writer, err := syslog.New(syslog.LOG_INFO | syslog.LOG_DAEMON, "logstash-forwarder")
writer, err := syslog.New(syslog.LOG_INFO|syslog.LOG_DAEMON, "logstash-forwarder")
if err != nil {
log.Fatalf("Failed to open syslog: %s\n", err)
return
......
......@@ -3,5 +3,5 @@ package main
import "log"
func configureSyslog() {
log.Printf("Logging to syslog not supported on this platform\n");
log.Printf("Logging to syslog not supported on this platform\n")
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment