spooler.go 1.79 KB
Newer Older
1
package main
2 3 4 5 6

import (
  "time"
)

7
func Spool(input chan *FileEvent, 
8
           output chan []*FileEvent,
9 10
           max_size uint64,
           idle_timeout time.Duration) {
11 12 13
  // heartbeat periodically. If the last flush was longer than
  // 'idle_timeout' time ago, then we'll force a flush to prevent us from
  // holding on to spooled events for too long.
14

15
  ticker := time.NewTicker(idle_timeout / 2)
16 17 18

  // slice for spooling into
  // TODO(sissel): use container.Ring?
19 20
  spool := make([]*FileEvent, max_size)

21 22
  // Current write position in the spool
  var spool_i int = 0
23

24
  next_flush_time := time.Now().Add(idle_timeout)
25 26 27
  for {
    select {
      case event := <- input:
28
        //append(spool, event)
29 30 31 32
        spool[spool_i] = event
        spool_i++

        // Flush if full
33 34 35 36 37
        if spool_i == cap(spool) { 
          //spoolcopy := make([]*FileEvent, max_size)
          var spoolcopy []*FileEvent
          //fmt.Println(spool[0])
          spoolcopy = append(spoolcopy, spool[:]...)
38
          output <- spoolcopy
39
          next_flush_time = time.Now().Add(idle_timeout)
40

41 42 43
          spool_i = 0
        }
      case <- ticker.C:
44
        //fmt.Println("tick")
45 46
        if now := time.Now(); now.After(next_flush_time) {
          // if current time is after the next_flush_time, flush! 
47 48
          //fmt.Printf("timeout: %d exceeded by %d\n", idle_timeout,
                     //now.Sub(next_flush_time))
49 50 51

          // Flush what we have, if anything
          if spool_i > 0 { 
52 53 54
            var spoolcopy []*FileEvent
            spoolcopy = append(spoolcopy, spool[0:spool_i]...)
            output <- spoolcopy
55
            next_flush_time = now.Add(idle_timeout)
56 57
            spool_i = 0
          }
58
        } /* if 'now' is after 'next_flush_time' */
59 60 61
      /* case ... */
    } /* select */
  } /* for */
62
} /* spool */