server2.rb 3.43 KB
Newer Older
1 2 3 4
require "ffi-rzmq"
require "zlib"
require "rbnacl"
require "json"
Jordan Sissel's avatar
Jordan Sissel committed
5
require "stud/try"
6 7 8 9 10 11 12 13 14 15 16

module Lumberjack
  class Server2
    # Create a new Lumberjack server.
    #
    # - options is a hash. Valid options are:
    #
    # * :port - the port to listen on
    # * :address - the host/address to bind to
    def initialize(options={})
      @options = {
Jordan Sissel's avatar
Jordan Sissel committed
17 18 19 20
        :workers => 1,

        # Generate an inproc url for workers to attach to
        :worker_endpoint => "inproc://#{Time.now.to_f}#{rand}"
21 22
      }.merge(options)

Jordan Sissel's avatar
Jordan Sissel committed
23
      [:my_secret_key, :their_public_key, :endpoint].each do |k|
Jordan Sissel's avatar
Jordan Sissel committed
24 25 26 27
        if @options[k].nil?
          raise "You must specify #{k} in Lumberjack::Server.new(...)"
        end
      end
28 29 30 31 32 33 34 35

      @context = ZMQ::Context.new

      @cryptobox = Crypto::Box.new(
        Crypto::PublicKey.new(@options[:their_public_key]),
        Crypto::PrivateKey.new(@options[:my_secret_key]))
    end # def initialize

Jordan Sissel's avatar
Jordan Sissel committed
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
    def setup_proxy(context)
      # Socket facing clients
      frontend = context.socket(ZMQ::ROUTER)
      rc = frontend.bind(@options[:endpoint])
      if rc < 0
        puts "RC :("
        raise "Unable to bind lumberjack to #{@options[:endpoint]}"
      end
      
      # Socket facing services
      backend = context.socket(ZMQ::DEALER)
      backend.bind(@options[:worker_endpoint])

      @proxy_thread = Thread.new do
        ZMQ::Device.new(ZMQ::QUEUE, frontend, backend)
        raise "The lumberjack proxy died."
      end
    end

55
    def run(&block)
Jordan Sissel's avatar
Jordan Sissel committed
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
      setup_proxy(@context)

      threads = @options[:workers].times.collect do |i|
        Thread.new do
          run_worker(@context, &block)
        end
      end

      threads.each(&:join)
    end

    def run_worker(context, &block)
      socket = context.socket(ZMQ::REP)
      Stud::try(10.times) do
        rc = socket.connect(@options[:worker_endpoint])
        if rc < 0
          raise "connect to #{@options[:worker_endpoint]} failed"
        end
      end

76 77 78 79 80 81
      ciphertext = ""
      ciphertext.force_encoding("BINARY")
      nonce = ""
      nonce.force_encoding("BINARY")
      count = 0
      start = Time.now
Jordan Sissel's avatar
Jordan Sissel committed
82
      list = []
83
      while true
Jordan Sissel's avatar
Jordan Sissel committed
84 85
        socket.recv_string(nonce)
        socket.recv_string(ciphertext)
86 87 88 89 90 91 92 93 94

        # Decrypt
        plaintext = @cryptobox.open(nonce, ciphertext)

        # decompress
        inflated = Zlib::Inflate.inflate(plaintext)

        # JSON
        events = JSON.parse(inflated)
Jordan Sissel's avatar
Jordan Sissel committed
95 96 97
        #events.each do |event|
          #yield event
        #end
98 99 100

        # TODO(sissel): yield each event
        count += events.count
Jordan Sissel's avatar
Jordan Sissel committed
101 102 103 104

        # Reply to acknowledge.
        # Currently there is no response message to put.
        socket.send_string("")
105 106 107 108 109 110 111

        if count > 100000
          puts :rate => (count / (Time.now - start))
          count = 0
          start = Time.now
        end
      end
Jordan Sissel's avatar
Jordan Sissel committed
112
    end # def run_worker
113 114 115 116 117
  end # class Server2
end # module Lumberjack

if __FILE__ == $0
  a = Lumberjack::Server2.new(
Jordan Sissel's avatar
Jordan Sissel committed
118 119
    :workers => 4,
    :endpoint => "tcp://127.0.0.1:12345",
120 121 122
    :their_public_key => File.read("../../nacl.public").force_encoding("BINARY"),
    :my_secret_key => File.read("../../nacl.secret").force_encoding("BINARY"))

Jordan Sissel's avatar
Jordan Sissel committed
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
  count = 0
  start = Time.now
  require "thread"
  q = Queue.new
  #q = java.util.concurrent.LinkedBlockingQueue.new
  #Thread.new { a.run { |e| q.put(e) } }
  a.run { |e| }

  #while q.take
    #count += 1
    #if count > 100000
      #puts count / (Time.now - start)
      #count = 0
      #start = Time.now
    #end
  #end
139
end