diff --git a/internal/multiplex/datagramBuffer.go b/internal/multiplex/datagramBufferedPipe.go similarity index 81% rename from internal/multiplex/datagramBuffer.go rename to internal/multiplex/datagramBufferedPipe.go index e802e0a..0846849 100644 --- a/internal/multiplex/datagramBuffer.go +++ b/internal/multiplex/datagramBufferedPipe.go @@ -9,10 +9,10 @@ import ( "time" ) -// datagramBuffer is the same as bufferedPipe with the exception that it's message-oriented, +// datagramBufferedPipe is the same as streamBufferedPipe with the exception that it's message-oriented, // instead of byte-oriented. The integrity of datagrams written into this buffer is preserved. // it won't get chopped up into individual bytes -type datagramBuffer struct { +type datagramBufferedPipe struct { pLens []int buf *bytes.Buffer closed bool @@ -21,14 +21,14 @@ type datagramBuffer struct { rDeadline time.Time } -func NewDatagramBuffer() *datagramBuffer { - d := &datagramBuffer{ +func NewDatagramBufferedPipe() *datagramBufferedPipe { + d := &datagramBufferedPipe{ rwCond: sync.NewCond(&sync.Mutex{}), } return d } -func (d *datagramBuffer) Read(target []byte) (int, error) { +func (d *datagramBufferedPipe) Read(target []byte) (int, error) { d.rwCond.L.Lock() defer d.rwCond.L.Unlock() if d.buf == nil { @@ -63,7 +63,7 @@ func (d *datagramBuffer) Read(target []byte) (int, error) { return dataLen, nil } -func (d *datagramBuffer) WriteTo(w io.Writer) (n int64, err error) { +func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { d.rwCond.L.Lock() defer d.rwCond.L.Unlock() if d.buf == nil { @@ -104,7 +104,7 @@ func (d *datagramBuffer) WriteTo(w io.Writer) (n int64, err error) { } } -func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) { +func (d *datagramBufferedPipe) Write(f Frame) (toBeClosed bool, err error) { d.rwCond.L.Lock() defer d.rwCond.L.Unlock() if d.buf == nil { @@ -135,7 +135,7 @@ func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) { return false, nil } -func (d *datagramBuffer) Close() error { +func (d *datagramBufferedPipe) Close() error { d.rwCond.L.Lock() defer d.rwCond.L.Unlock() @@ -144,7 +144,7 @@ func (d *datagramBuffer) Close() error { return nil } -func (d *datagramBuffer) SetReadDeadline(t time.Time) { +func (d *datagramBufferedPipe) SetReadDeadline(t time.Time) { d.rwCond.L.Lock() defer d.rwCond.L.Unlock() @@ -152,7 +152,7 @@ func (d *datagramBuffer) SetReadDeadline(t time.Time) { d.rwCond.Broadcast() } -func (d *datagramBuffer) SetWriteToTimeout(t time.Duration) { +func (d *datagramBufferedPipe) SetWriteToTimeout(t time.Duration) { d.rwCond.L.Lock() defer d.rwCond.L.Unlock() diff --git a/internal/multiplex/datagramBuffer_test.go b/internal/multiplex/datagramBufferedPipe_test.go similarity index 92% rename from internal/multiplex/datagramBuffer_test.go rename to internal/multiplex/datagramBufferedPipe_test.go index cc39b56..4f7e906 100644 --- a/internal/multiplex/datagramBuffer_test.go +++ b/internal/multiplex/datagramBufferedPipe_test.go @@ -9,7 +9,7 @@ import ( func TestDatagramBuffer_RW(t *testing.T) { b := []byte{0x01, 0x02, 0x03} t.Run("simple write", func(t *testing.T) { - pipe := NewDatagramBuffer() + pipe := NewDatagramBufferedPipe() _, err := pipe.Write(Frame{Payload: b}) if err != nil { t.Error( @@ -21,7 +21,7 @@ func TestDatagramBuffer_RW(t *testing.T) { }) t.Run("simple read", func(t *testing.T) { - pipe := NewDatagramBuffer() + pipe := NewDatagramBufferedPipe() _, _ = pipe.Write(Frame{Payload: b}) b2 := make([]byte, len(b)) n, err := pipe.Read(b2) @@ -54,7 +54,7 @@ func TestDatagramBuffer_RW(t *testing.T) { }) t.Run("writing closing frame", func(t *testing.T) { - pipe := NewDatagramBuffer() + pipe := NewDatagramBufferedPipe() toBeClosed, err := pipe.Write(Frame{Closing: C_STREAM}) if !toBeClosed { t.Error("should be to be closed") @@ -73,7 +73,7 @@ func TestDatagramBuffer_RW(t *testing.T) { } func TestDatagramBuffer_BlockingRead(t *testing.T) { - pipe := NewDatagramBuffer() + pipe := NewDatagramBufferedPipe() b := []byte{0x01, 0x02, 0x03} go func() { time.Sleep(100 * time.Millisecond) @@ -108,7 +108,7 @@ func TestDatagramBuffer_BlockingRead(t *testing.T) { } func TestDatagramBuffer_CloseThenRead(t *testing.T) { - pipe := NewDatagramBuffer() + pipe := NewDatagramBufferedPipe() b := []byte{0x01, 0x02, 0x03} pipe.Write(Frame{Payload: b}) b2 := make([]byte, len(b)) diff --git a/internal/multiplex/obfs.go b/internal/multiplex/obfs.go index 18b1499..5711e48 100644 --- a/internal/multiplex/obfs.go +++ b/internal/multiplex/obfs.go @@ -32,11 +32,17 @@ type Obfuscator struct { // Used in Stream.Write. Add multiplexing headers, encrypt and add TLS header Obfs Obfser // Remove TLS header, decrypt and unmarshall frames - Deobfs Deobfser - SessionKey [32]byte - minOverhead int + Deobfs Deobfser + SessionKey [32]byte + + maxOverhead int } +// MakeObfs returns a function of type Obfser. An Obfser takes three arguments: +// a *Frame with all the field set correctly, a []byte as buffer to put encrypted +// message in, and an int called payloadOffsetInBuf to be used when *Frame.payload +// is in the byte slice used as buffer (2nd argument). payloadOffsetInBuf specifies +// the index at which data belonging to *Frame.Payload starts in the buffer. func MakeObfs(salsaKey [32]byte, payloadCipher cipher.AEAD) Obfser { obfs := func(f *Frame, buf []byte, payloadOffsetInBuf int) (int, error) { // we need the encrypted data to be at least 8 bytes to be used as nonce for salsa20 stream header encryption @@ -48,7 +54,9 @@ func MakeObfs(salsaKey [32]byte, payloadCipher cipher.AEAD) Obfser { } var extraLen int if payloadCipher == nil { - if extraLen = 8 - payloadLen; extraLen < 0 { + extraLen = 8 - payloadLen + if extraLen < 0 { + // if our payload is already greater than 8 bytes extraLen = 0 } } else { @@ -92,6 +100,9 @@ func MakeObfs(salsaKey [32]byte, payloadCipher cipher.AEAD) Obfser { return obfs } +// MakeDeobfs returns a function Deobfser. A Deobfser takes in a single byte slice, +// containing the message to be decrypted, and returns a *Frame containing the frame +// information and plaintext func MakeDeobfs(salsaKey [32]byte, payloadCipher cipher.AEAD) Deobfser { // stream header length + minimum data size (i.e. nonce size of salsa20) const minInputLen = HEADER_LEN + 8 @@ -151,7 +162,7 @@ func MakeObfuscator(encryptionMethod byte, sessionKey [32]byte) (obfuscator Obfu switch encryptionMethod { case E_METHOD_PLAIN: payloadCipher = nil - obfuscator.minOverhead = 0 + obfuscator.maxOverhead = 0 case E_METHOD_AES_GCM: var c cipher.Block c, err = aes.NewCipher(sessionKey[:]) @@ -162,13 +173,13 @@ func MakeObfuscator(encryptionMethod byte, sessionKey [32]byte) (obfuscator Obfu if err != nil { return } - obfuscator.minOverhead = payloadCipher.Overhead() + obfuscator.maxOverhead = payloadCipher.Overhead() case E_METHOD_CHACHA20_POLY1305: payloadCipher, err = chacha20poly1305.New(sessionKey[:]) if err != nil { return } - obfuscator.minOverhead = payloadCipher.Overhead() + obfuscator.maxOverhead = payloadCipher.Overhead() default: return obfuscator, errors.New("Unknown encryption method") } diff --git a/internal/multiplex/recvBuffer.go b/internal/multiplex/recvBuffer.go index 414cae2..d951682 100644 --- a/internal/multiplex/recvBuffer.go +++ b/internal/multiplex/recvBuffer.go @@ -1,15 +1,23 @@ package multiplex import ( + "errors" "io" "time" ) +var ErrTimeout = errors.New("deadline exceeded") + type recvBuffer interface { // Read calls' err must be nil | io.EOF | io.ErrShortBuffer + // Read should NOT return error on a closed streamBuffer with a non-empty buffer. + // Instead, it should behave as if it hasn't been closed. Closure is only relevant + // when the buffer is empty. io.ReadCloser io.WriterTo Write(Frame) (toBeClosed bool, err error) SetReadDeadline(time time.Time) + // SetWriteToTimeout sets the duration a recvBuffer waits in a WriteTo call when nothing + // has been written for a while. After that duration it should return ErrTimeout SetWriteToTimeout(d time.Duration) } diff --git a/internal/multiplex/session.go b/internal/multiplex/session.go index a53876e..4cd370d 100644 --- a/internal/multiplex/session.go +++ b/internal/multiplex/session.go @@ -34,7 +34,8 @@ type SessionConfig struct { Singleplex bool - MaxFrameSize int // maximum size of the frame, including the header + // maximum size of Frame.Payload + MaxFrameSize int SendBufferSize int ReceiveBufferSize int } @@ -64,7 +65,8 @@ type Session struct { terminalMsg atomic.Value - maxStreamUnitWrite int // the max size passed to Write calls before it splits it into multiple frames + // the max size passed to Write calls before it splits it into multiple frames + maxStreamUnitWrite int } func MakeSession(id uint32, config SessionConfig) *Session { @@ -89,7 +91,7 @@ func MakeSession(id uint32, config SessionConfig) *Session { sesh.MaxFrameSize = defaultSendRecvBufSize - 1024 } // todo: validation. this must be smaller than the buffer sizes - sesh.maxStreamUnitWrite = sesh.MaxFrameSize - HEADER_LEN - sesh.Obfuscator.minOverhead + sesh.maxStreamUnitWrite = sesh.MaxFrameSize - HEADER_LEN - sesh.Obfuscator.maxOverhead sbConfig := switchboardConfig{ valve: sesh.Valve, @@ -156,7 +158,7 @@ func (sesh *Session) closeStream(s *Stream, active bool) error { if atomic.SwapUint32(&s.closed, 1) == 1 { return fmt.Errorf("closing stream %v: %w", s.id, errRepeatStreamClosing) } - _ = s.recvBuf.Close() // both datagramBuffer and streamBuffer won't return err on Close() + _ = s.recvBuf.Close() // recvBuf.Close should not return error if active { // Notify remote that this stream is closed @@ -291,6 +293,7 @@ func (sesh *Session) Close() error { return true }) + // we send a notice frame telling remote to close the session pad := genRandomPadding() f := &Frame{ StreamID: 0xffffffff, diff --git a/internal/multiplex/session_test.go b/internal/multiplex/session_test.go index 69924a6..3fa6f8f 100644 --- a/internal/multiplex/session_test.go +++ b/internal/multiplex/session_test.go @@ -335,7 +335,7 @@ func TestRecvDataFromRemote_Closing_OutOfOrder(t *testing.T) { } } -func TestParallel(t *testing.T) { +func TestParallelStreams(t *testing.T) { rand.Seed(0) var sessionKey [32]byte diff --git a/internal/multiplex/stream.go b/internal/multiplex/stream.go index 1e2810b..9c3bb88 100644 --- a/internal/multiplex/stream.go +++ b/internal/multiplex/stream.go @@ -43,7 +43,7 @@ type Stream struct { func makeStream(sesh *Session, id uint32) *Stream { var recvBuf recvBuffer if sesh.Unordered { - recvBuf = NewDatagramBuffer() + recvBuf = NewDatagramBufferedPipe() } else { recvBuf = NewStreamBuffer() } diff --git a/internal/multiplex/streamBuffer.go b/internal/multiplex/streamBuffer.go index 31c5937..9e5f53c 100644 --- a/internal/multiplex/streamBuffer.go +++ b/internal/multiplex/streamBuffer.go @@ -48,13 +48,17 @@ type streamBuffer struct { nextRecvSeq uint64 sh sorterHeap - buf *bufferedPipe + buf *streamBufferedPipe } +// streamBuffer is a wrapper around streamBufferedPipe. +// Its main function is to sort frames in order, and wait for frames to arrive +// if they have arrived out-of-order. Then it writes the payload of frames into +// a streamBufferedPipe. func NewStreamBuffer() *streamBuffer { sb := &streamBuffer{ sh: []*Frame{}, - buf: NewBufferedPipe(), + buf: NewStreamBufferedPipe(), } return sb } diff --git a/internal/multiplex/bufferedPipe.go b/internal/multiplex/streamBufferedPipe.go similarity index 80% rename from internal/multiplex/bufferedPipe.go rename to internal/multiplex/streamBufferedPipe.go index ea3f87e..89a58a6 100644 --- a/internal/multiplex/bufferedPipe.go +++ b/internal/multiplex/streamBufferedPipe.go @@ -4,7 +4,6 @@ package multiplex import ( "bytes" - "errors" "io" "sync" "time" @@ -12,10 +11,8 @@ import ( const BUF_SIZE_LIMIT = 1 << 20 * 500 -var ErrTimeout = errors.New("deadline exceeded") - -// The point of a bufferedPipe is that Read() will block until data is available -type bufferedPipe struct { +// The point of a streamBufferedPipe is that Read() will block until data is available +type streamBufferedPipe struct { // only alloc when on first Read or Write buf *bytes.Buffer @@ -25,14 +22,14 @@ type bufferedPipe struct { wtTimeout time.Duration } -func NewBufferedPipe() *bufferedPipe { - p := &bufferedPipe{ +func NewStreamBufferedPipe() *streamBufferedPipe { + p := &streamBufferedPipe{ rwCond: sync.NewCond(&sync.Mutex{}), } return p } -func (p *bufferedPipe) Read(target []byte) (int, error) { +func (p *streamBufferedPipe) Read(target []byte) (int, error) { p.rwCond.L.Lock() defer p.rwCond.L.Unlock() if p.buf == nil { @@ -60,7 +57,7 @@ func (p *bufferedPipe) Read(target []byte) (int, error) { return n, err } -func (p *bufferedPipe) WriteTo(w io.Writer) (n int64, err error) { +func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) { p.rwCond.L.Lock() defer p.rwCond.L.Unlock() if p.buf == nil { @@ -98,7 +95,7 @@ func (p *bufferedPipe) WriteTo(w io.Writer) (n int64, err error) { } } -func (p *bufferedPipe) Write(input []byte) (int, error) { +func (p *streamBufferedPipe) Write(input []byte) (int, error) { p.rwCond.L.Lock() defer p.rwCond.L.Unlock() if p.buf == nil { @@ -120,7 +117,7 @@ func (p *bufferedPipe) Write(input []byte) (int, error) { return n, err } -func (p *bufferedPipe) Close() error { +func (p *streamBufferedPipe) Close() error { p.rwCond.L.Lock() defer p.rwCond.L.Unlock() @@ -129,7 +126,7 @@ func (p *bufferedPipe) Close() error { return nil } -func (p *bufferedPipe) SetReadDeadline(t time.Time) { +func (p *streamBufferedPipe) SetReadDeadline(t time.Time) { p.rwCond.L.Lock() defer p.rwCond.L.Unlock() @@ -137,7 +134,7 @@ func (p *bufferedPipe) SetReadDeadline(t time.Time) { p.rwCond.Broadcast() } -func (p *bufferedPipe) SetWriteToTimeout(d time.Duration) { +func (p *streamBufferedPipe) SetWriteToTimeout(d time.Duration) { p.rwCond.L.Lock() defer p.rwCond.L.Unlock() diff --git a/internal/multiplex/bufferedPipe_test.go b/internal/multiplex/streamBufferedPipe_test.go similarity index 94% rename from internal/multiplex/bufferedPipe_test.go rename to internal/multiplex/streamBufferedPipe_test.go index 85b5f54..dcaad29 100644 --- a/internal/multiplex/bufferedPipe_test.go +++ b/internal/multiplex/streamBufferedPipe_test.go @@ -8,7 +8,7 @@ import ( ) func TestPipeRW(t *testing.T) { - pipe := NewBufferedPipe() + pipe := NewStreamBufferedPipe() b := []byte{0x01, 0x02, 0x03} n, err := pipe.Write(b) if n != len(b) { @@ -57,7 +57,7 @@ func TestPipeRW(t *testing.T) { } func TestReadBlock(t *testing.T) { - pipe := NewBufferedPipe() + pipe := NewStreamBufferedPipe() b := []byte{0x01, 0x02, 0x03} go func() { time.Sleep(100 * time.Millisecond) @@ -92,7 +92,7 @@ func TestReadBlock(t *testing.T) { } func TestPartialRead(t *testing.T) { - pipe := NewBufferedPipe() + pipe := NewStreamBufferedPipe() b := []byte{0x01, 0x02, 0x03} pipe.Write(b) b1 := make([]byte, 1) @@ -148,7 +148,7 @@ func TestPartialRead(t *testing.T) { } func TestReadAfterClose(t *testing.T) { - pipe := NewBufferedPipe() + pipe := NewStreamBufferedPipe() b := []byte{0x01, 0x02, 0x03} pipe.Write(b) b2 := make([]byte, len(b)) @@ -184,7 +184,7 @@ func BenchmarkBufferedPipe_RW(b *testing.B) { testData := make([]byte, PAYLOAD_LEN) rand.Read(testData) - pipe := NewBufferedPipe() + pipe := NewStreamBufferedPipe() smallBuf := make([]byte, PAYLOAD_LEN-10) go func() { diff --git a/internal/multiplex/switchboard.go b/internal/multiplex/switchboard.go index 3a2b6e7..320700d 100644 --- a/internal/multiplex/switchboard.go +++ b/internal/multiplex/switchboard.go @@ -20,7 +20,12 @@ type switchboardConfig struct { recvBufferSize int } -// switchboard is responsible for keeping the reference of TCP connections between client and server +// switchboard is responsible for managing TCP connections between client and server. +// It has several purposes: constantly receiving incoming data from all connections +// and pass them to Session.recvDataFromRemote(); accepting data through +// switchboard.send(), in which it selects a connection according to its +// switchboardStrategy and send the data off using that; and counting, as well as +// rate limiting, data received and sent through its Valve. type switchboard struct { session *Session