diff options
Diffstat (limited to 'src/reader.go')
-rw-r--r-- | src/reader.go | 89 |
1 files changed, 72 insertions, 17 deletions
diff --git a/src/reader.go b/src/reader.go index 47102ec1..3ebe3521 100644 --- a/src/reader.go +++ b/src/reader.go @@ -7,6 +7,7 @@ import ( "os" "os/exec" "path/filepath" + "strconv" "sync" "sync/atomic" "time" @@ -111,31 +112,85 @@ func (r *Reader) ReadSource(root string, opts walkerOpts, ignores []string) { } func (r *Reader) feed(src io.Reader) { + readerSlabSize, ae := strconv.Atoi(os.Getenv("SLAB_KB")) + if ae != nil { + readerSlabSize = 128 * 1024 + } else { + readerSlabSize *= 1024 + } + readerBufferSize, be := strconv.Atoi(os.Getenv("BUF_KB")) + if be != nil { + readerBufferSize = 64 * 1024 + } else { + readerBufferSize *= 1024 + } + + slab := make([]byte, readerSlabSize) + pointer := 0 delim := byte('\n') if r.delimNil { delim = '\000' } reader := bufio.NewReaderSize(src, readerBufferSize) + + // We do not put a slice longer than 10% of the slab to reduce fragmentation + maxBytes := readerBufferSize / 10 + for { - // ReadBytes returns err != nil if and only if the returned data does not - // end in delim. - bytea, err := reader.ReadBytes(delim) - byteaLen := len(bytea) - if byteaLen > 0 { - if err == nil { - // get rid of carriage return if under Windows: - if util.IsWindows() && byteaLen >= 2 && bytea[byteaLen-2] == byte('\r') { - bytea = bytea[:byteaLen-2] - } else { - bytea = bytea[:byteaLen-1] + var frags [][]byte + fragsLen := 0 + for { + bytea, err := reader.ReadSlice(delim) + if err == bufio.ErrBufferFull { + // Could not find the delimiter in the reader buffer. + // Need to collect the fragments and merge them later. + frags = append(frags, bytea) + fragsLen += len(bytea) + } else { + byteaLen := len(bytea) + if err == nil { + // No errors. Found the delimiter. + if util.IsWindows() && byteaLen >= 2 && bytea[byteaLen-2] == byte('\r') { + bytea = bytea[:byteaLen-2] + byteaLen -= 2 + } else { + bytea = bytea[:byteaLen-1] + byteaLen-- + } } + + itemLen := fragsLen + byteaLen + pointer += itemLen + var slice []byte + if itemLen <= maxBytes { // We can use the slab + // Allocate a new slab if it doesn't fit + if pointer > readerSlabSize { + slab = make([]byte, readerSlabSize) + pointer = itemLen + } + slice = slab[pointer-itemLen : pointer] + } else { // We can't use the slab because the item is too large + slice = make([]byte, itemLen) + } + + if len(frags) > 0 { + // Collect the fragments + n := 0 + for _, frag := range frags { + n += copy(slice[n:], frag) + } + copy(slice[n:], bytea) + } else if byteaLen > 0 { + copy(slice, bytea) + } + if (err == nil || itemLen > 0) && r.pusher(slice) { + atomic.StoreInt32(&r.event, int32(EvtReadNew)) + } + if err != nil { + return + } + break } - if r.pusher(bytea) { - atomic.StoreInt32(&r.event, int32(EvtReadNew)) - } - } - if err != nil { - break } } } |