summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJunegunn Choi <junegunn.c@gmail.com>2017-08-16 03:24:23 +0900
committerJunegunn Choi <junegunn.c@gmail.com>2017-08-16 03:33:48 +0900
commit487c8fe88f4cfcc55850b8aef73665b1d09b8fe0 (patch)
treed21e3b4a4fdcb2a6d6980d1f47fb3ede5bd5c557
parent0d171ba1d81886c6f9caf61867129e6daa268cd6 (diff)
Make Reader event notification asynchronous
Instead of notifying the event coordinator (EventBox) whenever a new line is arrived, start a background goroutine that periodically does the task. Atomic.StoreInt32 is much cheaper than mutex synchronization that happens during EventBox update.
-rw-r--r--src/constants.go7
-rw-r--r--src/core.go8
-rw-r--r--src/reader.go39
-rw-r--r--src/reader_test.go28
4 files changed, 64 insertions, 18 deletions
diff --git a/src/constants.go b/src/constants.go
index 10df1e7e..0f6a32c9 100644
--- a/src/constants.go
+++ b/src/constants.go
@@ -16,7 +16,10 @@ const (
coordinatorDelayStep time.Duration = 10 * time.Millisecond
// Reader
- readerBufferSize = 64 * 1024
+ readerBufferSize = 64 * 1024
+ readerPollIntervalMin = 10 * time.Millisecond
+ readerPollIntervalStep = 5 * time.Millisecond
+ readerPollIntervalMax = 50 * time.Millisecond
// Terminal
initialDelay = 20 * time.Millisecond
@@ -68,7 +71,7 @@ const (
EvtSearchProgress
EvtSearchFin
EvtHeader
- EvtClose
+ EvtReady
)
const (
diff --git a/src/core.go b/src/core.go
index 968d407b..61f14f91 100644
--- a/src/core.go
+++ b/src/core.go
@@ -115,9 +115,9 @@ func Run(opts *Options, revision string) {
// Reader
streamingFilter := opts.Filter != nil && !sort && !opts.Tac && !opts.Sync
if !streamingFilter {
- reader := Reader{func(data []byte) bool {
+ reader := NewReader(func(data []byte) bool {
return chunkList.Push(data)
- }, eventBox, opts.ReadZero}
+ }, eventBox, opts.ReadZero)
go reader.ReadSource()
}
@@ -150,7 +150,7 @@ func Run(opts *Options, revision string) {
found := false
if streamingFilter {
slab := util.MakeSlab(slab16Size, slab32Size)
- reader := Reader{
+ reader := NewReader(
func(runes []byte) bool {
item := Item{}
if chunkList.trans(&item, runes, 0) {
@@ -160,7 +160,7 @@ func Run(opts *Options, revision string) {
}
}
return false
- }, eventBox, opts.ReadZero}
+ }, eventBox, opts.ReadZero)
reader.ReadSource()
} else {
eventBox.Unwatch(EvtReadNew)
diff --git a/src/reader.go b/src/reader.go
index 1572e5de..401b8f0f 100644
--- a/src/reader.go
+++ b/src/reader.go
@@ -4,6 +4,8 @@ import (
"bufio"
"io"
"os"
+ "sync/atomic"
+ "time"
"github.com/junegunn/fzf/src/util"
)
@@ -13,10 +15,43 @@ type Reader struct {
pusher func([]byte) bool
eventBox *util.EventBox
delimNil bool
+ event int32
+}
+
+// NewReader returns new Reader object
+func NewReader(pusher func([]byte) bool, eventBox *util.EventBox, delimNil bool) *Reader {
+ return &Reader{pusher, eventBox, delimNil, int32(EvtReady)}
+}
+
+func (r *Reader) startEventPoller() {
+ go func() {
+ ptr := &r.event
+ pollInterval := readerPollIntervalMin
+ for {
+ if atomic.CompareAndSwapInt32(ptr, int32(EvtReadNew), int32(EvtReady)) {
+ r.eventBox.Set(EvtReadNew, true)
+ pollInterval = readerPollIntervalMin
+ } else if atomic.LoadInt32(ptr) == int32(EvtReadFin) {
+ return
+ } else {
+ pollInterval += readerPollIntervalStep
+ if pollInterval > readerPollIntervalMax {
+ pollInterval = readerPollIntervalMax
+ }
+ }
+ time.Sleep(pollInterval)
+ }
+ }()
+}
+
+func (r *Reader) fin(success bool) {
+ atomic.StoreInt32(&r.event, int32(EvtReadFin))
+ r.eventBox.Set(EvtReadFin, success)
}
// ReadSource reads data from the default command or from standard input
func (r *Reader) ReadSource() {
+ r.startEventPoller()
var success bool
if util.IsTty() {
cmd := os.Getenv("FZF_DEFAULT_COMMAND")
@@ -27,7 +62,7 @@ func (r *Reader) ReadSource() {
} else {
success = r.readFromStdin()
}
- r.eventBox.Set(EvtReadFin, success)
+ r.fin(success)
}
func (r *Reader) feed(src io.Reader) {
@@ -51,7 +86,7 @@ func (r *Reader) feed(src io.Reader) {
}
}
if r.pusher(bytea) {
- r.eventBox.Set(EvtReadNew, true)
+ atomic.StoreInt32(&r.event, int32(EvtReadNew))
}
}
if err != nil {
diff --git a/src/reader_test.go b/src/reader_test.go
index d5c218cb..82ca6b7b 100644
--- a/src/reader_test.go
+++ b/src/reader_test.go
@@ -2,6 +2,7 @@ package fzf
import (
"testing"
+ "time"
"github.com/junegunn/fzf/src/util"
)
@@ -11,7 +12,10 @@ func TestReadFromCommand(t *testing.T) {
eb := util.NewEventBox()
reader := Reader{
pusher: func(s []byte) bool { strs = append(strs, string(s)); return true },
- eventBox: eb}
+ eventBox: eb,
+ event: int32(EvtReady)}
+
+ reader.startEventPoller()
// Check EventBox
if eb.Peek(EvtReadNew) {
@@ -19,21 +23,16 @@ func TestReadFromCommand(t *testing.T) {
}
// Normal command
- reader.readFromCommand(`echo abc && echo def`)
+ reader.fin(reader.readFromCommand(`echo abc && echo def`))
if len(strs) != 2 || strs[0] != "abc" || strs[1] != "def" {
t.Errorf("%s", strs)
}
// Check EventBox again
- if !eb.Peek(EvtReadNew) {
- t.Error("EvtReadNew should be set yet")
- }
+ eb.WaitFor(EvtReadFin)
// Wait should return immediately
eb.Wait(func(events *util.Events) {
- if _, found := (*events)[EvtReadNew]; !found {
- t.Errorf("%s", events)
- }
events.Clear()
})
@@ -42,8 +41,14 @@ func TestReadFromCommand(t *testing.T) {
t.Error("EvtReadNew should not be set yet")
}
+ // Make sure that event poller is finished
+ time.Sleep(readerPollIntervalMax)
+
+ // Restart event poller
+ reader.startEventPoller()
+
// Failing command
- reader.readFromCommand(`no-such-command`)
+ reader.fin(reader.readFromCommand(`no-such-command`))
strs = []string{}
if len(strs) > 0 {
t.Errorf("%s", strs)
@@ -51,6 +56,9 @@ func TestReadFromCommand(t *testing.T) {
// Check EventBox again
if eb.Peek(EvtReadNew) {
- t.Error("Command failed. EvtReadNew should be set")
+ t.Error("Command failed. EvtReadNew should not be set")
+ }
+ if !eb.Peek(EvtReadFin) {
+ t.Error("EvtReadFin should be set")
}
}