publisher1.go 6.34 KB
Newer Older
1
2
3
4
package main

import (
  "bytes"
Jordan Sissel's avatar
Jordan Sissel committed
5
  "compress/zlib"
6
7
  "crypto/tls"
  "crypto/x509"
Jordan Sissel's avatar
Jordan Sissel committed
8
9
  "encoding/binary"
  "encoding/pem"
10
11
  "io"
  "io/ioutil"
12
  "log"
Jordan Sissel's avatar
Jordan Sissel committed
13
14
15
  "math/rand"
  "net"
  "os"
16
  "regexp"
Jordan Sissel's avatar
Jordan Sissel committed
17
18
  "strconv"
  "time"
19
20
)

21
var hostname string
22
var hostport_re, _ = regexp.Compile(`^\[?([^]]+)\]?:([0-9]+)$`)
23

24
func init() {
25
26
  log.Printf("publisher init\n")
  hostname, _ = os.Hostname()
Avleen Vig's avatar
Avleen Vig committed
27
  rand.Seed(time.Now().UnixNano())
28
29
30
}

func Publishv1(input chan []*FileEvent,
Jordan Sissel's avatar
Jordan Sissel committed
31
32
  registrar chan []*FileEvent,
  config *NetworkConfig) {
33
34
35
36
  var buffer bytes.Buffer
  var socket *tls.Conn
  var sequence uint32
  var err error
37

38
39
  socket = connect(config)
  defer socket.Close()
40

41
  for events := range input {
42
    buffer.Truncate(0)
43
44
45
46
47
48
49
    compressor, _ := zlib.NewWriterLevel(&buffer, 3)

    for _, event := range events {
      sequence += 1
      writeDataFrame(event, sequence, compressor)
    }
    compressor.Flush()
50
51
    compressor.Close()

52
53
54
55
    compressed_payload := buffer.Bytes()

    // Send buffer until we're successful...
    oops := func(err error) {
Jordan Sissel's avatar
Jordan Sissel committed
56
57
      // TODO(sissel): Track how frequently we timeout and reconnect. If we're
      // timing out too frequently, there's really no point in timing out since
Jordan Sissel's avatar
Jordan Sissel committed
58
      // basically everything is slow or down. We'll want to ratchet up the
Jordan Sissel's avatar
Jordan Sissel committed
59
60
      // timeout value slowly until things improve, then ratchet it down once
      // things seem healthy.
61
62
63
64
65
66
      log.Printf("Socket error, will reconnect: %s\n", err)
      time.Sleep(1 * time.Second)
      socket.Close()
      socket = connect(config)
    }

Jordan Sissel's avatar
Jordan Sissel committed
67
68
  SendPayload:
    for {
69
70
      // Abort if our whole request takes longer than the configured
      // network timeout.
Jordan Sissel's avatar
Jordan Sissel committed
71
      socket.SetDeadline(time.Now().Add(config.timeout))
72
73
74

      // Set the window size to the length of this payload in events.
      _, err = socket.Write([]byte("1W"))
Jordan Sissel's avatar
Jordan Sissel committed
75
76
77
78
      if err != nil {
        oops(err)
        continue
      }
79
      binary.Write(socket, binary.BigEndian, uint32(len(events)))
Jordan Sissel's avatar
Jordan Sissel committed
80
81
82
83
      if err != nil {
        oops(err)
        continue
      }
84

85
86
      // Write compressed frame
      socket.Write([]byte("1C"))
Jordan Sissel's avatar
Jordan Sissel committed
87
88
89
90
      if err != nil {
        oops(err)
        continue
      }
91
      binary.Write(socket, binary.BigEndian, uint32(len(compressed_payload)))
Jordan Sissel's avatar
Jordan Sissel committed
92
93
94
95
      if err != nil {
        oops(err)
        continue
      }
96
      _, err = socket.Write(compressed_payload)
Jordan Sissel's avatar
Jordan Sissel committed
97
98
99
100
      if err != nil {
        oops(err)
        continue
      }
101
102
103
104
105
106
107
108
109
110
111
112
113
114

      // read ack
      response := make([]byte, 0, 6)
      ackbytes := 0
      for ackbytes != 6 {
        n, err := socket.Read(response[len(response):cap(response)])
        if err != nil {
          log.Printf("Read error looking for ack: %s\n", err)
          socket.Close()
          socket = connect(config)
          continue SendPayload // retry sending on new connection
        } else {
          ackbytes += n
        }
115
      }
116
117
118
119

      // TODO(sissel): verify ack
      // Success, stop trying to send the payload.
      break
120
121
122
123
124
125
    }

    // Tell the registrar that we've successfully sent these events
    registrar <- events
  } /* for each event payload */
} // Publish
126
127
128
129
130
131

func connect(config *NetworkConfig) (socket *tls.Conn) {
  var tlsconfig tls.Config

  if len(config.SSLCertificate) > 0 && len(config.SSLKey) > 0 {
    log.Printf("Loading client ssl certificate: %s and %s\n",
Jordan Sissel's avatar
Jordan Sissel committed
132
      config.SSLCertificate, config.SSLKey)
133
134
135
136
137
138
139
140
141
142
143
144
    cert, err := tls.LoadX509KeyPair(config.SSLCertificate, config.SSLKey)
    if err != nil {
      log.Fatalf("Failed loading client ssl certificate: %s\n", err)
    }
    tlsconfig.Certificates = []tls.Certificate{cert}
  }

  if len(config.SSLCA) > 0 {
    log.Printf("Setting trusted CA from file: %s\n", config.SSLCA)
    tlsconfig.RootCAs = x509.NewCertPool()

    pemdata, err := ioutil.ReadFile(config.SSLCA)
Jordan Sissel's avatar
Jordan Sissel committed
145
146
147
    if err != nil {
      log.Fatalf("Failure reading CA certificate: %s\n", err)
    }
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164

    block, _ := pem.Decode(pemdata)
    if block == nil {
      log.Fatalf("Failed to decode PEM data, is %s a valid cert?\n", config.SSLCA)
    }
    if block.Type != "CERTIFICATE" {
      log.Fatalf("This is not a certificate file: %s\n", config.SSLCA)
    }

    cert, err := x509.ParseCertificate(block.Bytes)
    if err != nil {
      log.Fatalf("Failed to parse a certificate: %s\n", config.SSLCA)
    }
    tlsconfig.RootCAs.AddCert(cert)
  }

  for {
165
    // Pick a random server from the list.
Jordan Sissel's avatar
Jordan Sissel committed
166
    hostport := config.Servers[rand.Int()%len(config.Servers)]
167
168
169
170
171
172
    submatch := hostport_re.FindSubmatch([]byte(hostport))
    if submatch == nil {
      log.Fatalf("Invalid host:port given: %s", hostport)
    }
    host := string(submatch[1])
    port := string(submatch[2])
173
174
175
    addresses, err := net.LookupHost(host)

    if err != nil {
Jordan Sissel's avatar
Jordan Sissel committed
176
      log.Printf("DNS lookup failure \"%s\": %s\n", host, err)
177
178
179
180
      time.Sleep(1 * time.Second)
      continue
    }

Jordan Sissel's avatar
Jordan Sissel committed
181
    address := addresses[rand.Int()%len(addresses)]
182
    addressport := net.JoinHostPort(address, port)
183

184
    log.Printf("Connecting to %s (%s) \n", addressport, host)
185

186
    tcpsocket, err := net.DialTimeout("tcp", addressport, config.timeout)
187
188
189
190
191
192
193
    if err != nil {
      log.Printf("Failure connecting to %s: %s\n", address, err)
      time.Sleep(1 * time.Second)
      continue
    }

    socket = tls.Client(tcpsocket, &tlsconfig)
Jordan Sissel's avatar
Jordan Sissel committed
194
    socket.SetDeadline(time.Now().Add(config.timeout))
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
    err = socket.Handshake()
    if err != nil {
      log.Printf("Failed to tls handshake with %s %s\n", address, err)
      time.Sleep(1 * time.Second)
      socket.Close()
      continue
    }

    log.Printf("Connected to %s\n", address)

    // connected, let's rock and roll.
    return
  }
  return
}

func writeDataFrame(event *FileEvent, sequence uint32, output io.Writer) {
  //log.Printf("event: %s\n", *event.Text)
  // header, "1D"
  output.Write([]byte("1D"))
  // sequence number
  binary.Write(output, binary.BigEndian, uint32(sequence))
  // 'pair' count
Jordan Sissel's avatar
Jordan Sissel committed
218
  binary.Write(output, binary.BigEndian, uint32(len(*event.Fields)+4))
219
220

  writeKV("file", *event.Source, output)
Jordan Sissel's avatar
Jordan Sissel committed
221
  writeKV("host", hostname, output)
222
223
  writeKV("offset", strconv.FormatInt(event.Offset, 10), output)
  writeKV("line", *event.Text, output)
Jordan Sissel's avatar
Jordan Sissel committed
224
  for k, v := range *event.Fields {
225
226
227
228
229
230
231
232
233
234
235
    writeKV(k, v, output)
  }
}

func writeKV(key string, value string, output io.Writer) {
  //log.Printf("kv: %d/%s %d/%s\n", len(key), key, len(value), value)
  binary.Write(output, binary.BigEndian, uint32(len(key)))
  output.Write([]byte(key))
  binary.Write(output, binary.BigEndian, uint32(len(value)))
  output.Write([]byte(value))
}