diff --git a/internal/multiplex/datagramBuffer.go b/internal/multiplex/datagramBuffer.go index 6c1e9fe..397fc4e 100644 --- a/internal/multiplex/datagramBuffer.go +++ b/internal/multiplex/datagramBuffer.go @@ -64,6 +64,7 @@ func (d *datagramBuffer) Write(f Frame) error { } d.rwCond.Wait() } + // TODO: deal with closing frame here data := make([]byte, len(f.Payload)) copy(data, f.Payload) d.buf = append(d.buf, data) diff --git a/internal/multiplex/streamBuffer.go b/internal/multiplex/streamBuffer.go index 5182ba7..44cd147 100644 --- a/internal/multiplex/streamBuffer.go +++ b/internal/multiplex/streamBuffer.go @@ -12,7 +12,6 @@ package multiplex import ( "container/heap" - "errors" "fmt" "sync" ) @@ -60,8 +59,6 @@ func NewStreamBuffer() *streamBuffer { return sb } -var ClosingFrameReceived = errors.New("closed by closing frame") - // recvNewFrame is a forever running loop which receives frames unordered, // cache and order them and send them into sortedBufCh func (sb *streamBuffer) Write(f Frame) error { @@ -70,9 +67,8 @@ func (sb *streamBuffer) Write(f Frame) error { // when there'fs no ooo packages in heap and we receive the next package in order if len(sb.sh) == 0 && f.Seq == sb.nextRecvSeq { if f.Closing == 1 { - // empty data indicates closing signal sb.buf.Close() - return ClosingFrameReceived + return nil } else { sb.buf.Write(f.Payload) sb.nextRecvSeq += 1 @@ -91,7 +87,7 @@ func (sb *streamBuffer) Write(f Frame) error { if f.Closing == 1 { // empty data indicates closing signal sb.buf.Close() - return ClosingFrameReceived + return nil } else { sb.buf.Write(f.Payload) sb.nextRecvSeq += 1