diff --git a/internal/multiplex/datagramBufferedPipe.go b/internal/multiplex/datagramBufferedPipe.go index e804c41..7ee2fe4 100644 --- a/internal/multiplex/datagramBufferedPipe.go +++ b/internal/multiplex/datagramBufferedPipe.go @@ -21,12 +21,14 @@ type datagramBufferedPipe struct { wtTimeout time.Duration rDeadline time.Time - timeoutTimer *time.Timer + timeoutTimer *time.Timer + enforceReadDeadline bool } func NewDatagramBufferedPipe() *datagramBufferedPipe { d := &datagramBufferedPipe{ rwCond: sync.NewCond(&sync.Mutex{}), + enforceReadDeadline: true, } return d } @@ -42,7 +44,7 @@ func (d *datagramBufferedPipe) Read(target []byte) (int, error) { return 0, io.EOF } - hasRDeadline := !d.rDeadline.IsZero() + hasRDeadline := !d.rDeadline.IsZero() && d.enforceReadDeadline if hasRDeadline { if time.Until(d.rDeadline) <= 0 { return 0, ErrTimeout @@ -65,6 +67,7 @@ func (d *datagramBufferedPipe) Read(target []byte) (int, error) { d.pLens = d.pLens[1:] d.buf.Read(target[:dataLen]) // err will always be nil because we have already verified that buf.Len() != 0 + d.enforceReadDeadline = false d.rwCond.Broadcast() return dataLen, nil } diff --git a/internal/multiplex/streamBufferedPipe.go b/internal/multiplex/streamBufferedPipe.go index cb51574..7737da5 100644 --- a/internal/multiplex/streamBufferedPipe.go +++ b/internal/multiplex/streamBufferedPipe.go @@ -19,12 +19,14 @@ type streamBufferedPipe struct { rDeadline time.Time wtTimeout time.Duration - timeoutTimer *time.Timer + timeoutTimer *time.Timer + enforceReadDeadline bool } func NewStreamBufferedPipe() *streamBufferedPipe { p := &streamBufferedPipe{ rwCond: sync.NewCond(&sync.Mutex{}), + enforceReadDeadline: true, } return p } @@ -40,7 +42,7 @@ func (p *streamBufferedPipe) Read(target []byte) (int, error) { return 0, io.EOF } - hasRDeadline := !p.rDeadline.IsZero() + hasRDeadline := !p.rDeadline.IsZero() && p.enforceReadDeadline if hasRDeadline { if time.Until(p.rDeadline) <= 0 { return 0, ErrTimeout @@ -57,6 +59,7 @@ func (p *streamBufferedPipe) Read(target []byte) (int, error) { } n, err := p.buf.Read(target) // err will always be nil because we have already verified that buf.Len() != 0 + p.enforceReadDeadline = false p.rwCond.Broadcast() return n, err }