diff --git a/internal/multiplex/bufferedPipe.go b/internal/multiplex/bufferedPipe.go index 452b86f..6ab9f49 100644 --- a/internal/multiplex/bufferedPipe.go +++ b/internal/multiplex/bufferedPipe.go @@ -17,7 +17,9 @@ var ErrTimeout = errors.New("deadline exceeded") // The point of a bufferedPipe is that Read() will block until data is available type bufferedPipe struct { - buf *bytes.Buffer + // only alloc when on first Read or Write + buf *bytes.Buffer + closed uint32 rwCond *sync.Cond rDeadline time.Time @@ -25,7 +27,6 @@ type bufferedPipe struct { func NewBufferedPipe() *bufferedPipe { p := &bufferedPipe{ - buf: new(bytes.Buffer), rwCond: sync.NewCond(&sync.Mutex{}), } return p @@ -34,6 +35,9 @@ func NewBufferedPipe() *bufferedPipe { func (p *bufferedPipe) Read(target []byte) (int, error) { p.rwCond.L.Lock() defer p.rwCond.L.Unlock() + if p.buf == nil { + p.buf = new(bytes.Buffer) + } for { if atomic.LoadUint32(&p.closed) == 1 && p.buf.Len() == 0 { return 0, io.EOF @@ -59,6 +63,9 @@ func (p *bufferedPipe) Read(target []byte) (int, error) { func (p *bufferedPipe) Write(input []byte) (int, error) { p.rwCond.L.Lock() defer p.rwCond.L.Unlock() + if p.buf == nil { + p.buf = new(bytes.Buffer) + } for { if atomic.LoadUint32(&p.closed) == 1 { return 0, io.ErrClosedPipe diff --git a/internal/multiplex/session.go b/internal/multiplex/session.go index 7fecfab..ecb2dbf 100644 --- a/internal/multiplex/session.go +++ b/internal/multiplex/session.go @@ -164,6 +164,8 @@ func (sesh *Session) closeStream(s *Stream, active bool) error { Closing: C_STREAM, Payload: pad, } + + s.allocIdempot.Do(func() { s.obfsBuf = make([]byte, s.session.SendBufferSize) }) i, err := s.session.Obfs(f, s.obfsBuf) if err != nil { return err diff --git a/internal/multiplex/stream.go b/internal/multiplex/stream.go index f3eadef..dbccae0 100644 --- a/internal/multiplex/stream.go +++ b/internal/multiplex/stream.go @@ -28,7 +28,9 @@ type Stream struct { // atomic closed uint32 - obfsBuf []byte + // only alloc when writing to the stream + allocIdempot sync.Once + obfsBuf []byte // we assign each stream a fixed underlying TCP connection to utilise order guarantee provided by TCP itself // so that frameSorter should have few to none ooo frames to deal with @@ -49,7 +51,6 @@ func makeStream(sesh *Session, id uint32, assignedConnId uint32) *Stream { id: id, session: sesh, recvBuf: recvBuf, - obfsBuf: make([]byte, sesh.SendBufferSize), assignedConnId: assignedConnId, } @@ -90,12 +91,15 @@ func (s *Stream) Write(in []byte) (n int, err error) { // in the middle of the execution of Write. This may cause the closing frame // to be sent before the data frame and cause loss of packet. //log.Tracef("attempting to write %v bytes to stream %v",len(in),s.id) + // todo: forbid concurrent write s.writingM.RLock() defer s.writingM.RUnlock() if s.isClosed() { return 0, ErrBrokenStream } + s.allocIdempot.Do(func() { s.obfsBuf = make([]byte, s.session.SendBufferSize) }) + for n < len(in) { var framePayload []byte if len(in)-n <= s.session.maxStreamUnitWrite {