diff --git a/internal/multiplex/bufferedPipe.go b/internal/multiplex/bufferedPipe.go index 12da757..d42930a 100644 --- a/internal/multiplex/bufferedPipe.go +++ b/internal/multiplex/bufferedPipe.go @@ -6,6 +6,7 @@ import ( "bytes" "io" "sync" + "sync/atomic" ) const BUF_SIZE_LIMIT = 1 << 20 * 500 @@ -13,7 +14,7 @@ const BUF_SIZE_LIMIT = 1 << 20 * 500 // The point of a bufferedPipe is that Read() will block until data is available type bufferedPipe struct { buf *bytes.Buffer - closed bool + closed uint32 rwCond *sync.Cond } @@ -29,7 +30,7 @@ func (p *bufferedPipe) Read(target []byte) (int, error) { p.rwCond.L.Lock() defer p.rwCond.L.Unlock() for { - if p.closed && p.buf.Len() == 0 { + if atomic.LoadUint32(&p.closed) == 1 && p.buf.Len() == 0 { return 0, io.EOF } @@ -48,7 +49,7 @@ func (p *bufferedPipe) Write(input []byte) (int, error) { p.rwCond.L.Lock() defer p.rwCond.L.Unlock() for { - if p.closed { + if atomic.LoadUint32(&p.closed) == 1 { return 0, io.ErrClosedPipe } if p.buf.Len() <= BUF_SIZE_LIMIT { @@ -64,10 +65,7 @@ func (p *bufferedPipe) Write(input []byte) (int, error) { } func (p *bufferedPipe) Close() error { - p.rwCond.L.Lock() - defer p.rwCond.L.Unlock() - - p.closed = true + atomic.StoreUint32(&p.closed, 1) p.rwCond.Broadcast() return nil } diff --git a/internal/multiplex/datagramBuffer.go b/internal/multiplex/datagramBuffer.go index 8cf5759..b37d99f 100644 --- a/internal/multiplex/datagramBuffer.go +++ b/internal/multiplex/datagramBuffer.go @@ -6,6 +6,7 @@ import ( "errors" "io" "sync" + "sync/atomic" ) const DATAGRAM_NUMBER_LIMIT = 1024 @@ -15,7 +16,7 @@ const DATAGRAM_NUMBER_LIMIT = 1024 // it won't get chopped up into individual bytes type datagramBuffer struct { buf [][]byte - closed bool + closed uint32 rwCond *sync.Cond } @@ -31,7 +32,7 @@ func (d *datagramBuffer) Read(target []byte) (int, error) { d.rwCond.L.Lock() defer d.rwCond.L.Unlock() for { - if d.closed && len(d.buf) == 0 { + if atomic.LoadUint32(&d.closed) == 1 && len(d.buf) == 0 { return 0, io.EOF } @@ -55,7 +56,7 @@ func (d *datagramBuffer) Write(f Frame) error { d.rwCond.L.Lock() defer d.rwCond.L.Unlock() for { - if d.closed { + if atomic.LoadUint32(&d.closed) == 1 { return io.ErrClosedPipe } if len(d.buf) <= DATAGRAM_NUMBER_LIMIT { @@ -66,7 +67,7 @@ func (d *datagramBuffer) Write(f Frame) error { } if f.Closing == 1 { - d.closed = true + atomic.StoreUint32(&d.closed, 1) d.rwCond.Broadcast() return nil } @@ -80,10 +81,7 @@ func (d *datagramBuffer) Write(f Frame) error { } func (d *datagramBuffer) Close() error { - d.rwCond.L.Lock() - defer d.rwCond.L.Unlock() - - d.closed = true + atomic.StoreUint32(&d.closed, 1) d.rwCond.Broadcast() return nil } diff --git a/internal/multiplex/datagramBuffer_test.go b/internal/multiplex/datagramBuffer_test.go index f64088d..3a7396d 100644 --- a/internal/multiplex/datagramBuffer_test.go +++ b/internal/multiplex/datagramBuffer_test.go @@ -2,6 +2,7 @@ package multiplex import ( "bytes" + "sync/atomic" "testing" "time" ) @@ -63,7 +64,7 @@ func TestDatagramBuffer_RW(t *testing.T) { ) return } - if !pipe.closed { + if atomic.LoadUint32(&pipe.closed) != 1 { t.Error("expecting closed pipe, not closed") } })