Lazy allocation

This commit is contained in:
Andy Wang 2020-04-12 00:49:49 +01:00
parent 8a76afabf7
commit 008fd8f0a3
3 changed files with 17 additions and 4 deletions

View File

@ -17,7 +17,9 @@ var ErrTimeout = errors.New("deadline exceeded")
// The point of a bufferedPipe is that Read() will block until data is available // The point of a bufferedPipe is that Read() will block until data is available
type bufferedPipe struct { type bufferedPipe struct {
buf *bytes.Buffer // only alloc when on first Read or Write
buf *bytes.Buffer
closed uint32 closed uint32
rwCond *sync.Cond rwCond *sync.Cond
rDeadline time.Time rDeadline time.Time
@ -25,7 +27,6 @@ type bufferedPipe struct {
func NewBufferedPipe() *bufferedPipe { func NewBufferedPipe() *bufferedPipe {
p := &bufferedPipe{ p := &bufferedPipe{
buf: new(bytes.Buffer),
rwCond: sync.NewCond(&sync.Mutex{}), rwCond: sync.NewCond(&sync.Mutex{}),
} }
return p return p
@ -34,6 +35,9 @@ func NewBufferedPipe() *bufferedPipe {
func (p *bufferedPipe) Read(target []byte) (int, error) { func (p *bufferedPipe) Read(target []byte) (int, error) {
p.rwCond.L.Lock() p.rwCond.L.Lock()
defer p.rwCond.L.Unlock() defer p.rwCond.L.Unlock()
if p.buf == nil {
p.buf = new(bytes.Buffer)
}
for { for {
if atomic.LoadUint32(&p.closed) == 1 && p.buf.Len() == 0 { if atomic.LoadUint32(&p.closed) == 1 && p.buf.Len() == 0 {
return 0, io.EOF return 0, io.EOF
@ -59,6 +63,9 @@ func (p *bufferedPipe) Read(target []byte) (int, error) {
func (p *bufferedPipe) Write(input []byte) (int, error) { func (p *bufferedPipe) Write(input []byte) (int, error) {
p.rwCond.L.Lock() p.rwCond.L.Lock()
defer p.rwCond.L.Unlock() defer p.rwCond.L.Unlock()
if p.buf == nil {
p.buf = new(bytes.Buffer)
}
for { for {
if atomic.LoadUint32(&p.closed) == 1 { if atomic.LoadUint32(&p.closed) == 1 {
return 0, io.ErrClosedPipe return 0, io.ErrClosedPipe

View File

@ -164,6 +164,8 @@ func (sesh *Session) closeStream(s *Stream, active bool) error {
Closing: C_STREAM, Closing: C_STREAM,
Payload: pad, Payload: pad,
} }
s.allocIdempot.Do(func() { s.obfsBuf = make([]byte, s.session.SendBufferSize) })
i, err := s.session.Obfs(f, s.obfsBuf) i, err := s.session.Obfs(f, s.obfsBuf)
if err != nil { if err != nil {
return err return err

View File

@ -28,7 +28,9 @@ type Stream struct {
// atomic // atomic
closed uint32 closed uint32
obfsBuf []byte // only alloc when writing to the stream
allocIdempot sync.Once
obfsBuf []byte
// we assign each stream a fixed underlying TCP connection to utilise order guarantee provided by TCP itself // we assign each stream a fixed underlying TCP connection to utilise order guarantee provided by TCP itself
// so that frameSorter should have few to none ooo frames to deal with // so that frameSorter should have few to none ooo frames to deal with
@ -49,7 +51,6 @@ func makeStream(sesh *Session, id uint32, assignedConnId uint32) *Stream {
id: id, id: id,
session: sesh, session: sesh,
recvBuf: recvBuf, recvBuf: recvBuf,
obfsBuf: make([]byte, sesh.SendBufferSize),
assignedConnId: assignedConnId, assignedConnId: assignedConnId,
} }
@ -90,12 +91,15 @@ func (s *Stream) Write(in []byte) (n int, err error) {
// in the middle of the execution of Write. This may cause the closing frame // in the middle of the execution of Write. This may cause the closing frame
// to be sent before the data frame and cause loss of packet. // to be sent before the data frame and cause loss of packet.
//log.Tracef("attempting to write %v bytes to stream %v",len(in),s.id) //log.Tracef("attempting to write %v bytes to stream %v",len(in),s.id)
// todo: forbid concurrent write
s.writingM.RLock() s.writingM.RLock()
defer s.writingM.RUnlock() defer s.writingM.RUnlock()
if s.isClosed() { if s.isClosed() {
return 0, ErrBrokenStream return 0, ErrBrokenStream
} }
s.allocIdempot.Do(func() { s.obfsBuf = make([]byte, s.session.SendBufferSize) })
for n < len(in) { for n < len(in) {
var framePayload []byte var framePayload []byte
if len(in)-n <= s.session.maxStreamUnitWrite { if len(in)-n <= s.session.maxStreamUnitWrite {