summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJakob Borg <jakob@nym.se>2014-07-03 13:37:20 +0200
committerJakob Borg <jakob@nym.se>2014-07-03 13:37:20 +0200
commit4a6b43bcaec3f6deaf823e96746f725ef3e77a6b (patch)
treef404d9113e65fe1623f8203298d31ab32be1e4a7
parent2f5a822ca4c04847676fe9168e4d26decc6c1970 (diff)
Clean up protocol locking and closingv0.8.18
-rw-r--r--protocol/protocol.go74
1 files changed, 27 insertions, 47 deletions
diff --git a/protocol/protocol.go b/protocol/protocol.go
index 5642206953..9977737d3d 100644
--- a/protocol/protocol.go
+++ b/protocol/protocol.go
@@ -81,15 +81,14 @@ type rawConnection struct {
reader io.ReadCloser
cr *countingReader
xr *xdr.Reader
- writer io.WriteCloser
- cw *countingWriter
- wb *bufio.Writer
- xw *xdr.Writer
- wmut sync.Mutex
+ writer io.WriteCloser
+ cw *countingWriter
+ wb *bufio.Writer
+ xw *xdr.Writer
- awaiting []chan asyncResult
- imut sync.Mutex
+ awaiting []chan asyncResult
+ awaitingMut sync.Mutex
idxSent map[string]map[string]uint64
idxMut sync.Mutex // ensures serialization of Index calls
@@ -97,6 +96,7 @@ type rawConnection struct {
nextID chan int
outbox chan []encodable
closed chan struct{}
+ once sync.Once
}
type asyncResult struct {
@@ -192,13 +192,13 @@ func (c *rawConnection) Request(repo string, name string, offset int64, size int
return nil, ErrClosed
}
- c.imut.Lock()
+ c.awaitingMut.Lock()
if ch := c.awaiting[id]; ch != nil {
panic("id taken")
}
- rc := make(chan asyncResult)
+ rc := make(chan asyncResult, 1)
c.awaiting[id] = rc
- c.imut.Unlock()
+ c.awaitingMut.Unlock()
ok := c.send(header{0, id, messageTypeRequest},
RequestMessage{repo, name, uint64(offset), uint32(size)})
@@ -227,9 +227,9 @@ func (c *rawConnection) ping() bool {
}
rc := make(chan asyncResult, 1)
- c.imut.Lock()
+ c.awaitingMut.Lock()
c.awaiting[id] = rc
- c.imut.Unlock()
+ c.awaitingMut.Unlock()
ok := c.send(header{0, id, messageTypePing})
if !ok {
@@ -388,32 +388,25 @@ func (c *rawConnection) handleResponse(hdr header) error {
return err
}
- go func(hdr header, err error) {
- c.imut.Lock()
- rc := c.awaiting[hdr.msgID]
+ c.awaitingMut.Lock()
+ if rc := c.awaiting[hdr.msgID]; rc != nil {
c.awaiting[hdr.msgID] = nil
- c.imut.Unlock()
-
- if rc != nil {
- rc <- asyncResult{data, err}
- close(rc)
- }
- }(hdr, c.xr.Error())
+ rc <- asyncResult{data, nil}
+ close(rc)
+ }
+ c.awaitingMut.Unlock()
return nil
}
func (c *rawConnection) handlePong(hdr header) {
- c.imut.Lock()
+ c.awaitingMut.Lock()
if rc := c.awaiting[hdr.msgID]; rc != nil {
- go func() {
- rc <- asyncResult{}
- close(rc)
- }()
-
c.awaiting[hdr.msgID] = nil
+ rc <- asyncResult{}
+ close(rc)
}
- c.imut.Unlock()
+ c.awaitingMut.Unlock()
}
func (c *rawConnection) handleClusterConfig() error {
@@ -458,17 +451,14 @@ func (c *rawConnection) send(h header, es ...encodable) bool {
func (c *rawConnection) writerLoop() {
var err error
for es := range c.outbox {
- c.wmut.Lock()
for _, e := range es {
e.encodeXDR(c.xw)
}
if err = c.flush(); err != nil {
- c.wmut.Unlock()
c.close(err)
return
}
- c.wmut.Unlock()
}
}
@@ -493,29 +483,20 @@ func (c *rawConnection) flush() error {
}
func (c *rawConnection) close(err error) {
- c.imut.Lock()
- c.wmut.Lock()
- defer c.imut.Unlock()
- defer c.wmut.Unlock()
-
- select {
- case <-c.closed:
- return
- default:
+ c.once.Do(func() {
close(c.closed)
+ c.awaitingMut.Lock()
for i, ch := range c.awaiting {
if ch != nil {
close(ch)
c.awaiting[i] = nil
}
}
-
- c.writer.Close()
- c.reader.Close()
+ c.awaitingMut.Unlock()
go c.receiver.Close(c.id, err)
- }
+ })
}
func (c *rawConnection) idGenerator() {
@@ -577,8 +558,7 @@ func (c *rawConnection) pingerLoop() {
func (c *rawConnection) processRequest(msgID int, req RequestMessage) {
data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
- c.send(header{0, msgID, messageTypeResponse},
- encodableBytes(data))
+ c.send(header{0, msgID, messageTypeResponse}, encodableBytes(data))
}
type Statistics struct {