lumberjack.go 4.45 KB
Newer Older
1
2
3
package main

import (
4
  "log"
5
  lumberjack "liblumberjack"
Jordan Sissel's avatar
Jordan Sissel committed
6
  "os"
7
  "time"
8
9
10
  "flag"
  "strings"
  "runtime/pprof"
11
  "sodium"
12
13
)

14
15
16
17
18
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var spool_size = flag.Uint64("spool-size", 1024, "Maximum number of events to spool before a flush is forced.")
var idle_timeout = flag.Duration("idle-flush-time", 5 * time.Second, "Maximum time to wait for a full spool before flushing anyway")
var server_timeout = flag.Duration("server-timeout", 30 * time.Second, "Maximum time to wait for a request to a server before giving up and trying another.")
var servers = flag.String("servers", "", "Server (or comma-separated list of servers) to send events to. Each server can be a 'host' or 'host:port'. If the port is not specified, port 5005 is assumed. One server is chosen of the list at random, and only on failure is another server used.")
19
var their_public_key_path = flag.String("their-public-key", "", "the file containing the NaCl public key for the server you are talking to.")
Jordan Sissel's avatar
Jordan Sissel committed
20
21
var our_secret_key_path = flag.String("my-secret-key", "", "the file containing the NaCl secret key for this process to encrypt with. If none is given, one is generated at runtime.")
//var our_public_key_path = flag.String("my-public-key", "", "the file containing the NaCl public key for this process to encrypt with. If you specify this, you MUST specify -my-private-key.")
22
23
24
25
26
27
28
29
30
31
32

func read_key(path string, key []byte) (err error) {
  file, err := os.Open(path)
  if err != nil {
    return
  }

  // TODO(sissel): check length of read
  _, err = file.Read(key)
  return
}
33
34

func main() {
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
  flag.Parse()

  if *cpuprofile != "" {
    f, err := os.Create(*cpuprofile)
    if err != nil {
        log.Fatal(err)
    }
    pprof.StartCPUProfile(f)
    go func() {
      time.Sleep(60 * time.Second)
      pprof.StopCPUProfile()
      panic("done")
    }()
  }

Jordan Sissel's avatar
Jordan Sissel committed
50
51
  if *their_public_key_path == "" {
    log.Fatalf("No -their-public-key flag given")
52
53
  }

54
55
  // Turn 'host' and 'host:port' into 'tcp://host:port'
  if *servers == "" {
56
    log.Fatalf("No servers specified, please provide the -servers setting\n")
57
  }
58

59
60
61
62
63
64
65
66
67
68
69
  server_list := strings.Split(*servers, ",")
  for i, server := range server_list {
    if !strings.Contains(server, ":") {
      server_list[i] = "tcp://" + server + ":5005"
    } else {
      server_list[i] = "tcp://" + server
    }
  }

  log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds)

70
  // TODO(sissel): support flags for setting... stuff
71
72
  event_chan := make(chan *lumberjack.FileEvent, 16)
  publisher_chan := make(chan []*lumberjack.FileEvent, 1)
73
  registrar_chan := make(chan []*lumberjack.FileEvent, 1)
Jordan Sissel's avatar
Jordan Sissel committed
74

75
76
77
78
79
80
81
82
  paths := flag.Args()

  if len(paths) == 0 {
    log.Fatalf("No paths given. What files do you want me to watch?\n")
  }

  var public_key [sodium.PUBLICKEYBYTES]byte

Jordan Sissel's avatar
Jordan Sissel committed
83
  err := read_key(*their_public_key_path, public_key[:])
84
85
  if err != nil {
    log.Fatalf("Unable to read public key path (%s): %s\n",
Jordan Sissel's avatar
Jordan Sissel committed
86
               *their_public_key_path, err)
87
88
89
  }

  var secret_key [sodium.SECRETKEYBYTES]byte
Jordan Sissel's avatar
Jordan Sissel committed
90
  if *our_secret_key_path  == "" {
91
92
93
    log.Printf("No secret key given; generating one.")
    _, secret_key = sodium.CryptoBoxKeypair()
  } else {
Jordan Sissel's avatar
Jordan Sissel committed
94
    err := read_key(*our_secret_key_path, secret_key[:])
95
    if err != nil {
Jordan Sissel's avatar
Jordan Sissel committed
96
97
98
      log.Printf("Unable to read secret key (%s): %s\n",
                 *our_secret_key_path, err)
      log.Printf("Generating a key pair now.\n")
99
100
101
102
103
      _, sk := sodium.CryptoBoxKeypair()
      copy(secret_key[:], sk[:])
    }
  }

Jordan Sissel's avatar
Jordan Sissel committed
104
105
106
  // The basic model of execution:
  // - prospector: finds files in paths/globs to harvest, starts harvesters
  // - harvester: reads a file, sends events to the spooler
Jordan Sissel's avatar
Jordan Sissel committed
107
108
  // - spooler: buffers events until ready to flush to the publisher
  // - publisher: writes to the network, notifies registrar
Jordan Sissel's avatar
Jordan Sissel committed
109
110
111
112
  // - registrar: records positions of files read
  // Finally, prospector uses the registrar information, on restart, to
  // determine where in each file to resume a harvester.

Jordan Sissel's avatar
Jordan Sissel committed
113
  // Prospect the globs/paths given on the command line and launch harvesters
114
  go lumberjack.Prospect(paths, event_chan)
Jordan Sissel's avatar
Jordan Sissel committed
115

Jordan Sissel's avatar
Jordan Sissel committed
116
  // Harvesters dump events into the spooler.
117
  go lumberjack.Spool(event_chan, publisher_chan, *spool_size, *idle_timeout)
Jordan Sissel's avatar
Jordan Sissel committed
118

119
  go lumberjack.Publish(publisher_chan, registrar_chan, server_list,
120
                     public_key, secret_key, *server_timeout)
Jordan Sissel's avatar
Jordan Sissel committed
121

122
  // TODO(sissel): registrar db path
Jordan Sissel's avatar
Jordan Sissel committed
123
  // TODO(sissel): registrar records last acknowledged positions in all files.
124
  lumberjack.Registrar(registrar_chan)
125
} /* main */