From 53c0c6bafec9c2b9883840c534024e68a73e88ad Mon Sep 17 00:00:00 2001 From: Andy Wang Date: Mon, 13 Apr 2020 22:48:28 +0100 Subject: [PATCH] Explicitly allow buf overlap in obfs --- internal/multiplex/obfs.go | 36 +++++++++------ internal/multiplex/obfs_test.go | 18 ++++---- internal/multiplex/session.go | 4 +- internal/multiplex/session_test.go | 30 ++++++------ internal/multiplex/stream.go | 8 ++-- internal/multiplex/stream_test.go | 63 +++++++++++++++----------- internal/multiplex/switchboard_test.go | 2 +- internal/test/integration_test.go | 38 ++++++---------- 8 files changed, 104 insertions(+), 95 deletions(-) diff --git a/internal/multiplex/obfs.go b/internal/multiplex/obfs.go index c771f0c..457a4a2 100644 --- a/internal/multiplex/obfs.go +++ b/internal/multiplex/obfs.go @@ -9,10 +9,9 @@ import ( "github.com/cbeuw/Cloak/internal/util" "golang.org/x/crypto/chacha20poly1305" "golang.org/x/crypto/salsa20" - "io" ) -type Obfser func(*Frame, []byte) (int, error) +type Obfser func(*Frame, []byte, int) (int, error) type Deobfser func([]byte) (*Frame, error) var u32 = binary.BigEndian.Uint32 @@ -39,13 +38,17 @@ type Obfuscator struct { } func MakeObfs(salsaKey [32]byte, payloadCipher cipher.AEAD) Obfser { - obfs := func(f *Frame, buf []byte) (int, error) { + obfs := func(f *Frame, buf []byte, payloadOffsetInBuf int) (int, error) { // we need the encrypted data to be at least 8 bytes to be used as nonce for salsa20 stream header encryption // this will be the case if the encryption method is an AEAD cipher, however for plain, it's well possible // that the frame payload is smaller than 8 bytes, so we need to add on the difference + payloadLen := len(f.Payload) + if payloadLen == 0 { + return 0, errors.New("payload cannot be empty") + } var extraLen int if payloadCipher == nil { - if extraLen = 8 - len(f.Payload); extraLen < 0 { + if extraLen = 8 - payloadLen; extraLen < 0 { extraLen = 0 } } else { @@ -54,31 +57,34 @@ func MakeObfs(salsaKey [32]byte, payloadCipher cipher.AEAD) Obfser { return 0, errors.New("AEAD's Overhead cannot be fewer than 8 bytes") } } - // usefulLen is the amount of bytes that will be eventually sent off - usefulLen := HEADER_LEN + len(f.Payload) + extraLen - if usefulLen < HEADER_LEN || len(buf) < usefulLen { // compiler hint to eliminate bound check - return 0, io.ErrShortBuffer + + usefulLen := HEADER_LEN + payloadLen + extraLen + if len(buf) < usefulLen { + return 0, errors.New("obfs buffer too small") } // we do as much in-place as possible to save allocation - header := buf[:HEADER_LEN] - encryptedPayloadWithExtra := buf[HEADER_LEN:usefulLen] + payload := buf[HEADER_LEN : HEADER_LEN+payloadLen] + if payloadOffsetInBuf != HEADER_LEN { + // if payload is not at the correct location in buffer + copy(payload, f.Payload) + } + header := buf[:HEADER_LEN] putU32(header[0:4], f.StreamID) putU64(header[4:12], f.Seq) header[12] = f.Closing header[13] = byte(extraLen) if payloadCipher == nil { - copy(encryptedPayloadWithExtra, f.Payload) if extraLen != 0 { // read nonce - util.CryptoRandRead(encryptedPayloadWithExtra[len(encryptedPayloadWithExtra)-extraLen:]) + extra := buf[usefulLen-extraLen : usefulLen] + util.CryptoRandRead(extra) } } else { - ciphertext := payloadCipher.Seal(nil, header[:12], f.Payload, nil) - copy(encryptedPayloadWithExtra, ciphertext) + payloadCipher.Seal(payload[:0], header[:12], payload, nil) } - nonce := encryptedPayloadWithExtra[len(encryptedPayloadWithExtra)-8:] + nonce := buf[usefulLen-8 : usefulLen] salsa20.XORKeyStream(header, header, nonce, &salsaKey) return usefulLen, nil diff --git a/internal/multiplex/obfs_test.go b/internal/multiplex/obfs_test.go index f5cc4a9..b95a314 100644 --- a/internal/multiplex/obfs_test.go +++ b/internal/multiplex/obfs_test.go @@ -20,7 +20,7 @@ func TestGenerateObfs(t *testing.T) { f := &Frame{} _testFrame, _ := quick.Value(reflect.TypeOf(f), rand.New(rand.NewSource(42))) testFrame := _testFrame.Interface().(*Frame) - i, err := obfuscator.Obfs(testFrame, obfsBuf) + i, err := obfuscator.Obfs(testFrame, obfsBuf, 0) if err != nil { ct.Error("failed to obfs ", err) return @@ -92,7 +92,7 @@ func BenchmarkObfs(b *testing.B) { b.SetBytes(int64(len(testFrame.Payload))) b.ResetTimer() for i := 0; i < b.N; i++ { - obfs(testFrame, obfsBuf) + obfs(testFrame, obfsBuf, 0) } }) b.Run("AES128GCM", func(b *testing.B) { @@ -103,7 +103,7 @@ func BenchmarkObfs(b *testing.B) { b.SetBytes(int64(len(testFrame.Payload))) b.ResetTimer() for i := 0; i < b.N; i++ { - obfs(testFrame, obfsBuf) + obfs(testFrame, obfsBuf, 0) } }) b.Run("plain", func(b *testing.B) { @@ -111,7 +111,7 @@ func BenchmarkObfs(b *testing.B) { b.SetBytes(int64(len(testFrame.Payload))) b.ResetTimer() for i := 0; i < b.N; i++ { - obfs(testFrame, obfsBuf) + obfs(testFrame, obfsBuf, 0) } }) b.Run("chacha20Poly1305", func(b *testing.B) { @@ -121,7 +121,7 @@ func BenchmarkObfs(b *testing.B) { b.SetBytes(int64(len(testFrame.Payload))) b.ResetTimer() for i := 0; i < b.N; i++ { - obfs(testFrame, obfsBuf) + obfs(testFrame, obfsBuf, 0) } }) } @@ -145,7 +145,7 @@ func BenchmarkDeobfs(b *testing.B) { payloadCipher, _ := cipher.NewGCM(c) obfs := MakeObfs(key, payloadCipher) - n, _ := obfs(testFrame, obfsBuf) + n, _ := obfs(testFrame, obfsBuf, 0) deobfs := MakeDeobfs(key, payloadCipher) b.SetBytes(int64(n)) @@ -159,7 +159,7 @@ func BenchmarkDeobfs(b *testing.B) { payloadCipher, _ := cipher.NewGCM(c) obfs := MakeObfs(key, payloadCipher) - n, _ := obfs(testFrame, obfsBuf) + n, _ := obfs(testFrame, obfsBuf, 0) deobfs := MakeDeobfs(key, payloadCipher) b.ResetTimer() @@ -170,7 +170,7 @@ func BenchmarkDeobfs(b *testing.B) { }) b.Run("plain", func(b *testing.B) { obfs := MakeObfs(key, nil) - n, _ := obfs(testFrame, obfsBuf) + n, _ := obfs(testFrame, obfsBuf, 0) deobfs := MakeDeobfs(key, nil) b.ResetTimer() @@ -183,7 +183,7 @@ func BenchmarkDeobfs(b *testing.B) { payloadCipher, _ := chacha20poly1305.New(key[:16]) obfs := MakeObfs(key, payloadCipher) - n, _ := obfs(testFrame, obfsBuf) + n, _ := obfs(testFrame, obfsBuf, 0) deobfs := MakeDeobfs(key, payloadCipher) b.ResetTimer() diff --git a/internal/multiplex/session.go b/internal/multiplex/session.go index 49572cc..e05fcf4 100644 --- a/internal/multiplex/session.go +++ b/internal/multiplex/session.go @@ -161,7 +161,7 @@ func (sesh *Session) closeStream(s *Stream, active bool) error { s.nextSendSeq++ obfsBuf := make([]byte, len(padding)+64) - i, err := sesh.Obfs(f, obfsBuf) + i, err := sesh.Obfs(f, obfsBuf, 0) if err != nil { return err } @@ -286,7 +286,7 @@ func (sesh *Session) Close() error { Payload: pad, } obfsBuf := make([]byte, len(pad)+64) - i, err := sesh.Obfs(f, obfsBuf) + i, err := sesh.Obfs(f, obfsBuf, 0) if err != nil { return err } diff --git a/internal/multiplex/session_test.go b/internal/multiplex/session_test.go index f1a2f55..69924a6 100644 --- a/internal/multiplex/session_test.go +++ b/internal/multiplex/session_test.go @@ -35,7 +35,7 @@ func TestRecvDataFromRemote(t *testing.T) { obfuscator, _ := MakeObfuscator(E_METHOD_PLAIN, sessionKey) seshConfigOrdered.Obfuscator = obfuscator sesh := MakeSession(0, seshConfigOrdered) - n, _ := sesh.Obfs(f, obfsBuf) + n, _ := sesh.Obfs(f, obfsBuf, 0) err := sesh.recvDataFromRemote(obfsBuf[:n]) if err != nil { @@ -62,7 +62,7 @@ func TestRecvDataFromRemote(t *testing.T) { obfuscator, _ := MakeObfuscator(E_METHOD_AES_GCM, sessionKey) seshConfigOrdered.Obfuscator = obfuscator sesh := MakeSession(0, seshConfigOrdered) - n, _ := sesh.Obfs(f, obfsBuf) + n, _ := sesh.Obfs(f, obfsBuf, 0) err := sesh.recvDataFromRemote(obfsBuf[:n]) if err != nil { @@ -89,7 +89,7 @@ func TestRecvDataFromRemote(t *testing.T) { obfuscator, _ := MakeObfuscator(E_METHOD_CHACHA20_POLY1305, sessionKey) seshConfigOrdered.Obfuscator = obfuscator sesh := MakeSession(0, seshConfigOrdered) - n, _ := sesh.Obfs(f, obfsBuf) + n, _ := sesh.Obfs(f, obfsBuf, 0) err := sesh.recvDataFromRemote(obfsBuf[:n]) if err != nil { @@ -117,7 +117,7 @@ func TestRecvDataFromRemote(t *testing.T) { obfuscator, _ := MakeObfuscator(E_METHOD_PLAIN, sessionKey) seshConfigUnordered.Obfuscator = obfuscator sesh := MakeSession(0, seshConfigOrdered) - n, _ := sesh.Obfs(f, obfsBuf) + n, _ := sesh.Obfs(f, obfsBuf, 0) err := sesh.recvDataFromRemote(obfsBuf[:n]) if err != nil { @@ -163,7 +163,7 @@ func TestRecvDataFromRemote_Closing_InOrder(t *testing.T) { testPayload, } // create stream 1 - n, _ := sesh.Obfs(f1, obfsBuf) + n, _ := sesh.Obfs(f1, obfsBuf, 0) err := sesh.recvDataFromRemote(obfsBuf[:n]) if err != nil { t.Fatalf("receiving normal frame for stream 1: %v", err) @@ -183,7 +183,7 @@ func TestRecvDataFromRemote_Closing_InOrder(t *testing.T) { C_NOOP, testPayload, } - n, _ = sesh.Obfs(f2, obfsBuf) + n, _ = sesh.Obfs(f2, obfsBuf, 0) err = sesh.recvDataFromRemote(obfsBuf[:n]) if err != nil { t.Fatalf("receiving normal frame for stream 2: %v", err) @@ -203,7 +203,7 @@ func TestRecvDataFromRemote_Closing_InOrder(t *testing.T) { C_STREAM, testPayload, } - n, _ = sesh.Obfs(f1CloseStream, obfsBuf) + n, _ = sesh.Obfs(f1CloseStream, obfsBuf, 0) err = sesh.recvDataFromRemote(obfsBuf[:n]) if err != nil { t.Fatalf("receiving stream closing frame for stream 1: %v", err) @@ -230,7 +230,7 @@ func TestRecvDataFromRemote_Closing_InOrder(t *testing.T) { } // close stream 1 again - n, _ = sesh.Obfs(f1CloseStream, obfsBuf) + n, _ = sesh.Obfs(f1CloseStream, obfsBuf, 0) err = sesh.recvDataFromRemote(obfsBuf[:n]) if err != nil { t.Fatalf("receiving stream closing frame for stream 1 %v", err) @@ -250,7 +250,7 @@ func TestRecvDataFromRemote_Closing_InOrder(t *testing.T) { Closing: C_SESSION, Payload: testPayload, } - n, _ = sesh.Obfs(fCloseSession, obfsBuf) + n, _ = sesh.Obfs(fCloseSession, obfsBuf, 0) err = sesh.recvDataFromRemote(obfsBuf[:n]) if err != nil { t.Fatalf("receiving session closing frame: %v", err) @@ -294,7 +294,7 @@ func TestRecvDataFromRemote_Closing_OutOfOrder(t *testing.T) { C_STREAM, testPayload, } - n, _ := sesh.Obfs(f1CloseStream, obfsBuf) + n, _ := sesh.Obfs(f1CloseStream, obfsBuf, 0) err := sesh.recvDataFromRemote(obfsBuf[:n]) if err != nil { t.Fatalf("receiving out of order stream closing frame for stream 1: %v", err) @@ -314,7 +314,7 @@ func TestRecvDataFromRemote_Closing_OutOfOrder(t *testing.T) { C_NOOP, testPayload, } - n, _ = sesh.Obfs(f1, obfsBuf) + n, _ = sesh.Obfs(f1, obfsBuf, 0) err = sesh.recvDataFromRemote(obfsBuf[:n]) if err != nil { t.Fatalf("receiving normal frame for stream 1: %v", err) @@ -375,7 +375,7 @@ func TestParallel(t *testing.T) { wg.Add(1) go func(frame *Frame) { data := make([]byte, 1000) - n, _ := sesh.Obfs(frame, data) + n, _ := sesh.Obfs(frame, data, 0) data = data[0:n] err := sesh.recvDataFromRemote(data) @@ -463,7 +463,7 @@ func BenchmarkRecvDataFromRemote_Ordered(b *testing.B) { obfuscator, _ := MakeObfuscator(E_METHOD_PLAIN, sessionKey) seshConfigOrdered.Obfuscator = obfuscator sesh := MakeSession(0, seshConfigOrdered) - n, _ := sesh.Obfs(f, obfsBuf) + n, _ := sesh.Obfs(f, obfsBuf, 0) b.SetBytes(int64(len(f.Payload))) b.ResetTimer() @@ -476,7 +476,7 @@ func BenchmarkRecvDataFromRemote_Ordered(b *testing.B) { obfuscator, _ := MakeObfuscator(E_METHOD_AES_GCM, sessionKey) seshConfigOrdered.Obfuscator = obfuscator sesh := MakeSession(0, seshConfigOrdered) - n, _ := sesh.Obfs(f, obfsBuf) + n, _ := sesh.Obfs(f, obfsBuf, 0) b.SetBytes(int64(len(f.Payload))) b.ResetTimer() @@ -489,7 +489,7 @@ func BenchmarkRecvDataFromRemote_Ordered(b *testing.B) { obfuscator, _ := MakeObfuscator(E_METHOD_CHACHA20_POLY1305, sessionKey) seshConfigOrdered.Obfuscator = obfuscator sesh := MakeSession(0, seshConfigOrdered) - n, _ := sesh.Obfs(f, obfsBuf) + n, _ := sesh.Obfs(f, obfsBuf, 0) b.SetBytes(int64(len(f.Payload))) b.ResetTimer() diff --git a/internal/multiplex/stream.go b/internal/multiplex/stream.go index 0b6974e..69dde45 100644 --- a/internal/multiplex/stream.go +++ b/internal/multiplex/stream.go @@ -90,9 +90,9 @@ func (s *Stream) WriteTo(w io.Writer) (int64, error) { return n, nil } -func (s *Stream) sendFrame(f *Frame) error { +func (s *Stream) sendFrame(f *Frame, framePayloadOffset int) error { var cipherTextLen int - cipherTextLen, err := s.session.Obfs(f, s.obfsBuf) + cipherTextLen, err := s.session.Obfs(f, s.obfsBuf, framePayloadOffset) if err != nil { return err } @@ -138,7 +138,7 @@ func (s *Stream) Write(in []byte) (n int, err error) { Payload: framePayload, } s.nextSendSeq++ - err = s.sendFrame(f) + err = s.sendFrame(f, 0) if err != nil { return } @@ -168,7 +168,7 @@ func (s *Stream) ReadFrom(r io.Reader) (n int64, err error) { Payload: s.obfsBuf[HEADER_LEN : HEADER_LEN+read], } s.nextSendSeq++ - err = s.sendFrame(f) + err = s.sendFrame(f, HEADER_LEN) s.writingM.Unlock() if err != nil { diff --git a/internal/multiplex/stream_test.go b/internal/multiplex/stream_test.go index 9d0fa27..77a1463 100644 --- a/internal/multiplex/stream_test.go +++ b/internal/multiplex/stream_test.go @@ -13,8 +13,8 @@ import ( const payloadLen = 1000 -func setupSesh(unordered bool, key [32]byte) *Session { - obfuscator, _ := MakeObfuscator(0x00, key) +func setupSesh(unordered bool, key [32]byte, encryptionMethod byte) *Session { + obfuscator, _ := MakeObfuscator(encryptionMethod, key) seshConfig := SessionConfig{ Obfuscator: obfuscator, @@ -28,16 +28,27 @@ func BenchmarkStream_Write_Ordered(b *testing.B) { hole := connutil.Discard() var sessionKey [32]byte rand.Read(sessionKey[:]) - sesh := setupSesh(false, sessionKey) - sesh.AddConnection(hole) - testData := make([]byte, payloadLen) - rand.Read(testData) - stream, _ := sesh.OpenStream() - b.SetBytes(payloadLen) - b.ResetTimer() - for i := 0; i < b.N; i++ { - stream.Write(testData) + const testDataLen = 65536 + testData := make([]byte, testDataLen) + rand.Read(testData) + eMethods := map[string]byte{ + "plain": E_METHOD_PLAIN, + "chacha20-poly1305": E_METHOD_CHACHA20_POLY1305, + "aes-gcm": E_METHOD_AES_GCM, + } + + for name, method := range eMethods { + b.Run(name, func(b *testing.B) { + sesh := setupSesh(false, sessionKey, method) + sesh.AddConnection(hole) + stream, _ := sesh.OpenStream() + b.SetBytes(testDataLen) + b.ResetTimer() + for i := 0; i < b.N; i++ { + stream.Write(testData) + } + }) } } @@ -96,7 +107,7 @@ func TestStream_Write(t *testing.T) { hole := connutil.Discard() var sessionKey [32]byte rand.Read(sessionKey[:]) - sesh := setupSesh(false, sessionKey) + sesh := setupSesh(false, sessionKey, E_METHOD_PLAIN) sesh.AddConnection(hole) testData := make([]byte, payloadLen) rand.Read(testData) @@ -115,8 +126,8 @@ func TestStream_WriteSync(t *testing.T) { // Close calls made after write MUST have a higher seq var sessionKey [32]byte rand.Read(sessionKey[:]) - clientSesh := setupSesh(false, sessionKey) - serverSesh := setupSesh(false, sessionKey) + clientSesh := setupSesh(false, sessionKey, E_METHOD_PLAIN) + serverSesh := setupSesh(false, sessionKey, E_METHOD_PLAIN) w, r := connutil.AsyncPipe() clientSesh.AddConnection(&common.TLSConn{Conn: w}) serverSesh.AddConnection(&common.TLSConn{Conn: r}) @@ -161,7 +172,7 @@ func TestStream_WriteSync(t *testing.T) { func TestStream_Close(t *testing.T) { var sessionKey [32]byte rand.Read(sessionKey[:]) - sesh := setupSesh(false, sessionKey) + sesh := setupSesh(false, sessionKey, E_METHOD_PLAIN) testPayload := []byte{42, 42, 42} f := &Frame{ @@ -174,7 +185,7 @@ func TestStream_Close(t *testing.T) { conn, writingEnd := connutil.AsyncPipe() sesh.AddConnection(conn) obfsBuf := make([]byte, 512) - i, _ := sesh.Obfs(f, obfsBuf) + i, _ := sesh.Obfs(f, obfsBuf, 0) writingEnd.Write(obfsBuf[:i]) time.Sleep(100 * time.Microsecond) stream, err := sesh.Accept() @@ -206,7 +217,7 @@ func TestStream_Close(t *testing.T) { func TestStream_Read(t *testing.T) { var sessionKey [32]byte rand.Read(sessionKey[:]) - sesh := setupSesh(false, sessionKey) + sesh := setupSesh(false, sessionKey, E_METHOD_PLAIN) testPayload := []byte{42, 42, 42} const smallPayloadLen = 3 @@ -226,7 +237,7 @@ func TestStream_Read(t *testing.T) { obfsBuf := make([]byte, 512) t.Run("Plain read", func(t *testing.T) { f.StreamID = streamID - i, _ := sesh.Obfs(f, obfsBuf) + i, _ := sesh.Obfs(f, obfsBuf, 0) streamID++ writingEnd.Write(obfsBuf[:i]) time.Sleep(100 * time.Microsecond) @@ -252,7 +263,7 @@ func TestStream_Read(t *testing.T) { }) t.Run("Nil buf", func(t *testing.T) { f.StreamID = streamID - i, _ := sesh.Obfs(f, obfsBuf) + i, _ := sesh.Obfs(f, obfsBuf, 0) streamID++ writingEnd.Write(obfsBuf[:i]) time.Sleep(100 * time.Microsecond) @@ -265,7 +276,7 @@ func TestStream_Read(t *testing.T) { }) t.Run("Read after stream close", func(t *testing.T) { f.StreamID = streamID - i, _ := sesh.Obfs(f, obfsBuf) + i, _ := sesh.Obfs(f, obfsBuf, 0) streamID++ writingEnd.Write(obfsBuf[:i]) time.Sleep(100 * time.Microsecond) @@ -290,7 +301,7 @@ func TestStream_Read(t *testing.T) { }) t.Run("Read after session close", func(t *testing.T) { f.StreamID = streamID - i, _ := sesh.Obfs(f, obfsBuf) + i, _ := sesh.Obfs(f, obfsBuf, 0) streamID++ writingEnd.Write(obfsBuf[:i]) time.Sleep(100 * time.Microsecond) @@ -319,7 +330,7 @@ func TestStream_Read(t *testing.T) { func TestStream_UnorderedRead(t *testing.T) { var sessionKey [32]byte rand.Read(sessionKey[:]) - sesh := setupSesh(false, sessionKey) + sesh := setupSesh(false, sessionKey, E_METHOD_PLAIN) testPayload := []byte{42, 42, 42} const smallPayloadLen = 3 @@ -339,7 +350,7 @@ func TestStream_UnorderedRead(t *testing.T) { obfsBuf := make([]byte, 512) t.Run("Plain read", func(t *testing.T) { f.StreamID = streamID - i, _ := sesh.Obfs(f, obfsBuf) + i, _ := sesh.Obfs(f, obfsBuf, 0) streamID++ writingEnd.Write(obfsBuf[:i]) time.Sleep(100 * time.Microsecond) @@ -361,7 +372,7 @@ func TestStream_UnorderedRead(t *testing.T) { }) t.Run("Nil buf", func(t *testing.T) { f.StreamID = streamID - i, _ := sesh.Obfs(f, obfsBuf) + i, _ := sesh.Obfs(f, obfsBuf, 0) streamID++ writingEnd.Write(obfsBuf[:i]) time.Sleep(100 * time.Microsecond) @@ -374,7 +385,7 @@ func TestStream_UnorderedRead(t *testing.T) { }) t.Run("Read after stream close", func(t *testing.T) { f.StreamID = streamID - i, _ := sesh.Obfs(f, obfsBuf) + i, _ := sesh.Obfs(f, obfsBuf, 0) streamID++ writingEnd.Write(obfsBuf[:i]) time.Sleep(100 * time.Microsecond) @@ -399,7 +410,7 @@ func TestStream_UnorderedRead(t *testing.T) { }) t.Run("Read after session close", func(t *testing.T) { f.StreamID = streamID - i, _ := sesh.Obfs(f, obfsBuf) + i, _ := sesh.Obfs(f, obfsBuf, 0) streamID++ writingEnd.Write(obfsBuf[:i]) time.Sleep(100 * time.Microsecond) diff --git a/internal/multiplex/switchboard_test.go b/internal/multiplex/switchboard_test.go index c21de01..395aa59 100644 --- a/internal/multiplex/switchboard_test.go +++ b/internal/multiplex/switchboard_test.go @@ -135,7 +135,7 @@ func TestSwitchboard_TxCredit(t *testing.T) { func TestSwitchboard_CloseOnOneDisconn(t *testing.T) { var sessionKey [32]byte rand.Read(sessionKey[:]) - sesh := setupSesh(false, sessionKey) + sesh := setupSesh(false, sessionKey, E_METHOD_PLAIN) conn0client, conn0server := connutil.AsyncPipe() sesh.AddConnection(conn0client) diff --git a/internal/test/integration_test.go b/internal/test/integration_test.go index da515ee..99bd6df 100644 --- a/internal/test/integration_test.go +++ b/internal/test/integration_test.go @@ -381,7 +381,7 @@ func BenchmarkThroughput(b *testing.B) { } b.Run("single conn", func(b *testing.B) { - more := make(chan int, 100) + more := make(chan int, 10) go func() { writeBuf := make([]byte, bufSize+100) serverConn, _ := pxyServerL.Accept() @@ -396,45 +396,37 @@ func BenchmarkThroughput(b *testing.B) { b.SetBytes(bufSize) b.ResetTimer() for i := 0; i < b.N; i++ { - clientConn.Read(readBuf) + io.ReadFull(clientConn, readBuf) more <- 0 } }) /* - b.Run("multi conn", func(b *testing.B) { - var connsIds sync.Pool - conns := make([]net.Conn, numConns) - more := make([]chan int, numConns) - for i := 0; i < numConns; i++ { - conns[i], _ = pxyClientD.Dial("", "") - conns[i].Write([]byte{1}) // to make server accept - connsIds.Put(i) - moreChan := make(chan int, 100) - more[i] = moreChan - writeBuf := make([]byte, bufSize + 100) + b.Run("multiconn", func(b *testing.B) { + writeBuf := make([]byte, bufSize) + b.SetBytes(bufSize) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + ready := make(chan int, 10) go func() { serverConn, _ := pxyServerL.Accept() for { serverConn.Write(writeBuf) - <- moreChan + <-ready } }() - } - b.SetParallelism(numConns) - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - buf := make([]byte, bufSize) - connNum := connsIds.Get().(int) + readBuf := make([]byte, bufSize) + clientConn, _ := pxyClientD.Dial("", "") + clientConn.Write([]byte{1}) // to make server accept for pb.Next() { - n, _ := conns[connNum].Read(buf) - more[connNum] <- 0 - b.SetBytes(int64(n)) + io.ReadFull(clientConn,readBuf) + ready <- 0 } }) }) */ + }) }