From 239647c5b2e4c6d461a7df1a902f764cb3fce32c Mon Sep 17 00:00:00 2001 From: Qian Wang Date: Sat, 24 Nov 2018 00:55:26 +0000 Subject: [PATCH] Fix a race, some cleanup --- cmd/ck-client/ck-client.go | 3 ++- config/ckserver.json | 3 ++- internal/client/state.go | 2 +- internal/multiplex/session.go | 25 +++++++++++++++++++++ internal/multiplex/switchboard.go | 35 +++++++++++++---------------- internal/server/usermanager/user.go | 8 ++++--- 6 files changed, 50 insertions(+), 26 deletions(-) diff --git a/cmd/ck-client/ck-client.go b/cmd/ck-client/ck-client.go index b8a2f62..23a42ff 100644 --- a/cmd/ck-client/ck-client.go +++ b/cmd/ck-client/ck-client.go @@ -22,6 +22,7 @@ import ( var version string func pipe(dst io.ReadWriteCloser, src io.ReadWriteCloser) { + // TODO: auto reconnect // The maximum size of TLS message will be 16396+12. 12 because of the stream header // 16408 is the max TLS message size on Firefox buf := make([]byte, 16396) @@ -175,7 +176,6 @@ func main() { sesh := mux.MakeSession(0, valve, obfs, deobfs, util.ReadTLS) var wg sync.WaitGroup - // TODO: use sync group for i := 0; i < sta.NumConn; i++ { wg.Add(1) go func() { @@ -190,6 +190,7 @@ func main() { } wg.Wait() + // TODO: ipv6 listener, err := net.Listen("tcp", sta.SS_LOCAL_HOST+":"+sta.SS_LOCAL_PORT) if err != nil { log.Fatal(err) diff --git a/config/ckserver.json b/config/ckserver.json index 9d358c3..e1e0893 100644 --- a/config/ckserver.json +++ b/config/ckserver.json @@ -1,4 +1,5 @@ { "WebServerAddr":"204.79.197.200:443", - "Key":"UGUmcEmxWf0pKxfkZ/8EoP35Ht+wQnqf3L0xYgyQFlQ=" + "Key":"UGUmcEmxWf0pKxfkZ/8EoP35Ht+wQnqf3L0xYgyQFlQ=", + "AdminUID":"ugDmcEmxWf0pKxfkZ/8EoP35Ht+wQnqf3L0xYgyQFlQ=" } diff --git a/internal/client/state.go b/internal/client/state.go index affaa93..b9074ae 100644 --- a/internal/client/state.go +++ b/internal/client/state.go @@ -56,7 +56,7 @@ func InitState(localHost, localPort, remoteHost, remotePort string, nowFunc func // semi-colon separated value. This is for Android plugin options func ssvToJson(ssv string) (ret []byte) { - // TODO: base64 encoded data has =. How to escape? + // FIXME: base64 encoded data has =. How to escape? unescape := func(s string) string { r := strings.Replace(s, "\\\\", "\\", -1) r = strings.Replace(r, "\\=", "=", -1) diff --git a/internal/multiplex/session.go b/internal/multiplex/session.go index 8da565b..6e29f0d 100644 --- a/internal/multiplex/session.go +++ b/internal/multiplex/session.go @@ -101,6 +101,31 @@ func (sesh *Session) isStream(id uint32) bool { return ok } +// If the stream has been closed and the triggering frame is a closing frame, +// we return nil +func (sesh *Session) getOrAddStream(id uint32, closingFrame bool) *Stream { + // it would have been neater to use defer Unlock(), however it gives + // non-negligable overhead and this function is performance critical + sesh.streamsM.Lock() + stream := sesh.streams[id] + if stream != nil { + sesh.streamsM.Unlock() + return stream + } else { + if closingFrame { + sesh.streamsM.Unlock() + return nil + } else { + stream = makeStream(id, sesh) + sesh.streams[id] = stream + sesh.acceptCh <- stream + log.Printf("Adding stream %v\n", id) + sesh.streamsM.Unlock() + return stream + } + } +} + func (sesh *Session) getStream(id uint32) *Stream { sesh.streamsM.RLock() ret := sesh.streams[id] diff --git a/internal/multiplex/switchboard.go b/internal/multiplex/switchboard.go index eca81ff..d0e46aa 100644 --- a/internal/multiplex/switchboard.go +++ b/internal/multiplex/switchboard.go @@ -154,26 +154,21 @@ func (sb *switchboard) deplex(ce *connEnclave) { } frame := sb.session.deobfs(buf[:n]) - var stream *Stream - // FIXME: get-then-put without lock - if stream = sb.session.getStream(frame.StreamID); stream == nil { - if frame.Closing == 1 { - // if the frame is telling us to close a closed stream - // (this happens when ss-server and ss-local closes the stream - // simutaneously), we don't do anything - continue - } - //debug - /* - sb.hM.Lock() - if sb.used[frame.StreamID] { - log.Printf("%v lost!\n", frame.StreamID) - } - sb.used[frame.StreamID] = true - sb.hM.Unlock() - */ - stream = sb.session.addStream(frame.StreamID) + stream := sb.session.getOrAddStream(frame.StreamID, frame.Closing == 1) + // if the frame is telling us to close a closed stream + // (this happens when ss-server and ss-local closes the stream + // simutaneously), we don't do anything + if stream != nil { + stream.writeNewFrame(frame) } - stream.writeNewFrame(frame) + //debug + /* + sb.hM.Lock() + if sb.used[frame.StreamID] { + log.Printf("%v lost!\n", frame.StreamID) + } + sb.used[frame.StreamID] = true + sb.hM.Unlock() + */ } } diff --git a/internal/server/usermanager/user.go b/internal/server/usermanager/user.go index b018c5e..12f774e 100644 --- a/internal/server/usermanager/user.go +++ b/internal/server/usermanager/user.go @@ -69,8 +69,9 @@ func (u *User) updateInfo(uinfo UserInfo) { func (u *User) GetSession(sessionID uint32) *mux.Session { u.sessionsM.RLock() - defer u.sessionsM.RUnlock() - return u.sessions[sessionID] + sesh := u.sessions[sessionID] + u.sessionsM.RUnlock() + return sesh } func (u *User) PutSession(sessionID uint32, sesh *mux.Session) { @@ -93,13 +94,14 @@ func (u *User) DelSession(sessionID uint32) { func (u *User) GetOrCreateSession(sessionID uint32, obfs func(*mux.Frame) []byte, deobfs func([]byte) *mux.Frame, obfsedRead func(net.Conn, []byte) (int, error)) (sesh *mux.Session, existing bool) { // TODO: session cap u.sessionsM.Lock() - defer u.sessionsM.Unlock() if sesh = u.sessions[sessionID]; sesh != nil { + u.sessionsM.Unlock() return sesh, true } else { log.Printf("Creating session %v\n", sessionID) sesh = mux.MakeSession(sessionID, u.valve, obfs, deobfs, obfsedRead) u.sessions[sessionID] = sesh + u.sessionsM.Unlock() return sesh, false } }