From cbd71fae6d549179222a6f20939d58322b6e0293 Mon Sep 17 00:00:00 2001 From: Andy Wang Date: Sat, 26 Dec 2020 16:34:25 +0000 Subject: [PATCH] Control flow optimisations --- internal/multiplex/datagramBufferedPipe.go | 13 ++------- internal/multiplex/streamBufferedPipe.go | 11 +------- internal/multiplex/switchboard.go | 33 ++++++++++------------ 3 files changed, 18 insertions(+), 39 deletions(-) diff --git a/internal/multiplex/datagramBufferedPipe.go b/internal/multiplex/datagramBufferedPipe.go index a7b99e4..7082264 100644 --- a/internal/multiplex/datagramBufferedPipe.go +++ b/internal/multiplex/datagramBufferedPipe.go @@ -13,8 +13,7 @@ import ( // instead of byte-oriented. The integrity of datagrams written into this buffer is preserved. // it won't get chopped up into individual bytes type datagramBufferedPipe struct { - pLens []int - // lazily allocated + pLens []int buf *bytes.Buffer closed bool rwCond *sync.Cond @@ -27,6 +26,7 @@ type datagramBufferedPipe struct { func NewDatagramBufferedPipe() *datagramBufferedPipe { d := &datagramBufferedPipe{ rwCond: sync.NewCond(&sync.Mutex{}), + buf: new(bytes.Buffer), } return d } @@ -34,9 +34,6 @@ func NewDatagramBufferedPipe() *datagramBufferedPipe { func (d *datagramBufferedPipe) Read(target []byte) (int, error) { d.rwCond.L.Lock() defer d.rwCond.L.Unlock() - if d.buf == nil { - d.buf = new(bytes.Buffer) - } for { if d.closed && len(d.pLens) == 0 { return 0, io.EOF @@ -72,9 +69,6 @@ func (d *datagramBufferedPipe) Read(target []byte) (int, error) { func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { d.rwCond.L.Lock() defer d.rwCond.L.Unlock() - if d.buf == nil { - d.buf = new(bytes.Buffer) - } for { if d.closed && len(d.pLens) == 0 { return 0, io.EOF @@ -115,9 +109,6 @@ func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { func (d *datagramBufferedPipe) Write(f *Frame) (toBeClosed bool, err error) { d.rwCond.L.Lock() defer d.rwCond.L.Unlock() - if d.buf == nil { - d.buf = new(bytes.Buffer) - } for { if d.closed { return true, io.ErrClosedPipe diff --git a/internal/multiplex/streamBufferedPipe.go b/internal/multiplex/streamBufferedPipe.go index 66dacec..0dd3e46 100644 --- a/internal/multiplex/streamBufferedPipe.go +++ b/internal/multiplex/streamBufferedPipe.go @@ -11,7 +11,6 @@ import ( // The point of a streamBufferedPipe is that Read() will block until data is available type streamBufferedPipe struct { - // only alloc when on first Read or Write buf *bytes.Buffer closed bool @@ -25,6 +24,7 @@ type streamBufferedPipe struct { func NewStreamBufferedPipe() *streamBufferedPipe { p := &streamBufferedPipe{ rwCond: sync.NewCond(&sync.Mutex{}), + buf: new(bytes.Buffer), } return p } @@ -32,9 +32,6 @@ func NewStreamBufferedPipe() *streamBufferedPipe { func (p *streamBufferedPipe) Read(target []byte) (int, error) { p.rwCond.L.Lock() defer p.rwCond.L.Unlock() - if p.buf == nil { - p.buf = new(bytes.Buffer) - } for { if p.closed && p.buf.Len() == 0 { return 0, io.EOF @@ -64,9 +61,6 @@ func (p *streamBufferedPipe) Read(target []byte) (int, error) { func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { p.rwCond.L.Lock() defer p.rwCond.L.Unlock() - if p.buf == nil { - p.buf = new(bytes.Buffer) - } for { if p.closed && p.buf.Len() == 0 { return 0, io.EOF @@ -104,9 +98,6 @@ func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { func (p *streamBufferedPipe) Write(input []byte) (int, error) { p.rwCond.L.Lock() defer p.rwCond.L.Unlock() - if p.buf == nil { - p.buf = new(bytes.Buffer) - } for { if p.closed { return 0, io.ErrClosedPipe diff --git a/internal/multiplex/switchboard.go b/internal/multiplex/switchboard.go index 6b254f6..84e43c9 100644 --- a/internal/multiplex/switchboard.go +++ b/internal/multiplex/switchboard.go @@ -72,46 +72,43 @@ func (sb *switchboard) addConn(conn net.Conn) { // a pointer to connId is passed here so that the switchboard can reassign it if that connId isn't usable func (sb *switchboard) send(data []byte, connId *uint32) (n int, err error) { - writeAndRegUsage := func(conn net.Conn, d []byte) (int, error) { - n, err = conn.Write(d) - if err != nil { - sb.conns.Delete(*connId) - sb.session.SetTerminalMsg("failed to write to remote " + err.Error()) - sb.session.passiveClose() - return n, err - } - sb.valve.AddTx(int64(n)) - return n, nil - } - sb.valve.txWait(len(data)) if atomic.LoadUint32(&sb.broken) == 1 || sb.connsCount() == 0 { return 0, errBrokenSwitchboard } + var conn net.Conn switch sb.strategy { case UNIFORM_SPREAD: - _, conn, err := sb.pickRandConn() + _, conn, err = sb.pickRandConn() if err != nil { return 0, errBrokenSwitchboard } - return writeAndRegUsage(conn, data) case FIXED_CONN_MAPPING: connI, ok := sb.conns.Load(*connId) if ok { - conn := connI.(net.Conn) - return writeAndRegUsage(conn, data) + conn = connI.(net.Conn) } else { - newConnId, conn, err := sb.pickRandConn() + var newConnId uint32 + newConnId, conn, err = sb.pickRandConn() if err != nil { return 0, errBrokenSwitchboard } *connId = newConnId - return writeAndRegUsage(conn, data) } default: return 0, errors.New("unsupported traffic distribution strategy") } + + n, err = conn.Write(data) + if err != nil { + sb.conns.Delete(*connId) + sb.session.SetTerminalMsg("failed to write to remote " + err.Error()) + sb.session.passiveClose() + return n, err + } + sb.valve.AddTx(int64(n)) + return n, nil } // returns a random connId