Commit 1d975fe5 authored by Jordan Sissel's avatar Jordan Sissel

finish implementing lumberjack protocol in lumberjack2 (test works

against logstash 1.1.13)

This has one slight behavioral change from the C version. We now
send a window size change before every spooled payload. This has
the effect of allowing slow publishers to be acknowledged sooner.
parent e9e1d7df
......@@ -3,59 +3,185 @@ package main
import (
"bytes"
"encoding/binary"
//"crypto/tls"
"encoding/pem"
"crypto/tls"
"crypto/x509"
"net"
"io"
"os"
"io/ioutil"
"log"
//"time"
"time"
"compress/zlib"
"strconv"
)
var hostname string
func init() {
}
func connect(config *NetworkConfig) {
log.Printf("publisher init\n")
hostname, _ = os.Hostname()
}
func Publishv1(input chan []*FileEvent,
registrar chan []*FileEvent,
config *NetworkConfig) {
var zbuf, packbuf bytes.Buffer
socket := connect(config)
for events := range input {
var buffer bytes.Buffer
var socket *tls.Conn
var sequence uint32
var err error
for event := range events {
}
socket = connect(config)
defer socket.Close()
// Compress it
// A new zlib writer is used for every payload of events so that any
// individual payload can be decompressed alone.
// TODO(sissel): Make compression level tunable
compressor, _ := zlib.NewWriterLevel(&buffer, 3)
for events := range input {
buffer.Truncate(0)
_, err := compressor.Write(data)
err = compressor.Flush()
compressor, _ := zlib.NewWriterLevel(&buffer, 3)
for _, event := range events {
sequence += 1
writeDataFrame(event, sequence, compressor)
}
compressor.Flush()
compressor.Close()
// Loop forever trying to send.
// This will cause reconnects/etc on failures automatically
for {
err = socket.Send(nonce, zmq.SNDMORE)
if err != nil {
continue // send failed, retry!
}
err = socket.Send(ciphertext, 0)
if err != nil {
continue // send failed, retry!
}
compressed_payload := buffer.Bytes()
// Send buffer until we're successful...
oops := func(err error) {
log.Printf("Socket error, will reconnect: %s\n", err)
time.Sleep(1 * time.Second)
socket.Close()
socket = connect(config)
}
SendPayload: for {
// Abort if our whole request takes longer than the configured
// network timeout.
socket.SetDeadline(time.Now().Add(config.Timeout))
// Set the window size to the length of this payload in events.
_, err = socket.Write([]byte("1W"))
if err != nil { oops(err); continue }
binary.Write(socket, binary.BigEndian, uint32(len(events)))
if err != nil { oops(err); continue }
data, err = socket.Recv(0)
// TODO(sissel): Figure out acknowledgement protocol? If any?
if err == nil {
break // success!
// Write compressed frame
socket.Write([]byte("1C"))
if err != nil { oops(err); continue }
binary.Write(socket, binary.BigEndian, uint32(len(compressed_payload)))
if err != nil { oops(err); continue }
_, err = socket.Write(compressed_payload)
if err != nil { oops(err); continue }
// read ack
response := make([]byte, 0, 6)
ackbytes := 0
for ackbytes != 6 {
n, err := socket.Read(response[len(response):cap(response)])
if err != nil {
log.Printf("Read error looking for ack: %s\n", err)
socket.Close()
socket = connect(config)
continue SendPayload // retry sending on new connection
} else {
ackbytes += n
}
}
// TODO(sissel): verify ack
// Success, stop trying to send the payload.
break
}
// Tell the registrar that we've successfully sent these events
registrar <- events
} /* for each event payload */
} // Publish
func connect(config *NetworkConfig) (socket *tls.Conn) {
var tlsconfig tls.Config
if len(config.SSLCertificate) > 0 && len(config.SSLKey) > 0 {
log.Printf("Loading client ssl certificate: %s and %s\n",
config.SSLCertificate, config.SSLKey)
cert, err := tls.LoadX509KeyPair(config.SSLCertificate, config.SSLKey)
if err != nil {
log.Fatalf("Failed loading client ssl certificate: %s\n", err)
}
tlsconfig.Certificates = []tls.Certificate{cert}
}
if len(config.SSLCA) > 0 {
log.Printf("Setting trusted CA from file: %s\n", config.SSLCA)
tlsconfig.RootCAs = x509.NewCertPool()
pemdata, err := ioutil.ReadFile(config.SSLCA)
if err != nil { log.Fatalf("Failure reading CA certificate: %s\n", err) }
block, _ := pem.Decode(pemdata)
if block == nil {
log.Fatalf("Failed to decode PEM data, is %s a valid cert?\n", config.SSLCA)
}
if block.Type != "CERTIFICATE" {
log.Fatalf("This is not a certificate file: %s\n", config.SSLCA)
}
cert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
log.Fatalf("Failed to parse a certificate: %s\n", config.SSLCA)
}
tlsconfig.RootCAs.AddCert(cert)
}
address := "127.0.0.1:3333"
for {
tcpsocket, err := net.DialTimeout("tcp", address, config.Timeout)
if err != nil {
log.Printf("Failure connecting to %s: %s\n", address, err)
time.Sleep(1 * time.Second)
continue
}
socket = tls.Client(tcpsocket, &tlsconfig)
socket.SetDeadline(time.Now().Add(config.Timeout))
err = socket.Handshake()
if err != nil {
log.Printf("Failed to tls handshake with %s %s\n", address, err)
time.Sleep(1 * time.Second)
socket.Close()
continue
}
log.Printf("Connected to %s\n", address)
// connected, let's rock and roll.
return
}
return
}
func writeDataFrame(event *FileEvent, sequence uint32, output io.Writer) {
//log.Printf("event: %s\n", *event.Text)
// header, "1D"
output.Write([]byte("1D"))
// sequence number
binary.Write(output, binary.BigEndian, uint32(sequence))
// 'pair' count
binary.Write(output, binary.BigEndian, uint32(len(*event.Fields) + 3))
writeKV("file", *event.Source, output)
writeKV("offset", strconv.FormatInt(event.Offset, 10), output)
writeKV("line", *event.Text, output)
for k, v := range(*event.Fields) {
writeKV(k, v, output)
}
}
func writeKV(key string, value string, output io.Writer) {
//log.Printf("kv: %d/%s %d/%s\n", len(key), key, len(value), value)
binary.Write(output, binary.BigEndian, uint32(len(key)))
output.Write([]byte(key))
binary.Write(output, binary.BigEndian, uint32(len(value)))
output.Write([]byte(value))
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment