diff --git a/internal/multiplex/session.go b/internal/multiplex/session.go index 948d323..f347ea7 100644 --- a/internal/multiplex/session.go +++ b/internal/multiplex/session.go @@ -124,11 +124,7 @@ func (sesh *Session) OpenStream() (*Stream, error) { } id := atomic.AddUint32(&sesh.nextStreamID, 1) - 1 // Because atomic.AddUint32 returns the value after incrementation - connId, _, err := sesh.sb.pickRandConn() - if err != nil { - return nil, err - } - stream := makeStream(sesh, id, connId) + stream := makeStream(sesh, id) sesh.streams.Store(id, stream) sesh.streamCountIncr() log.Tracef("stream %v of session %v opened", id, sesh.id) @@ -202,9 +198,7 @@ func (sesh *Session) recvDataFromRemote(data []byte) error { return sesh.passiveClose() } - connId, _, _ := sesh.sb.pickRandConn() - // we ignore the error here. If the switchboard is broken, it will be reflected upon stream.Write - newStream := makeStream(sesh, frame.StreamID, connId) + newStream := makeStream(sesh, frame.StreamID) existingStreamI, existing := sesh.streams.LoadOrStore(frame.StreamID, newStream) if existing { if existingStreamI == nil { diff --git a/internal/multiplex/stream.go b/internal/multiplex/stream.go index a4d5dd6..0350d31 100644 --- a/internal/multiplex/stream.go +++ b/internal/multiplex/stream.go @@ -39,7 +39,7 @@ type Stream struct { assignedConnId uint32 } -func makeStream(sesh *Session, id uint32, assignedConnId uint32) *Stream { +func makeStream(sesh *Session, id uint32) *Stream { var recvBuf recvBuffer if sesh.Unordered { recvBuf = NewDatagramBuffer() @@ -48,10 +48,9 @@ func makeStream(sesh *Session, id uint32, assignedConnId uint32) *Stream { } stream := &Stream{ - id: id, - session: sesh, - recvBuf: recvBuf, - assignedConnId: assignedConnId, + id: id, + session: sesh, + recvBuf: recvBuf, } return stream diff --git a/internal/multiplex/switchboard.go b/internal/multiplex/switchboard.go index 6f57666..407d273 100644 --- a/internal/multiplex/switchboard.go +++ b/internal/multiplex/switchboard.go @@ -38,6 +38,7 @@ func makeSwitchboard(sesh *Session, config switchboardConfig) *switchboard { sb := &switchboard{ session: sesh, switchboardConfig: config, + nextConnId: 1, } return sb } @@ -160,6 +161,7 @@ func (sb *switchboard) deplex(connId uint32, conn net.Conn) { sb.valve.AddRx(int64(n)) if err != nil { log.Debugf("a connection for session %v has closed: %v", sb.session.id, err) + sb.conns.Delete(connId) sb.close("a connection has dropped unexpectedly") return }