diff --git a/internal/multiplex/datagramBufferedPipe.go b/internal/multiplex/datagramBufferedPipe.go index e1a0462..ecdec8f 100644 --- a/internal/multiplex/datagramBufferedPipe.go +++ b/internal/multiplex/datagramBufferedPipe.go @@ -72,6 +72,7 @@ func (d *datagramBufferedPipe) Read(target []byte) (int, error) { func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { d.rwCond.L.Lock() defer d.rwCond.L.Unlock() + enforceTimeout := true if d.buf == nil { d.buf = new(bytes.Buffer) } @@ -80,11 +81,12 @@ func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { return 0, io.EOF } - hasRDeadline := !d.rDeadline.IsZero() - if hasRDeadline { - if time.Until(d.rDeadline) <= 0 { - return 0, ErrTimeout - } + if !d.rDeadline.IsZero() && enforceTimeout && time.Until(d.rDeadline) <= 0 { + return 0, ErrTimeout + } + + if d.wtTimeout != 0 { + d.rDeadline = time.Now().Add(d.wtTimeout) } if len(d.pLens) > 0 { @@ -96,14 +98,14 @@ func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { d.rwCond.Broadcast() return n, er } + enforceTimeout = false d.rwCond.Broadcast() } else { if d.wtTimeout == 0 { - if hasRDeadline { + if !d.rDeadline.IsZero() { d.broadcastAfter(time.Until(d.rDeadline)) } } else { - d.rDeadline = time.Now().Add(d.wtTimeout) d.broadcastAfter(d.wtTimeout) } diff --git a/internal/multiplex/streamBufferedPipe.go b/internal/multiplex/streamBufferedPipe.go index 66dacec..89ddb55 100644 --- a/internal/multiplex/streamBufferedPipe.go +++ b/internal/multiplex/streamBufferedPipe.go @@ -64,6 +64,7 @@ func (p *streamBufferedPipe) Read(target []byte) (int, error) { func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { p.rwCond.L.Lock() defer p.rwCond.L.Unlock() + enforceTimeout := true if p.buf == nil { p.buf = new(bytes.Buffer) } @@ -72,12 +73,14 @@ func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { return 0, io.EOF } - hasRDeadline := !p.rDeadline.IsZero() - if hasRDeadline { - if time.Until(p.rDeadline) <= 0 { - return 0, ErrTimeout - } + if !p.rDeadline.IsZero() && enforceTimeout && time.Until(p.rDeadline) <= 0 { + return 0, ErrTimeout } + + if p.wtTimeout != 0 { + p.rDeadline = time.Now().Add(p.wtTimeout) + } + if p.buf.Len() > 0 { written, er := p.buf.WriteTo(w) n += written @@ -85,14 +88,14 @@ func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { p.rwCond.Broadcast() return n, er } + enforceTimeout = false p.rwCond.Broadcast() } else { if p.wtTimeout == 0 { - if hasRDeadline { + if !p.rDeadline.IsZero() { p.broadcastAfter(time.Until(p.rDeadline)) } } else { - p.rDeadline = time.Now().Add(p.wtTimeout) p.broadcastAfter(p.wtTimeout) }