diff --git a/internal/client/piper.go b/internal/client/piper.go index 92b92a4..4bb9fc0 100644 --- a/internal/client/piper.go +++ b/internal/client/piper.go @@ -4,6 +4,7 @@ import ( "github.com/cbeuw/Cloak/internal/common" "io" "net" + "sync" "time" mux "github.com/cbeuw/Cloak/internal/multiplex" @@ -18,6 +19,7 @@ func RouteUDP(bindFunc func() (*net.UDPConn, error), streamTimeout time.Duration } streams := make(map[string]*mux.Stream) + var streamsMutex sync.Mutex data := make([]byte, 8192) for { @@ -31,6 +33,7 @@ func RouteUDP(bindFunc func() (*net.UDPConn, error), streamTimeout time.Duration sesh = newSeshFunc() } + streamsMutex.Lock() stream, ok := streams[addr.String()] if !ok { if singleplex { @@ -43,10 +46,14 @@ func RouteUDP(bindFunc func() (*net.UDPConn, error), streamTimeout time.Duration sesh.Close() } log.Errorf("Failed to open stream: %v", err) + streamsMutex.Unlock() continue } - streams[addr.String()] = stream + streamsMutex.Unlock() + + _ = stream.SetReadDeadline(time.Now().Add(streamTimeout)) + proxyAddr := addr go func(stream *mux.Stream, localConn *net.UDPConn) { buf := make([]byte, 8192) @@ -54,27 +61,36 @@ func RouteUDP(bindFunc func() (*net.UDPConn, error), streamTimeout time.Duration n, err := stream.Read(buf) if err != nil { log.Tracef("copying stream to proxy client: %v", err) - stream.Close() - return + break } + _ = stream.SetReadDeadline(time.Now().Add(streamTimeout)) _, err = localConn.WriteTo(buf[:n], proxyAddr) if err != nil { log.Tracef("copying stream to proxy client: %v", err) - stream.Close() - return + break } } + streamsMutex.Lock() + delete(streams, addr.String()) + streamsMutex.Unlock() + stream.Close() + return }(stream, localConn) + } else { + streamsMutex.Unlock() } _, err = stream.Write(data[:i]) if err != nil { log.Tracef("copying proxy client to stream: %v", err) + streamsMutex.Lock() delete(streams, addr.String()) + streamsMutex.Unlock() stream.Close() continue } + _ = stream.SetReadDeadline(time.Now().Add(streamTimeout)) } }