From 9e4aedbdc106da379a68e8cc9350eca8098ae3be Mon Sep 17 00:00:00 2001 From: Qian Wang Date: Sat, 27 Oct 2018 23:35:46 +0100 Subject: [PATCH] Stream closing is now ordered --- internal/multiplex/frame.go | 8 ++--- internal/multiplex/frameSorter.go | 12 ++++++- internal/multiplex/session.go | 6 +--- internal/multiplex/stream.go | 56 ++++++++++++++++++++++--------- internal/multiplex/switchboard.go | 17 ++-------- internal/util/obfs.go | 14 ++++---- 6 files changed, 66 insertions(+), 47 deletions(-) diff --git a/internal/multiplex/frame.go b/internal/multiplex/frame.go index 32b5b2a..62a94e1 100644 --- a/internal/multiplex/frame.go +++ b/internal/multiplex/frame.go @@ -3,8 +3,8 @@ package multiplex import () type Frame struct { - StreamID uint32 - Seq uint32 - ClosingStreamID uint32 - Payload []byte + StreamID uint32 + Seq uint32 + Closing uint32 + Payload []byte } diff --git a/internal/multiplex/frameSorter.go b/internal/multiplex/frameSorter.go index d858dc3..395293a 100644 --- a/internal/multiplex/frameSorter.go +++ b/internal/multiplex/frameSorter.go @@ -68,6 +68,11 @@ func (s *Stream) recvNewFrame() { } if len(s.sh) == 0 && f.Seq == s.nextRecvSeq { + if f.Closing == 1 { + s.passiveClose() + return + } + s.sortedBufCh <- f.Payload s.nextRecvSeq += 1 @@ -107,8 +112,13 @@ func (s *Stream) recvNewFrame() { // Keep popping from the heap until empty or to the point that the wanted seq was not received for len(s.sh) > 0 && s.sh[0].seq == s.nextRecvSeq { + frame := heap.Pop(&s.sh).(*frameNode).frame - payload := heap.Pop(&s.sh).(*frameNode).frame.Payload + if frame.Closing == 1 { + s.passiveClose() + return + } + payload := frame.Payload s.sortedBufCh <- payload s.nextRecvSeq += 1 diff --git a/internal/multiplex/session.go b/internal/multiplex/session.go index af7adb7..e8c3216 100644 --- a/internal/multiplex/session.go +++ b/internal/multiplex/session.go @@ -28,6 +28,7 @@ type Session struct { // This is supposed to read one TLS message, the same as GoQuiet's ReadTillDrain obfsedReader func(net.Conn, []byte) (int, error) + // atomic nextStreamID uint32 streamsM sync.RWMutex @@ -38,10 +39,6 @@ type Session struct { // For accepting new streams acceptCh chan *Stream - // Once a stream.Close is called, it sends its streamID to this channel - // to be read by another stream to send the streamID to notify the remote - // that this stream is closed - closeQCh chan uint32 closingM sync.Mutex die chan struct{} @@ -58,7 +55,6 @@ func MakeSession(id int, conn net.Conn, obfs func(*Frame) []byte, deobfs func([] nextStreamID: 1, streams: make(map[uint32]*Stream), acceptCh: make(chan *Stream, acceptBacklog), - closeQCh: make(chan uint32, closeBacklog), die: make(chan struct{}), } sesh.sb = makeSwitchboard(conn, sesh) diff --git a/internal/multiplex/stream.go b/internal/multiplex/stream.go index 5479fed..04b55db 100644 --- a/internal/multiplex/stream.go +++ b/internal/multiplex/stream.go @@ -3,6 +3,8 @@ package multiplex import ( "errors" "log" + "math" + prand "math/rand" "sync" "sync/atomic" ) @@ -26,6 +28,7 @@ type Stream struct { // sortedBufCh are order-sorted data ready to be read raw sortedBufCh chan []byte + // atomic nextSendSeq uint32 closingM sync.Mutex @@ -81,18 +84,11 @@ func (stream *Stream) Write(in []byte) (n int, err error) { default: } - var closingID uint32 - - select { - case closingID = <-stream.session.closeQCh: - default: - } - f := &Frame{ - StreamID: stream.id, - Seq: stream.nextSendSeq, - ClosingStreamID: closingID, - Payload: in, + StreamID: stream.id, + Seq: atomic.LoadUint32(&stream.nextSendSeq), + Closing: 0, + Payload: in, } atomic.AddUint32(&stream.nextSendSeq, 1) @@ -104,8 +100,8 @@ func (stream *Stream) Write(in []byte) (n int, err error) { } -func (stream *Stream) Close() error { - log.Printf("ID: %v closing\n", stream.id) +// only close locally. Used when the stream close is notified by the remote +func (stream *Stream) passiveClose() error { // Lock here because closing a closed channel causes panic stream.closingM.Lock() @@ -113,10 +109,41 @@ func (stream *Stream) Close() error { if stream.closing { return errRepeatStreamClosing } + log.Printf("ID: %v passiveclosing\n", stream.id) stream.closing = true close(stream.die) stream.session.delStream(stream.id) - stream.session.closeQCh <- stream.id + return nil +} + +// active close. Close locally and tell the remote that this stream is being closed +func (stream *Stream) Close() error { + + // Lock here because closing a closed channel causes panic + stream.closingM.Lock() + defer stream.closingM.Unlock() + if stream.closing { + return errRepeatStreamClosing + } + log.Printf("ID: %v closing\n", stream.id) + stream.closing = true + close(stream.die) + + prand.Seed(int64(stream.id)) + padLen := int(math.Floor(prand.Float64()*200 + 300)) + log.Println(padLen) + pad := make([]byte, padLen) + prand.Read(pad) + f := &Frame{ + StreamID: stream.id, + Seq: atomic.LoadUint32(&stream.nextSendSeq), + Closing: 1, + Payload: pad, + } + tlsRecord := stream.session.obfs(f) + stream.session.sb.dispatCh <- tlsRecord + + stream.session.delStream(stream.id) return nil } @@ -133,6 +160,5 @@ func (stream *Stream) closeNoDelMap() error { } stream.closing = true close(stream.die) - stream.session.closeQCh <- stream.id return nil } diff --git a/internal/multiplex/switchboard.go b/internal/multiplex/switchboard.go index a1c7f99..caa2bb6 100644 --- a/internal/multiplex/switchboard.go +++ b/internal/multiplex/switchboard.go @@ -145,7 +145,6 @@ func (sb *switchboard) dispatch() { // it is responsible to act in response to the deobfsed header // i.e. should a new stream be added? which existing stream should be closed? func (sb *switchboard) deplex(ce *connEnclave) { - var highestStream uint32 buf := make([]byte, 20480) for { i, err := sb.session.obfsedReader(ce.remoteConn, buf) @@ -156,22 +155,10 @@ func (sb *switchboard) deplex(ce *connEnclave) { return } frame := sb.session.deobfs(buf[:i]) - if closing := sb.session.getStream(frame.ClosingStreamID); closing != nil { - log.Printf("HeaderClosing: %v\n", frame.ClosingStreamID) - closing.Close() - } - var stream *Stream - // If we want to open a new stream, we need to make sure that the newStreamID is indeed new - // i.e. it is not a stream that existed before but has been closed - // we don't allow streamID reuse. - // So here we do a check that the new stream has a higher ID than the highest ID we have got - if stream = sb.session.getStream(frame.StreamID); highestStream < frame.StreamID && stream == nil { + if stream = sb.session.getStream(frame.StreamID); stream == nil { stream = sb.session.addStream(frame.StreamID) - highestStream = frame.StreamID - } - if stream != nil { - stream.newFrameCh <- frame } + stream.newFrameCh <- frame } } diff --git a/internal/util/obfs.go b/internal/util/obfs.go index f2d1851..c0ee450 100644 --- a/internal/util/obfs.go +++ b/internal/util/obfs.go @@ -25,11 +25,11 @@ func genXorKeys(SID []byte, data []byte) (i uint32, ii uint32, iii uint32) { func MakeObfs(key []byte) func(*mux.Frame) []byte { obfs := func(f *mux.Frame) []byte { obfsedHeader := make([]byte, 12) - // header: [StreamID 4 bytes][Seq 4 bytes][ClosingStreamID 4 bytes] + // header: [StreamID 4 bytes][Seq 4 bytes][Closing 4 bytes] i, ii, iii := genXorKeys(key, f.Payload[0:18]) binary.BigEndian.PutUint32(obfsedHeader[0:4], f.StreamID^i) binary.BigEndian.PutUint32(obfsedHeader[4:8], f.Seq^ii) - binary.BigEndian.PutUint32(obfsedHeader[8:12], f.ClosingStreamID^iii) + binary.BigEndian.PutUint32(obfsedHeader[8:12], f.Closing^iii) // Composing final obfsed message // We don't use util.AddRecordLayer here to avoid unnecessary malloc @@ -52,14 +52,14 @@ func MakeDeobfs(key []byte) func([]byte) *mux.Frame { i, ii, iii := genXorKeys(key, peeled[12:30]) streamID := binary.BigEndian.Uint32(peeled[0:4]) ^ i seq := binary.BigEndian.Uint32(peeled[4:8]) ^ ii - closingStreamID := binary.BigEndian.Uint32(peeled[8:12]) ^ iii + closing := binary.BigEndian.Uint32(peeled[8:12]) ^ iii payload := make([]byte, len(peeled)-12) copy(payload, peeled[12:]) ret := &mux.Frame{ - StreamID: streamID, - Seq: seq, - ClosingStreamID: closingStreamID, - Payload: payload, + StreamID: streamID, + Seq: seq, + Closing: closing, + Payload: payload, } return ret }