spooler.go 1.69 KB
Newer Older
1
package main
2 3 4 5 6

import (
  "time"
)

Jordan Sissel's avatar
Jordan Sissel committed
7 8 9 10
func Spool(input chan *FileEvent,
  output chan []*FileEvent,
  max_size uint64,
  idle_timeout time.Duration) {
11 12 13
  // heartbeat periodically. If the last flush was longer than
  // 'idle_timeout' time ago, then we'll force a flush to prevent us from
  // holding on to spooled events for too long.
14

15
  ticker := time.NewTicker(idle_timeout / 2)
16 17 18

  // slice for spooling into
  // TODO(sissel): use container.Ring?
19 20
  spool := make([]*FileEvent, max_size)

21 22
  // Current write position in the spool
  var spool_i int = 0
23

24
  next_flush_time := time.Now().Add(idle_timeout)
25 26
  for {
    select {
Jordan Sissel's avatar
Jordan Sissel committed
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
    case event := <-input:
      //append(spool, event)
      spool[spool_i] = event
      spool_i++

      // Flush if full
      if spool_i == cap(spool) {
        //spoolcopy := make([]*FileEvent, max_size)
        var spoolcopy []*FileEvent
        //fmt.Println(spool[0])
        spoolcopy = append(spoolcopy, spool[:]...)
        output <- spoolcopy
        next_flush_time = time.Now().Add(idle_timeout)

        spool_i = 0
      }
    case <-ticker.C:
      //fmt.Println("tick")
      if now := time.Now(); now.After(next_flush_time) {
        // if current time is after the next_flush_time, flush!
        //fmt.Printf("timeout: %d exceeded by %d\n", idle_timeout,
        //now.Sub(next_flush_time))

        // Flush what we have, if anything
        if spool_i > 0 {
52
          var spoolcopy []*FileEvent
Jordan Sissel's avatar
Jordan Sissel committed
53
          spoolcopy = append(spoolcopy, spool[0:spool_i]...)
54
          output <- spoolcopy
Jordan Sissel's avatar
Jordan Sissel committed
55
          next_flush_time = now.Add(idle_timeout)
56 57
          spool_i = 0
        }
Jordan Sissel's avatar
Jordan Sissel committed
58
      } /* if 'now' is after 'next_flush_time' */
59 60 61
      /* case ... */
    } /* select */
  } /* for */
62
} /* spool */