prospector.go 4.15 KB
Newer Older
1
package main
2 3 4 5

import (
  "time"
  "path/filepath"
Jordan Sissel's avatar
Jordan Sissel committed
6
  "encoding/json"
7
  "os"
8
  "log"
9 10
)

11
func Prospect(fileconfig FileConfig, output chan *FileEvent) {
12
  fileinfo := make(map[string]os.FileInfo)
Jordan Sissel's avatar
Jordan Sissel committed
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27

  // Handle any "-" (stdin) paths
  for i, path := range fileconfig.Paths {
    if path == "-" {
      harvester := Harvester{Path: path, Fields: fileconfig.Fields}
      go harvester.Harvest(output)

      // Remove it from the file list
      fileconfig.Paths = append(fileconfig.Paths[:i], fileconfig.Paths[i+1:]...)
    }
  }

  // Use the registrar db to reopen any files at their last positions
  resume_tracking(fileconfig, fileinfo, output)

28
  for {
29 30
    for _, path := range fileconfig.Paths {
      prospector_scan(path, fileconfig.Fields, fileinfo, output)
31
    }
32

33 34 35 36
    // Defer next scan for a bit.
    time.Sleep(10 * time.Second) // Make this tunable
  }
} /* Prospect */
37

Jordan Sissel's avatar
Jordan Sissel committed
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
func resume_tracking(fileconfig FileConfig, fileinfo map[string]os.FileInfo, output chan *FileEvent) {
  // Start up with any registrar data.
  history, err := os.Open(".lumberjack")
  if err == nil {
    historical_state := make(map[string]*FileState)
    log.Printf("Loading registrar data\n")
    decoder := json.NewDecoder(history)
    decoder.Decode(&historical_state)
    history.Close()

    for path, state := range historical_state {
      // if the file is the same inode/device as we last saw,
      // start a harvester on it at the last known position
      info, err := os.Stat(path)
      if err != nil { continue }

54
      if is_file_same(path, info, state) {
Jordan Sissel's avatar
Jordan Sissel committed
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
        // same file, seek to last known position
        fileinfo[path] = info

        for _, pathglob := range fileconfig.Paths {
          match, _ := filepath.Match(pathglob, path)
          if match {
            harvester := Harvester{Path: path, Fields: fileconfig.Fields, Offset: state.Offset }
            go harvester.Harvest(output)
            break
          }
        }
      }
    }
  }
}

71 72
func prospector_scan(path string, fields map[string]string, 
                     fileinfo map[string]os.FileInfo,
73
                     output chan *FileEvent) {
74
  //log.Printf("Prospecting %s\n", path)
75

76 77 78 79 80 81
  // Evaluate the path as a wildcards/shell glob
  matches, err := filepath.Glob(path)
  if err != nil {
    log.Printf("glob(%s) failed: %v\n", path, err)
    return
  }
82

83 84 85 86
  // If the glob matches nothing, use the path itself as a literal.
  if len(matches) == 0 && path == "-" {
    matches = append(matches, path)
  }
87

88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
  // Check any matched files to see if we need to start a harvester
  for _, file := range matches {
    // Stat the file, following any symlinks.
    info, err := os.Stat(file)
    // TODO(sissel): check err
    if err != nil {
      log.Printf("stat(%s) failed: %s\n", file, err)
      continue
    }

    if info.IsDir() {
      log.Printf("Skipping directory: %s\n", file)
      continue
    }

    // Check the current info against fileinfo[file]
    lastinfo, is_known := fileinfo[file]
    // Track the stat data for this file for later comparison to check for
    // rotation/etc
    fileinfo[file] = info

    // Conditions for starting a new harvester:
    // - file path hasn't been seen before
    // - the file's inode or device changed
    if !is_known { 
      // TODO(sissel): Skip files with modification dates older than N
      // TODO(sissel): Make the 'ignore if older than N' tunable
      if time.Since(info.ModTime()) > 24*time.Hour {
        log.Printf("Skipping old file: %s\n", file)
117
      } else if is_file_renamed(file, info, fileinfo) {
118
        // Check to see if this file was simply renamed (known inode+dev)
119 120 121
      } else {
        // Most likely a new file. Harvest it!
        log.Printf("Launching harvester on new file: %s\n", file)
122
        harvester := Harvester{Path: file, Fields: fields}
123 124
        go harvester.Harvest(output)
      }
125 126 127 128 129 130
    } else if !is_fileinfo_same(lastinfo, info) {
      log.Printf("Launching harvester on rotated file: %s\n", file)
      // TODO(sissel): log 'file rotated' or osmething
      // Start a harvester on the path; a new file appeared with the same name.
      harvester := Harvester{Path: file, Fields: fields}
      go harvester.Harvest(output)
131 132 133
    }
  } // for each file matched by the glob
}