spooler.go 1.69 KB
Newer Older
1
package main
2 3 4 5 6

import (
  "time"
)

7 8 9 10
func Spool(input chan *FileEvent,
  output chan []*FileEvent,
  max_size uint64,
  idle_timeout time.Duration) {
11 12 13
  // heartbeat periodically. If the last flush was longer than
  // 'idle_timeout' time ago, then we'll force a flush to prevent us from
  // holding on to spooled events for too long.
14

15
  ticker := time.NewTicker(idle_timeout / 2)
16 17 18

  // slice for spooling into
  // TODO(sissel): use container.Ring?
19 20
  spool := make([]*FileEvent, max_size)

21 22
  // Current write position in the spool
  var spool_i int = 0
23

24
  next_flush_time := time.Now().Add(idle_timeout)
25 26
  for {
    select {
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
    case event := <-input:
      //append(spool, event)
      spool[spool_i] = event
      spool_i++

      // Flush if full
      if spool_i == cap(spool) {
        //spoolcopy := make([]*FileEvent, max_size)
        var spoolcopy []*FileEvent
        //fmt.Println(spool[0])
        spoolcopy = append(spoolcopy, spool[:]...)
        output <- spoolcopy
        next_flush_time = time.Now().Add(idle_timeout)

        spool_i = 0
      }
    case <-ticker.C:
      //fmt.Println("tick")
      if now := time.Now(); now.After(next_flush_time) {
        // if current time is after the next_flush_time, flush!
        //fmt.Printf("timeout: %d exceeded by %d\n", idle_timeout,
        //now.Sub(next_flush_time))

        // Flush what we have, if anything
        if spool_i > 0 {
52
          var spoolcopy []*FileEvent
53
          spoolcopy = append(spoolcopy, spool[0:spool_i]...)
54
          output <- spoolcopy
55
          next_flush_time = now.Add(idle_timeout)
56 57
          spool_i = 0
        }
58
      } /* if 'now' is after 'next_flush_time' */
59 60 61
      /* case ... */
    } /* select */
  } /* for */
62
} /* spool */