From 334712c413c4599453e2a3ad28f3fd4bc849f721 Mon Sep 17 00:00:00 2001 From: Andy Wang Date: Sun, 12 Apr 2020 12:28:00 +0100 Subject: [PATCH] Regression: closing in the middle of a read will block --- internal/multiplex/bufferedPipe.go | 14 ++++++-------- internal/multiplex/datagramBuffer.go | 16 +++++++--------- internal/multiplex/datagramBuffer_test.go | 3 ++- 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/internal/multiplex/bufferedPipe.go b/internal/multiplex/bufferedPipe.go index 4d2c4eb..056fd09 100644 --- a/internal/multiplex/bufferedPipe.go +++ b/internal/multiplex/bufferedPipe.go @@ -7,6 +7,7 @@ import ( "errors" "io" "sync" + "sync/atomic" "time" ) @@ -19,7 +20,7 @@ type bufferedPipe struct { // only alloc when on first Read or Write buf *bytes.Buffer - closed bool + closed uint32 rwCond *sync.Cond rDeadline time.Time } @@ -38,7 +39,7 @@ func (p *bufferedPipe) Read(target []byte) (int, error) { p.buf = new(bytes.Buffer) } for { - if p.closed && p.buf.Len() == 0 { + if atomic.LoadUint32(&p.closed) == 1 && p.buf.Len() == 0 { return 0, io.EOF } if !p.rDeadline.IsZero() { @@ -66,7 +67,7 @@ func (p *bufferedPipe) WriteTo(w io.Writer) (n int64, err error) { p.buf = new(bytes.Buffer) } for { - if p.closed && p.buf.Len() == 0 { + if atomic.LoadUint32(&p.closed) == 1 && p.buf.Len() == 0 { return 0, io.EOF } if !p.rDeadline.IsZero() { @@ -96,7 +97,7 @@ func (p *bufferedPipe) Write(input []byte) (int, error) { p.buf = new(bytes.Buffer) } for { - if p.closed { + if atomic.LoadUint32(&p.closed) == 1 { return 0, io.ErrClosedPipe } if p.buf.Len() <= BUF_SIZE_LIMIT { @@ -112,10 +113,7 @@ func (p *bufferedPipe) Write(input []byte) (int, error) { } func (p *bufferedPipe) Close() error { - p.rwCond.L.Lock() - defer p.rwCond.L.Unlock() - - p.closed = true + atomic.StoreUint32(&p.closed, 1) p.rwCond.Broadcast() return nil } diff --git a/internal/multiplex/datagramBuffer.go b/internal/multiplex/datagramBuffer.go index 3b8d784..203b9b9 100644 --- a/internal/multiplex/datagramBuffer.go +++ b/internal/multiplex/datagramBuffer.go @@ -6,6 +6,7 @@ import ( "bytes" "io" "sync" + "sync/atomic" "time" ) @@ -15,7 +16,7 @@ import ( type datagramBuffer struct { pLens []int buf *bytes.Buffer - closed bool + closed uint32 rwCond *sync.Cond rDeadline time.Time } @@ -34,7 +35,7 @@ func (d *datagramBuffer) Read(target []byte) (int, error) { d.buf = new(bytes.Buffer) } for { - if d.closed && len(d.pLens) == 0 { + if atomic.LoadUint32(&d.closed) == 1 && len(d.pLens) == 0 { return 0, io.EOF } @@ -69,7 +70,7 @@ func (d *datagramBuffer) WriteTo(w io.Writer) (n int64, err error) { d.buf = new(bytes.Buffer) } for { - if d.closed && len(d.pLens) == 0 { + if atomic.LoadUint32(&d.closed) == 1 && len(d.pLens) == 0 { return 0, io.EOF } @@ -103,7 +104,7 @@ func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) { d.buf = new(bytes.Buffer) } for { - if d.closed { + if atomic.LoadUint32(&d.closed) == 1 { return true, io.ErrClosedPipe } if d.buf.Len() <= BUF_SIZE_LIMIT { @@ -114,7 +115,7 @@ func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) { } if f.Closing != C_NOOP { - d.closed = true + atomic.StoreUint32(&d.closed, 1) d.rwCond.Broadcast() return true, nil } @@ -128,10 +129,7 @@ func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) { } func (d *datagramBuffer) Close() error { - d.rwCond.L.Lock() - defer d.rwCond.L.Unlock() - - d.closed = true + atomic.StoreUint32(&d.closed, 1) d.rwCond.Broadcast() return nil } diff --git a/internal/multiplex/datagramBuffer_test.go b/internal/multiplex/datagramBuffer_test.go index 91d8b33..add8e86 100644 --- a/internal/multiplex/datagramBuffer_test.go +++ b/internal/multiplex/datagramBuffer_test.go @@ -2,6 +2,7 @@ package multiplex import ( "bytes" + "sync/atomic" "testing" "time" ) @@ -66,7 +67,7 @@ func TestDatagramBuffer_RW(t *testing.T) { ) return } - if !pipe.closed { + if atomic.LoadUint32(&pipe.closed) != 1 { t.Error("expecting closed pipe, not closed") } })