diff --git a/lib/lumberjack/server2.rb b/lib/lumberjack/server2.rb index c51e7f6bae78e822b3d2201f3ecfb9db26195a88..37d31e5ac021c026c7773ae351135822a1f4a65b 100644 --- a/lib/lumberjack/server2.rb +++ b/lib/lumberjack/server2.rb @@ -53,10 +53,11 @@ module Lumberjack end def run(&block) - setup_proxy(@context) + #setup_proxy(@context) threads = @options[:workers].times.collect do |i| Thread.new do + puts "Starting worker #{i}" run_worker(@context, &block) end end @@ -67,7 +68,8 @@ module Lumberjack def run_worker(context, &block) socket = context.socket(ZMQ::REP) Stud::try(10.times) do - rc = socket.connect(@options[:worker_endpoint]) + #rc = socket.connect(@options[:worker_endpoint]) + rc = socket.bind(@options[:endpoint]) if rc < 0 raise "connect to #{@options[:worker_endpoint]} failed" end @@ -85,25 +87,26 @@ module Lumberjack socket.recv_string(ciphertext) # Decrypt - plaintext = @cryptobox.open(nonce, ciphertext) + #plaintext = @cryptobox.open(nonce, ciphertext) # decompress - inflated = Zlib::Inflate.inflate(plaintext) + #inflated = Zlib::Inflate.inflate(plaintext) # JSON - events = JSON.parse(inflated) + #events = JSON.parse(inflated) #events.each do |event| #yield event #end # TODO(sissel): yield each event - count += events.count + #count += events.count + count += 1 # Reply to acknowledge. # Currently there is no response message to put. socket.send_string("") - if count > 100000 + if count > 100 puts :rate => (count / (Time.now - start)) count = 0 start = Time.now @@ -115,15 +118,15 @@ end # module Lumberjack if __FILE__ == $0 a = Lumberjack::Server2.new( - :workers => 4, + :workers => 1, :endpoint => "tcp://127.0.0.1:12345", :their_public_key => File.read("../../nacl.public").force_encoding("BINARY"), :my_secret_key => File.read("../../nacl.secret").force_encoding("BINARY")) count = 0 - start = Time.now - require "thread" - q = Queue.new + #start = Time.now + #require "thread" + #q = Queue.new #q = java.util.concurrent.LinkedBlockingQueue.new #Thread.new { a.run { |e| q.put(e) } } a.run { |e| } diff --git a/src/liblumberjack/publisher.go b/src/liblumberjack/publisher.go index 37575adae64e1ed3f987c3aaa3d176fc05c2f073..8c66d1389e604f08f88739e4653009cd70f1d2d2 100644 --- a/src/liblumberjack/publisher.go +++ b/src/liblumberjack/publisher.go @@ -175,7 +175,7 @@ func Publish(input chan []*FileEvent, for events := range input { // got a bunch of events, ship them out. - log.Printf("Publisher received %d events\n", len(events)) + //log.Printf("Publisher received %d events\n", len(events)) data, _ := json.Marshal(events) // TODO(sissel): check error @@ -210,7 +210,7 @@ func Publish(input chan []*FileEvent, // Loop forever trying to send. // This will cause reconnects/etc on failures automatically for { - err = socket.Send(nonce[:], zmq.SNDMORE) + err = socket.Send(nonce, zmq.SNDMORE) if err != nil { continue // send failed, retry! } diff --git a/src/sodium/nonce.go b/src/sodium/nonce.go index 8c329fa19180e9eae84a0123b635f7a9cd3ec686..673d2a8d72ba63e5fca42d54a3934d6f12aba36f 100644 --- a/src/sodium/nonce.go +++ b/src/sodium/nonce.go @@ -3,23 +3,24 @@ import ( "unsafe" ) -func RandomNonceStrategy() (func() [crypto_box_NONCEBYTES]byte) { - return func () (nonce [crypto_box_NONCEBYTES]byte) { +func RandomNonceStrategy() (func() []byte) { + return func () ([]byte) { + var nonce [crypto_box_NONCEBYTES]byte Randombytes(nonce[:]) - return + return nonce[:] } } -func IncrementalNonceStrategy() (func() [crypto_box_NONCEBYTES]byte) { +func IncrementalNonceStrategy() (func() []byte) { var nonce [crypto_box_NONCEBYTES]byte Randombytes(nonce[:]) // TODO(sissel): Make the high-8 bytes of the nonce based on current time to // help avoid collisions? - return func() ([crypto_box_NONCEBYTES]byte) { + return func() ([]byte) { increment(nonce[:], 1) - return nonce + return nonce[:] } } diff --git a/src/sodium/session.go b/src/sodium/session.go index d70806c71c5dcd9b437d46053542892db05cb10a..697da8c7f5e55ebcedd079234fc19da50727736e 100644 --- a/src/sodium/session.go +++ b/src/sodium/session.go @@ -3,7 +3,7 @@ package sodium // #cgo pkg-config: sodium import "C" import "unsafe" -//import "fmt" +import "fmt" type Session struct { // the public key of the agent who is sending you encrypted messages @@ -16,7 +16,7 @@ type Session struct { k [crypto_box_BEFORENMBYTES]byte // The nonce generator. - Nonce func() [crypto_box_NONCEBYTES]byte + Nonce func() []byte } func NewSession(pk [PUBLICKEYBYTES]byte, sk [SECRETKEYBYTES]byte) (s *Session){ @@ -36,11 +36,12 @@ func (s *Session) Precompute() { (*C.uchar)(unsafe.Pointer(&s.Secret[0]))) } -func (s *Session) Box(plaintext []byte) (ciphertext []byte, nonce [crypto_box_NONCEBYTES]byte) { +func (s *Session) Box(plaintext []byte) (ciphertext []byte, nonce []byte) { // XXX: ciphertext needs to be zero-padded at the start for crypto_box_ZEROBYTES // ZEROBYTES + len(plaintext) is ciphertext length ciphertext = make([]byte, crypto_box_ZEROBYTES + len(plaintext)) nonce = s.Nonce() + m := make([]byte, crypto_box_ZEROBYTES + len(plaintext)) copy(m[crypto_box_ZEROBYTES:], plaintext) @@ -52,14 +53,18 @@ func (s *Session) Box(plaintext []byte) (ciphertext []byte, nonce [crypto_box_NO //fmt.Printf("ciphertext: %v\n", ciphertext) //fmt.Printf("ciphertext2: %v\n", ciphertext[crypto_box_BOXZEROBYTES:]) - return ciphertext[crypto_box_BOXZEROBYTES:], nonce + return ciphertext[crypto_box_BOXZEROBYTES:], nonce[:] } -func (s *Session) Open(nonce [crypto_box_NONCEBYTES]byte, ciphertext []byte) ([]byte) { +func (s *Session) Open(nonce []byte, ciphertext []byte) ([]byte) { // This function assumes the verbatim []byte given by Session.Box() is passed m := make([]byte, crypto_box_BOXZEROBYTES + len(ciphertext)) copy(m[crypto_box_BOXZEROBYTES:], ciphertext) plaintext := make([]byte, len(m)) + if len(nonce) != crypto_box_NONCEBYTES { + panic(fmt.Sprintf("Invalid nonce length (%d). Expected %d\n", + len(nonce), crypto_box_NONCEBYTES)) + } C.crypto_box_curve25519xsalsa20poly1305_ref_open_afternm( (*C.uchar)(unsafe.Pointer(&plaintext[0])), diff --git a/src/sodium/sodium_test.go b/src/sodium/sodium_test.go index 001cc0175ee2aab566eb3f7c95d8ee09ecb28076..a1e858cdb0e8f4fc4be12e34d91bb94a4c140090 100644 --- a/src/sodium/sodium_test.go +++ b/src/sodium/sodium_test.go @@ -2,6 +2,7 @@ package sodium import "fmt" import "testing" +import "bytes" func ExampleBox() { pk, sk := CryptoBoxKeypair() @@ -30,7 +31,7 @@ func TestNonceGeneration(t *testing.T) { _, nonce := s.Box([]byte(original)) _, nonce2 := s.Box([]byte(original)) - if nonce == nonce2 { + if bytes.Equal(nonce, nonce2) { t.Fatal("Two Box() calls generated the same nonce") } }