This commit is contained in:
Andy Wang 2020-05-23 15:44:24 +01:00
parent 2682460d46
commit 369f3c781c
3 changed files with 90 additions and 78 deletions

View File

@ -4,96 +4,74 @@ import (
"github.com/cbeuw/Cloak/internal/common" "github.com/cbeuw/Cloak/internal/common"
"io" "io"
"net" "net"
"sync/atomic"
"time" "time"
mux "github.com/cbeuw/Cloak/internal/multiplex" mux "github.com/cbeuw/Cloak/internal/multiplex"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
func RouteUDP(listen func(string, string) (net.PacketConn, error), localConfig LocalConnConfig, newSeshFunc func() *mux.Session) { func RouteUDP(acceptFunc func() (*net.UDPConn, error), streamTimeout time.Duration, newSeshFunc func() *mux.Session) {
var sesh *mux.Session var sesh *mux.Session
start: localConn, err := acceptFunc()
localConn, err := listen("udp", localConfig.LocalAddr)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
var otherEnd atomic.Value
data := make([]byte, 10240) streams := make(map[string]*mux.Stream)
i, oe, err := localConn.ReadFrom(data)
data := make([]byte, 8192)
for {
i, addr, err := localConn.ReadFrom(data)
if err != nil { if err != nil {
log.Errorf("Failed to read first packet from proxy client: %v", err) log.Errorf("Failed to read first packet from proxy client: %v", err)
localConn.Close() localConn, err = acceptFunc()
return if err != nil {
log.Fatal(err)
}
continue
} }
otherEnd.Store(oe)
if sesh == nil || sesh.IsClosed() { if sesh == nil || sesh.IsClosed() {
sesh = newSeshFunc() sesh = newSeshFunc()
} }
log.Debugf("proxy local address %v", otherEnd.Load().(net.Addr).String())
stream, err := sesh.OpenStream() stream, ok := streams[addr.String()]
if !ok {
stream, err = sesh.OpenStream()
if err != nil { if err != nil {
log.Errorf("Failed to open stream: %v", err) log.Errorf("Failed to open stream: %v", err)
localConn.Close() continue
//localConnWrite.Close()
return
} }
_, err = stream.Write(data[:i]) streams[addr.String()] = stream
proxyAddr := addr
go func() {
buf := make([]byte, 8192)
for {
n, err := stream.Read(buf)
if err != nil { if err != nil {
log.Errorf("Failed to write to stream: %v", err) log.Tracef("copying stream to proxy client: %v", err)
localConn.Close()
//localConnWrite.Close()
stream.Close() stream.Close()
return return
} }
// stream to proxy _, err = localConn.WriteTo(buf[:n], proxyAddr)
go func() {
buf := make([]byte, 16380)
for {
i, err := io.ReadAtLeast(stream, buf, 1)
if err != nil { if err != nil {
log.Print(err) log.Tracef("copying stream to proxy client: %v", err)
localConn.Close()
stream.Close() stream.Close()
break return
}
_, err = localConn.WriteTo(buf[:i], otherEnd.Load().(net.Addr))
if err != nil {
log.Print(err)
localConn.Close()
stream.Close()
break
} }
} }
}() }()
}
// proxy to stream _, err = stream.Write(data[:i])
buf := make([]byte, 16380)
if localConfig.Timeout != 0 {
localConn.SetReadDeadline(time.Now().Add(localConfig.Timeout))
}
for {
if localConfig.Timeout != 0 {
localConn.SetReadDeadline(time.Now().Add(localConfig.Timeout))
}
i, oe, err := localConn.ReadFrom(buf)
if err != nil { if err != nil {
localConn.Close() log.Tracef("copying proxy client to stream: %v", err)
delete(streams, addr.String())
stream.Close() stream.Close()
break continue
}
otherEnd.Store(oe)
_, err = stream.Write(buf[:i])
if err != nil {
localConn.Close()
stream.Close()
break
} }
} }
goto start
} }
func RouteTCP(listener net.Listener, streamTimeout time.Duration, newSeshFunc func() *mux.Session) { func RouteTCP(listener net.Listener, streamTimeout time.Duration, newSeshFunc func() *mux.Session) {

View File

@ -98,6 +98,7 @@ func (d *datagramBuffer) WriteTo(w io.Writer) (n int64, err error) {
return n, er return n, er
} }
d.rwCond.Broadcast() d.rwCond.Broadcast()
continue
} }
d.rwCond.Wait() d.rwCond.Wait()
} }

View File

@ -42,7 +42,6 @@ func serveTCPEcho(l net.Listener) {
} }
} }
/*
func serveUDPEcho(listener *connutil.PipeListener) { func serveUDPEcho(listener *connutil.PipeListener) {
for { for {
conn, err := listener.ListenPacket("udp", "") conn, err := listener.ListenPacket("udp", "")
@ -56,7 +55,7 @@ func serveUDPEcho(listener *connutil.PipeListener) {
defer conn.Close() defer conn.Close()
buf := make([]byte, bufSize) buf := make([]byte, bufSize)
for { for {
r,_, err := conn.ReadFrom(buf) r, _, err := conn.ReadFrom(buf)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return return
@ -75,8 +74,6 @@ func serveUDPEcho(listener *connutil.PipeListener) {
} }
} }
*/
var bypassUID = [16]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15} var bypassUID = [16]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
var publicKey, _ = base64.StdEncoding.DecodeString("7f7TuKrs264VNSgMno8PkDlyhGhVuOSR8JHLE6H4Ljc=") var publicKey, _ = base64.StdEncoding.DecodeString("7f7TuKrs264VNSgMno8PkDlyhGhVuOSR8JHLE6H4Ljc=")
var privateKey, _ = base64.StdEncoding.DecodeString("SMWeC6VuZF8S/id65VuFQFlfa7hTEJBpL6wWhqPP100=") var privateKey, _ = base64.StdEncoding.DecodeString("SMWeC6VuZF8S/id65VuFQFlfa7hTEJBpL6wWhqPP100=")
@ -123,6 +120,18 @@ func basicServerState(ws common.WorldState, db *os.File) *server.State {
return state return state
} }
type mockUDPDialer struct {
addrCh chan *net.UDPAddr
raddr *net.UDPAddr
}
func (m *mockUDPDialer) Dial(network, address string) (net.Conn, error) {
if m.raddr == nil {
m.raddr = <-m.addrCh
}
return net.DialUDP("udp", nil, m.raddr)
}
func establishSession(lcc client.LocalConnConfig, rcc client.RemoteConnConfig, ai client.AuthInfo, serverState *server.State) (common.Dialer, *connutil.PipeListener, common.Dialer, net.Listener, error) { func establishSession(lcc client.LocalConnConfig, rcc client.RemoteConnConfig, ai client.AuthInfo, serverState *server.State) (common.Dialer, *connutil.PipeListener, common.Dialer, net.Listener, error) {
// transport // transport
ckClientDialer, ckServerListener := connutil.DialerListener(10 * 1024) ckClientDialer, ckServerListener := connutil.DialerListener(10 * 1024)
@ -130,10 +139,23 @@ func establishSession(lcc client.LocalConnConfig, rcc client.RemoteConnConfig, a
return client.MakeSession(rcc, ai, ckClientDialer, false) return client.MakeSession(rcc, ai, ckClientDialer, false)
} }
proxyToCkClientD, proxyToCkClientL := connutil.DialerListener(10 * 1024) var proxyToCkClientD common.Dialer
if ai.Unordered { if ai.Unordered {
go client.RouteUDP(proxyToCkClientL.ListenPacket, lcc, clientSeshMaker) addrCh := make(chan *net.UDPAddr, 1)
mDialer := &mockUDPDialer{
addrCh: addrCh,
}
acceptor := func() (*net.UDPConn, error) {
laddr, _ := net.ResolveUDPAddr("udp", "127.0.0.1:0")
conn, err := net.ListenUDP("udp", laddr)
addrCh <- conn.LocalAddr().(*net.UDPAddr)
return conn, err
}
go client.RouteUDP(acceptor, lcc.Timeout, clientSeshMaker)
proxyToCkClientD = mDialer
} else { } else {
var proxyToCkClientL *connutil.PipeListener
proxyToCkClientD, proxyToCkClientL = connutil.DialerListener(10 * 1024)
go client.RouteTCP(proxyToCkClientL, lcc.Timeout, clientSeshMaker) go client.RouteTCP(proxyToCkClientL, lcc.Timeout, clientSeshMaker)
} }
@ -180,7 +202,7 @@ func runEchoTest(t *testing.T, conns []net.Conn, maxMsgLen int) {
func TestUDP(t *testing.T) { func TestUDP(t *testing.T) {
var tmpDB, _ = ioutil.TempFile("", "ck_user_info") var tmpDB, _ = ioutil.TempFile("", "ck_user_info")
defer os.Remove(tmpDB.Name()) defer os.Remove(tmpDB.Name())
log.SetLevel(log.FatalLevel) log.SetLevel(log.TraceLevel)
worldState := common.WorldOfTime(time.Unix(10, 0)) worldState := common.WorldOfTime(time.Unix(10, 0))
lcc, rcc, ai := basicClientConfigs(worldState) lcc, rcc, ai := basicClientConfigs(worldState)
@ -224,6 +246,17 @@ func TestUDP(t *testing.T) {
} }
}) })
t.Run("user echo", func(t *testing.T) {
go serveUDPEcho(pxyServerL)
var conn [1]net.Conn
conn[0], err = pxyClientD.Dial("udp", "")
if err != nil {
t.Error(err)
}
runEchoTest(t, conn[:], 1024)
})
} }
func TestTCP(t *testing.T) { func TestTCP(t *testing.T) {