diff --git a/cmd/ck-client/ck-client.go b/cmd/ck-client/ck-client.go index bc8a72e..41c5a98 100644 --- a/cmd/ck-client/ck-client.go +++ b/cmd/ck-client/ck-client.go @@ -140,17 +140,11 @@ func main() { log.Fatal("TicketTimeHint cannot be empty or 0") } - initRemoteConn, err := makeRemoteConn(sta) - if err != nil { - log.Fatalf("Failed to establish connection to remote: %v\n", err) - } - obfs := util.MakeObfs(sta.SID) deobfs := util.MakeDeobfs(sta.SID) - // TODO: where to put obfs deobfs and rtd? - sesh := mux.MakeSession(0, initRemoteConn, obfs, deobfs, util.ReadTillDrain) + sesh := mux.MakeSession(0, 1e9, 1e9, obfs, deobfs, util.ReadTillDrain) - for i := 0; i < sta.NumConn-1; i++ { + for i := 0; i < sta.NumConn; i++ { go func() { conn, err := makeRemoteConn(sta) if err != nil { diff --git a/cmd/ck-server/ck-server.go b/cmd/ck-server/ck-server.go index 11378da..6ee9be5 100644 --- a/cmd/ck-server/ck-server.go +++ b/cmd/ck-server/ck-server.go @@ -102,12 +102,11 @@ func dispatchConnection(conn net.Conn, sta *server.State) { var arrSID [32]byte copy(arrSID[:], SID) var sesh *mux.Session - if sesh = sta.GetSession(arrSID); sesh != nil { - sesh.AddConnection(conn) - } else { - sesh = mux.MakeSession(0, conn, util.MakeObfs(SID), util.MakeDeobfs(SID), util.ReadTillDrain) + if sesh = sta.GetSession(arrSID); sesh == nil { + sesh = mux.MakeSession(0, 1e9, 1e9, util.MakeObfs(SID), util.MakeDeobfs(SID), util.ReadTillDrain) sta.PutSession(arrSID, sesh) } + sesh.AddConnection(conn) go func() { for { newStream, err := sesh.AcceptStream() diff --git a/internal/multiplex/session.go b/internal/multiplex/session.go index 23206f3..d70870b 100644 --- a/internal/multiplex/session.go +++ b/internal/multiplex/session.go @@ -10,8 +10,6 @@ import ( const ( // Copied from smux acceptBacklog = 1024 - - closeBacklog = 512 ) var ErrBrokenSession = errors.New("broken session") @@ -45,7 +43,7 @@ type Session struct { } // 1 conn is needed to make a session -func MakeSession(id int, conn net.Conn, obfs func(*Frame) []byte, deobfs func([]byte) *Frame, obfsedReader func(net.Conn, []byte) (int, error)) *Session { +func MakeSession(id int, uprate, downrate float64, obfs func(*Frame) []byte, deobfs func([]byte) *Frame, obfsedReader func(net.Conn, []byte) (int, error)) *Session { sesh := &Session{ id: id, obfs: obfs, @@ -56,7 +54,7 @@ func MakeSession(id int, conn net.Conn, obfs func(*Frame) []byte, deobfs func([] acceptCh: make(chan *Stream, acceptBacklog), die: make(chan struct{}), } - sesh.sb = makeSwitchboard(conn, sesh) + sesh.sb = makeSwitchboard(sesh, uprate, downrate) return sesh } diff --git a/internal/multiplex/switchboard.go b/internal/multiplex/switchboard.go index 70a2cab..35cd5c9 100644 --- a/internal/multiplex/switchboard.go +++ b/internal/multiplex/switchboard.go @@ -1,22 +1,22 @@ package multiplex import ( + "errors" "log" "net" "sync" "sync/atomic" -) -const ( - sentNotifyBacklog = 1024 - dispatchBacklog = 10240 - newConnBacklog = 8 + "github.com/juju/ratelimit" ) // switchboard is responsible for keeping the reference of TLS connections between client and server type switchboard struct { session *Session + wtb *ratelimit.Bucket + rtb *ratelimit.Bucket + optimum atomic.Value cesM sync.RWMutex ces []*connEnclave @@ -33,24 +33,25 @@ type connEnclave struct { } // It takes at least 1 conn to start a switchboard -func makeSwitchboard(conn net.Conn, sesh *Session) *switchboard { +// TODO: does it really? +func makeSwitchboard(sesh *Session, uprate, downrate float64) *switchboard { sb := &switchboard{ session: sesh, + wtb: ratelimit.NewBucketWithRate(uprate, int64(uprate)), + rtb: ratelimit.NewBucketWithRate(downrate, int64(downrate)), ces: []*connEnclave{}, } - ce := &connEnclave{ - sb: sb, - remoteConn: conn, - sendQueue: 0, - } - sb.ces = append(sb.ces, ce) - go sb.deplex(ce) - return sb } +var errNilOptimum error = errors.New("The optimal connection is nil") + func (sb *switchboard) send(data []byte) (int, error) { ce := sb.optimum.Load().(*connEnclave) + if ce == nil { + return 0, errNilOptimum + } + sb.wtb.Wait(int64(len(data))) atomic.AddUint32(&ce.sendQueue, uint32(len(data))) go sb.updateOptimum() n, err := ce.remoteConn.Write(data) @@ -118,6 +119,7 @@ func (sb *switchboard) deplex(ce *connEnclave) { buf := make([]byte, 20480) for { i, err := sb.session.obfsedReader(ce.remoteConn, buf) + sb.rtb.Wait(int64(i)) if err != nil { log.Println(err) go ce.remoteConn.Close()