mirror of https://github.com/cbeuw/Cloak
Refactor frame sorter
This commit is contained in:
parent
9fcc328797
commit
d46fa74924
|
|
@ -2,7 +2,9 @@ package multiplex
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/heap"
|
"container/heap"
|
||||||
//"log"
|
"io"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// The data is multiplexed through several TCP connections, therefore the
|
// The data is multiplexed through several TCP connections, therefore the
|
||||||
|
|
@ -50,73 +52,101 @@ func (sh *sorterHeap) Pop() interface{} {
|
||||||
return x
|
return x
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Stream) writeNewFrame(f *Frame) {
|
type frameSorter struct {
|
||||||
s.newFrameCh <- f
|
nextRecvSeq uint32
|
||||||
|
rev int
|
||||||
|
sh sorterHeap
|
||||||
|
wrapMode bool
|
||||||
|
|
||||||
|
// New frames are received through newFrameCh by frameSorter
|
||||||
|
newFrameCh chan *Frame
|
||||||
|
|
||||||
|
output io.WriteCloser
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFrameSorter(output io.WriteCloser) *frameSorter {
|
||||||
|
fs := &frameSorter{
|
||||||
|
sh: []*frameNode{},
|
||||||
|
newFrameCh: make(chan *Frame, 1024),
|
||||||
|
rev: 0,
|
||||||
|
output: output,
|
||||||
|
}
|
||||||
|
go fs.recvNewFrame()
|
||||||
|
return fs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *frameSorter) writeNewFrame(f *Frame) {
|
||||||
|
fs.newFrameCh <- f
|
||||||
|
}
|
||||||
|
func (fs *frameSorter) Close() error {
|
||||||
|
fs.newFrameCh <- nil
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// recvNewFrame is a forever running loop which receives frames unordered,
|
// recvNewFrame is a forever running loop which receives frames unordered,
|
||||||
// cache and order them and send them into sortedBufCh
|
// cache and order them and send them into sortedBufCh
|
||||||
func (s *Stream) recvNewFrame() {
|
func (fs *frameSorter) recvNewFrame() {
|
||||||
|
defer log.Tracef("a recvNewFrame has returned gracefully")
|
||||||
for {
|
for {
|
||||||
f := <-s.newFrameCh
|
f := <-fs.newFrameCh
|
||||||
if f == nil {
|
if f == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// when there's no ooo packages in heap and we receive the next package in order
|
// when there'fs no ooo packages in heap and we receive the next package in order
|
||||||
if len(s.sh) == 0 && f.Seq == s.nextRecvSeq {
|
if len(fs.sh) == 0 && f.Seq == fs.nextRecvSeq {
|
||||||
if f.Closing == 1 {
|
if f.Closing == 1 {
|
||||||
// empty data indicates closing signal
|
// empty data indicates closing signal
|
||||||
s.passiveClose()
|
fs.output.Close()
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
s.sortedBuf.Write(f.Payload)
|
fs.output.Write(f.Payload)
|
||||||
s.nextRecvSeq += 1
|
fs.nextRecvSeq += 1
|
||||||
if s.nextRecvSeq == 0 { // getting wrapped
|
if fs.nextRecvSeq == 0 { // getting wrapped
|
||||||
s.rev += 1
|
fs.rev += 1
|
||||||
s.wrapMode = false
|
fs.wrapMode = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
fs := &frameNode{
|
node := &frameNode{
|
||||||
trueSeq: 0,
|
trueSeq: 0,
|
||||||
frame: f,
|
frame: f,
|
||||||
}
|
}
|
||||||
|
|
||||||
if f.Seq < s.nextRecvSeq {
|
if f.Seq < fs.nextRecvSeq {
|
||||||
// For the ease of demonstration, assume seq is uint8, i.e. it wraps around after 255
|
// For the ease of demonstration, assume seq is uint8, i.e. it wraps around after 255
|
||||||
// e.g. we are on rev=0 (wrap has not happened yet)
|
// e.g. we are on rev=0 (wrap has not happened yet)
|
||||||
// and we get the order of recv as 253 254 0 1
|
// and we get the order of recv as 253 254 0 1
|
||||||
// after 254, nextN should be 255, but 0 is received and 0 < 255
|
// after 254, nextN should be 255, but 0 is received and 0 < 255
|
||||||
// now 0 should have a trueSeq of 256
|
// now 0 should have a trueSeq of 256
|
||||||
if !s.wrapMode {
|
if !fs.wrapMode {
|
||||||
// wrapMode is true when the latest seq is wrapped but nextN is not
|
// wrapMode is true when the latest seq is wrapped but nextN is not
|
||||||
s.wrapMode = true
|
fs.wrapMode = true
|
||||||
}
|
}
|
||||||
fs.trueSeq = uint64(1<<32)*uint64(s.rev+1) + uint64(f.Seq) + 1
|
node.trueSeq = uint64(1<<32)*uint64(fs.rev+1) + uint64(f.Seq) + 1
|
||||||
// +1 because wrapped 0 should have trueSeq of 256 instead of 255
|
// +1 because wrapped 0 should have trueSeq of 256 instead of 255
|
||||||
// when this bit was run on 1, the trueSeq of 1 would become 256
|
// when this bit was run on 1, the trueSeq of 1 would become 256
|
||||||
} else {
|
} else {
|
||||||
fs.trueSeq = uint64(1<<32)*uint64(s.rev) + uint64(f.Seq)
|
node.trueSeq = uint64(1<<32)*uint64(fs.rev) + uint64(f.Seq)
|
||||||
// when this bit was run on 255, the trueSeq of 255 would be 255
|
// when this bit was run on 255, the trueSeq of 255 would be 255
|
||||||
}
|
}
|
||||||
|
|
||||||
heap.Push(&s.sh, fs)
|
heap.Push(&fs.sh, node)
|
||||||
// Keep popping from the heap until empty or to the point that the wanted seq was not received
|
// Keep popping from the heap until empty or to the point that the wanted seq was not received
|
||||||
for len(s.sh) > 0 && s.sh[0].frame.Seq == s.nextRecvSeq {
|
for len(fs.sh) > 0 && fs.sh[0].frame.Seq == fs.nextRecvSeq {
|
||||||
f = heap.Pop(&s.sh).(*frameNode).frame
|
f = heap.Pop(&fs.sh).(*frameNode).frame
|
||||||
if f.Closing == 1 {
|
if f.Closing == 1 {
|
||||||
// empty data indicates closing signal
|
// empty data indicates closing signal
|
||||||
s.passiveClose()
|
fs.output.Close()
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
s.sortedBuf.Write(f.Payload)
|
fs.output.Write(f.Payload)
|
||||||
s.nextRecvSeq += 1
|
fs.nextRecvSeq += 1
|
||||||
if s.nextRecvSeq == 0 { // getting wrapped
|
if fs.nextRecvSeq == 0 { // getting wrapped
|
||||||
s.rev += 1
|
fs.rev += 1
|
||||||
s.wrapMode = false
|
fs.wrapMode = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,21 +1,32 @@
|
||||||
package multiplex
|
package multiplex
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"time"
|
||||||
|
|
||||||
//"log"
|
//"log"
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type BufferReaderWriterCloser struct {
|
||||||
|
*bytes.Buffer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BufferReaderWriterCloser) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
func TestRecvNewFrame(t *testing.T) {
|
func TestRecvNewFrame(t *testing.T) {
|
||||||
inOrder := []uint64{5, 6, 7, 8, 9, 10, 11}
|
inOrder := []uint64{5, 6, 7, 8, 9, 10, 11}
|
||||||
outOfOrder0 := []uint64{5, 7, 8, 6, 11, 10, 9}
|
outOfOrder0 := []uint64{5, 7, 8, 6, 11, 10, 9}
|
||||||
outOfOrder1 := []uint64{1, 96, 47, 2, 29, 18, 60, 8, 74, 22, 82, 58, 44, 51, 57, 71, 90, 94, 68, 83, 61, 91, 39, 97, 85, 63, 46, 73, 54, 84, 76, 98, 93, 79, 75, 50, 67, 37, 92, 99, 42, 77, 17, 16, 38, 3, 100, 24, 31, 7, 36, 40, 86, 64, 34, 45, 12, 5, 9, 27, 21, 26, 35, 6, 65, 69, 53, 4, 48, 28, 30, 56, 32, 11, 80, 66, 25, 41, 78, 13, 88, 62, 15, 70, 49, 43, 72, 23, 10, 55, 52, 95, 14, 59, 87, 33, 19, 20, 81, 89}
|
outOfOrder1 := []uint64{1, 96, 47, 2, 29, 18, 60, 8, 74, 22, 82, 58, 44, 51, 57, 71, 90, 94, 68, 83, 61, 91, 39, 97, 85, 63, 46, 73, 54, 84, 76, 98, 93, 79, 75, 50, 67, 37, 92, 99, 42, 77, 17, 16, 38, 3, 100, 24, 31, 7, 36, 40, 86, 64, 34, 45, 12, 5, 9, 27, 21, 26, 35, 6, 65, 69, 53, 4, 48, 28, 30, 56, 32, 11, 80, 66, 25, 41, 78, 13, 88, 62, 15, 70, 49, 43, 72, 23, 10, 55, 52, 95, 14, 59, 87, 33, 19, 20, 81, 89}
|
||||||
outOfOrderWrap0 := []uint64{1<<32 - 5, 1<<32 + 3, 1 << 32, 1<<32 - 3, 1<<32 - 4, 1<<32 + 2, 1<<32 - 2, 1<<32 - 1, 1<<32 + 1}
|
outOfOrderWrap0 := []uint64{1<<32 - 5, 1<<32 + 3, 1 << 32, 1<<32 - 3, 1<<32 - 4, 1<<32 + 2, 1<<32 - 2, 1<<32 - 1, 1<<32 + 1}
|
||||||
sets := [][]uint64{inOrder, outOfOrder0, outOfOrder1, outOfOrderWrap0}
|
|
||||||
for _, set := range sets {
|
sortedBuf := &BufferReaderWriterCloser{new(bytes.Buffer)}
|
||||||
stream := makeStream(1, &Session{})
|
test := func(set []uint64, ct *testing.T) {
|
||||||
stream.nextRecvSeq = uint32(set[0])
|
fs := NewFrameSorter(sortedBuf)
|
||||||
|
fs.nextRecvSeq = uint32(set[0])
|
||||||
for _, n := range set {
|
for _, n := range set {
|
||||||
bu64 := make([]byte, 8)
|
bu64 := make([]byte, 8)
|
||||||
binary.BigEndian.PutUint64(bu64, n)
|
binary.BigEndian.PutUint64(bu64, n)
|
||||||
|
|
@ -23,33 +34,49 @@ func TestRecvNewFrame(t *testing.T) {
|
||||||
Seq: uint32(n),
|
Seq: uint32(n),
|
||||||
Payload: bu64,
|
Payload: bu64,
|
||||||
}
|
}
|
||||||
stream.writeNewFrame(frame)
|
fs.writeNewFrame(frame)
|
||||||
}
|
}
|
||||||
|
|
||||||
var testSorted []uint32
|
time.Sleep(100 * time.Microsecond)
|
||||||
|
|
||||||
|
var sortedResult []uint64
|
||||||
for x := 0; x < len(set); x++ {
|
for x := 0; x < len(set); x++ {
|
||||||
oct := make([]byte, 8)
|
oct := make([]byte, 8)
|
||||||
stream.sortedBuf.Read(oct)
|
n, err := sortedBuf.Read(oct)
|
||||||
|
if n != 8 || err != nil {
|
||||||
|
ct.Error("failed to read from sorted Buf", n, err)
|
||||||
|
}
|
||||||
//log.Print(p)
|
//log.Print(p)
|
||||||
testSorted = append(testSorted, uint32(binary.BigEndian.Uint64(oct)))
|
sortedResult = append(sortedResult, binary.BigEndian.Uint64(oct))
|
||||||
}
|
|
||||||
sorted64 := make([]uint64, len(set))
|
|
||||||
copy(sorted64, set)
|
|
||||||
sort.Slice(sorted64, func(i, j int) bool { return sorted64[i] < sorted64[j] })
|
|
||||||
sorted32 := make([]uint32, len(set))
|
|
||||||
for i, _ := range sorted64 {
|
|
||||||
sorted32[i] = uint32(sorted64[i])
|
|
||||||
}
|
}
|
||||||
|
targetSorted := make([]uint64, len(set))
|
||||||
|
copy(targetSorted, set)
|
||||||
|
sort.Slice(targetSorted, func(i, j int) bool { return targetSorted[i] < targetSorted[j] })
|
||||||
|
|
||||||
for i, _ := range sorted32 {
|
for i, _ := range targetSorted {
|
||||||
if sorted32[i] != testSorted[i] {
|
if sortedResult[i] != targetSorted[i] {
|
||||||
t.Error(
|
goto fail
|
||||||
"For", set,
|
|
||||||
"expecting", sorted32,
|
|
||||||
"got", testSorted,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
stream.newFrameCh <- nil
|
fs.Close()
|
||||||
|
return
|
||||||
|
fail:
|
||||||
|
ct.Error(
|
||||||
|
"expecting", targetSorted,
|
||||||
|
"got", sortedResult,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t.Run("in order", func(t *testing.T) {
|
||||||
|
test(inOrder, t)
|
||||||
|
})
|
||||||
|
t.Run("out of order0", func(t *testing.T) {
|
||||||
|
test(outOfOrder0, t)
|
||||||
|
})
|
||||||
|
t.Run("out of order1", func(t *testing.T) {
|
||||||
|
test(outOfOrder1, t)
|
||||||
|
})
|
||||||
|
t.Run("out of order wrap", func(t *testing.T) {
|
||||||
|
test(outOfOrderWrap0, t)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,17 +19,10 @@ type Stream struct {
|
||||||
|
|
||||||
session *Session
|
session *Session
|
||||||
|
|
||||||
// Explanations of the following 4 fields can be found in frameSorter.go
|
|
||||||
nextRecvSeq uint32
|
|
||||||
rev int
|
|
||||||
sh sorterHeap
|
|
||||||
wrapMode bool
|
|
||||||
|
|
||||||
// New frames are received through newFrameCh by frameSorter
|
|
||||||
newFrameCh chan *Frame
|
|
||||||
|
|
||||||
sortedBuf *bufferedPipe
|
sortedBuf *bufferedPipe
|
||||||
|
|
||||||
|
sorter *frameSorter
|
||||||
|
|
||||||
// atomic
|
// atomic
|
||||||
nextSendSeq uint32
|
nextSendSeq uint32
|
||||||
|
|
||||||
|
|
@ -42,15 +35,16 @@ type Stream struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeStream(id uint32, sesh *Session) *Stream {
|
func makeStream(id uint32, sesh *Session) *Stream {
|
||||||
|
buf := NewBufferedPipe()
|
||||||
|
|
||||||
stream := &Stream{
|
stream := &Stream{
|
||||||
id: id,
|
id: id,
|
||||||
session: sesh,
|
session: sesh,
|
||||||
sh: []*frameNode{},
|
sortedBuf: buf,
|
||||||
newFrameCh: make(chan *Frame, 1024),
|
obfsBuf: make([]byte, 17000),
|
||||||
sortedBuf: NewBufferedPipe(),
|
sorter: NewFrameSorter(buf),
|
||||||
obfsBuf: make([]byte, 17000),
|
|
||||||
}
|
}
|
||||||
go stream.recvNewFrame()
|
|
||||||
log.Tracef("stream %v opened", id)
|
log.Tracef("stream %v opened", id)
|
||||||
return stream
|
return stream
|
||||||
}
|
}
|
||||||
|
|
@ -108,7 +102,7 @@ func (s *Stream) Write(in []byte) (n int, err error) {
|
||||||
// the necessary steps to mark the stream as closed and to release resources
|
// the necessary steps to mark the stream as closed and to release resources
|
||||||
func (s *Stream) _close() {
|
func (s *Stream) _close() {
|
||||||
atomic.StoreUint32(&s.closed, 1)
|
atomic.StoreUint32(&s.closed, 1)
|
||||||
s.newFrameCh <- nil // this will trigger frameSorter to return
|
s.sorter.Close() // this will trigger frameSorter to return
|
||||||
s.sortedBuf.Close()
|
s.sortedBuf.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -148,7 +148,7 @@ func (sb *switchboard) deplex(ce *connEnclave) {
|
||||||
// (this happens when ss-server and ss-local closes the stream
|
// (this happens when ss-server and ss-local closes the stream
|
||||||
// simutaneously), we don't do anything
|
// simutaneously), we don't do anything
|
||||||
if stream != nil {
|
if stream != nil {
|
||||||
stream.writeNewFrame(frame)
|
stream.sorter.writeNewFrame(frame)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue