Commit ae65de48 authored by Jordan Sissel's avatar Jordan Sissel

gofmt -tabs=false -tabwidth=2

I am not very happy with the formatting gofmt provides, but as there's
the cult of gophers who seems to love this tool, it makes sense to try
to avoid "why not gofmt!?" discussions and not distract existing go
users with differing code style.

Next step is obviously to write logstash-forwarder in !Go just to
avoid formatting wars, right? ;)
parent d2ba8895
......@@ -2,27 +2,27 @@ package main
import (
"encoding/json"
"os"
"log"
"os"
"time"
)
type Config struct {
Network NetworkConfig `json:network`
Files []FileConfig `json:files`
Files []FileConfig `json:files`
}
type NetworkConfig struct {
Servers []string `json:servers`
SSLCertificate string `json:"ssl certificate"`
SSLKey string `json:"ssl key"`
SSLCA string `json:"ssl ca"`
Timeout int64 `json:timeout`
timeout time.Duration
}
Servers []string `json:servers`
SSLCertificate string `json:"ssl certificate"`
SSLKey string `json:"ssl key"`
SSLCA string `json:"ssl ca"`
Timeout int64 `json:timeout`
timeout time.Duration
}
type FileConfig struct {
Paths []string `json:paths`
Paths []string `json:paths`
Fields map[string]string `json:fields`
//DeadTime time.Duration `json:"dead time"`
}
......@@ -37,7 +37,7 @@ func LoadConfig(path string) (config Config, err error) {
fi, _ := config_file.Stat()
if fi.Size() > (10 << 20) {
log.Printf("Config file too large? Aborting, just in case. '%s' is %d bytes\n",
path, fi)
path, fi)
return
}
......@@ -58,9 +58,9 @@ func LoadConfig(path string) (config Config, err error) {
config.Network.timeout = time.Duration(config.Network.Timeout) * time.Second
//for _, fileconfig := range config.Files {
//if fileconfig.DeadTime == 0 {
//fileconfig.DeadTime = 24 * time.Hour
//}
//if fileconfig.DeadTime == 0 {
//fileconfig.DeadTime = 24 * time.Hour
//}
//}
return
......
package main
import (
"testing"
"encoding/json"
"testing"
)
type FileConfig struct {
Paths []string "json:paths"
Paths []string "json:paths"
Fields map[string]string "json:fields"
}
......@@ -14,8 +14,16 @@ func TestJSONLoading(t *testing.T) {
var f File
err := json.Unmarshal([]byte("{ \"paths\": [ \"/var/log/fail2ban.log\" ], \"fields\": { \"type\": \"fail2ban\" } }"), &f)
if err != nil { t.Fatalf("json.Unmarshal failed") }
if len(f.Paths) != 1 { t.FailNow() }
if f.Paths[0] != "/var/log/fail2ban.log" { t.FailNow() }
if f.Fields["type"] != "fail2ban" { t.FailNow() }
if err != nil {
t.Fatalf("json.Unmarshal failed")
}
if len(f.Paths) != 1 {
t.FailNow()
}
if f.Paths[0] != "/var/log/fail2ban.log" {
t.FailNow()
}
if f.Fields["type"] != "fail2ban" {
t.FailNow()
}
}
......@@ -4,9 +4,9 @@ import "os"
type FileEvent struct {
Source *string `json:"source,omitempty"`
Offset int64 `json:"offset,omitempty"`
Line uint64 `json:"line,omitempty"`
Text *string `json:"text,omitempty"`
Offset int64 `json:"offset,omitempty"`
Line uint64 `json:"line,omitempty"`
Text *string `json:"text,omitempty"`
Fields *map[string]string
fileinfo *os.FileInfo
......
......@@ -32,4 +32,3 @@ func is_file_renamed(file string, info os.FileInfo, fileinfo map[string]os.FileI
}
return false
}
......@@ -2,7 +2,7 @@ package main
type FileState struct {
Source *string `json:"source,omitempty"`
Offset int64 `json:"offset,omitempty"`
Inode uint64 `json:"inode,omitempty"`
Device int32 `json:"device,omitempty"`
Offset int64 `json:"offset,omitempty"`
Inode uint64 `json:"inode,omitempty"`
Device int32 `json:"device,omitempty"`
}
......@@ -2,7 +2,7 @@ package main
type FileState struct {
Source *string `json:"source,omitempty"`
Offset int64 `json:"offset,omitempty"`
Inode uint64 `json:"inode,omitempty"`
Device uint64 `json:"device,omitempty"`
Offset int64 `json:"offset,omitempty"`
Inode uint64 `json:"inode,omitempty"`
Device uint64 `json:"device,omitempty"`
}
......@@ -2,7 +2,7 @@ package main
type FileState struct {
Source *string `json:"source,omitempty"`
Offset int64 `json:"offset,omitempty"`
Inode uint64 `json:"inode,omitempty"`
Device uint64 `json:"device,omitempty"`
Offset int64 `json:"offset,omitempty"`
Inode uint64 `json:"inode,omitempty"`
Device uint64 `json:"device,omitempty"`
}
package main
import (
"os" // for File and friends
"log"
"bufio"
"bytes"
"io"
"bufio"
"log"
"os" // for File and friends
"time"
)
type Harvester struct {
Path string /* the file path to harvest */
Path string /* the file path to harvest */
Fields map[string]string
Offset int64
......@@ -71,14 +71,14 @@ func (h *Harvester) Harvest(output chan *FileEvent) {
line++
event := &FileEvent{
Source: &h.Path,
Offset: offset,
Line: line,
Text: text,
Fields: &h.Fields,
Source: &h.Path,
Offset: offset,
Line: line,
Text: text,
Fields: &h.Fields,
fileinfo: &info,
}
offset += int64(len(*event.Text)) + 1 // +1 because of the line terminator
offset += int64(len(*event.Text)) + 1 // +1 because of the line terminator
output <- event // ship the new event downstream
} /* forever */
......@@ -89,7 +89,7 @@ func (h *Harvester) open() *os.File {
if h.Path == "-" {
h.file = os.Stdin
return h.file
}
}
for {
var err error
......
package main
import (
"flag"
"log"
"os"
"time"
"flag"
"runtime/pprof"
"time"
)
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var spool_size = flag.Uint64("spool-size", 1024, "Maximum number of events to spool before a flush is forced.")
var idle_timeout = flag.Duration("idle-flush-time", 5 * time.Second, "Maximum time to wait for a full spool before flushing anyway")
var idle_timeout = flag.Duration("idle-flush-time", 5*time.Second, "Maximum time to wait for a full spool before flushing anyway")
var config_file = flag.String("config", "", "The config file to load")
var use_syslog = flag.Bool("log-to-syslog", false, "Log to syslog instead of stdout")
var from_beginning = flag.Bool("from-beginning", false, "Read new files from the beginning, instead of the end")
......@@ -21,7 +21,7 @@ func main() {
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal(err)
log.Fatal(err)
}
pprof.StartCPUProfile(f)
go func() {
......@@ -52,7 +52,7 @@ func main() {
// - registrar: records positions of files read
// Finally, prospector uses the registrar information, on restart, to
// determine where in each file to resume a harvester.
log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds)
if *use_syslog {
configureSyslog()
......
package main
import (
"time"
"path/filepath"
"encoding/json"
"os"
"log"
"os"
"path/filepath"
"time"
)
func Prospect(fileconfig FileConfig, output chan *FileEvent) {
......@@ -49,7 +49,9 @@ func resume_tracking(fileconfig FileConfig, fileinfo map[string]os.FileInfo, out
// if the file is the same inode/device as we last saw,
// start a harvester on it at the last known position
info, err := os.Stat(path)
if err != nil { continue }
if err != nil {
continue
}
if is_file_same(path, info, state) {
// same file, seek to last known position
......@@ -58,7 +60,7 @@ func resume_tracking(fileconfig FileConfig, fileinfo map[string]os.FileInfo, out
for _, pathglob := range fileconfig.Paths {
match, _ := filepath.Match(pathglob, path)
if match {
harvester := Harvester{Path: path, Fields: fileconfig.Fields, Offset: state.Offset }
harvester := Harvester{Path: path, Fields: fileconfig.Fields, Offset: state.Offset}
go harvester.Harvest(output)
break
}
......@@ -68,9 +70,9 @@ func resume_tracking(fileconfig FileConfig, fileinfo map[string]os.FileInfo, out
}
}
func prospector_scan(path string, fields map[string]string,
fileinfo map[string]os.FileInfo,
output chan *FileEvent) {
func prospector_scan(path string, fields map[string]string,
fileinfo map[string]os.FileInfo,
output chan *FileEvent) {
//log.Printf("Prospecting %s\n", path)
// Evaluate the path as a wildcards/shell glob
......@@ -109,7 +111,7 @@ func prospector_scan(path string, fields map[string]string,
// Conditions for starting a new harvester:
// - file path hasn't been seen before
// - the file's inode or device changed
if !is_known {
if !is_known {
// TODO(sissel): Skip files with modification dates older than N
// TODO(sissel): Make the 'ignore if older than N' tunable
if time.Since(info.ModTime()) > 24*time.Hour {
......
package main
import (
"math/rand"
"bytes"
"encoding/binary"
"encoding/pem"
"compress/zlib"
"crypto/tls"
"crypto/x509"
"net"
"encoding/binary"
"encoding/pem"
"fmt"
"io"
"os"
"io/ioutil"
"log"
"time"
"compress/zlib"
"strconv"
"math/rand"
"net"
"os"
"regexp"
"fmt"
"strconv"
"time"
)
var hostname string
......@@ -29,8 +29,8 @@ func init() {
}
func Publishv1(input chan []*FileEvent,
registrar chan []*FileEvent,
config *NetworkConfig) {
registrar chan []*FileEvent,
config *NetworkConfig) {
var buffer bytes.Buffer
var socket *tls.Conn
var sequence uint32
......@@ -56,7 +56,7 @@ func Publishv1(input chan []*FileEvent,
oops := func(err error) {
// TODO(sissel): Track how frequently we timeout and reconnect. If we're
// timing out too frequently, there's really no point in timing out since
// basically everything is slow or down. We'll want to ratchet up the
// basically everything is slow or down. We'll want to ratchet up the
// timeout value slowly until things improve, then ratchet it down once
// things seem healthy.
log.Printf("Socket error, will reconnect: %s\n", err)
......@@ -65,24 +65,40 @@ func Publishv1(input chan []*FileEvent,
socket = connect(config)
}
SendPayload: for {
SendPayload:
for {
// Abort if our whole request takes longer than the configured
// network timeout.
socket.SetDeadline(time.Now().Add(config.timeout))
// Set the window size to the length of this payload in events.
_, err = socket.Write([]byte("1W"))
if err != nil { oops(err); continue }
if err != nil {
oops(err)
continue
}
binary.Write(socket, binary.BigEndian, uint32(len(events)))
if err != nil { oops(err); continue }
if err != nil {
oops(err)
continue
}
// Write compressed frame
socket.Write([]byte("1C"))
if err != nil { oops(err); continue }
if err != nil {
oops(err)
continue
}
binary.Write(socket, binary.BigEndian, uint32(len(compressed_payload)))
if err != nil { oops(err); continue }
if err != nil {
oops(err)
continue
}
_, err = socket.Write(compressed_payload)
if err != nil { oops(err); continue }
if err != nil {
oops(err)
continue
}
// read ack
response := make([]byte, 0, 6)
......@@ -114,7 +130,7 @@ func connect(config *NetworkConfig) (socket *tls.Conn) {
if len(config.SSLCertificate) > 0 && len(config.SSLKey) > 0 {
log.Printf("Loading client ssl certificate: %s and %s\n",
config.SSLCertificate, config.SSLKey)
config.SSLCertificate, config.SSLKey)
cert, err := tls.LoadX509KeyPair(config.SSLCertificate, config.SSLKey)
if err != nil {
log.Fatalf("Failed loading client ssl certificate: %s\n", err)
......@@ -127,7 +143,9 @@ func connect(config *NetworkConfig) (socket *tls.Conn) {
tlsconfig.RootCAs = x509.NewCertPool()
pemdata, err := ioutil.ReadFile(config.SSLCA)
if err != nil { log.Fatalf("Failure reading CA certificate: %s\n", err) }
if err != nil {
log.Fatalf("Failure reading CA certificate: %s\n", err)
}
block, _ := pem.Decode(pemdata)
if block == nil {
......@@ -146,7 +164,7 @@ func connect(config *NetworkConfig) (socket *tls.Conn) {
for {
// Pick a random server from the list.
hostport := config.Servers[rand.Int() % len(config.Servers)]
hostport := config.Servers[rand.Int()%len(config.Servers)]
submatch := hostport_re.FindSubmatch([]byte(hostport))
if submatch == nil {
log.Fatalf("Invalid host:port given: %s", hostport)
......@@ -156,12 +174,12 @@ func connect(config *NetworkConfig) (socket *tls.Conn) {
addresses, err := net.LookupHost(host)
if err != nil {
log.Printf("DNS lookup failure \"%s\": %s\n", host, err);
log.Printf("DNS lookup failure \"%s\": %s\n", host, err)
time.Sleep(1 * time.Second)
continue
}
address := addresses[rand.Int() % len(addresses)]
address := addresses[rand.Int()%len(addresses)]
addressport := fmt.Sprintf("%s:%s", address, port)
log.Printf("Connecting to %s (%s) \n", addressport, host)
......@@ -198,13 +216,13 @@ func writeDataFrame(event *FileEvent, sequence uint32, output io.Writer) {
// sequence number
binary.Write(output, binary.BigEndian, uint32(sequence))
// 'pair' count
binary.Write(output, binary.BigEndian, uint32(len(*event.Fields) + 4))
binary.Write(output, binary.BigEndian, uint32(len(*event.Fields)+4))
writeKV("file", *event.Source, output)
writeKV("host", hostname, output)
writeKV("offset", strconv.FormatInt(event.Offset, 10), output)
writeKV("line", *event.Text, output)
for k, v := range(*event.Fields) {
for k, v := range *event.Fields {
writeKV(k, v, output)
}
}
......
......@@ -23,7 +23,7 @@ func Registrar(input chan []*FileEvent) {
// take the offset + length of the line + newline char and
// save it as the new starting offset.
Offset: event.Offset + int64(len(*event.Text)) + 1,
Inode: ino,
Inode: ino,
Device: dev,
}
//log.Printf("State %s: %d\n", *event.Source, event.Offset)
......@@ -34,4 +34,3 @@ func Registrar(input chan []*FileEvent) {
}
}
}
// +build !windows
package main
import (
"encoding/json"
"os"
"log"
"os"
)
func WriteRegistry(state map[string]*FileState, path string) {
......
......@@ -2,8 +2,8 @@ package main
import (
"encoding/json"
"os"
"log"
"os"
)
func WriteRegistry(state map[string]*FileState, path string) {
......
......@@ -4,10 +4,10 @@ import (
"time"
)
func Spool(input chan *FileEvent,
output chan []*FileEvent,
max_size uint64,
idle_timeout time.Duration) {
func Spool(input chan *FileEvent,
output chan []*FileEvent,
max_size uint64,
idle_timeout time.Duration) {
// heartbeat periodically. If the last flush was longer than
// 'idle_timeout' time ago, then we'll force a flush to prevent us from
// holding on to spooled events for too long.
......@@ -24,38 +24,38 @@ func Spool(input chan *FileEvent,
next_flush_time := time.Now().Add(idle_timeout)
for {
select {
case event := <- input:
//append(spool, event)
spool[spool_i] = event
spool_i++
// Flush if full
if spool_i == cap(spool) {
//spoolcopy := make([]*FileEvent, max_size)
case event := <-input:
//append(spool, event)
spool[spool_i] = event
spool_i++
// Flush if full
if spool_i == cap(spool) {
//spoolcopy := make([]*FileEvent, max_size)
var spoolcopy []*FileEvent
//fmt.Println(spool[0])
spoolcopy = append(spoolcopy, spool[:]...)
output <- spoolcopy
next_flush_time = time.Now().Add(idle_timeout)
spool_i = 0
}
case <-ticker.C:
//fmt.Println("tick")
if now := time.Now(); now.After(next_flush_time) {
// if current time is after the next_flush_time, flush!
//fmt.Printf("timeout: %d exceeded by %d\n", idle_timeout,
//now.Sub(next_flush_time))
// Flush what we have, if anything
if spool_i > 0 {
var spoolcopy []*FileEvent
//fmt.Println(spool[0])
spoolcopy = append(spoolcopy, spool[:]...)
spoolcopy = append(spoolcopy, spool[0:spool_i]...)
output <- spoolcopy
next_flush_time = time.Now().Add(idle_timeout)
next_flush_time = now.Add(idle_timeout)
spool_i = 0
}
case <- ticker.C:
//fmt.Println("tick")
if now := time.Now(); now.After(next_flush_time) {
// if current time is after the next_flush_time, flush!
//fmt.Printf("timeout: %d exceeded by %d\n", idle_timeout,
//now.Sub(next_flush_time))
// Flush what we have, if anything
if spool_i > 0 {
var spoolcopy []*FileEvent
spoolcopy = append(spoolcopy, spool[0:spool_i]...)
output <- spoolcopy
next_flush_time = now.Add(idle_timeout)
spool_i = 0
}
} /* if 'now' is after 'next_flush_time' */
} /* if 'now' is after 'next_flush_time' */
/* case ... */
} /* select */
} /* for */
......
// +build !windows
package main
import (
"log"
"log/syslog"
)
func configureSyslog() {
writer, err := syslog.New(syslog.LOG_INFO | syslog.LOG_DAEMON, "logstash-forwarder")
writer, err := syslog.New(syslog.LOG_INFO|syslog.LOG_DAEMON, "logstash-forwarder")
if err != nil {
log.Fatalf("Failed to open syslog: %s\n", err)
return
......
......@@ -3,5 +3,5 @@ package main
import "log"
func configureSyslog() {
log.Printf("Logging to syslog not supported on this platform\n");
log.Printf("Logging to syslog not supported on this platform\n")
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment