logstash-forwarder.go 2.26 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
package main

import (
  "log"
  "os"
  "time"
  "flag"
  "runtime/pprof"
)

var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var spool_size = flag.Uint64("spool-size", 1024, "Maximum number of events to spool before a flush is forced.")
var idle_timeout = flag.Duration("idle-flush-time", 5 * time.Second, "Maximum time to wait for a full spool before flushing anyway")
var config_file = flag.String("config", "", "The config file to load")
15
var use_syslog = flag.Bool("log-to-syslog", false, "Log to syslog instead of stdout")
16
var from_beginning = flag.Bool("from-beginning", false, "Read new files from the beginning, instead of the end")
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54

func main() {
  flag.Parse()

  if *cpuprofile != "" {
    f, err := os.Create(*cpuprofile)
    if err != nil {
        log.Fatal(err)
    }
    pprof.StartCPUProfile(f)
    go func() {
      time.Sleep(60 * time.Second)
      pprof.StopCPUProfile()
      panic("done")
    }()
  }

  config, err := LoadConfig(*config_file)
  if err != nil {
    return
  }

  event_chan := make(chan *FileEvent, 16)
  publisher_chan := make(chan []*FileEvent, 1)
  registrar_chan := make(chan []*FileEvent, 1)

  if len(config.Files) == 0 {
    log.Fatalf("No paths given. What files do you want me to watch?\n")
  }

  // The basic model of execution:
  // - prospector: finds files in paths/globs to harvest, starts harvesters
  // - harvester: reads a file, sends events to the spooler
  // - spooler: buffers events until ready to flush to the publisher
  // - publisher: writes to the network, notifies registrar
  // - registrar: records positions of files read
  // Finally, prospector uses the registrar information, on restart, to
  // determine where in each file to resume a harvester.
55 56 57
  
  log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds)
  if *use_syslog {
58
    configureSyslog()
59
  }
60 61 62 63 64 65 66 67 68

  // Prospect the globs/paths given on the command line and launch harvesters
  for _, fileconfig := range config.Files {
    go Prospect(fileconfig, event_chan)
  }

  // Harvesters dump events into the spooler.
  go Spool(event_chan, publisher_chan, *spool_size, *idle_timeout)

69
  go Publishv1(publisher_chan, registrar_chan, &config.Network)
70

71
  // registrar records last acknowledged positions in all files.
72 73
  Registrar(registrar_chan)
} /* main */