diff --git a/internal/multiplex/session.go b/internal/multiplex/session.go index a32b6bf..1a4b8b4 100644 --- a/internal/multiplex/session.go +++ b/internal/multiplex/session.go @@ -169,7 +169,7 @@ func (sesh *Session) closeStream(s *Stream, active bool) error { if atomic.SwapUint32(&s.closed, 1) == 1 { return fmt.Errorf("closing stream %v: %w", s.id, errRepeatStreamClosing) } - _ = s.recvBuf.Close() // recvBuf.Close should not return error + _ = s.getRecvBuf().Close() // recvBuf.Close should not return error if active { // Notify remote that this stream is closed diff --git a/internal/multiplex/stream.go b/internal/multiplex/stream.go index d64628f..2245f5b 100644 --- a/internal/multiplex/stream.go +++ b/internal/multiplex/stream.go @@ -23,8 +23,9 @@ type Stream struct { session *Session + allocIdempot sync.Once // a buffer (implemented as an asynchronous buffered pipe) to put data we've received from recvFrame but hasn't - // been read by the consumer through Read or WriteTo + // been read by the consumer through Read or WriteTo. Lazily allocated recvBuf recvBuffer writingM sync.Mutex @@ -33,11 +34,9 @@ type Stream struct { // atomic closed uint32 - // lazy allocation for obfsBuf. This is desirable because obfsBuf is only used when data is sent from + // obfuscation happens in obfsBuf. This buffer is lazily allocated as obfsBuf is only used when data is sent from // the stream (through Write or ReadFrom). Some streams never send data so eager allocation will waste // memory - allocIdempot sync.Once - // obfuscation happens in this buffer obfsBuf []byte // When we want order guarantee (i.e. session.Unordered is false), @@ -52,17 +51,9 @@ type Stream struct { } func makeStream(sesh *Session, id uint32) *Stream { - var recvBuf recvBuffer - if sesh.Unordered { - recvBuf = NewDatagramBufferedPipe() - } else { - recvBuf = NewStreamBuffer() - } - stream := &Stream{ id: id, session: sesh, - recvBuf: recvBuf, writingFrame: Frame{ StreamID: id, Seq: 0, @@ -75,9 +66,20 @@ func makeStream(sesh *Session, id uint32) *Stream { func (s *Stream) isClosed() bool { return atomic.LoadUint32(&s.closed) == 1 } +func (s *Stream) getRecvBuf() recvBuffer { + s.allocIdempot.Do(func() { + if s.session.Unordered { + s.recvBuf = NewDatagramBufferedPipe() + } else { + s.recvBuf = NewStreamBuffer() + } + }) + return s.recvBuf +} + // receive a readily deobfuscated Frame so its payload can later be Read func (s *Stream) recvFrame(frame Frame) error { - toBeClosed, err := s.recvBuf.Write(frame) + toBeClosed, err := s.getRecvBuf().Write(frame) if toBeClosed { err = s.passiveClose() if errors.Is(err, errRepeatStreamClosing) { @@ -96,7 +98,7 @@ func (s *Stream) Read(buf []byte) (n int, err error) { return 0, nil } - n, err = s.recvBuf.Read(buf) + n, err = s.getRecvBuf().Read(buf) log.Tracef("%v read from stream %v with err %v", n, s.id, err) if err == io.EOF { return n, ErrBrokenStream @@ -107,7 +109,7 @@ func (s *Stream) Read(buf []byte) (n int, err error) { // WriteTo continuously write data Stream has received into the writer w. func (s *Stream) WriteTo(w io.Writer) (int64, error) { // will keep writing until the underlying buffer is closed - n, err := s.recvBuf.WriteTo(w) + n, err := s.getRecvBuf().WriteTo(w) log.Tracef("%v read from stream %v with err %v", n, s.id, err) if err == io.EOF { return n, ErrBrokenStream @@ -219,8 +221,8 @@ func (s *Stream) Close() error { func (s *Stream) LocalAddr() net.Addr { return s.session.addrs.Load().([]net.Addr)[0] } func (s *Stream) RemoteAddr() net.Addr { return s.session.addrs.Load().([]net.Addr)[1] } -func (s *Stream) SetWriteToTimeout(d time.Duration) { s.recvBuf.SetWriteToTimeout(d) } -func (s *Stream) SetReadDeadline(t time.Time) error { s.recvBuf.SetReadDeadline(t); return nil } +func (s *Stream) SetWriteToTimeout(d time.Duration) { s.getRecvBuf().SetWriteToTimeout(d) } +func (s *Stream) SetReadDeadline(t time.Time) error { s.getRecvBuf().SetReadDeadline(t); return nil } func (s *Stream) SetReadFromTimeout(d time.Duration) { s.readFromTimeout = d } var errNotImplemented = errors.New("Not implemented")