Make buffer close non-blocking

This commit is contained in:
Andy Wang 2019-11-02 21:02:35 +00:00
parent 97cb19c364
commit 9cab4670f4
3 changed files with 13 additions and 16 deletions

View File

@ -6,6 +6,7 @@ import (
"bytes" "bytes"
"io" "io"
"sync" "sync"
"sync/atomic"
) )
const BUF_SIZE_LIMIT = 1 << 20 * 500 const BUF_SIZE_LIMIT = 1 << 20 * 500
@ -13,7 +14,7 @@ const BUF_SIZE_LIMIT = 1 << 20 * 500
// The point of a bufferedPipe is that Read() will block until data is available // The point of a bufferedPipe is that Read() will block until data is available
type bufferedPipe struct { type bufferedPipe struct {
buf *bytes.Buffer buf *bytes.Buffer
closed bool closed uint32
rwCond *sync.Cond rwCond *sync.Cond
} }
@ -29,7 +30,7 @@ func (p *bufferedPipe) Read(target []byte) (int, error) {
p.rwCond.L.Lock() p.rwCond.L.Lock()
defer p.rwCond.L.Unlock() defer p.rwCond.L.Unlock()
for { for {
if p.closed && p.buf.Len() == 0 { if atomic.LoadUint32(&p.closed) == 1 && p.buf.Len() == 0 {
return 0, io.EOF return 0, io.EOF
} }
@ -48,7 +49,7 @@ func (p *bufferedPipe) Write(input []byte) (int, error) {
p.rwCond.L.Lock() p.rwCond.L.Lock()
defer p.rwCond.L.Unlock() defer p.rwCond.L.Unlock()
for { for {
if p.closed { if atomic.LoadUint32(&p.closed) == 1 {
return 0, io.ErrClosedPipe return 0, io.ErrClosedPipe
} }
if p.buf.Len() <= BUF_SIZE_LIMIT { if p.buf.Len() <= BUF_SIZE_LIMIT {
@ -64,10 +65,7 @@ func (p *bufferedPipe) Write(input []byte) (int, error) {
} }
func (p *bufferedPipe) Close() error { func (p *bufferedPipe) Close() error {
p.rwCond.L.Lock() atomic.StoreUint32(&p.closed, 1)
defer p.rwCond.L.Unlock()
p.closed = true
p.rwCond.Broadcast() p.rwCond.Broadcast()
return nil return nil
} }

View File

@ -6,6 +6,7 @@ import (
"errors" "errors"
"io" "io"
"sync" "sync"
"sync/atomic"
) )
const DATAGRAM_NUMBER_LIMIT = 1024 const DATAGRAM_NUMBER_LIMIT = 1024
@ -15,7 +16,7 @@ const DATAGRAM_NUMBER_LIMIT = 1024
// it won't get chopped up into individual bytes // it won't get chopped up into individual bytes
type datagramBuffer struct { type datagramBuffer struct {
buf [][]byte buf [][]byte
closed bool closed uint32
rwCond *sync.Cond rwCond *sync.Cond
} }
@ -31,7 +32,7 @@ func (d *datagramBuffer) Read(target []byte) (int, error) {
d.rwCond.L.Lock() d.rwCond.L.Lock()
defer d.rwCond.L.Unlock() defer d.rwCond.L.Unlock()
for { for {
if d.closed && len(d.buf) == 0 { if atomic.LoadUint32(&d.closed) == 1 && len(d.buf) == 0 {
return 0, io.EOF return 0, io.EOF
} }
@ -55,7 +56,7 @@ func (d *datagramBuffer) Write(f Frame) error {
d.rwCond.L.Lock() d.rwCond.L.Lock()
defer d.rwCond.L.Unlock() defer d.rwCond.L.Unlock()
for { for {
if d.closed { if atomic.LoadUint32(&d.closed) == 1 {
return io.ErrClosedPipe return io.ErrClosedPipe
} }
if len(d.buf) <= DATAGRAM_NUMBER_LIMIT { if len(d.buf) <= DATAGRAM_NUMBER_LIMIT {
@ -66,7 +67,7 @@ func (d *datagramBuffer) Write(f Frame) error {
} }
if f.Closing == 1 { if f.Closing == 1 {
d.closed = true atomic.StoreUint32(&d.closed, 1)
d.rwCond.Broadcast() d.rwCond.Broadcast()
return nil return nil
} }
@ -80,10 +81,7 @@ func (d *datagramBuffer) Write(f Frame) error {
} }
func (d *datagramBuffer) Close() error { func (d *datagramBuffer) Close() error {
d.rwCond.L.Lock() atomic.StoreUint32(&d.closed, 1)
defer d.rwCond.L.Unlock()
d.closed = true
d.rwCond.Broadcast() d.rwCond.Broadcast()
return nil return nil
} }

View File

@ -2,6 +2,7 @@ package multiplex
import ( import (
"bytes" "bytes"
"sync/atomic"
"testing" "testing"
"time" "time"
) )
@ -63,7 +64,7 @@ func TestDatagramBuffer_RW(t *testing.T) {
) )
return return
} }
if !pipe.closed { if atomic.LoadUint32(&pipe.closed) != 1 {
t.Error("expecting closed pipe, not closed") t.Error("expecting closed pipe, not closed")
} }
}) })