Commit b5522a4e authored by Jordan Sissel's avatar Jordan Sissel

Add worker model

parent c5f1aad9
...@@ -2,6 +2,7 @@ require "ffi-rzmq" ...@@ -2,6 +2,7 @@ require "ffi-rzmq"
require "zlib" require "zlib"
require "rbnacl" require "rbnacl"
require "json" require "json"
require "stud/try"
module Lumberjack module Lumberjack
class Server2 class Server2
...@@ -13,36 +14,75 @@ module Lumberjack ...@@ -13,36 +14,75 @@ module Lumberjack
# * :address - the host/address to bind to # * :address - the host/address to bind to
def initialize(options={}) def initialize(options={})
@options = { @options = {
:endpoint => "tcp://0.0.0.0:3333", :workers => 1,
:my_secret_key => nil,
:their_public_key => nil, # Generate an inproc url for workers to attach to
:worker_endpoint => "inproc://#{Time.now.to_f}#{rand}"
}.merge(options) }.merge(options)
[:my_secret_key, :their_public_key].each do |k| [:my_secret_key, :their_public_key, :endpoint].each do |k|
if @options[k].nil? if @options[k].nil?
raise "You must specify #{k} in Lumberjack::Server.new(...)" raise "You must specify #{k} in Lumberjack::Server.new(...)"
end end
end end
@context = ZMQ::Context.new @context = ZMQ::Context.new
@socket = @context.socket(ZMQ::REP)
@socket.bind(@options[:endpoint])
@cryptobox = Crypto::Box.new( @cryptobox = Crypto::Box.new(
Crypto::PublicKey.new(@options[:their_public_key]), Crypto::PublicKey.new(@options[:their_public_key]),
Crypto::PrivateKey.new(@options[:my_secret_key])) Crypto::PrivateKey.new(@options[:my_secret_key]))
end # def initialize end # def initialize
def setup_proxy(context)
# Socket facing clients
frontend = context.socket(ZMQ::ROUTER)
rc = frontend.bind(@options[:endpoint])
if rc < 0
puts "RC :("
raise "Unable to bind lumberjack to #{@options[:endpoint]}"
end
# Socket facing services
backend = context.socket(ZMQ::DEALER)
backend.bind(@options[:worker_endpoint])
@proxy_thread = Thread.new do
ZMQ::Device.new(ZMQ::QUEUE, frontend, backend)
raise "The lumberjack proxy died."
end
end
def run(&block) def run(&block)
setup_proxy(@context)
threads = @options[:workers].times.collect do |i|
Thread.new do
run_worker(@context, &block)
end
end
threads.each(&:join)
end
def run_worker(context, &block)
socket = context.socket(ZMQ::REP)
Stud::try(10.times) do
rc = socket.connect(@options[:worker_endpoint])
if rc < 0
raise "connect to #{@options[:worker_endpoint]} failed"
end
end
ciphertext = "" ciphertext = ""
ciphertext.force_encoding("BINARY") ciphertext.force_encoding("BINARY")
nonce = "" nonce = ""
nonce.force_encoding("BINARY") nonce.force_encoding("BINARY")
count = 0 count = 0
start = Time.now start = Time.now
list = []
while true while true
@socket.recv_string(nonce) socket.recv_string(nonce)
@socket.recv_string(ciphertext) socket.recv_string(ciphertext)
# Decrypt # Decrypt
plaintext = @cryptobox.open(nonce, ciphertext) plaintext = @cryptobox.open(nonce, ciphertext)
...@@ -52,14 +92,16 @@ module Lumberjack ...@@ -52,14 +92,16 @@ module Lumberjack
# JSON # JSON
events = JSON.parse(inflated) events = JSON.parse(inflated)
events.each do |event| #events.each do |event|
yield event #yield event
end #end
# TODO(sissel): yield each event # TODO(sissel): yield each event
count += events.count count += events.count
@socket.send_string("")
#count += 4096 # Reply to acknowledge.
# Currently there is no response message to put.
socket.send_string("")
if count > 100000 if count > 100000
puts :rate => (count / (Time.now - start)) puts :rate => (count / (Time.now - start))
...@@ -67,16 +109,31 @@ module Lumberjack ...@@ -67,16 +109,31 @@ module Lumberjack
start = Time.now start = Time.now
end end
end end
end # def run end # def run_worker
end # class Server2 end # class Server2
end # module Lumberjack end # module Lumberjack
if __FILE__ == $0 if __FILE__ == $0
a = Lumberjack::Server2.new( a = Lumberjack::Server2.new(
:workers => 4,
:endpoint => "tcp://127.0.0.1:12345",
:their_public_key => File.read("../../nacl.public").force_encoding("BINARY"), :their_public_key => File.read("../../nacl.public").force_encoding("BINARY"),
:my_secret_key => File.read("../../nacl.secret").force_encoding("BINARY")) :my_secret_key => File.read("../../nacl.secret").force_encoding("BINARY"))
a.run do |e| count = 0
p :event => e start = Time.now
end require "thread"
q = Queue.new
#q = java.util.concurrent.LinkedBlockingQueue.new
#Thread.new { a.run { |e| q.put(e) } }
a.run { |e| }
#while q.take
#count += 1
#if count > 100000
#puts count / (Time.now - start)
#count = 0
#start = Time.now
#end
#end
end end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment