diff --git a/internal/multiplex/datagramBuffer.go b/internal/multiplex/datagramBuffer.go index 397fc4e..8cf5759 100644 --- a/internal/multiplex/datagramBuffer.go +++ b/internal/multiplex/datagramBuffer.go @@ -64,7 +64,13 @@ func (d *datagramBuffer) Write(f Frame) error { } d.rwCond.Wait() } - // TODO: deal with closing frame here + + if f.Closing == 1 { + d.closed = true + d.rwCond.Broadcast() + return nil + } + data := make([]byte, len(f.Payload)) copy(data, f.Payload) d.buf = append(d.buf, data) diff --git a/internal/multiplex/datagramBuffer_test.go b/internal/multiplex/datagramBuffer_test.go index 0154ec8..f64088d 100644 --- a/internal/multiplex/datagramBuffer_test.go +++ b/internal/multiplex/datagramBuffer_test.go @@ -7,48 +7,66 @@ import ( ) func TestDatagramBuffer_RW(t *testing.T) { - pipe := NewDatagramBuffer() b := []byte{0x01, 0x02, 0x03} - err := pipe.Write(Frame{Payload: b}) - if err != nil { - t.Error( - "For", "simple write", - "expecting", "nil error", - "got", err, - ) - return - } + t.Run("simple write", func(t *testing.T) { + pipe := NewDatagramBuffer() + err := pipe.Write(Frame{Payload: b}) + if err != nil { + t.Error( + "expecting", "nil error", + "got", err, + ) + return + } + }) - b2 := make([]byte, len(b)) - n, err := pipe.Read(b2) - if n != len(b) { - t.Error( - "For", "number of bytes read", - "expecting", len(b), - "got", n, - ) - return - } - if err != nil { - t.Error( - "For", "simple read", - "expecting", "nil error", - "got", err, - ) - return - } - if !bytes.Equal(b, b2) { - t.Error( - "For", "simple read", - "expecting", b, - "got", b2, - ) - } - if len(pipe.buf) != 0 { - t.Error("buf len is not 0 after finished reading") - return - } + t.Run("simple read", func(t *testing.T) { + pipe := NewDatagramBuffer() + _ = pipe.Write(Frame{Payload: b}) + b2 := make([]byte, len(b)) + n, err := pipe.Read(b2) + if n != len(b) { + t.Error( + "For", "number of bytes read", + "expecting", len(b), + "got", n, + ) + return + } + if err != nil { + t.Error( + "expecting", "nil error", + "got", err, + ) + return + } + if !bytes.Equal(b, b2) { + t.Error( + "expecting", b, + "got", b2, + ) + } + if len(pipe.buf) != 0 { + t.Error("buf len is not 0 after finished reading") + return + } + }) + + t.Run("writing closing frame", func(t *testing.T) { + pipe := NewDatagramBuffer() + err := pipe.Write(Frame{Closing: 1}) + if err != nil { + t.Error( + "expecting", "nil error", + "got", err, + ) + return + } + if !pipe.closed { + t.Error("expecting closed pipe, not closed") + } + }) } func TestDatagramBuffer_BlockingRead(t *testing.T) {