diff --git a/lib/lumberjack/server2.rb b/lib/lumberjack/server2.rb index b982c71cded710a2fb71061d73f967a8c8c93332..c51e7f6bae78e822b3d2201f3ecfb9db26195a88 100644 --- a/lib/lumberjack/server2.rb +++ b/lib/lumberjack/server2.rb @@ -2,6 +2,7 @@ require "ffi-rzmq" require "zlib" require "rbnacl" require "json" +require "stud/try" module Lumberjack class Server2 @@ -13,36 +14,75 @@ module Lumberjack # * :address - the host/address to bind to def initialize(options={}) @options = { - :endpoint => "tcp://0.0.0.0:3333", - :my_secret_key => nil, - :their_public_key => nil, + :workers => 1, + + # Generate an inproc url for workers to attach to + :worker_endpoint => "inproc://#{Time.now.to_f}#{rand}" }.merge(options) - [:my_secret_key, :their_public_key].each do |k| + [:my_secret_key, :their_public_key, :endpoint].each do |k| if @options[k].nil? raise "You must specify #{k} in Lumberjack::Server.new(...)" end end @context = ZMQ::Context.new - @socket = @context.socket(ZMQ::REP) - @socket.bind(@options[:endpoint]) @cryptobox = Crypto::Box.new( Crypto::PublicKey.new(@options[:their_public_key]), Crypto::PrivateKey.new(@options[:my_secret_key])) end # def initialize + def setup_proxy(context) + # Socket facing clients + frontend = context.socket(ZMQ::ROUTER) + rc = frontend.bind(@options[:endpoint]) + if rc < 0 + puts "RC :(" + raise "Unable to bind lumberjack to #{@options[:endpoint]}" + end + + # Socket facing services + backend = context.socket(ZMQ::DEALER) + backend.bind(@options[:worker_endpoint]) + + @proxy_thread = Thread.new do + ZMQ::Device.new(ZMQ::QUEUE, frontend, backend) + raise "The lumberjack proxy died." + end + end + def run(&block) + setup_proxy(@context) + + threads = @options[:workers].times.collect do |i| + Thread.new do + run_worker(@context, &block) + end + end + + threads.each(&:join) + end + + def run_worker(context, &block) + socket = context.socket(ZMQ::REP) + Stud::try(10.times) do + rc = socket.connect(@options[:worker_endpoint]) + if rc < 0 + raise "connect to #{@options[:worker_endpoint]} failed" + end + end + ciphertext = "" ciphertext.force_encoding("BINARY") nonce = "" nonce.force_encoding("BINARY") count = 0 start = Time.now + list = [] while true - @socket.recv_string(nonce) - @socket.recv_string(ciphertext) + socket.recv_string(nonce) + socket.recv_string(ciphertext) # Decrypt plaintext = @cryptobox.open(nonce, ciphertext) @@ -52,14 +92,16 @@ module Lumberjack # JSON events = JSON.parse(inflated) - events.each do |event| - yield event - end + #events.each do |event| + #yield event + #end # TODO(sissel): yield each event count += events.count - @socket.send_string("") - #count += 4096 + + # Reply to acknowledge. + # Currently there is no response message to put. + socket.send_string("") if count > 100000 puts :rate => (count / (Time.now - start)) @@ -67,16 +109,31 @@ module Lumberjack start = Time.now end end - end # def run + end # def run_worker end # class Server2 end # module Lumberjack if __FILE__ == $0 a = Lumberjack::Server2.new( + :workers => 4, + :endpoint => "tcp://127.0.0.1:12345", :their_public_key => File.read("../../nacl.public").force_encoding("BINARY"), :my_secret_key => File.read("../../nacl.secret").force_encoding("BINARY")) - a.run do |e| - p :event => e - end + count = 0 + start = Time.now + require "thread" + q = Queue.new + #q = java.util.concurrent.LinkedBlockingQueue.new + #Thread.new { a.run { |e| q.put(e) } } + a.run { |e| } + + #while q.take + #count += 1 + #if count > 100000 + #puts count / (Time.now - start) + #count = 0 + #start = Time.now + #end + #end end