Assign connId only when needed

This commit is contained in:
Andy Wang 2020-04-12 12:51:00 +01:00
parent 14787a7057
commit f05cc19dbc
3 changed files with 8 additions and 13 deletions

View File

@ -124,11 +124,7 @@ func (sesh *Session) OpenStream() (*Stream, error) {
} }
id := atomic.AddUint32(&sesh.nextStreamID, 1) - 1 id := atomic.AddUint32(&sesh.nextStreamID, 1) - 1
// Because atomic.AddUint32 returns the value after incrementation // Because atomic.AddUint32 returns the value after incrementation
connId, _, err := sesh.sb.pickRandConn() stream := makeStream(sesh, id)
if err != nil {
return nil, err
}
stream := makeStream(sesh, id, connId)
sesh.streams.Store(id, stream) sesh.streams.Store(id, stream)
sesh.streamCountIncr() sesh.streamCountIncr()
log.Tracef("stream %v of session %v opened", id, sesh.id) log.Tracef("stream %v of session %v opened", id, sesh.id)
@ -202,9 +198,7 @@ func (sesh *Session) recvDataFromRemote(data []byte) error {
return sesh.passiveClose() return sesh.passiveClose()
} }
connId, _, _ := sesh.sb.pickRandConn() newStream := makeStream(sesh, frame.StreamID)
// we ignore the error here. If the switchboard is broken, it will be reflected upon stream.Write
newStream := makeStream(sesh, frame.StreamID, connId)
existingStreamI, existing := sesh.streams.LoadOrStore(frame.StreamID, newStream) existingStreamI, existing := sesh.streams.LoadOrStore(frame.StreamID, newStream)
if existing { if existing {
if existingStreamI == nil { if existingStreamI == nil {

View File

@ -39,7 +39,7 @@ type Stream struct {
assignedConnId uint32 assignedConnId uint32
} }
func makeStream(sesh *Session, id uint32, assignedConnId uint32) *Stream { func makeStream(sesh *Session, id uint32) *Stream {
var recvBuf recvBuffer var recvBuf recvBuffer
if sesh.Unordered { if sesh.Unordered {
recvBuf = NewDatagramBuffer() recvBuf = NewDatagramBuffer()
@ -48,10 +48,9 @@ func makeStream(sesh *Session, id uint32, assignedConnId uint32) *Stream {
} }
stream := &Stream{ stream := &Stream{
id: id, id: id,
session: sesh, session: sesh,
recvBuf: recvBuf, recvBuf: recvBuf,
assignedConnId: assignedConnId,
} }
return stream return stream

View File

@ -38,6 +38,7 @@ func makeSwitchboard(sesh *Session, config switchboardConfig) *switchboard {
sb := &switchboard{ sb := &switchboard{
session: sesh, session: sesh,
switchboardConfig: config, switchboardConfig: config,
nextConnId: 1,
} }
return sb return sb
} }
@ -160,6 +161,7 @@ func (sb *switchboard) deplex(connId uint32, conn net.Conn) {
sb.valve.AddRx(int64(n)) sb.valve.AddRx(int64(n))
if err != nil { if err != nil {
log.Debugf("a connection for session %v has closed: %v", sb.session.id, err) log.Debugf("a connection for session %v has closed: %v", sb.session.id, err)
sb.conns.Delete(connId)
sb.close("a connection has dropped unexpectedly") sb.close("a connection has dropped unexpectedly")
return return
} }