diff --git a/README.md b/README.md index 4ab94e6..8d7b840 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,7 @@ Example: `ServerName` is the domain you want to make your ISP or firewall think you are visiting. -`NumConn` is the amount of underlying TCP connections you want to use. The default of 4 should be appropriate for most people. Setting it too high will hinder the performance. +`NumConn` is the amount of underlying TCP connections you want to use. The default of 4 should be appropriate for most people. Setting it too high will hinder the performance. Setting it to 0 will disable connection multiplexing and each TCP connection will spawn a separate short lived session that will be closed after it is terminated. This makes it behave like GoQuiet. This maybe useful for people with unstable connections. `BrowserSig` is the browser you want to **appear** to be using. It's not relevant to the browser you are actually using. Currently, `chrome` and `firefox` are supported. diff --git a/cmd/ck-client/ck-client.go b/cmd/ck-client/ck-client.go index 6e901f4..2eac843 100644 --- a/cmd/ck-client/ck-client.go +++ b/cmd/ck-client/ck-client.go @@ -169,18 +169,20 @@ func main() { } } + useSessionPerConnection := remoteConfig.NumConn == 0 + if authInfo.Unordered { acceptor := func() (*net.UDPConn, error) { udpAddr, _ := net.ResolveUDPAddr("udp", localConfig.LocalAddr) return net.ListenUDP("udp", udpAddr) } - client.RouteUDP(acceptor, localConfig.Timeout, seshMaker) + client.RouteUDP(acceptor, localConfig.Timeout, seshMaker, useSessionPerConnection) } else { listener, err := net.Listen("tcp", localConfig.LocalAddr) if err != nil { log.Fatal(err) } - client.RouteTCP(listener, localConfig.Timeout, seshMaker) + client.RouteTCP(listener, localConfig.Timeout, seshMaker, useSessionPerConnection) } } diff --git a/internal/client/connector.go b/internal/client/connector.go index d7a6792..2a7d783 100644 --- a/internal/client/connector.go +++ b/internal/client/connector.go @@ -25,10 +25,16 @@ func MakeSession(connConfig RemoteConnConfig, authInfo AuthInfo, dialer common.D authInfo.SessionId = 0 } - connsCh := make(chan net.Conn, connConfig.NumConn) + numConn := connConfig.NumConn + if numConn <= 0 { + log.Infof("Using session per connection (no multiplexing)") + numConn = 1 + } + + connsCh := make(chan net.Conn, numConn) var _sessionKey atomic.Value var wg sync.WaitGroup - for i := 0; i < connConfig.NumConn; i++ { + for i := 0; i < numConn; i++ { wg.Add(1) go func() { makeconn: @@ -70,7 +76,7 @@ func MakeSession(connConfig RemoteConnConfig, authInfo AuthInfo, dialer common.D } sesh := mux.MakeSession(authInfo.SessionId, seshConfig) - for i := 0; i < connConfig.NumConn; i++ { + for i := 0; i < numConn; i++ { conn := <-connsCh sesh.AddConnection(conn) } diff --git a/internal/client/piper.go b/internal/client/piper.go index 7f4edd5..d744548 100644 --- a/internal/client/piper.go +++ b/internal/client/piper.go @@ -10,14 +10,31 @@ import ( log "github.com/sirupsen/logrus" ) -func RouteUDP(bindFunc func() (*net.UDPConn, error), streamTimeout time.Duration, newSeshFunc func() *mux.Session) { +type ConnWithReadFromTimeout interface { + net.Conn + SetReadFromTimeout(d time.Duration) +} + +type CloseSessionAfterCloseStream struct { + ConnWithReadFromTimeout + Session *mux.Session +} + +func (s *CloseSessionAfterCloseStream) Close() error { + if err := s.ConnWithReadFromTimeout.Close(); err != nil { + return err + } + return s.Session.Close() +} + +func RouteUDP(bindFunc func() (*net.UDPConn, error), streamTimeout time.Duration, newSeshFunc func() *mux.Session, useSessionPerConnection bool) { var sesh *mux.Session localConn, err := bindFunc() if err != nil { log.Fatal(err) } - streams := make(map[string]*mux.Stream) + streams := make(map[string]ConnWithReadFromTimeout) data := make([]byte, 8192) for { @@ -27,17 +44,34 @@ func RouteUDP(bindFunc func() (*net.UDPConn, error), streamTimeout time.Duration continue } - if sesh == nil || sesh.IsClosed() { + if !useSessionPerConnection && (sesh == nil || sesh.IsClosed()) { sesh = newSeshFunc() } + var stream ConnWithReadFromTimeout stream, ok := streams[addr.String()] if !ok { - stream, err = sesh.OpenStream() + connectionSession := sesh + if useSessionPerConnection { + connectionSession = newSeshFunc() + } + + stream, err = connectionSession.OpenStream() if err != nil { log.Errorf("Failed to open stream: %v", err) + if useSessionPerConnection { + connectionSession.Close() + } continue } + + if useSessionPerConnection { + stream = &CloseSessionAfterCloseStream{ + ConnWithReadFromTimeout: stream, + Session: connectionSession, + } + } + streams[addr.String()] = stream proxyAddr := addr go func() { @@ -70,7 +104,7 @@ func RouteUDP(bindFunc func() (*net.UDPConn, error), streamTimeout time.Duration } } -func RouteTCP(listener net.Listener, streamTimeout time.Duration, newSeshFunc func() *mux.Session) { +func RouteTCP(listener net.Listener, streamTimeout time.Duration, newSeshFunc func() *mux.Session, useSessionPerConnection bool) { var sesh *mux.Session for { localConn, err := listener.Accept() @@ -78,7 +112,7 @@ func RouteTCP(listener net.Listener, streamTimeout time.Duration, newSeshFunc fu log.Fatal(err) continue } - if sesh == nil || sesh.IsClosed() { + if !useSessionPerConnection && (sesh == nil || sesh.IsClosed()) { sesh = newSeshFunc() } go func() { @@ -89,12 +123,30 @@ func RouteTCP(listener net.Listener, streamTimeout time.Duration, newSeshFunc fu localConn.Close() return } - stream, err := sesh.OpenStream() + + connectionSession := sesh + if useSessionPerConnection { + connectionSession = newSeshFunc() + } + + var stream ConnWithReadFromTimeout + stream, err = connectionSession.OpenStream() if err != nil { log.Errorf("Failed to open stream: %v", err) localConn.Close() + if useSessionPerConnection { + connectionSession.Close() + } return } + + if useSessionPerConnection { + stream = &CloseSessionAfterCloseStream{ + ConnWithReadFromTimeout: stream, + Session: connectionSession, + } + } + _, err = stream.Write(data[:i]) if err != nil { log.Errorf("Failed to write to stream: %v", err) diff --git a/internal/client/state.go b/internal/client/state.go index ce963f5..9a4004f 100644 --- a/internal/client/state.go +++ b/internal/client/state.go @@ -172,8 +172,8 @@ func (raw *RawConfig) SplitConfigs(worldState common.WorldState) (local LocalCon return nullErr("RemotePort") } remote.RemoteAddr = net.JoinHostPort(raw.RemoteHost, raw.RemotePort) - if raw.NumConn == 0 { - return nullErr("NumConn") + if raw.NumConn <= 0 { + raw.NumConn = 0 } remote.NumConn = raw.NumConn diff --git a/internal/test/integration_test.go b/internal/test/integration_test.go index f2b5b8f..30a259e 100644 --- a/internal/test/integration_test.go +++ b/internal/test/integration_test.go @@ -139,6 +139,7 @@ func establishSession(lcc client.LocalConnConfig, rcc client.RemoteConnConfig, a return client.MakeSession(rcc, ai, ckClientDialer, false) } + useSessionPerConnection := rcc.NumConn == 0 var proxyToCkClientD common.Dialer if ai.Unordered { addrCh := make(chan *net.UDPAddr, 1) @@ -151,12 +152,12 @@ func establishSession(lcc client.LocalConnConfig, rcc client.RemoteConnConfig, a addrCh <- conn.LocalAddr().(*net.UDPAddr) return conn, err } - go client.RouteUDP(acceptor, lcc.Timeout, clientSeshMaker) + go client.RouteUDP(acceptor, lcc.Timeout, clientSeshMaker, useSessionPerConnection) proxyToCkClientD = mDialer } else { var proxyToCkClientL *connutil.PipeListener proxyToCkClientD, proxyToCkClientL = connutil.DialerListener(10 * 1024) - go client.RouteTCP(proxyToCkClientL, lcc.Timeout, clientSeshMaker) + go client.RouteTCP(proxyToCkClientL, lcc.Timeout, clientSeshMaker, useSessionPerConnection) } // set up server