prospector.go 4.19 KB
Newer Older
1
package main
2 3

import (
Jordan Sissel's avatar
Jordan Sissel committed
4
  "encoding/json"
5
  "log"
6 7 8
  "os"
  "path/filepath"
  "time"
9 10
)

11
func Prospect(fileconfig FileConfig, output chan *FileEvent, age_limit time.Duration) {
12
  fileinfo := make(map[string]os.FileInfo)
Jordan Sissel's avatar
Jordan Sissel committed
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27

  // Handle any "-" (stdin) paths
  for i, path := range fileconfig.Paths {
    if path == "-" {
      harvester := Harvester{Path: path, Fields: fileconfig.Fields}
      go harvester.Harvest(output)

      // Remove it from the file list
      fileconfig.Paths = append(fileconfig.Paths[:i], fileconfig.Paths[i+1:]...)
    }
  }

  // Use the registrar db to reopen any files at their last positions
  resume_tracking(fileconfig, fileinfo, output)

28
  for {
29
    for _, path := range fileconfig.Paths {
30
      prospector_scan(path, fileconfig.Fields, fileinfo, output, age_limit)
31
    }
32

33 34 35 36
    // Defer next scan for a bit.
    time.Sleep(10 * time.Second) // Make this tunable
  }
} /* Prospect */
37

Jordan Sissel's avatar
Jordan Sissel committed
38 39
func resume_tracking(fileconfig FileConfig, fileinfo map[string]os.FileInfo, output chan *FileEvent) {
  // Start up with any registrar data.
40
  history, err := os.Open(".logstash-forwarder")
Jordan Sissel's avatar
Jordan Sissel committed
41 42 43 44 45 46 47 48 49 50 51
  if err == nil {
    historical_state := make(map[string]*FileState)
    log.Printf("Loading registrar data\n")
    decoder := json.NewDecoder(history)
    decoder.Decode(&historical_state)
    history.Close()

    for path, state := range historical_state {
      // if the file is the same inode/device as we last saw,
      // start a harvester on it at the last known position
      info, err := os.Stat(path)
52 53 54
      if err != nil {
        continue
      }
Jordan Sissel's avatar
Jordan Sissel committed
55

56
      if is_file_same(path, info, state) {
Jordan Sissel's avatar
Jordan Sissel committed
57 58 59 60 61 62
        // same file, seek to last known position
        fileinfo[path] = info

        for _, pathglob := range fileconfig.Paths {
          match, _ := filepath.Match(pathglob, path)
          if match {
63
            harvester := Harvester{Path: path, Fields: fileconfig.Fields, Offset: state.Offset}
Jordan Sissel's avatar
Jordan Sissel committed
64 65 66 67 68 69 70 71 72
            go harvester.Harvest(output)
            break
          }
        }
      }
    }
  }
}

73 74
func prospector_scan(path string, fields map[string]string,
  fileinfo map[string]os.FileInfo,
75
  output chan *FileEvent, age_limit time.Duration) {
76
  //log.Printf("Prospecting %s\n", path)
77

78 79 80 81 82 83
  // Evaluate the path as a wildcards/shell glob
  matches, err := filepath.Glob(path)
  if err != nil {
    log.Printf("glob(%s) failed: %v\n", path, err)
    return
  }
84

85 86 87 88
  // If the glob matches nothing, use the path itself as a literal.
  if len(matches) == 0 && path == "-" {
    matches = append(matches, path)
  }
89

90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
  // Check any matched files to see if we need to start a harvester
  for _, file := range matches {
    // Stat the file, following any symlinks.
    info, err := os.Stat(file)
    // TODO(sissel): check err
    if err != nil {
      log.Printf("stat(%s) failed: %s\n", file, err)
      continue
    }

    if info.IsDir() {
      log.Printf("Skipping directory: %s\n", file)
      continue
    }

    // Check the current info against fileinfo[file]
    lastinfo, is_known := fileinfo[file]
    // Track the stat data for this file for later comparison to check for
    // rotation/etc
    fileinfo[file] = info

    // Conditions for starting a new harvester:
    // - file path hasn't been seen before
    // - the file's inode or device changed
114
    if !is_known {
115 116
      // TODO(sissel): Skip files with modification dates older than N
      // TODO(sissel): Make the 'ignore if older than N' tunable
117
      if time.Since(info.ModTime()) > 30*24*time.Hour {
118
        log.Printf("Skipping old file: %s\n", file)
119
      } else if is_file_renamed(file, info, fileinfo) {
120
        // Check to see if this file was simply renamed (known inode+dev)
121 122 123
      } else {
        // Most likely a new file. Harvest it!
        log.Printf("Launching harvester on new file: %s\n", file)
124
        harvester := Harvester{Path: file, Fields: fields}
125 126
        go harvester.Harvest(output)
      }
127 128 129 130 131 132
    } else if !is_fileinfo_same(lastinfo, info) {
      log.Printf("Launching harvester on rotated file: %s\n", file)
      // TODO(sissel): log 'file rotated' or osmething
      // Start a harvester on the path; a new file appeared with the same name.
      harvester := Harvester{Path: file, Fields: fields}
      go harvester.Harvest(output)
133 134 135
    }
  } // for each file matched by the glob
}