Commit 41e4a4d4 authored by Thomas Zahari's avatar Thomas Zahari

Fixing some more issues (offset calculation, removing CR,...)

Fixed: file offset still calculated wrong. File was detected as
truncated again :-(
Fixed: loosing the beginning of a partial line
Fixed: A CR is also removed from the EOL

Known issues:
If the last line was logged with CRLF as EOF then the file offset is
not correct calculated (missing one byte for the CR) in the Registry
parent 2c7d9150
......@@ -42,7 +42,7 @@ func (h *Harvester) Harvest(output chan *FileEvent) {
var read_timeout = 10 * time.Second
last_read_time := time.Now()
for {
text, err := h.readline(reader, read_timeout)
text, bytesread, err := h.readline(reader, read_timeout)
if err != nil {
if err == io.EOF {
......@@ -78,7 +78,7 @@ func (h *Harvester) Harvest(output chan *FileEvent) {
Fields: &h.Fields,
fileinfo: &info,
}
offset += int64(len(*text))
offset += int64(bytesread)
output <- event // ship the new event downstream
} /* forever */
......@@ -116,10 +116,16 @@ func (h *Harvester) open() *os.File {
return h.file
}
func (h *Harvester) readline(reader *bufio.Reader, eof_timeout time.Duration) (*string, error) {
func (h *Harvester) readline(reader *bufio.Reader, eof_timeout time.Duration) (*string, int, error) {
var buffer bytes.Buffer
var is_partial bool = true
var is_cr_present bool = false
var bufferSize int = 0;
start_time := time.Now()
// Store current offset for seeking back on timeout if the line is not complete
offset, _ := h.file.Seek(0, os.SEEK_CUR)
for {
segment, err := reader.ReadBytes('\n')
......@@ -127,9 +133,22 @@ func (h *Harvester) readline(reader *bufio.Reader, eof_timeout time.Duration) (*
if segment[len(segment)-1] == '\n' {
// Found a complete line
is_partial = false
// Check if also a CR present
if len(segment) > 1 && segment[len(segment)-2] == '\r' {
is_cr_present = true;
}
}
}
if segment != nil && len(segment) > 0 {
// TODO(sissel): if buffer exceeds a certain length, maybe report an error condition? chop it?
writelen,_ := buffer.Write(segment)
bufferSize += writelen;
}
if err != nil {
if err == io.EOF && is_partial {
time.Sleep(1 * time.Second) // TODO(sissel): Implement backoff
......@@ -137,25 +156,32 @@ func (h *Harvester) readline(reader *bufio.Reader, eof_timeout time.Duration) (*
// Give up waiting for data after a certain amount of time.
// If we time out, return the error (eof)
if time.Since(start_time) > eof_timeout {
return nil, err
// If we read a partial line then we seek back otherwise we miss this part
if len(segment) > 0 || bufferSize > 0 {
h.file.Seek(offset, os.SEEK_SET)
}
return nil, 0, err
}
continue
} else {
log.Println(err)
return nil, err // TODO(sissel): don't do this?
return nil, 0, err // TODO(sissel): don't do this?
}
}
// TODO(sissel): if buffer exceeds a certain length, maybe report an error condition? chop it?
buffer.Write(segment)
if !is_partial {
// If we got a full line, return the whole line without the newline.
// If we got a full line, return the whole line without the EOL chars (CRLF or LF)
str := new(string)
*str = buffer.String()[:len(segment)-1]
return str, nil
if !is_cr_present {
*str = buffer.String()[:bufferSize-1]
} else {
*str = buffer.String()[:bufferSize-2]
}
// bufferSize returns the str length with the EOL chars (LF or CRLF)
return str, bufferSize, nil
}
} /* forever read chunks */
return nil, nil
return nil, 0, nil
}
......@@ -22,6 +22,7 @@ func Registrar(input chan []*FileEvent) {
Source: event.Source,
// take the offset + length of the line + newline char and
// save it as the new starting offset.
// This issues a problem, if the EOL is a CRLF! Then on start it read the LF again and generates a event with an empty line
Offset: event.Offset + int64(len(*event.Text)) + 1,
Inode: ino,
Device: dev,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment