server2.rb 3.54 KB
Newer Older
1 2 3 4
require "ffi-rzmq"
require "zlib"
require "rbnacl"
require "json"
Jordan Sissel's avatar
Jordan Sissel committed
5
require "stud/try"
6 7 8 9 10 11 12 13 14 15 16

module Lumberjack
  class Server2
    # Create a new Lumberjack server.
    #
    # - options is a hash. Valid options are:
    #
    # * :port - the port to listen on
    # * :address - the host/address to bind to
    def initialize(options={})
      @options = {
Jordan Sissel's avatar
Jordan Sissel committed
17 18 19 20
        :workers => 1,

        # Generate an inproc url for workers to attach to
        :worker_endpoint => "inproc://#{Time.now.to_f}#{rand}"
21 22
      }.merge(options)

Jordan Sissel's avatar
Jordan Sissel committed
23
      [:my_secret_key, :their_public_key, :endpoint].each do |k|
Jordan Sissel's avatar
Jordan Sissel committed
24 25 26 27
        if @options[k].nil?
          raise "You must specify #{k} in Lumberjack::Server.new(...)"
        end
      end
28 29 30 31 32 33 34 35

      @context = ZMQ::Context.new

      @cryptobox = Crypto::Box.new(
        Crypto::PublicKey.new(@options[:their_public_key]),
        Crypto::PrivateKey.new(@options[:my_secret_key]))
    end # def initialize

Jordan Sissel's avatar
Jordan Sissel committed
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
    def setup_proxy(context)
      # Socket facing clients
      frontend = context.socket(ZMQ::ROUTER)
      rc = frontend.bind(@options[:endpoint])
      if rc < 0
        puts "RC :("
        raise "Unable to bind lumberjack to #{@options[:endpoint]}"
      end
      
      # Socket facing services
      backend = context.socket(ZMQ::DEALER)
      backend.bind(@options[:worker_endpoint])

      @proxy_thread = Thread.new do
        ZMQ::Device.new(ZMQ::QUEUE, frontend, backend)
        raise "The lumberjack proxy died."
      end
    end

55
    def run(&block)
56
      #setup_proxy(@context)
Jordan Sissel's avatar
Jordan Sissel committed
57 58 59

      threads = @options[:workers].times.collect do |i|
        Thread.new do
60
          puts "Starting worker #{i}"
Jordan Sissel's avatar
Jordan Sissel committed
61 62 63 64 65 66 67 68 69 70
          run_worker(@context, &block)
        end
      end

      threads.each(&:join)
    end

    def run_worker(context, &block)
      socket = context.socket(ZMQ::REP)
      Stud::try(10.times) do
71 72
        #rc = socket.connect(@options[:worker_endpoint])
        rc = socket.bind(@options[:endpoint])
Jordan Sissel's avatar
Jordan Sissel committed
73 74 75 76 77
        if rc < 0
          raise "connect to #{@options[:worker_endpoint]} failed"
        end
      end

78 79 80 81 82 83
      ciphertext = ""
      ciphertext.force_encoding("BINARY")
      nonce = ""
      nonce.force_encoding("BINARY")
      count = 0
      start = Time.now
Jordan Sissel's avatar
Jordan Sissel committed
84
      list = []
85
      while true
Jordan Sissel's avatar
Jordan Sissel committed
86 87
        socket.recv_string(nonce)
        socket.recv_string(ciphertext)
88 89

        # Decrypt
90
        #plaintext = @cryptobox.open(nonce, ciphertext)
91 92

        # decompress
93
        #inflated = Zlib::Inflate.inflate(plaintext)
94 95

        # JSON
96
        #events = JSON.parse(inflated)
Jordan Sissel's avatar
Jordan Sissel committed
97 98 99
        #events.each do |event|
          #yield event
        #end
100 101

        # TODO(sissel): yield each event
102 103
        #count += events.count
        count += 1
Jordan Sissel's avatar
Jordan Sissel committed
104 105 106 107

        # Reply to acknowledge.
        # Currently there is no response message to put.
        socket.send_string("")
108

109
        if count > 100
110 111 112 113 114
          puts :rate => (count / (Time.now - start))
          count = 0
          start = Time.now
        end
      end
Jordan Sissel's avatar
Jordan Sissel committed
115
    end # def run_worker
116 117 118 119 120
  end # class Server2
end # module Lumberjack

if __FILE__ == $0
  a = Lumberjack::Server2.new(
121
    :workers => 1,
Jordan Sissel's avatar
Jordan Sissel committed
122
    :endpoint => "tcp://127.0.0.1:12345",
123 124 125
    :their_public_key => File.read("../../nacl.public").force_encoding("BINARY"),
    :my_secret_key => File.read("../../nacl.secret").force_encoding("BINARY"))

Jordan Sissel's avatar
Jordan Sissel committed
126
  count = 0
127 128 129
  #start = Time.now
  #require "thread"
  #q = Queue.new
Jordan Sissel's avatar
Jordan Sissel committed
130 131 132 133 134 135 136 137 138 139 140 141
  #q = java.util.concurrent.LinkedBlockingQueue.new
  #Thread.new { a.run { |e| q.put(e) } }
  a.run { |e| }

  #while q.take
    #count += 1
    #if count > 100000
      #puts count / (Time.now - start)
      #count = 0
      #start = Time.now
    #end
  #end
142
end