From 64d857c74dea0456fbef36513a2bb9549abda563 Mon Sep 17 00:00:00 2001 From: Andy Wang Date: Sun, 18 Oct 2020 15:36:40 +0100 Subject: [PATCH] Rename variables for clarity --- internal/multiplex/stream.go | 45 ++++++++++++++++++++----------- internal/multiplex/switchboard.go | 2 +- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/internal/multiplex/stream.go b/internal/multiplex/stream.go index a3bb4ef..bffa9e7 100644 --- a/internal/multiplex/stream.go +++ b/internal/multiplex/stream.go @@ -13,11 +13,18 @@ import ( var ErrBrokenStream = errors.New("broken stream") +// Stream implements net.Conn. It represents an optionally-ordered, full-duplex, self-contained connection. +// If the session it belongs to runs in ordered mode, it provides ordering guarantee regardless of the underlying +// connection used. +// If the underlying connections the session uses are reliable, Stream is reliable. If they are not, Stream does not +// guarantee reliability. type Stream struct { id uint32 session *Session + // a buffer (implemented as an asynchronous buffered pipe) to put data we've received from recvFrame but hasn't + // been read by the consumer through Read or WriteTo recvBuf recvBuffer nextSendSeq uint64 @@ -34,13 +41,15 @@ type Stream struct { // obfuscation happens in this buffer obfsBuf []byte - // we assign each stream a fixed underlying TCP connection to utilise order guarantee provided by TCP itself - // so that frameSorter should have few to none ooo frames to deal with - // overall the streams in a session should be uniformly distributed across all connections + // When we want order guarantee (i.e. session.Unordered is false), + // we assign each stream a fixed underlying connection. + // If the underlying connections the session uses provide ordering guarantee (most likely TCP), + // recvBuffer (implemented by streamBuffer under ordered mode) will not receive out-of-order packets + // so it won't have to use its priority queue to sort it. // This is not used in unordered connection mode assignedConnId uint32 - rfTimeout time.Duration + readFromTimeout time.Duration } func makeStream(sesh *Session, id uint32) *Stream { @@ -91,6 +100,7 @@ func (s *Stream) Read(buf []byte) (n int, err error) { return } +// WriteTo continuously write data Stream has received into the writer w. func (s *Stream) WriteTo(w io.Writer) (int64, error) { // will keep writing until the underlying buffer is closed n, err := s.recvBuf.WriteTo(w) @@ -101,9 +111,9 @@ func (s *Stream) WriteTo(w io.Writer) (int64, error) { return n, nil } -func (s *Stream) sendFrame(f *Frame, framePayloadOffset int) error { +func (s *Stream) obfuscateAndSend(f *Frame, payloadOffsetInObfsBuf int) error { var cipherTextLen int - cipherTextLen, err := s.session.Obfs(f, s.obfsBuf, framePayloadOffset) + cipherTextLen, err := s.session.Obfs(f, s.obfsBuf, payloadOffsetInObfsBuf) if err != nil { return err } @@ -134,9 +144,12 @@ func (s *Stream) Write(in []byte) (n int, err error) { for n < len(in) { var framePayload []byte if len(in)-n <= s.session.maxStreamUnitWrite { + // if we can fit remaining data of in into one frame framePayload = in[n:] } else { - if s.session.Unordered { // no splitting + // if we have to split + if s.session.Unordered { + // but we are not allowed to err = io.ErrShortBuffer return } @@ -149,7 +162,7 @@ func (s *Stream) Write(in []byte) (n int, err error) { Payload: framePayload, } s.nextSendSeq++ - err = s.sendFrame(f, 0) + err = s.obfuscateAndSend(f, 0) if err != nil { return } @@ -158,16 +171,18 @@ func (s *Stream) Write(in []byte) (n int, err error) { return } +// ReadFrom continuously read data from r and send it off, until either r returns error or nothing has been read +// for readFromTimeout amount of time func (s *Stream) ReadFrom(r io.Reader) (n int64, err error) { if s.obfsBuf == nil { s.obfsBuf = make([]byte, s.session.StreamSendBufferSize) } for { - if s.rfTimeout != 0 { + if s.readFromTimeout != 0 { if rder, ok := r.(net.Conn); !ok { log.Warn("ReadFrom timeout is set but reader doesn't implement SetReadDeadline") } else { - rder.SetReadDeadline(time.Now().Add(s.rfTimeout)) + rder.SetReadDeadline(time.Now().Add(s.readFromTimeout)) } } read, er := r.Read(s.obfsBuf[HEADER_LEN : HEADER_LEN+s.session.maxStreamUnitWrite]) @@ -186,7 +201,7 @@ func (s *Stream) ReadFrom(r io.Reader) (n int64, err error) { Payload: s.obfsBuf[HEADER_LEN : HEADER_LEN+read], } s.nextSendSeq++ - err = s.sendFrame(f, HEADER_LEN) + err = s.obfuscateAndSend(f, HEADER_LEN) s.writingM.Unlock() if err != nil { @@ -211,14 +226,14 @@ func (s *Stream) Close() error { func (s *Stream) LocalAddr() net.Addr { return s.session.addrs.Load().([]net.Addr)[0] } func (s *Stream) RemoteAddr() net.Addr { return s.session.addrs.Load().([]net.Addr)[1] } -// TODO: implement the following func (s *Stream) SetWriteToTimeout(d time.Duration) { s.recvBuf.SetWriteToTimeout(d) } func (s *Stream) SetReadDeadline(t time.Time) error { s.recvBuf.SetReadDeadline(t); return nil } -func (s *Stream) SetReadFromTimeout(d time.Duration) { s.rfTimeout = d } +func (s *Stream) SetReadFromTimeout(d time.Duration) { s.readFromTimeout = d } + +var errNotImplemented = errors.New("Not implemented") // the following functions are purely for implementing net.Conn interface. // they are not used -var errNotImplemented = errors.New("Not implemented") - +// TODO: implement the following func (s *Stream) SetDeadline(t time.Time) error { return errNotImplemented } func (s *Stream) SetWriteDeadline(t time.Time) error { return errNotImplemented } diff --git a/internal/multiplex/switchboard.go b/internal/multiplex/switchboard.go index 7773084..bc33500 100644 --- a/internal/multiplex/switchboard.go +++ b/internal/multiplex/switchboard.go @@ -64,7 +64,7 @@ func (sb *switchboard) addConn(conn net.Conn) { go sb.deplex(connId, conn) } -// a pointer to connId is passed here so that the switchboard can reassign it +// a pointer to connId is passed here so that the switchboard can reassign it if that connId isn't usable func (sb *switchboard) send(data []byte, connId *uint32) (n int, err error) { writeAndRegUsage := func(conn net.Conn, d []byte) (int, error) { n, err = conn.Write(d)