diff --git a/internal/multiplex/frameSorter.go b/internal/multiplex/frameSorter.go index 610606e..de5902f 100644 --- a/internal/multiplex/frameSorter.go +++ b/internal/multiplex/frameSorter.go @@ -25,13 +25,13 @@ import ( // Stream.rev counts the amount of time the sequence number gets wrapped type frameNode struct { - seq uint32 - frame *Frame + trueSeq uint64 + frame *Frame } type sorterHeap []*frameNode func (sh sorterHeap) Less(i, j int) bool { - return sh[i].seq < sh[j].seq + return sh[i].trueSeq < sh[j].trueSeq } func (sh sorterHeap) Len() int { return len(sh) @@ -78,13 +78,31 @@ func (s *Stream) recvNewFrame() { } fs := &frameNode{ - f.Seq, - f, + trueSeq: 0, + frame: f, + } + + if f.Seq < s.nextRecvSeq { + // For the ease of demonstration, assume seq is uint8, i.e. it wraps around after 255 + // e.g. we are on rev=0 (wrap has not happened yet) + // and we get the order of recv as 253 254 0 1 + // after 254, nextN should be 255, but 0 is received and 0 < 255 + // now 0 should have a trueSeq of 256 + if !s.wrapMode { + // wrapMode is true when the latest seq is wrapped but nextN is not + s.wrapMode = true + } + fs.trueSeq = uint64(1<<16*(s.rev+1)) + uint64(f.Seq) + 1 + // +1 because wrapped 0 should have trueSeq of 256 instead of 255 + // when this bit was run on 1, the trueSeq of 1 would become 256 + } else { + fs.trueSeq = uint64(1<<16*s.rev) + uint64(f.Seq) + // when this bit was run on 255, the trueSeq of 255 would be 255 } heap.Push(&s.sh, fs) // Keep popping from the heap until empty or to the point that the wanted seq was not received - for len(s.sh) > 0 && s.sh[0].seq == s.nextRecvSeq { + for len(s.sh) > 0 && s.sh[0].frame.Seq == s.nextRecvSeq { frame := heap.Pop(&s.sh).(*frameNode).frame s.pushFrame(frame) } @@ -101,6 +119,7 @@ func (s *Stream) pushFrame(f *Frame) { s.sortedBufCh <- f.Payload s.nextRecvSeq += 1 if s.nextRecvSeq == 0 { // getting wrapped - s.Close() + s.rev += 1 + s.wrapMode = false } } diff --git a/internal/multiplex/stream.go b/internal/multiplex/stream.go index ae51975..6c16bc9 100644 --- a/internal/multiplex/stream.go +++ b/internal/multiplex/stream.go @@ -18,9 +18,9 @@ type Stream struct { // Explanations of the following 4 fields can be found in frameSorter.go nextRecvSeq uint32 - //rev int - sh sorterHeap - //wrapMode bool + rev int + sh sorterHeap + wrapMode bool // New frames are received through newFrameCh by frameSorter newFrameCh chan *Frame