Commit 2c7d9150 authored by tzahari's avatar tzahari

Bugfix for logging lines without or delayed newlines

I found a solution for the issue
https://github.com/elasticsearch/logstash-forwarder/issues/149
parent 8c050664
...@@ -78,7 +78,7 @@ func (h *Harvester) Harvest(output chan *FileEvent) { ...@@ -78,7 +78,7 @@ func (h *Harvester) Harvest(output chan *FileEvent) {
Fields: &h.Fields, Fields: &h.Fields,
fileinfo: &info, fileinfo: &info,
} }
offset += int64(len(*event.Text)) + 1 // +1 because of the line terminator offset += int64(len(*text))
output <- event // ship the new event downstream output <- event // ship the new event downstream
} /* forever */ } /* forever */
...@@ -118,12 +118,20 @@ func (h *Harvester) open() *os.File { ...@@ -118,12 +118,20 @@ func (h *Harvester) open() *os.File {
func (h *Harvester) readline(reader *bufio.Reader, eof_timeout time.Duration) (*string, error) { func (h *Harvester) readline(reader *bufio.Reader, eof_timeout time.Duration) (*string, error) {
var buffer bytes.Buffer var buffer bytes.Buffer
var is_partial bool = true
start_time := time.Now() start_time := time.Now()
for { for {
segment, is_partial, err := reader.ReadLine() segment, err := reader.ReadBytes('\n')
if segment != nil && len(segment) > 0 {
if segment[len(segment)-1] == '\n' {
// Found a complete line
is_partial = false
}
}
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF && is_partial {
time.Sleep(1 * time.Second) // TODO(sissel): Implement backoff time.Sleep(1 * time.Second) // TODO(sissel): Implement backoff
// Give up waiting for data after a certain amount of time. // Give up waiting for data after a certain amount of time.
...@@ -142,9 +150,9 @@ func (h *Harvester) readline(reader *bufio.Reader, eof_timeout time.Duration) (* ...@@ -142,9 +150,9 @@ func (h *Harvester) readline(reader *bufio.Reader, eof_timeout time.Duration) (*
buffer.Write(segment) buffer.Write(segment)
if !is_partial { if !is_partial {
// If we got a full line, return the whole line. // If we got a full line, return the whole line without the newline.
str := new(string) str := new(string)
*str = buffer.String() *str = buffer.String()[:len(segment)-1]
return str, nil return str, nil
} }
} /* forever read chunks */ } /* forever read chunks */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment