logstash-forwarder.go 2.25 KB
Newer Older
1 2 3
package main

import (
4
  "flag"
5 6 7
  "log"
  "os"
  "runtime/pprof"
8
  "time"
9 10 11 12
)

var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var spool_size = flag.Uint64("spool-size", 1024, "Maximum number of events to spool before a flush is forced.")
13
var idle_timeout = flag.Duration("idle-flush-time", 5*time.Second, "Maximum time to wait for a full spool before flushing anyway")
14
var config_file = flag.String("config", "", "The config file to load")
Jordan Sissel's avatar
Jordan Sissel committed
15
var use_syslog = flag.Bool("log-to-syslog", false, "Log to syslog instead of stdout")
16
var from_beginning = flag.Bool("from-beginning", false, "Read new files from the beginning, instead of the end")
17 18 19 20 21 22 23

func main() {
  flag.Parse()

  if *cpuprofile != "" {
    f, err := os.Create(*cpuprofile)
    if err != nil {
24
      log.Fatal(err)
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
    }
    pprof.StartCPUProfile(f)
    go func() {
      time.Sleep(60 * time.Second)
      pprof.StopCPUProfile()
      panic("done")
    }()
  }

  config, err := LoadConfig(*config_file)
  if err != nil {
    return
  }

  event_chan := make(chan *FileEvent, 16)
  publisher_chan := make(chan []*FileEvent, 1)
  registrar_chan := make(chan []*FileEvent, 1)

  if len(config.Files) == 0 {
    log.Fatalf("No paths given. What files do you want me to watch?\n")
  }

  // The basic model of execution:
  // - prospector: finds files in paths/globs to harvest, starts harvesters
  // - harvester: reads a file, sends events to the spooler
  // - spooler: buffers events until ready to flush to the publisher
  // - publisher: writes to the network, notifies registrar
  // - registrar: records positions of files read
  // Finally, prospector uses the registrar information, on restart, to
  // determine where in each file to resume a harvester.
55

Jordan Sissel's avatar
Jordan Sissel committed
56 57
  log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds)
  if *use_syslog {
58
    configureSyslog()
Jordan Sissel's avatar
Jordan Sissel committed
59
  }
60 61 62 63 64 65 66 67 68

  // Prospect the globs/paths given on the command line and launch harvesters
  for _, fileconfig := range config.Files {
    go Prospect(fileconfig, event_chan)
  }

  // Harvesters dump events into the spooler.
  go Spool(event_chan, publisher_chan, *spool_size, *idle_timeout)

69
  go Publishv1(publisher_chan, registrar_chan, &config.Network)
70

71
  // registrar records last acknowledged positions in all files.
72 73
  Registrar(registrar_chan)
} /* main */