Regression: closing in the middle of a read will block

This commit is contained in:
Andy Wang 2020-04-12 12:28:00 +01:00
parent 4351815a4a
commit 334712c413
3 changed files with 15 additions and 18 deletions

View File

@ -7,6 +7,7 @@ import (
"errors"
"io"
"sync"
"sync/atomic"
"time"
)
@ -19,7 +20,7 @@ type bufferedPipe struct {
// only alloc when on first Read or Write
buf *bytes.Buffer
closed bool
closed uint32
rwCond *sync.Cond
rDeadline time.Time
}
@ -38,7 +39,7 @@ func (p *bufferedPipe) Read(target []byte) (int, error) {
p.buf = new(bytes.Buffer)
}
for {
if p.closed && p.buf.Len() == 0 {
if atomic.LoadUint32(&p.closed) == 1 && p.buf.Len() == 0 {
return 0, io.EOF
}
if !p.rDeadline.IsZero() {
@ -66,7 +67,7 @@ func (p *bufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
p.buf = new(bytes.Buffer)
}
for {
if p.closed && p.buf.Len() == 0 {
if atomic.LoadUint32(&p.closed) == 1 && p.buf.Len() == 0 {
return 0, io.EOF
}
if !p.rDeadline.IsZero() {
@ -96,7 +97,7 @@ func (p *bufferedPipe) Write(input []byte) (int, error) {
p.buf = new(bytes.Buffer)
}
for {
if p.closed {
if atomic.LoadUint32(&p.closed) == 1 {
return 0, io.ErrClosedPipe
}
if p.buf.Len() <= BUF_SIZE_LIMIT {
@ -112,10 +113,7 @@ func (p *bufferedPipe) Write(input []byte) (int, error) {
}
func (p *bufferedPipe) Close() error {
p.rwCond.L.Lock()
defer p.rwCond.L.Unlock()
p.closed = true
atomic.StoreUint32(&p.closed, 1)
p.rwCond.Broadcast()
return nil
}

View File

@ -6,6 +6,7 @@ import (
"bytes"
"io"
"sync"
"sync/atomic"
"time"
)
@ -15,7 +16,7 @@ import (
type datagramBuffer struct {
pLens []int
buf *bytes.Buffer
closed bool
closed uint32
rwCond *sync.Cond
rDeadline time.Time
}
@ -34,7 +35,7 @@ func (d *datagramBuffer) Read(target []byte) (int, error) {
d.buf = new(bytes.Buffer)
}
for {
if d.closed && len(d.pLens) == 0 {
if atomic.LoadUint32(&d.closed) == 1 && len(d.pLens) == 0 {
return 0, io.EOF
}
@ -69,7 +70,7 @@ func (d *datagramBuffer) WriteTo(w io.Writer) (n int64, err error) {
d.buf = new(bytes.Buffer)
}
for {
if d.closed && len(d.pLens) == 0 {
if atomic.LoadUint32(&d.closed) == 1 && len(d.pLens) == 0 {
return 0, io.EOF
}
@ -103,7 +104,7 @@ func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) {
d.buf = new(bytes.Buffer)
}
for {
if d.closed {
if atomic.LoadUint32(&d.closed) == 1 {
return true, io.ErrClosedPipe
}
if d.buf.Len() <= BUF_SIZE_LIMIT {
@ -114,7 +115,7 @@ func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) {
}
if f.Closing != C_NOOP {
d.closed = true
atomic.StoreUint32(&d.closed, 1)
d.rwCond.Broadcast()
return true, nil
}
@ -128,10 +129,7 @@ func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) {
}
func (d *datagramBuffer) Close() error {
d.rwCond.L.Lock()
defer d.rwCond.L.Unlock()
d.closed = true
atomic.StoreUint32(&d.closed, 1)
d.rwCond.Broadcast()
return nil
}

View File

@ -2,6 +2,7 @@ package multiplex
import (
"bytes"
"sync/atomic"
"testing"
"time"
)
@ -66,7 +67,7 @@ func TestDatagramBuffer_RW(t *testing.T) {
)
return
}
if !pipe.closed {
if atomic.LoadUint32(&pipe.closed) != 1 {
t.Error("expecting closed pipe, not closed")
}
})