diff --git a/internal/multiplex/session.go b/internal/multiplex/session.go index dcc36c3..210d3a4 100644 --- a/internal/multiplex/session.go +++ b/internal/multiplex/session.go @@ -25,6 +25,13 @@ type Obfuscator struct { SessionKey []byte } +type SwitchboardStrategy int + +const ( + FixedConnMapping SwitchboardStrategy = iota + Uniform +) + type SessionConfig struct { *Obfuscator @@ -32,6 +39,10 @@ type SessionConfig struct { // This is supposed to read one TLS message, the same as GoQuiet's ReadTillDrain UnitRead func(net.Conn, []byte) (int, error) + + Unordered bool + + SwitchboardStrategy SwitchboardStrategy } type Session struct { @@ -71,7 +82,13 @@ func MakeSession(id uint32, config *SessionConfig) *Session { if config.Valve == nil { config.Valve = UNLIMITED_VALVE } - sesh.sb = makeSwitchboard(sesh, config.Valve) + + sbConfig := &switchboardConfig{ + Valve: config.Valve, + unordered: config.Unordered, + strategy: config.SwitchboardStrategy, + } + sesh.sb = makeSwitchboard(sesh, sbConfig) go sesh.timeoutAfter(30 * time.Second) return sesh } diff --git a/internal/multiplex/stream.go b/internal/multiplex/stream.go index 2beb08e..c393310 100644 --- a/internal/multiplex/stream.go +++ b/internal/multiplex/stream.go @@ -57,7 +57,13 @@ func makeStream(sesh *Session, id uint32, assignedConnId uint32) *Stream { func (s *Stream) isClosed() bool { return atomic.LoadUint32(&s.closed) == 1 } -func (s *Stream) writeFrame(frame *Frame) { s.sorter.writeNewFrame(frame) } +func (s *Stream) writeFrame(frame *Frame) { + if s.session.Unordered { + s.sortedBuf.Write(frame.Payload) + } else { + s.sorter.writeNewFrame(frame) + } +} func (s *Stream) Read(buf []byte) (n int, err error) { //log.Tracef("attempting to read from stream %v", s.id) diff --git a/internal/multiplex/switchboard.go b/internal/multiplex/switchboard.go index 095db96..cd8cefb 100644 --- a/internal/multiplex/switchboard.go +++ b/internal/multiplex/switchboard.go @@ -9,11 +9,17 @@ import ( "sync/atomic" ) +type switchboardConfig struct { + Valve + unordered bool + strategy SwitchboardStrategy +} + // switchboard is responsible for keeping the reference of TLS connections between client and server type switchboard struct { session *Session - Valve + *switchboardConfig connsM sync.RWMutex conns map[uint32]net.Conn @@ -22,13 +28,13 @@ type switchboard struct { broken uint32 } -func makeSwitchboard(sesh *Session, valve Valve) *switchboard { +func makeSwitchboard(sesh *Session, config *switchboardConfig) *switchboard { // rates are uint64 because in the usermanager we want the bandwidth to be atomically // operated (so that the bandwidth can change on the fly). sb := &switchboard{ - session: sesh, - Valve: valve, - conns: make(map[uint32]net.Conn), + session: sesh, + switchboardConfig: config, + conns: make(map[uint32]net.Conn), } return sb } diff --git a/internal/multiplex/switchboard_test.go b/internal/multiplex/switchboard_test.go index c5e36f7..4406826 100644 --- a/internal/multiplex/switchboard_test.go +++ b/internal/multiplex/switchboard_test.go @@ -12,7 +12,12 @@ func BenchmarkSwitchboard_Send(b *testing.B) { UnitRead: nil, } sesh := MakeSession(0, seshConfig) - sb := makeSwitchboard(sesh, UNLIMITED_VALVE) + sbConfig := &switchboardConfig{ + Valve: UNLIMITED_VALVE, + unordered: false, + strategy: 0, + } + sb := makeSwitchboard(sesh, sbConfig) hole := newBlackHole() sb.addConn(hole) connId, err := sb.assignRandomConn()