From b9f2aa4ed07f5cac8e39da05a68fed5fd818926c Mon Sep 17 00:00:00 2001 From: Qian Wang Date: Tue, 9 Oct 2018 21:53:55 +0100 Subject: [PATCH] Barely working --- cmd/ck-client/ck-client.go | 30 +++++++++---- cmd/ck-server/ck-server.go | 7 +-- config/ckclient.json | 9 ++-- internal/client/state.go | 1 + internal/multiplex/frameSorter.go | 72 +++++++++++++++++-------------- internal/multiplex/session.go | 13 ++++-- internal/multiplex/stream.go | 17 +++++--- internal/multiplex/switchboard.go | 59 +++++++++++++++---------- internal/util/obfs.go | 36 +++++++--------- 9 files changed, 144 insertions(+), 100 deletions(-) diff --git a/cmd/ck-client/ck-client.go b/cmd/ck-client/ck-client.go index 67f37c3..b81fd1d 100644 --- a/cmd/ck-client/ck-client.go +++ b/cmd/ck-client/ck-client.go @@ -31,7 +31,7 @@ func pipe(dst io.ReadWriteCloser, src io.ReadWriteCloser) { } // This establishes a connection with ckserver and performs a handshake -func makeRemoteConn(sta *client.State) net.Conn { +func makeRemoteConn(sta *client.State) (net.Conn, error) { d := net.Dialer{Control: protector} @@ -39,12 +39,12 @@ func makeRemoteConn(sta *client.State) net.Conn { remoteConn, err := d.Dial("tcp", sta.SS_REMOTE_HOST+":"+sta.SS_REMOTE_PORT) if err != nil { log.Printf("Connecting to remote: %v\n", err) - return nil + return nil, err } _, err = remoteConn.Write(clientHello) if err != nil { log.Printf("Sending ClientHello: %v\n", err) - return nil + return nil, err } // Three discarded messages: ServerHello, ChangeCipherSpec and Finished @@ -53,7 +53,7 @@ func makeRemoteConn(sta *client.State) net.Conn { _, err = util.ReadTillDrain(remoteConn, discardBuf) if err != nil { log.Printf("Reading discarded message %v: %v\n", c, err) - return nil + return nil, err } } @@ -61,10 +61,10 @@ func makeRemoteConn(sta *client.State) net.Conn { _, err = remoteConn.Write(reply) if err != nil { log.Printf("Sending reply to remote: %v\n", err) - return nil + return nil, err } - return remoteConn + return remoteConn, nil } @@ -134,7 +134,10 @@ func main() { log.Fatal("TicketTimeHint cannot be empty or 0") } - initRemoteConn := makeRemoteConn(sta) + initRemoteConn, err := makeRemoteConn(sta) + if err != nil { + log.Fatalf("Failed to establish connection to remote: %v\n", err) + } obfs := util.MakeObfs(sta.SID) deobfs := util.MakeDeobfs(sta.SID) @@ -143,7 +146,11 @@ func main() { for i := 0; i < sta.NumConn-1; i++ { go func() { - conn := makeRemoteConn(sta) + conn, err := makeRemoteConn(sta) + if err != nil { + log.Printf("Failed to establish new connections to remote: %v\n", err) + return + } sesh.AddConnection(conn) }() } @@ -159,10 +166,17 @@ func main() { continue } go func() { + data := make([]byte, 10240) + i, err := io.ReadAtLeast(ssConn, data, 1) + if err != nil { + ssConn.Close() + return + } stream, err := sesh.OpenStream() if err != nil { ssConn.Close() } + stream.Write(data[:i]) go pipe(ssConn, stream) pipe(stream, ssConn) }() diff --git a/cmd/ck-server/ck-server.go b/cmd/ck-server/ck-server.go index 255af7f..59846bf 100644 --- a/cmd/ck-server/ck-server.go +++ b/cmd/ck-server/ck-server.go @@ -89,11 +89,11 @@ func dispatchConnection(conn net.Conn, sta *server.State) { go func() { var arrSID [32]byte copy(arrSID[:], SID) - sesh := sta.GetSession(arrSID) - if sesh == nil { + var sesh *mux.Session + if sesh = sta.GetSession(arrSID); sesh != nil { sesh.AddConnection(conn) } else { - sesh := mux.MakeSession(0, conn, util.MakeObfs(SID), util.MakeDeobfs(SID), util.ReadTillDrain) + sesh = mux.MakeSession(0, conn, util.MakeObfs(SID), util.MakeDeobfs(SID), util.ReadTillDrain) sta.PutSession(arrSID, sesh) } go func() { @@ -166,6 +166,7 @@ func main() { SS_REMOTE_PORT: remotePort, Now: time.Now, UsedRandom: map[[32]byte]int{}, + Sessions: map[[32]byte]*mux.Session{}, } err := sta.ParseConfig(pluginOpts) if err != nil { diff --git a/config/ckclient.json b/config/ckclient.json index 73af9f5..ee8efe6 100644 --- a/config/ckclient.json +++ b/config/ckclient.json @@ -1,6 +1,7 @@ { - "ServerName":"www.bing.com", - "Key":"UNhY4JhezH9gQYqvDMWrWH9CwlcKiECVqejMrND2VFwEOF8c8XRX8iYVdjKW2BAfym2zppExMPteovDB/Q8phdD53FnH39tQ1daaVLn9+FIGOAdk+UZZ2aOt5jSK638YPg==", - "TicketTimeHint":3600, - "Browser":"chrome" + "ServerName":"www.bing.com", + "Key":"UNhY4JhezH9gQYqvDMWrWH9CwlcKiECVqejMrND2VFwEOF8c8XRX8iYVdjKW2BAfym2zppExMPteovDB/Q8phdD53FnH39tQ1daaVLn9+FIGOAdk+UZZ2aOt5jSK638YPg==", + "TicketTimeHint":3600, + "NumConn":4, + "MaskBrowser":"chrome" } diff --git a/internal/client/state.go b/internal/client/state.go index 0236fdc..1c6a963 100644 --- a/internal/client/state.go +++ b/internal/client/state.go @@ -83,6 +83,7 @@ func (sta *State) ParseConfig(conf string) (err error) { sta.ServerName = preParse.ServerName sta.TicketTimeHint = preParse.TicketTimeHint sta.MaskBrowser = preParse.MaskBrowser + sta.NumConn = preParse.NumConn sid, pub, err := parseKey(preParse.Key) if err != nil { return errors.New("Failed to parse Key: " + err.Error()) diff --git a/internal/multiplex/frameSorter.go b/internal/multiplex/frameSorter.go index 432fdfe..de74021 100644 --- a/internal/multiplex/frameSorter.go +++ b/internal/multiplex/frameSorter.go @@ -53,43 +53,49 @@ func (sh *sorterHeap) Pop() interface{} { return x } -func (s *Stream) recvNewFrame(f *Frame) { - // For the ease of demonstration, assume seq is uint8, i.e. it wraps around after 255 - fs := &frameNode{ - f.Seq, - 0, - f, - } - - // TODO: if a malicious client resend a previously sent seq number, what will happen? - if fs.seq < s.nextRecvSeq { - // e.g. we are on rev=0 (wrap has not happened yet) - // and we get the order of recv as 253 254 0 1 - // after 254, nextN should be 255, but 0 is received and 0 < 255 - // now 0 should have a trueSeq of 256 - if !s.wrapMode { - // wrapMode is true when the latest seq is wrapped but nextN is not - s.wrapMode = true +func (s *Stream) recvNewFrame() { + for { + f := <-s.newFrameCh + if f == nil { + continue + } + // For the ease of demonstration, assume seq is uint8, i.e. it wraps around after 255 + fs := &frameNode{ + f.Seq, + 0, + f, } - fs.trueSeq = uint64(2<<16*(s.rev+1)) + uint64(fs.seq) + 1 - // +1 because wrapped 0 should have trueSeq of 256 instead of 255 - // when this bit was run on 1, the trueSeq of 1 would become 256 - } else { - fs.trueSeq = uint64(2<<16*s.rev) + uint64(fs.seq) - // when this bit was run on 255, the trueSeq of 255 would be 255 - } - heap.Push(&s.sh, fs) - // Keep popping from the heap until empty or to the point that the wanted seq was not received - for len(s.sh) > 0 && s.sh[0].seq == s.nextRecvSeq { + // TODO: if a malicious client resend a previously sent seq number, what will happen? + if fs.seq < s.nextRecvSeq { + // e.g. we are on rev=0 (wrap has not happened yet) + // and we get the order of recv as 253 254 0 1 + // after 254, nextN should be 255, but 0 is received and 0 < 255 + // now 0 should have a trueSeq of 256 + if !s.wrapMode { + // wrapMode is true when the latest seq is wrapped but nextN is not + s.wrapMode = true + } + fs.trueSeq = uint64(2<<16*(s.rev+1)) + uint64(fs.seq) + 1 + // +1 because wrapped 0 should have trueSeq of 256 instead of 255 + // when this bit was run on 1, the trueSeq of 1 would become 256 + } else { + fs.trueSeq = uint64(2<<16*s.rev) + uint64(fs.seq) + // when this bit was run on 255, the trueSeq of 255 would be 255 + } + heap.Push(&s.sh, fs) - s.sortedBufCh <- heap.Pop(&s.sh).(*frameNode).frame.Payload + // Keep popping from the heap until empty or to the point that the wanted seq was not received + for len(s.sh) > 0 && s.sh[0].seq == s.nextRecvSeq { - s.nextRecvSeq += 1 - if s.nextRecvSeq == 0 { - // when nextN is wrapped, wrapMode becomes false and rev+1 - s.rev += 1 - s.wrapMode = false + s.sortedBufCh <- heap.Pop(&s.sh).(*frameNode).frame.Payload + + s.nextRecvSeq += 1 + if s.nextRecvSeq == 0 { + // when nextN is wrapped, wrapMode becomes false and rev+1 + s.rev += 1 + s.wrapMode = false + } } } diff --git a/internal/multiplex/session.go b/internal/multiplex/session.go index af1992c..cb6d8d7 100644 --- a/internal/multiplex/session.go +++ b/internal/multiplex/session.go @@ -1,6 +1,7 @@ package multiplex import ( + "log" "net" "sync" ) @@ -49,13 +50,12 @@ func MakeSession(id int, conn net.Conn, obfs func(*Frame) []byte, deobfs func([] obfs: obfs, deobfs: deobfs, obfsedReader: obfsedReader, - nextStreamID: 0, + nextStreamID: 1, streams: make(map[uint32]*Stream), acceptCh: make(chan *Stream, acceptBacklog), closeQCh: make(chan uint32, closeBacklog), } sesh.sb = makeSwitchboard(conn, sesh) - sesh.sb.run() return sesh } @@ -80,6 +80,7 @@ func (sesh *Session) OpenStream() (*Stream, error) { func (sesh *Session) AcceptStream() (*Stream, error) { stream := <-sesh.acceptCh return stream, nil + } func (sesh *Session) delStream(id uint32) { @@ -101,7 +102,13 @@ func (sesh *Session) getStream(id uint32) *Stream { return sesh.streams[id] } -func (sesh *Session) addStream(id uint32) { +// addStream is used when the remote opened a new stream and we got notified +func (sesh *Session) addStream(id uint32) *Stream { + log.Printf("Adding stream %v", id) stream := makeStream(id, sesh) + sesh.streamsM.Lock() + sesh.streams[id] = stream + sesh.streamsM.Unlock() sesh.acceptCh <- stream + return stream } diff --git a/internal/multiplex/stream.go b/internal/multiplex/stream.go index 228d80b..419dd02 100644 --- a/internal/multiplex/stream.go +++ b/internal/multiplex/stream.go @@ -7,7 +7,7 @@ import ( ) const ( - readBuffer = 10240 + readBuffer = 102400 ) type Stream struct { @@ -25,6 +25,7 @@ type Stream struct { sh sorterHeap wrapMode bool + newFrameCh chan *Frame sortedBufCh chan []byte nextSendSeqM sync.Mutex @@ -36,14 +37,19 @@ type Stream struct { func makeStream(id uint32, sesh *Session) *Stream { stream := &Stream{ - id: id, - session: sesh, + id: id, + session: sesh, + die: make(chan struct{}), + sh: []*frameNode{}, + newFrameCh: make(chan *Frame, 1024), + sortedBufCh: make(chan []byte, readBuffer), } + go stream.recvNewFrame() return stream } func (stream *Stream) Read(buf []byte) (n int, err error) { - if len(buf) == 0 { + if len(buf) != 0 { select { case <-stream.die: return 0, errors.New(errBrokenPipe) @@ -79,8 +85,8 @@ func (stream *Stream) Write(in []byte) (n int, err error) { StreamID: stream.id, Seq: stream.nextSendSeq, ClosingStreamID: closingID, + Payload: in, } - copy(f.Payload, in) stream.nextSendSeqM.Lock() stream.nextSendSeq += 1 @@ -103,7 +109,6 @@ func (stream *Stream) Close() error { stream.closing = true stream.session.delStream(stream.id) close(stream.die) - close(stream.sortedBufCh) stream.session.closeQCh <- stream.id return nil } diff --git a/internal/multiplex/switchboard.go b/internal/multiplex/switchboard.go index 4afdebc..dfe1799 100644 --- a/internal/multiplex/switchboard.go +++ b/internal/multiplex/switchboard.go @@ -1,13 +1,15 @@ package multiplex import ( + "log" "net" "sort" ) const ( sentNotifyBacklog = 1024 - dispatchBacklog = 10240 + dispatchBacklog = 102400 + newConnBacklog = 8 ) type switchboard struct { @@ -19,6 +21,7 @@ type switchboard struct { sentNotifyCh chan *sentNotifier dispatCh chan []byte newConnCh chan net.Conn + closingCECh chan *connEnclave } // Some data comes from a Stream to be sent through one of the many @@ -51,6 +54,8 @@ func makeSwitchboard(conn net.Conn, sesh *Session) *switchboard { ces: []*connEnclave{}, sentNotifyCh: make(chan *sentNotifier, sentNotifyBacklog), dispatCh: make(chan []byte, dispatchBacklog), + newConnCh: make(chan net.Conn, newConnBacklog), + closingCECh: make(chan *connEnclave, 5), } ce := &connEnclave{ sb: sb, @@ -58,15 +63,12 @@ func makeSwitchboard(conn net.Conn, sesh *Session) *switchboard { sendQueue: 0, } sb.ces = append(sb.ces, ce) + go sb.deplex(ce) + go sb.dispatch() return sb } -func (sb *switchboard) run() { - go sb.startDispatcher() - go sb.startDeplexer() -} - // Everytime after a remoteConn sends something, it constructs this struct // Which is sent back to dispatch() through sentNotifyCh to tell dispatch // how many bytes it has sent @@ -87,7 +89,7 @@ func (ce *connEnclave) send(data []byte) { // Dispatcher sends data coming from a stream to a remote connection // I used channels here because I didn't want to use mutex -func (sb *switchboard) startDispatcher() { +func (sb *switchboard) dispatch() { for { select { // dispatCh receives data from stream.Write @@ -104,25 +106,38 @@ func (sb *switchboard) startDispatcher() { sendQueue: 0, } sb.ces = append(sb.ces, newCe) + go sb.deplex(newCe) sort.Sort(byQ(sb.ces)) + case closing := <-sb.closingCECh: + for i, ce := range sb.ces { + if closing == ce { + sb.ces = append(sb.ces[:i], sb.ces[i+1:]...) + break + } + } + // TODO: when all connections closed } } } -// Deplexer sends data coming from a remote connection to a stream -func (sb *switchboard) startDeplexer() { - for _, ce := range sb.ces { - go func() { - buf := make([]byte, 20480) - for { - sb.session.obfsedReader(ce.remoteConn, buf) - frame := sb.session.deobfs(buf) - if !sb.session.isStream(frame.StreamID) { - sb.session.addStream(frame.StreamID) - } - sb.session.getStream(frame.ClosingStreamID).Close() - sb.session.getStream(frame.StreamID).recvNewFrame(frame) - } - }() +func (sb *switchboard) deplex(ce *connEnclave) { + buf := make([]byte, 20480) + for { + i, err := sb.session.obfsedReader(ce.remoteConn, buf) + if err != nil { + log.Println(err) + go ce.remoteConn.Close() + sb.closingCECh <- ce + return + } + frame := sb.session.deobfs(buf[:i]) + var stream *Stream + if stream = sb.session.getStream(frame.StreamID); stream == nil { + stream = sb.session.addStream(frame.StreamID) + } + if closing := sb.session.getStream(frame.ClosingStreamID); closing != nil { + closing.Close() + } + stream.newFrameCh <- frame } } diff --git a/internal/util/obfs.go b/internal/util/obfs.go index 895e4dc..85abbcf 100644 --- a/internal/util/obfs.go +++ b/internal/util/obfs.go @@ -11,7 +11,7 @@ import ( func encrypt(iv []byte, key []byte, plaintext []byte) []byte { block, _ := aes.NewCipher(key) ciphertext := make([]byte, len(plaintext)) - stream := cipher.NewCFBEncrypter(block, iv) + stream := cipher.NewCTR(block, iv) stream.XORKeyStream(ciphertext, plaintext) return ciphertext } @@ -20,9 +20,8 @@ func decrypt(iv []byte, key []byte, ciphertext []byte) []byte { ret := make([]byte, len(ciphertext)) copy(ret, ciphertext) // Because XORKeyStream is inplace, but we don't want the input to be changed block, _ := aes.NewCipher(key) - stream := cipher.NewCFBDecrypter(block, iv) + stream := cipher.NewCTR(block, iv) stream.XORKeyStream(ret, ret) - // ret is now plaintext return ret } @@ -33,17 +32,13 @@ func MakeObfs(key []byte) func(*mux.Frame) []byte { binary.BigEndian.PutUint32(header[4:8], f.Seq) binary.BigEndian.PutUint32(header[8:12], f.ClosingStreamID) // header: [StreamID 4 bytes][Seq 4 bytes][ClosingStreamID 4 bytes] - plainheader := make([]byte, 16) - copy(plainheader[0:12], header) - copy(plainheader[12:], []byte{0x00, 0x00, 0x00, 0x00}) - // plainheader: [header 12 bytes][0x00,0x00,0x00,0x00] - iv := f.Payload[0:16] - cipherheader := encrypt(iv, key, plainheader) - obfsed := make([]byte, len(f.Payload)+12+4) + iv := CryptoRandBytes(16) + cipherheader := encrypt(iv, key, header) + obfsed := make([]byte, len(f.Payload)+12+16) copy(obfsed[0:16], iv) - copy(obfsed[16:32], cipherheader) - copy(obfsed[32:], f.Payload[16:]) - // obfsed: [iv 16 bytes][cipherheader 16 bytes][payload w/o iv] + copy(obfsed[16:28], cipherheader) + copy(obfsed[28:], f.Payload) + // obfsed: [iv 16 bytes][cipherheader 12 bytes][payload] ret := AddRecordLayer(obfsed, []byte{0x17}, []byte{0x03, 0x03}) return ret } @@ -53,14 +48,13 @@ func MakeObfs(key []byte) func(*mux.Frame) []byte { func MakeDeobfs(key []byte) func([]byte) *mux.Frame { deobfs := func(in []byte) *mux.Frame { peeled := PeelRecordLayer(in) - plainheader := decrypt(peeled[0:16], key, peeled[16:32]) - // plainheader: [header 12 bytes][0x00,0x00,0x00,0x00] - streamID := binary.BigEndian.Uint32(plainheader[0:4]) - seq := binary.BigEndian.Uint32(plainheader[4:8]) - closingStreamID := binary.BigEndian.Uint32(plainheader[8:12]) - payload := make([]byte, len(peeled)-12-4) - copy(payload[0:16], peeled[0:16]) - copy(payload[16:], peeled[32:]) + header := decrypt(peeled[0:16], key, peeled[16:28]) + streamID := binary.BigEndian.Uint32(header[0:4]) + seq := binary.BigEndian.Uint32(header[4:8]) + closingStreamID := binary.BigEndian.Uint32(header[8:12]) + payload := make([]byte, len(peeled)-12-16) + //log.Printf("Payload: %x\n", payload) + copy(payload, peeled[28:]) ret := &mux.Frame{ StreamID: streamID, Seq: seq,