Make send/receive buffer sizes configurable

This commit is contained in:
Andy Wang 2020-04-08 15:07:35 +01:00
parent 1de5045003
commit 545164a300
6 changed files with 26 additions and 14 deletions

View File

@ -90,7 +90,7 @@ func dispatchConnection(conn net.Conn, sta *server.State) {
return return
} }
log.Trace("finished handshake") log.Trace("finished handshake")
seshConfig := &mux.SessionConfig{ seshConfig := mux.SessionConfig{
Obfuscator: obfuscator, Obfuscator: obfuscator,
Valve: nil, Valve: nil,
UnitRead: ci.Transport.UnitReadFunc(), UnitRead: ci.Transport.UnitReadFunc(),
@ -122,7 +122,7 @@ func dispatchConnection(conn net.Conn, sta *server.State) {
return return
} }
seshConfig := &mux.SessionConfig{ seshConfig := mux.SessionConfig{
Obfuscator: obfuscator, Obfuscator: obfuscator,
Valve: nil, Valve: nil,
UnitRead: ci.Transport.UnitReadFunc(), UnitRead: ci.Transport.UnitReadFunc(),

View File

@ -70,7 +70,7 @@ func MakeSession(connConfig *remoteConnConfig, authInfo *authInfo, isAdmin bool)
log.Fatal(err) log.Fatal(err)
} }
seshConfig := &mux.SessionConfig{ seshConfig := mux.SessionConfig{
Obfuscator: obfuscator, Obfuscator: obfuscator,
Valve: nil, Valve: nil,
UnitRead: connConfig.Transport.UnitReadFunc(), UnitRead: connConfig.Transport.UnitReadFunc(),

View File

@ -13,7 +13,8 @@ import (
) )
const ( const (
acceptBacklog = 1024 acceptBacklog = 1024
defaultSendRecvBufSize = 20480
) )
var ErrBrokenSession = errors.New("broken session") var ErrBrokenSession = errors.New("broken session")
@ -30,12 +31,15 @@ type SessionConfig struct {
UnitRead func(net.Conn, []byte) (int, error) UnitRead func(net.Conn, []byte) (int, error)
Unordered bool Unordered bool
SendBufferSize int
ReceiveBufferSize int
} }
type Session struct { type Session struct {
id uint32 id uint32
*SessionConfig SessionConfig
// atomic // atomic
nextStreamID uint32 nextStreamID uint32
@ -58,7 +62,7 @@ type Session struct {
terminalMsg atomic.Value terminalMsg atomic.Value
} }
func MakeSession(id uint32, config *SessionConfig) *Session { func MakeSession(id uint32, config SessionConfig) *Session {
sesh := &Session{ sesh := &Session{
id: id, id: id,
SessionConfig: config, SessionConfig: config,
@ -70,9 +74,16 @@ func MakeSession(id uint32, config *SessionConfig) *Session {
if config.Valve == nil { if config.Valve == nil {
config.Valve = UNLIMITED_VALVE config.Valve = UNLIMITED_VALVE
} }
if config.SendBufferSize <= 0 {
config.SendBufferSize = defaultSendRecvBufSize
}
if config.ReceiveBufferSize <= 0 {
config.ReceiveBufferSize = defaultSendRecvBufSize
}
sbConfig := &switchboardConfig{ sbConfig := switchboardConfig{
Valve: config.Valve, Valve: config.Valve,
recvBufferSize: config.ReceiveBufferSize,
} }
if config.Unordered { if config.Unordered {
log.Debug("Connection is unordered") log.Debug("Connection is unordered")

View File

@ -49,7 +49,7 @@ func makeStream(sesh *Session, id uint32, assignedConnId uint32) *Stream {
id: id, id: id,
session: sesh, session: sesh,
recvBuf: recvBuf, recvBuf: recvBuf,
obfsBuf: make([]byte, 17000), //TODO don't leave this hardcoded obfsBuf: make([]byte, sesh.SendBufferSize),
assignedConnId: assignedConnId, assignedConnId: assignedConnId,
} }

View File

@ -16,14 +16,15 @@ const (
type switchboardConfig struct { type switchboardConfig struct {
Valve Valve
strategy switchboardStrategy strategy switchboardStrategy
recvBufferSize int
} }
// switchboard is responsible for keeping the reference of TCP connections between client and server // switchboard is responsible for keeping the reference of TCP connections between client and server
type switchboard struct { type switchboard struct {
session *Session session *Session
*switchboardConfig switchboardConfig
conns sync.Map conns sync.Map
nextConnId uint32 nextConnId uint32
@ -31,7 +32,7 @@ type switchboard struct {
broken uint32 broken uint32
} }
func makeSwitchboard(sesh *Session, config *switchboardConfig) *switchboard { func makeSwitchboard(sesh *Session, config switchboardConfig) *switchboard {
// rates are uint64 because in the usermanager we want the bandwidth to be atomically // rates are uint64 because in the usermanager we want the bandwidth to be atomically
// operated (so that the bandwidth can change on the fly). // operated (so that the bandwidth can change on the fly).
sb := &switchboard{ sb := &switchboard{
@ -153,7 +154,7 @@ func (sb *switchboard) closeAll() {
// deplex function costantly reads from a TCP connection // deplex function costantly reads from a TCP connection
func (sb *switchboard) deplex(connId uint32, conn net.Conn) { func (sb *switchboard) deplex(connId uint32, conn net.Conn) {
defer conn.Close() defer conn.Close()
buf := make([]byte, 20480) buf := make([]byte, sb.recvBufferSize)
for { for {
n, err := sb.session.UnitRead(conn, buf) n, err := sb.session.UnitRead(conn, buf)
sb.rxWait(n) sb.rxWait(n)

View File

@ -39,7 +39,7 @@ func (u *ActiveUser) CloseSession(sessionID uint32, reason string) {
// GetSession returns the reference to an existing session, or if one such session doesn't exist, it queries // GetSession returns the reference to an existing session, or if one such session doesn't exist, it queries
// the UserManager for the authorisation for a new session. If a new session is allowed, it creates this new session // the UserManager for the authorisation for a new session. If a new session is allowed, it creates this new session
// and returns its reference // and returns its reference
func (u *ActiveUser) GetSession(sessionID uint32, config *mux.SessionConfig) (sesh *mux.Session, existing bool, err error) { func (u *ActiveUser) GetSession(sessionID uint32, config mux.SessionConfig) (sesh *mux.Session, existing bool, err error) {
u.sessionsM.Lock() u.sessionsM.Lock()
defer u.sessionsM.Unlock() defer u.sessionsM.Unlock()
if sesh = u.sessions[sessionID]; sesh != nil { if sesh = u.sessions[sessionID]; sesh != nil {