diff --git a/internal/multiplex/datagramBufferedPipe.go b/internal/multiplex/datagramBufferedPipe.go index e1a0462..a7b99e4 100644 --- a/internal/multiplex/datagramBufferedPipe.go +++ b/internal/multiplex/datagramBufferedPipe.go @@ -112,7 +112,7 @@ func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { } } -func (d *datagramBufferedPipe) Write(f Frame) (toBeClosed bool, err error) { +func (d *datagramBufferedPipe) Write(f *Frame) (toBeClosed bool, err error) { d.rwCond.L.Lock() defer d.rwCond.L.Unlock() if d.buf == nil { diff --git a/internal/multiplex/datagramBufferedPipe_test.go b/internal/multiplex/datagramBufferedPipe_test.go index 4a5d4e2..6b20f76 100644 --- a/internal/multiplex/datagramBufferedPipe_test.go +++ b/internal/multiplex/datagramBufferedPipe_test.go @@ -10,7 +10,7 @@ func TestDatagramBuffer_RW(t *testing.T) { b := []byte{0x01, 0x02, 0x03} t.Run("simple write", func(t *testing.T) { pipe := NewDatagramBufferedPipe() - _, err := pipe.Write(Frame{Payload: b}) + _, err := pipe.Write(&Frame{Payload: b}) if err != nil { t.Error( "expecting", "nil error", @@ -22,7 +22,7 @@ func TestDatagramBuffer_RW(t *testing.T) { t.Run("simple read", func(t *testing.T) { pipe := NewDatagramBufferedPipe() - _, _ = pipe.Write(Frame{Payload: b}) + _, _ = pipe.Write(&Frame{Payload: b}) b2 := make([]byte, len(b)) n, err := pipe.Read(b2) if n != len(b) { @@ -55,7 +55,7 @@ func TestDatagramBuffer_RW(t *testing.T) { t.Run("writing closing frame", func(t *testing.T) { pipe := NewDatagramBufferedPipe() - toBeClosed, err := pipe.Write(Frame{Closing: closingStream}) + toBeClosed, err := pipe.Write(&Frame{Closing: closingStream}) if !toBeClosed { t.Error("should be to be closed") } @@ -77,7 +77,7 @@ func TestDatagramBuffer_BlockingRead(t *testing.T) { b := []byte{0x01, 0x02, 0x03} go func() { time.Sleep(readBlockTime) - pipe.Write(Frame{Payload: b}) + pipe.Write(&Frame{Payload: b}) }() b2 := make([]byte, len(b)) n, err := pipe.Read(b2) @@ -110,7 +110,7 @@ func TestDatagramBuffer_BlockingRead(t *testing.T) { func TestDatagramBuffer_CloseThenRead(t *testing.T) { pipe := NewDatagramBufferedPipe() b := []byte{0x01, 0x02, 0x03} - pipe.Write(Frame{Payload: b}) + pipe.Write(&Frame{Payload: b}) b2 := make([]byte, len(b)) pipe.Close() n, err := pipe.Read(b2) diff --git a/internal/multiplex/recvBuffer.go b/internal/multiplex/recvBuffer.go index 0797daf..63f1f6f 100644 --- a/internal/multiplex/recvBuffer.go +++ b/internal/multiplex/recvBuffer.go @@ -15,7 +15,7 @@ type recvBuffer interface { // when the buffer is empty. io.ReadCloser io.WriterTo - Write(Frame) (toBeClosed bool, err error) + Write(*Frame) (toBeClosed bool, err error) SetReadDeadline(time time.Time) // SetWriteToTimeout sets the duration a recvBuffer waits in a WriteTo call when nothing // has been written for a while. After that duration it should return ErrTimeout diff --git a/internal/multiplex/session.go b/internal/multiplex/session.go index 6a88aa3..92b5cf8 100644 --- a/internal/multiplex/session.go +++ b/internal/multiplex/session.go @@ -236,12 +236,12 @@ func (sesh *Session) recvDataFromRemote(data []byte) error { // this is when the stream existed before but has since been closed. We do nothing return nil } - return existingStreamI.(*Stream).recvFrame(*frame) + return existingStreamI.(*Stream).recvFrame(frame) } else { // new stream sesh.streamCountIncr() sesh.acceptCh <- newStream - return newStream.recvFrame(*frame) + return newStream.recvFrame(frame) } } diff --git a/internal/multiplex/stream.go b/internal/multiplex/stream.go index 2245f5b..d827117 100644 --- a/internal/multiplex/stream.go +++ b/internal/multiplex/stream.go @@ -78,7 +78,7 @@ func (s *Stream) getRecvBuf() recvBuffer { } // receive a readily deobfuscated Frame so its payload can later be Read -func (s *Stream) recvFrame(frame Frame) error { +func (s *Stream) recvFrame(frame *Frame) error { toBeClosed, err := s.getRecvBuf().Write(frame) if toBeClosed { err = s.passiveClose() diff --git a/internal/multiplex/streamBuffer.go b/internal/multiplex/streamBuffer.go index 4adfae2..13cc523 100644 --- a/internal/multiplex/streamBuffer.go +++ b/internal/multiplex/streamBuffer.go @@ -63,7 +63,7 @@ func NewStreamBuffer() *streamBuffer { return sb } -func (sb *streamBuffer) Write(f Frame) (toBeClosed bool, err error) { +func (sb *streamBuffer) Write(f *Frame) (toBeClosed bool, err error) { sb.recvM.Lock() defer sb.recvM.Unlock() // when there'fs no ooo packages in heap and we receive the next package in order @@ -81,10 +81,11 @@ func (sb *streamBuffer) Write(f Frame) (toBeClosed bool, err error) { return false, fmt.Errorf("seq %v is smaller than nextRecvSeq %v", f.Seq, sb.nextRecvSeq) } - heap.Push(&sb.sh, &f) + saved := *f + heap.Push(&sb.sh, &saved) // Keep popping from the heap until empty or to the point that the wanted seq was not received for len(sb.sh) > 0 && sb.sh[0].Seq == sb.nextRecvSeq { - f = *heap.Pop(&sb.sh).(*Frame) + f = heap.Pop(&sb.sh).(*Frame) if f.Closing != closingNothing { return true, nil } else { diff --git a/internal/multiplex/streamBuffer_test.go b/internal/multiplex/streamBuffer_test.go index 67fb3a5..b36bb6a 100644 --- a/internal/multiplex/streamBuffer_test.go +++ b/internal/multiplex/streamBuffer_test.go @@ -20,11 +20,10 @@ func TestRecvNewFrame(t *testing.T) { for _, n := range set { bu64 := make([]byte, 8) binary.BigEndian.PutUint64(bu64, n) - frame := Frame{ + sb.Write(&Frame{ Seq: n, Payload: bu64, - } - sb.Write(frame) + }) } var sortedResult []uint64 @@ -80,7 +79,7 @@ func TestStreamBuffer_RecvThenClose(t *testing.T) { Closing: 0, Payload: testData, } - sb.Write(testFrame) + sb.Write(&testFrame) sb.Close() readBuf := make([]byte, testDataLen)