Implement UDP stream timeout. (#148)

* Implement UDP stream timeout.

* Correctly delete streams from map after closing

Co-authored-by: Andy Wang <cbeuw.andy@gmail.com>
This commit is contained in:
不确定 2020-12-23 02:43:38 +02:00 committed by GitHub
parent 4d1612774f
commit dc030fbb47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 21 additions and 5 deletions

View File

@ -4,6 +4,7 @@ import (
"github.com/cbeuw/Cloak/internal/common"
"io"
"net"
"sync"
"time"
mux "github.com/cbeuw/Cloak/internal/multiplex"
@ -18,6 +19,7 @@ func RouteUDP(bindFunc func() (*net.UDPConn, error), streamTimeout time.Duration
}
streams := make(map[string]*mux.Stream)
var streamsMutex sync.Mutex
data := make([]byte, 8192)
for {
@ -31,6 +33,7 @@ func RouteUDP(bindFunc func() (*net.UDPConn, error), streamTimeout time.Duration
sesh = newSeshFunc()
}
streamsMutex.Lock()
stream, ok := streams[addr.String()]
if !ok {
if singleplex {
@ -43,10 +46,14 @@ func RouteUDP(bindFunc func() (*net.UDPConn, error), streamTimeout time.Duration
sesh.Close()
}
log.Errorf("Failed to open stream: %v", err)
streamsMutex.Unlock()
continue
}
streams[addr.String()] = stream
streamsMutex.Unlock()
_ = stream.SetReadDeadline(time.Now().Add(streamTimeout))
proxyAddr := addr
go func(stream *mux.Stream, localConn *net.UDPConn) {
buf := make([]byte, 8192)
@ -54,27 +61,36 @@ func RouteUDP(bindFunc func() (*net.UDPConn, error), streamTimeout time.Duration
n, err := stream.Read(buf)
if err != nil {
log.Tracef("copying stream to proxy client: %v", err)
stream.Close()
return
break
}
_ = stream.SetReadDeadline(time.Now().Add(streamTimeout))
_, err = localConn.WriteTo(buf[:n], proxyAddr)
if err != nil {
log.Tracef("copying stream to proxy client: %v", err)
stream.Close()
return
break
}
}
streamsMutex.Lock()
delete(streams, addr.String())
streamsMutex.Unlock()
stream.Close()
return
}(stream, localConn)
} else {
streamsMutex.Unlock()
}
_, err = stream.Write(data[:i])
if err != nil {
log.Tracef("copying proxy client to stream: %v", err)
streamsMutex.Lock()
delete(streams, addr.String())
streamsMutex.Unlock()
stream.Close()
continue
}
_ = stream.SetReadDeadline(time.Now().Add(streamTimeout))
}
}