From a26a2e84a4dfbd5e9122198ec3902de3cc831d32 Mon Sep 17 00:00:00 2001 From: notsure2 Date: Wed, 23 Dec 2020 02:32:13 +0200 Subject: [PATCH] Use a mutex instead of sync.Map in RouteUDP. --- internal/client/piper.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/internal/client/piper.go b/internal/client/piper.go index c867db5..811a8f5 100644 --- a/internal/client/piper.go +++ b/internal/client/piper.go @@ -18,7 +18,8 @@ func RouteUDP(bindFunc func() (*net.UDPConn, error), streamTimeout time.Duration log.Fatal(err) } - var streams sync.Map + streams := make(map[string]*mux.Stream) + var streamsMutex sync.Mutex data := make([]byte, 8192) for { @@ -32,15 +33,14 @@ func RouteUDP(bindFunc func() (*net.UDPConn, error), streamTimeout time.Duration sesh = newSeshFunc() } - var stream *mux.Stream - streamObj, ok := streams.Load(addr.String()) + streamsMutex.Lock() + stream, ok := streams[addr.String()] if !ok { if singleplex { sesh = newSeshFunc() } stream, err = sesh.OpenStream() - streamObj = stream if err != nil { if singleplex { sesh.Close() @@ -50,7 +50,9 @@ func RouteUDP(bindFunc func() (*net.UDPConn, error), streamTimeout time.Duration } _ = stream.SetReadDeadline(time.Now().Add(streamTimeout)) - streams.Store(addr.String(), stream) + streams[addr.String()] = stream + streamsMutex.Unlock() + proxyAddr := addr go func(stream *mux.Stream, localConn *net.UDPConn) { buf := make([]byte, 8192) @@ -58,7 +60,9 @@ func RouteUDP(bindFunc func() (*net.UDPConn, error), streamTimeout time.Duration n, err := stream.Read(buf) if err != nil { log.Tracef("copying stream to proxy client: %v", err) - streams.Delete(addr.String()) + streamsMutex.Lock() + delete(streams, addr.String()) + streamsMutex.Unlock() stream.Close() return } @@ -67,19 +71,24 @@ func RouteUDP(bindFunc func() (*net.UDPConn, error), streamTimeout time.Duration _, err = localConn.WriteTo(buf[:n], proxyAddr) if err != nil { log.Tracef("copying stream to proxy client: %v", err) - streams.Delete(addr.String()) + streamsMutex.Lock() + delete(streams, addr.String()) + streamsMutex.Unlock() stream.Close() return } } }(stream, localConn) + } else { + streamsMutex.Unlock() } - stream = streamObj.(*mux.Stream) _, err = stream.Write(data[:i]) if err != nil { log.Tracef("copying proxy client to stream: %v", err) - streams.Delete(addr.String()) + streamsMutex.Lock() + delete(streams, addr.String()) + streamsMutex.Unlock() stream.Close() continue }