Merge branch 'fix-termination-of-long-downloads' into notsure2

This commit is contained in:
notsure2 2020-12-12 02:39:56 +02:00
commit 273e5a7750
3 changed files with 20 additions and 33 deletions

View File

@ -21,14 +21,12 @@ type datagramBufferedPipe struct {
wtTimeout time.Duration wtTimeout time.Duration
rDeadline time.Time rDeadline time.Time
timeoutTimer *time.Timer timeoutTimer *time.Timer
enforceReadDeadline bool
} }
func NewDatagramBufferedPipe() *datagramBufferedPipe { func NewDatagramBufferedPipe() *datagramBufferedPipe {
d := &datagramBufferedPipe{ d := &datagramBufferedPipe{
rwCond: sync.NewCond(&sync.Mutex{}), rwCond: sync.NewCond(&sync.Mutex{}),
enforceReadDeadline: true,
} }
return d return d
} }
@ -44,7 +42,7 @@ func (d *datagramBufferedPipe) Read(target []byte) (int, error) {
return 0, io.EOF return 0, io.EOF
} }
hasRDeadline := !d.rDeadline.IsZero() && d.enforceReadDeadline hasRDeadline := !d.rDeadline.IsZero()
if hasRDeadline { if hasRDeadline {
if time.Until(d.rDeadline) <= 0 { if time.Until(d.rDeadline) <= 0 {
return 0, ErrTimeout return 0, ErrTimeout
@ -67,7 +65,6 @@ func (d *datagramBufferedPipe) Read(target []byte) (int, error) {
d.pLens = d.pLens[1:] d.pLens = d.pLens[1:]
d.buf.Read(target[:dataLen]) d.buf.Read(target[:dataLen])
// err will always be nil because we have already verified that buf.Len() != 0 // err will always be nil because we have already verified that buf.Len() != 0
d.enforceReadDeadline = false
d.rwCond.Broadcast() d.rwCond.Broadcast()
return dataLen, nil return dataLen, nil
} }
@ -75,7 +72,6 @@ func (d *datagramBufferedPipe) Read(target []byte) (int, error) {
func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
d.rwCond.L.Lock() d.rwCond.L.Lock()
defer d.rwCond.L.Unlock() defer d.rwCond.L.Unlock()
enforceTimeout := true
if d.buf == nil { if d.buf == nil {
d.buf = new(bytes.Buffer) d.buf = new(bytes.Buffer)
} }
@ -84,12 +80,11 @@ func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
return 0, io.EOF return 0, io.EOF
} }
if !d.rDeadline.IsZero() && enforceTimeout && time.Until(d.rDeadline) <= 0 { hasRDeadline := !d.rDeadline.IsZero()
return 0, ErrTimeout if hasRDeadline {
} if time.Until(d.rDeadline) <= 0 {
return 0, ErrTimeout
if d.wtTimeout != 0 { }
d.rDeadline = time.Now().Add(d.wtTimeout)
} }
if len(d.pLens) > 0 { if len(d.pLens) > 0 {
@ -101,14 +96,14 @@ func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
d.rwCond.Broadcast() d.rwCond.Broadcast()
return n, er return n, er
} }
enforceTimeout = false
d.rwCond.Broadcast() d.rwCond.Broadcast()
} else { } else {
if d.wtTimeout == 0 { if d.wtTimeout == 0 {
if !d.rDeadline.IsZero() { if hasRDeadline {
d.broadcastAfter(time.Until(d.rDeadline)) d.broadcastAfter(time.Until(d.rDeadline))
} }
} else if enforceTimeout { } else {
d.rDeadline = time.Now().Add(d.wtTimeout)
d.broadcastAfter(d.wtTimeout) d.broadcastAfter(d.wtTimeout)
} }

View File

@ -19,14 +19,12 @@ type streamBufferedPipe struct {
rDeadline time.Time rDeadline time.Time
wtTimeout time.Duration wtTimeout time.Duration
timeoutTimer *time.Timer timeoutTimer *time.Timer
enforceReadDeadline bool
} }
func NewStreamBufferedPipe() *streamBufferedPipe { func NewStreamBufferedPipe() *streamBufferedPipe {
p := &streamBufferedPipe{ p := &streamBufferedPipe{
rwCond: sync.NewCond(&sync.Mutex{}), rwCond: sync.NewCond(&sync.Mutex{}),
enforceReadDeadline: true,
} }
return p return p
} }
@ -42,7 +40,7 @@ func (p *streamBufferedPipe) Read(target []byte) (int, error) {
return 0, io.EOF return 0, io.EOF
} }
hasRDeadline := !p.rDeadline.IsZero() && p.enforceReadDeadline hasRDeadline := !p.rDeadline.IsZero()
if hasRDeadline { if hasRDeadline {
if time.Until(p.rDeadline) <= 0 { if time.Until(p.rDeadline) <= 0 {
return 0, ErrTimeout return 0, ErrTimeout
@ -59,7 +57,6 @@ func (p *streamBufferedPipe) Read(target []byte) (int, error) {
} }
n, err := p.buf.Read(target) n, err := p.buf.Read(target)
// err will always be nil because we have already verified that buf.Len() != 0 // err will always be nil because we have already verified that buf.Len() != 0
p.enforceReadDeadline = false
p.rwCond.Broadcast() p.rwCond.Broadcast()
return n, err return n, err
} }
@ -67,7 +64,6 @@ func (p *streamBufferedPipe) Read(target []byte) (int, error) {
func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
p.rwCond.L.Lock() p.rwCond.L.Lock()
defer p.rwCond.L.Unlock() defer p.rwCond.L.Unlock()
enforceTimeout := true
if p.buf == nil { if p.buf == nil {
p.buf = new(bytes.Buffer) p.buf = new(bytes.Buffer)
} }
@ -76,14 +72,12 @@ func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
return 0, io.EOF return 0, io.EOF
} }
if !p.rDeadline.IsZero() && enforceTimeout && time.Until(p.rDeadline) <= 0 { hasRDeadline := !p.rDeadline.IsZero()
return 0, ErrTimeout if hasRDeadline {
if time.Until(p.rDeadline) <= 0 {
return 0, ErrTimeout
}
} }
if p.wtTimeout != 0 {
p.rDeadline = time.Now().Add(p.wtTimeout)
}
if p.buf.Len() > 0 { if p.buf.Len() > 0 {
written, er := p.buf.WriteTo(w) written, er := p.buf.WriteTo(w)
n += written n += written
@ -91,14 +85,14 @@ func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
p.rwCond.Broadcast() p.rwCond.Broadcast()
return n, er return n, er
} }
enforceTimeout = false
p.rwCond.Broadcast() p.rwCond.Broadcast()
} else { } else {
if p.wtTimeout == 0 { if p.wtTimeout == 0 {
if !p.rDeadline.IsZero() { if hasRDeadline {
p.broadcastAfter(time.Until(p.rDeadline)) p.broadcastAfter(time.Until(p.rDeadline))
} }
} else if enforceTimeout { } else {
p.rDeadline = time.Now().Add(p.wtTimeout)
p.broadcastAfter(p.wtTimeout) p.broadcastAfter(p.wtTimeout)
} }

View File

@ -275,8 +275,6 @@ func serveSession(sesh *mux.Session, ci ClientInfo, user *ActiveUser, sta *State
} }
log.Tracef("%v endpoint has been successfully connected", ci.ProxyMethod) log.Tracef("%v endpoint has been successfully connected", ci.ProxyMethod)
// if stream has nothing to send to proxy server for sta.Timeout period of time, stream will return error
newStream.(*mux.Stream).SetWriteToTimeout(sta.Timeout)
go func() { go func() {
if _, err := common.Copy(localConn, newStream); err != nil { if _, err := common.Copy(localConn, newStream); err != nil {
log.Tracef("copying stream to proxy server: %v", err) log.Tracef("copying stream to proxy server: %v", err)