publisher.go 5.86 KB
Newer Older
1
package liblumberjack
Jordan Sissel's avatar
Jordan Sissel committed
2 3

import (
4
  "bytes"
Jordan Sissel's avatar
Jordan Sissel committed
5
  "encoding/json"
6
  zmq "github.com/alecthomas/gozmq"
7
  "log"
8 9 10 11 12
  "math/big"
  "syscall"
  "time"
  "compress/zlib"
  "crypto/rand"
13
  "sodium"
Jordan Sissel's avatar
Jordan Sissel committed
14 15
)

16 17
var context *zmq.Context

18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
func init() {
  context, _ = zmq.NewContext()
}

// Forever Faithful Socket
type FFS struct {
  Endpoints []string // set of endpoints available to ship to

  // Socket type; zmq.REQ, etc
  SocketType zmq.SocketType

  // Various timeout values
  SendTimeout time.Duration
  RecvTimeout time.Duration

33 34 35
  endpoint  string      // the current endpoint in use
  socket    *zmq.Socket // the current zmq socket
  connected bool        // are we connected?
36 37 38 39 40 41 42
}

func (s *FFS) Send(data []byte, flags zmq.SendRecvOption) (err error) {
  for {
    s.ensure_connect()

    pi := zmq.PollItems{zmq.PollItem{Socket: s.socket, Events: zmq.POLLOUT}}
43
    count, err := zmq.Poll(pi, s.SendTimeout)
44 45
    if count == 0 {
      // not ready in time, fail the socket and try again.
46
      log.Printf("%s: timed out waiting to Send(): %s\n", s.endpoint, err)
47
      s.fail_socket()
48 49 50 51 52
    } else {
      //log.Printf("%s: sending %d payload\n", s.endpoint, len(data))
      err = s.socket.Send(data, flags)
      if err != nil {
        log.Printf("%s: Failed to Send() %d byte message: %s\n",
53
          s.endpoint, len(data), err)
54 55 56 57 58 59 60 61 62 63 64 65 66 67
        s.fail_socket()
      } else {
        // Success!
        break
      }
    }
  }
  return
}

func (s *FFS) Recv(flags zmq.SendRecvOption) (data []byte, err error) {
  s.ensure_connect()

  pi := zmq.PollItems{zmq.PollItem{Socket: s.socket, Events: zmq.POLLIN}}
68
  count, err := zmq.Poll(pi, s.RecvTimeout)
69 70 71 72 73 74
  if count == 0 {
    // not ready in time, fail the socket and try again.
    s.fail_socket()

    err = syscall.ETIMEDOUT
    log.Printf("%s: timed out waiting to Recv(): %s\n",
75
      s.endpoint, err)
76 77 78 79 80
    return nil, err
  } else {
    data, err = s.socket.Recv(flags)
    if err != nil {
      log.Printf("%s: Failed to Recv() %d byte message: %s\n",
81
        s.endpoint, len(data), err)
82 83 84 85 86 87 88 89 90 91 92
      s.fail_socket()
      return nil, err
    } else {
      // Success!
    }
  }
  return
}

func (s *FFS) Close() (err error) {
  err = s.socket.Close()
93 94 95
  if err != nil {
    return
  }
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116

  s.socket = nil
  s.connected = false
  return nil
}

func (s *FFS) ensure_connect() {
  if s.connected {
    return
  }

  if s.SendTimeout == 0 {
    s.SendTimeout = 1 * time.Second
  }
  if s.RecvTimeout == 0 {
    s.RecvTimeout = 1 * time.Second
  }

  if s.SocketType == 0 {
    log.Panicf("No socket type set on zmq socket")
  }
117
  if s.socket != nil {
118 119 120 121 122
    s.socket.Close()
    s.socket = nil
  }

  var err error
123
  s.socket, err = context.NewSocket(s.SocketType)
124 125 126 127 128 129 130 131
  if err != nil {
    log.Panicf("zmq.NewSocket(%d) failed: %s\n", s.SocketType, err)
  }

  //s.socket.SetSockOptUInt64(zmq.HWM, 1)
  //s.socket.SetSockOptInt(zmq.RCVTIMEO, int(s.RecvTimeout.Nanoseconds() / 1000000))
  //s.socket.SetSockOptInt(zmq.SNDTIMEO, int(s.SendTimeout.Nanoseconds() / 1000000))

132 133 134
  // Abort anything in-flight on a socket that's closed.
  s.socket.SetSockOptInt(zmq.LINGER, 0)

135 136 137 138
  for !s.connected {
    var max *big.Int = big.NewInt(int64(len(s.Endpoints)))
    i, _ := rand.Int(rand.Reader, max)
    s.endpoint = s.Endpoints[i.Int64()]
Jordan Sissel's avatar
Jordan Sissel committed
139
    log.Printf("Connecting to %s\n", s.endpoint)
140 141 142 143 144 145 146 147 148 149 150 151 152
    err := s.socket.Connect(s.endpoint)
    if err != nil {
      log.Printf("%s: Error connecting: %s\n", s.endpoint, err)
      time.Sleep(500 * time.Millisecond)
      continue
    }

    // No error, we're connected.
    s.connected = true
  }
}

func (s *FFS) fail_socket() {
153 154 155
  if !s.connected {
    return
  }
156 157 158
  s.Close()
}

159
func Publish(input chan []*FileEvent,
160
             registrar chan []*FileEvent,
161 162 163 164
             server_list []string,
             public_key [sodium.PUBLICKEYBYTES]byte,
             secret_key [sodium.SECRETKEYBYTES]byte,
             server_timeout time.Duration) {
165
  var buffer bytes.Buffer
166
  session := sodium.NewSession(public_key, secret_key)
167 168

  socket := FFS{
169 170
    Endpoints:   server_list,
    SocketType:  zmq.REQ,
171 172 173 174 175 176
    RecvTimeout: server_timeout,
    SendTimeout: server_timeout,
  }
  //defer socket.Close()

  for events := range input {
Jordan Sissel's avatar
Jordan Sissel committed
177
    // got a bunch of events, ship them out.
178
    //log.Printf("Publisher received %d events\n", len(events))
179

180
    data, _ := json.Marshal(events)
181
    // TODO(sissel): check error
182 183

    // Compress it
184 185
    // A new zlib writer  is used for every payload of events so that any
    // individual payload can be decompressed alone.
186 187 188
    // TODO(sissel): Make compression level tunable
    compressor, _ := zlib.NewWriterLevel(&buffer, 3)
    buffer.Truncate(0)
189
    _, err := compressor.Write(data)
190 191
    err = compressor.Flush()
    compressor.Close()
192

193 194 195 196 197 198 199
    //log.Printf("compressed %d bytes\n", buffer.Len())
    // TODO(sissel): check err
    // TODO(sissel): implement security/encryption/etc

    // Send full payload over zeromq REQ/REP
    // TODO(sissel): check error
    //buffer.Write(data)
200 201
    ciphertext, nonce := session.Box(buffer.Bytes())

202 203 204 205
    //log.Printf("plaintext: %d\n", len(data))
    //log.Printf("compressed: %d\n", buffer.Len())
    //log.Printf("ciphertext: %d %v\n", len(ciphertext), ciphertext[:20])
    //log.Printf("nonce: %d\n", len(nonce))
206 207

    // TODO(sissel): figure out encoding for ciphertext + nonce
208
    // TODO(sissel): figure out encoding for ciphertext + nonce
209 210 211 212

    // Loop forever trying to send.
    // This will cause reconnects/etc on failures automatically
    for {
213
      err = socket.Send(nonce, zmq.SNDMORE)
214 215 216 217
      if err != nil {
        continue // send failed, retry!
      }
      err = socket.Send(ciphertext, 0)
218 219 220 221
      if err != nil {
        continue // send failed, retry!
      }

222
      data, err = socket.Recv(0)
223
      // TODO(sissel): Figure out acknowledgement protocol? If any?
224
      if err == nil {
225
        break // success!
226 227
      }
    }
Jordan Sissel's avatar
Jordan Sissel committed
228

229
    // Tell the registrar that we've successfully sent these events
230
    //registrar <- events
231
  } /* for each event payload */
Jordan Sissel's avatar
Jordan Sissel committed
232
} // Publish