From 545164a300824fd41ac3eb782e110c15db285934 Mon Sep 17 00:00:00 2001 From: Andy Wang Date: Wed, 8 Apr 2020 15:07:35 +0100 Subject: [PATCH] Make send/receive buffer sizes configurable --- cmd/ck-server/ck-server.go | 4 ++-- internal/client/connector.go | 2 +- internal/multiplex/session.go | 21 ++++++++++++++++----- internal/multiplex/stream.go | 2 +- internal/multiplex/switchboard.go | 9 +++++---- internal/server/activeuser.go | 2 +- 6 files changed, 26 insertions(+), 14 deletions(-) diff --git a/cmd/ck-server/ck-server.go b/cmd/ck-server/ck-server.go index b607eb7..52134cc 100644 --- a/cmd/ck-server/ck-server.go +++ b/cmd/ck-server/ck-server.go @@ -90,7 +90,7 @@ func dispatchConnection(conn net.Conn, sta *server.State) { return } log.Trace("finished handshake") - seshConfig := &mux.SessionConfig{ + seshConfig := mux.SessionConfig{ Obfuscator: obfuscator, Valve: nil, UnitRead: ci.Transport.UnitReadFunc(), @@ -122,7 +122,7 @@ func dispatchConnection(conn net.Conn, sta *server.State) { return } - seshConfig := &mux.SessionConfig{ + seshConfig := mux.SessionConfig{ Obfuscator: obfuscator, Valve: nil, UnitRead: ci.Transport.UnitReadFunc(), diff --git a/internal/client/connector.go b/internal/client/connector.go index 492b835..2153f04 100644 --- a/internal/client/connector.go +++ b/internal/client/connector.go @@ -70,7 +70,7 @@ func MakeSession(connConfig *remoteConnConfig, authInfo *authInfo, isAdmin bool) log.Fatal(err) } - seshConfig := &mux.SessionConfig{ + seshConfig := mux.SessionConfig{ Obfuscator: obfuscator, Valve: nil, UnitRead: connConfig.Transport.UnitReadFunc(), diff --git a/internal/multiplex/session.go b/internal/multiplex/session.go index 7719925..4cbb847 100644 --- a/internal/multiplex/session.go +++ b/internal/multiplex/session.go @@ -13,7 +13,8 @@ import ( ) const ( - acceptBacklog = 1024 + acceptBacklog = 1024 + defaultSendRecvBufSize = 20480 ) var ErrBrokenSession = errors.New("broken session") @@ -30,12 +31,15 @@ type SessionConfig struct { UnitRead func(net.Conn, []byte) (int, error) Unordered bool + + SendBufferSize int + ReceiveBufferSize int } type Session struct { id uint32 - *SessionConfig + SessionConfig // atomic nextStreamID uint32 @@ -58,7 +62,7 @@ type Session struct { terminalMsg atomic.Value } -func MakeSession(id uint32, config *SessionConfig) *Session { +func MakeSession(id uint32, config SessionConfig) *Session { sesh := &Session{ id: id, SessionConfig: config, @@ -70,9 +74,16 @@ func MakeSession(id uint32, config *SessionConfig) *Session { if config.Valve == nil { config.Valve = UNLIMITED_VALVE } + if config.SendBufferSize <= 0 { + config.SendBufferSize = defaultSendRecvBufSize + } + if config.ReceiveBufferSize <= 0 { + config.ReceiveBufferSize = defaultSendRecvBufSize + } - sbConfig := &switchboardConfig{ - Valve: config.Valve, + sbConfig := switchboardConfig{ + Valve: config.Valve, + recvBufferSize: config.ReceiveBufferSize, } if config.Unordered { log.Debug("Connection is unordered") diff --git a/internal/multiplex/stream.go b/internal/multiplex/stream.go index 4d1bf39..66f7036 100644 --- a/internal/multiplex/stream.go +++ b/internal/multiplex/stream.go @@ -49,7 +49,7 @@ func makeStream(sesh *Session, id uint32, assignedConnId uint32) *Stream { id: id, session: sesh, recvBuf: recvBuf, - obfsBuf: make([]byte, 17000), //TODO don't leave this hardcoded + obfsBuf: make([]byte, sesh.SendBufferSize), assignedConnId: assignedConnId, } diff --git a/internal/multiplex/switchboard.go b/internal/multiplex/switchboard.go index 071f604..f75cf71 100644 --- a/internal/multiplex/switchboard.go +++ b/internal/multiplex/switchboard.go @@ -16,14 +16,15 @@ const ( type switchboardConfig struct { Valve - strategy switchboardStrategy + strategy switchboardStrategy + recvBufferSize int } // switchboard is responsible for keeping the reference of TCP connections between client and server type switchboard struct { session *Session - *switchboardConfig + switchboardConfig conns sync.Map nextConnId uint32 @@ -31,7 +32,7 @@ type switchboard struct { broken uint32 } -func makeSwitchboard(sesh *Session, config *switchboardConfig) *switchboard { +func makeSwitchboard(sesh *Session, config switchboardConfig) *switchboard { // rates are uint64 because in the usermanager we want the bandwidth to be atomically // operated (so that the bandwidth can change on the fly). sb := &switchboard{ @@ -153,7 +154,7 @@ func (sb *switchboard) closeAll() { // deplex function costantly reads from a TCP connection func (sb *switchboard) deplex(connId uint32, conn net.Conn) { defer conn.Close() - buf := make([]byte, 20480) + buf := make([]byte, sb.recvBufferSize) for { n, err := sb.session.UnitRead(conn, buf) sb.rxWait(n) diff --git a/internal/server/activeuser.go b/internal/server/activeuser.go index 11a5bd1..2bf212c 100644 --- a/internal/server/activeuser.go +++ b/internal/server/activeuser.go @@ -39,7 +39,7 @@ func (u *ActiveUser) CloseSession(sessionID uint32, reason string) { // GetSession returns the reference to an existing session, or if one such session doesn't exist, it queries // the UserManager for the authorisation for a new session. If a new session is allowed, it creates this new session // and returns its reference -func (u *ActiveUser) GetSession(sessionID uint32, config *mux.SessionConfig) (sesh *mux.Session, existing bool, err error) { +func (u *ActiveUser) GetSession(sessionID uint32, config mux.SessionConfig) (sesh *mux.Session, existing bool, err error) { u.sessionsM.Lock() defer u.sessionsM.Unlock() if sesh = u.sessions[sessionID]; sesh != nil {