From 9d6ab39a7737a43fa1f681a3fa42a6d47f176575 Mon Sep 17 00:00:00 2001 From: notsure2 Date: Sat, 12 Dec 2020 02:37:06 +0200 Subject: [PATCH] Revert changes to multiplex module. Remove timeout from caller. --- internal/multiplex/datagramBufferedPipe.go | 25 +++++++++------------ internal/multiplex/streamBufferedPipe.go | 26 +++++++++------------- internal/server/dispatcher.go | 2 -- 3 files changed, 20 insertions(+), 33 deletions(-) diff --git a/internal/multiplex/datagramBufferedPipe.go b/internal/multiplex/datagramBufferedPipe.go index 7ee2fe4..e1a0462 100644 --- a/internal/multiplex/datagramBufferedPipe.go +++ b/internal/multiplex/datagramBufferedPipe.go @@ -21,14 +21,12 @@ type datagramBufferedPipe struct { wtTimeout time.Duration rDeadline time.Time - timeoutTimer *time.Timer - enforceReadDeadline bool + timeoutTimer *time.Timer } func NewDatagramBufferedPipe() *datagramBufferedPipe { d := &datagramBufferedPipe{ rwCond: sync.NewCond(&sync.Mutex{}), - enforceReadDeadline: true, } return d } @@ -44,7 +42,7 @@ func (d *datagramBufferedPipe) Read(target []byte) (int, error) { return 0, io.EOF } - hasRDeadline := !d.rDeadline.IsZero() && d.enforceReadDeadline + hasRDeadline := !d.rDeadline.IsZero() if hasRDeadline { if time.Until(d.rDeadline) <= 0 { return 0, ErrTimeout @@ -67,7 +65,6 @@ func (d *datagramBufferedPipe) Read(target []byte) (int, error) { d.pLens = d.pLens[1:] d.buf.Read(target[:dataLen]) // err will always be nil because we have already verified that buf.Len() != 0 - d.enforceReadDeadline = false d.rwCond.Broadcast() return dataLen, nil } @@ -75,7 +72,6 @@ func (d *datagramBufferedPipe) Read(target []byte) (int, error) { func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { d.rwCond.L.Lock() defer d.rwCond.L.Unlock() - enforceTimeout := true if d.buf == nil { d.buf = new(bytes.Buffer) } @@ -84,12 +80,11 @@ func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { return 0, io.EOF } - if !d.rDeadline.IsZero() && enforceTimeout && time.Until(d.rDeadline) <= 0 { - return 0, ErrTimeout - } - - if d.wtTimeout != 0 { - d.rDeadline = time.Now().Add(d.wtTimeout) + hasRDeadline := !d.rDeadline.IsZero() + if hasRDeadline { + if time.Until(d.rDeadline) <= 0 { + return 0, ErrTimeout + } } if len(d.pLens) > 0 { @@ -101,14 +96,14 @@ func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { d.rwCond.Broadcast() return n, er } - enforceTimeout = false d.rwCond.Broadcast() } else { if d.wtTimeout == 0 { - if !d.rDeadline.IsZero() { + if hasRDeadline { d.broadcastAfter(time.Until(d.rDeadline)) } - } else if enforceTimeout { + } else { + d.rDeadline = time.Now().Add(d.wtTimeout) d.broadcastAfter(d.wtTimeout) } diff --git a/internal/multiplex/streamBufferedPipe.go b/internal/multiplex/streamBufferedPipe.go index 7737da5..66dacec 100644 --- a/internal/multiplex/streamBufferedPipe.go +++ b/internal/multiplex/streamBufferedPipe.go @@ -19,14 +19,12 @@ type streamBufferedPipe struct { rDeadline time.Time wtTimeout time.Duration - timeoutTimer *time.Timer - enforceReadDeadline bool + timeoutTimer *time.Timer } func NewStreamBufferedPipe() *streamBufferedPipe { p := &streamBufferedPipe{ rwCond: sync.NewCond(&sync.Mutex{}), - enforceReadDeadline: true, } return p } @@ -42,7 +40,7 @@ func (p *streamBufferedPipe) Read(target []byte) (int, error) { return 0, io.EOF } - hasRDeadline := !p.rDeadline.IsZero() && p.enforceReadDeadline + hasRDeadline := !p.rDeadline.IsZero() if hasRDeadline { if time.Until(p.rDeadline) <= 0 { return 0, ErrTimeout @@ -59,7 +57,6 @@ func (p *streamBufferedPipe) Read(target []byte) (int, error) { } n, err := p.buf.Read(target) // err will always be nil because we have already verified that buf.Len() != 0 - p.enforceReadDeadline = false p.rwCond.Broadcast() return n, err } @@ -67,7 +64,6 @@ func (p *streamBufferedPipe) Read(target []byte) (int, error) { func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { p.rwCond.L.Lock() defer p.rwCond.L.Unlock() - enforceTimeout := true if p.buf == nil { p.buf = new(bytes.Buffer) } @@ -76,14 +72,12 @@ func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { return 0, io.EOF } - if !p.rDeadline.IsZero() && enforceTimeout && time.Until(p.rDeadline) <= 0 { - return 0, ErrTimeout + hasRDeadline := !p.rDeadline.IsZero() + if hasRDeadline { + if time.Until(p.rDeadline) <= 0 { + return 0, ErrTimeout + } } - - if p.wtTimeout != 0 { - p.rDeadline = time.Now().Add(p.wtTimeout) - } - if p.buf.Len() > 0 { written, er := p.buf.WriteTo(w) n += written @@ -91,14 +85,14 @@ func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { p.rwCond.Broadcast() return n, er } - enforceTimeout = false p.rwCond.Broadcast() } else { if p.wtTimeout == 0 { - if !p.rDeadline.IsZero() { + if hasRDeadline { p.broadcastAfter(time.Until(p.rDeadline)) } - } else if enforceTimeout { + } else { + p.rDeadline = time.Now().Add(p.wtTimeout) p.broadcastAfter(p.wtTimeout) } diff --git a/internal/server/dispatcher.go b/internal/server/dispatcher.go index 56a556f..4fa0698 100644 --- a/internal/server/dispatcher.go +++ b/internal/server/dispatcher.go @@ -275,8 +275,6 @@ func serveSession(sesh *mux.Session, ci ClientInfo, user *ActiveUser, sta *State } log.Tracef("%v endpoint has been successfully connected", ci.ProxyMethod) - // if stream has nothing to send to proxy server for sta.Timeout period of time, stream will return error - newStream.(*mux.Stream).SetWriteToTimeout(sta.Timeout) go func() { if _, err := common.Copy(localConn, newStream); err != nil { log.Tracef("copying stream to proxy server: %v", err)