Fix termination of long downloads after StreamTimeout seconds.

- Even if not broadcasting in a loop, we still need to update the read deadline.
- Don't enforce the timeout after the first data is written.
This commit is contained in:
notsure2 2020-12-06 05:31:50 +02:00
parent 46f4235ccb
commit ee5968afee
2 changed files with 19 additions and 14 deletions

View File

@ -72,6 +72,7 @@ func (d *datagramBufferedPipe) Read(target []byte) (int, error) {
func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
d.rwCond.L.Lock() d.rwCond.L.Lock()
defer d.rwCond.L.Unlock() defer d.rwCond.L.Unlock()
enforceTimeout := true
if d.buf == nil { if d.buf == nil {
d.buf = new(bytes.Buffer) d.buf = new(bytes.Buffer)
} }
@ -80,11 +81,12 @@ func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
return 0, io.EOF return 0, io.EOF
} }
hasRDeadline := !d.rDeadline.IsZero() if !d.rDeadline.IsZero() && enforceTimeout && time.Until(d.rDeadline) <= 0 {
if hasRDeadline {
if time.Until(d.rDeadline) <= 0 {
return 0, ErrTimeout return 0, ErrTimeout
} }
if d.wtTimeout != 0 {
d.rDeadline = time.Now().Add(d.wtTimeout)
} }
if len(d.pLens) > 0 { if len(d.pLens) > 0 {
@ -96,14 +98,14 @@ func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
d.rwCond.Broadcast() d.rwCond.Broadcast()
return n, er return n, er
} }
enforceTimeout = false
d.rwCond.Broadcast() d.rwCond.Broadcast()
} else { } else {
if d.wtTimeout == 0 { if d.wtTimeout == 0 {
if hasRDeadline { if !d.rDeadline.IsZero() {
d.broadcastAfter(time.Until(d.rDeadline)) d.broadcastAfter(time.Until(d.rDeadline))
} }
} else { } else {
d.rDeadline = time.Now().Add(d.wtTimeout)
d.broadcastAfter(d.wtTimeout) d.broadcastAfter(d.wtTimeout)
} }

View File

@ -64,6 +64,7 @@ func (p *streamBufferedPipe) Read(target []byte) (int, error) {
func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
p.rwCond.L.Lock() p.rwCond.L.Lock()
defer p.rwCond.L.Unlock() defer p.rwCond.L.Unlock()
enforceTimeout := true
if p.buf == nil { if p.buf == nil {
p.buf = new(bytes.Buffer) p.buf = new(bytes.Buffer)
} }
@ -72,12 +73,14 @@ func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
return 0, io.EOF return 0, io.EOF
} }
hasRDeadline := !p.rDeadline.IsZero() if !p.rDeadline.IsZero() && enforceTimeout && time.Until(p.rDeadline) <= 0 {
if hasRDeadline {
if time.Until(p.rDeadline) <= 0 {
return 0, ErrTimeout return 0, ErrTimeout
} }
if p.wtTimeout != 0 {
p.rDeadline = time.Now().Add(p.wtTimeout)
} }
if p.buf.Len() > 0 { if p.buf.Len() > 0 {
written, er := p.buf.WriteTo(w) written, er := p.buf.WriteTo(w)
n += written n += written
@ -85,14 +88,14 @@ func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
p.rwCond.Broadcast() p.rwCond.Broadcast()
return n, er return n, er
} }
enforceTimeout = false
p.rwCond.Broadcast() p.rwCond.Broadcast()
} else { } else {
if p.wtTimeout == 0 { if p.wtTimeout == 0 {
if hasRDeadline { if !p.rDeadline.IsZero() {
p.broadcastAfter(time.Until(p.rDeadline)) p.broadcastAfter(time.Until(p.rDeadline))
} }
} else { } else {
p.rDeadline = time.Now().Add(p.wtTimeout)
p.broadcastAfter(p.wtTimeout) p.broadcastAfter(p.wtTimeout)
} }