Only allocate and copy frame object into sorter heap when necessary (out of order frame)

This commit is contained in:
Andy Wang 2020-12-22 19:39:13 +00:00
parent 5a3f63f101
commit ff503b06a8
No known key found for this signature in database
GPG Key ID: 181B49F9F38F3374
7 changed files with 17 additions and 17 deletions

View File

@ -112,7 +112,7 @@ func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
} }
} }
func (d *datagramBufferedPipe) Write(f Frame) (toBeClosed bool, err error) { func (d *datagramBufferedPipe) Write(f *Frame) (toBeClosed bool, err error) {
d.rwCond.L.Lock() d.rwCond.L.Lock()
defer d.rwCond.L.Unlock() defer d.rwCond.L.Unlock()
if d.buf == nil { if d.buf == nil {

View File

@ -10,7 +10,7 @@ func TestDatagramBuffer_RW(t *testing.T) {
b := []byte{0x01, 0x02, 0x03} b := []byte{0x01, 0x02, 0x03}
t.Run("simple write", func(t *testing.T) { t.Run("simple write", func(t *testing.T) {
pipe := NewDatagramBufferedPipe() pipe := NewDatagramBufferedPipe()
_, err := pipe.Write(Frame{Payload: b}) _, err := pipe.Write(&Frame{Payload: b})
if err != nil { if err != nil {
t.Error( t.Error(
"expecting", "nil error", "expecting", "nil error",
@ -22,7 +22,7 @@ func TestDatagramBuffer_RW(t *testing.T) {
t.Run("simple read", func(t *testing.T) { t.Run("simple read", func(t *testing.T) {
pipe := NewDatagramBufferedPipe() pipe := NewDatagramBufferedPipe()
_, _ = pipe.Write(Frame{Payload: b}) _, _ = pipe.Write(&Frame{Payload: b})
b2 := make([]byte, len(b)) b2 := make([]byte, len(b))
n, err := pipe.Read(b2) n, err := pipe.Read(b2)
if n != len(b) { if n != len(b) {
@ -55,7 +55,7 @@ func TestDatagramBuffer_RW(t *testing.T) {
t.Run("writing closing frame", func(t *testing.T) { t.Run("writing closing frame", func(t *testing.T) {
pipe := NewDatagramBufferedPipe() pipe := NewDatagramBufferedPipe()
toBeClosed, err := pipe.Write(Frame{Closing: closingStream}) toBeClosed, err := pipe.Write(&Frame{Closing: closingStream})
if !toBeClosed { if !toBeClosed {
t.Error("should be to be closed") t.Error("should be to be closed")
} }
@ -77,7 +77,7 @@ func TestDatagramBuffer_BlockingRead(t *testing.T) {
b := []byte{0x01, 0x02, 0x03} b := []byte{0x01, 0x02, 0x03}
go func() { go func() {
time.Sleep(readBlockTime) time.Sleep(readBlockTime)
pipe.Write(Frame{Payload: b}) pipe.Write(&Frame{Payload: b})
}() }()
b2 := make([]byte, len(b)) b2 := make([]byte, len(b))
n, err := pipe.Read(b2) n, err := pipe.Read(b2)
@ -110,7 +110,7 @@ func TestDatagramBuffer_BlockingRead(t *testing.T) {
func TestDatagramBuffer_CloseThenRead(t *testing.T) { func TestDatagramBuffer_CloseThenRead(t *testing.T) {
pipe := NewDatagramBufferedPipe() pipe := NewDatagramBufferedPipe()
b := []byte{0x01, 0x02, 0x03} b := []byte{0x01, 0x02, 0x03}
pipe.Write(Frame{Payload: b}) pipe.Write(&Frame{Payload: b})
b2 := make([]byte, len(b)) b2 := make([]byte, len(b))
pipe.Close() pipe.Close()
n, err := pipe.Read(b2) n, err := pipe.Read(b2)

View File

@ -15,7 +15,7 @@ type recvBuffer interface {
// when the buffer is empty. // when the buffer is empty.
io.ReadCloser io.ReadCloser
io.WriterTo io.WriterTo
Write(Frame) (toBeClosed bool, err error) Write(*Frame) (toBeClosed bool, err error)
SetReadDeadline(time time.Time) SetReadDeadline(time time.Time)
// SetWriteToTimeout sets the duration a recvBuffer waits in a WriteTo call when nothing // SetWriteToTimeout sets the duration a recvBuffer waits in a WriteTo call when nothing
// has been written for a while. After that duration it should return ErrTimeout // has been written for a while. After that duration it should return ErrTimeout

View File

@ -236,12 +236,12 @@ func (sesh *Session) recvDataFromRemote(data []byte) error {
// this is when the stream existed before but has since been closed. We do nothing // this is when the stream existed before but has since been closed. We do nothing
return nil return nil
} }
return existingStreamI.(*Stream).recvFrame(*frame) return existingStreamI.(*Stream).recvFrame(frame)
} else { } else {
// new stream // new stream
sesh.streamCountIncr() sesh.streamCountIncr()
sesh.acceptCh <- newStream sesh.acceptCh <- newStream
return newStream.recvFrame(*frame) return newStream.recvFrame(frame)
} }
} }

View File

@ -78,7 +78,7 @@ func (s *Stream) getRecvBuf() recvBuffer {
} }
// receive a readily deobfuscated Frame so its payload can later be Read // receive a readily deobfuscated Frame so its payload can later be Read
func (s *Stream) recvFrame(frame Frame) error { func (s *Stream) recvFrame(frame *Frame) error {
toBeClosed, err := s.getRecvBuf().Write(frame) toBeClosed, err := s.getRecvBuf().Write(frame)
if toBeClosed { if toBeClosed {
err = s.passiveClose() err = s.passiveClose()

View File

@ -63,7 +63,7 @@ func NewStreamBuffer() *streamBuffer {
return sb return sb
} }
func (sb *streamBuffer) Write(f Frame) (toBeClosed bool, err error) { func (sb *streamBuffer) Write(f *Frame) (toBeClosed bool, err error) {
sb.recvM.Lock() sb.recvM.Lock()
defer sb.recvM.Unlock() defer sb.recvM.Unlock()
// when there'fs no ooo packages in heap and we receive the next package in order // when there'fs no ooo packages in heap and we receive the next package in order
@ -81,10 +81,11 @@ func (sb *streamBuffer) Write(f Frame) (toBeClosed bool, err error) {
return false, fmt.Errorf("seq %v is smaller than nextRecvSeq %v", f.Seq, sb.nextRecvSeq) return false, fmt.Errorf("seq %v is smaller than nextRecvSeq %v", f.Seq, sb.nextRecvSeq)
} }
heap.Push(&sb.sh, &f) saved := *f
heap.Push(&sb.sh, &saved)
// Keep popping from the heap until empty or to the point that the wanted seq was not received // Keep popping from the heap until empty or to the point that the wanted seq was not received
for len(sb.sh) > 0 && sb.sh[0].Seq == sb.nextRecvSeq { for len(sb.sh) > 0 && sb.sh[0].Seq == sb.nextRecvSeq {
f = *heap.Pop(&sb.sh).(*Frame) f = heap.Pop(&sb.sh).(*Frame)
if f.Closing != closingNothing { if f.Closing != closingNothing {
return true, nil return true, nil
} else { } else {

View File

@ -20,11 +20,10 @@ func TestRecvNewFrame(t *testing.T) {
for _, n := range set { for _, n := range set {
bu64 := make([]byte, 8) bu64 := make([]byte, 8)
binary.BigEndian.PutUint64(bu64, n) binary.BigEndian.PutUint64(bu64, n)
frame := Frame{ sb.Write(&Frame{
Seq: n, Seq: n,
Payload: bu64, Payload: bu64,
} })
sb.Write(frame)
} }
var sortedResult []uint64 var sortedResult []uint64
@ -80,7 +79,7 @@ func TestStreamBuffer_RecvThenClose(t *testing.T) {
Closing: 0, Closing: 0,
Payload: testData, Payload: testData,
} }
sb.Write(testFrame) sb.Write(&testFrame)
sb.Close() sb.Close()
readBuf := make([]byte, testDataLen) readBuf := make([]byte, testDataLen)