diff --git a/internal/test/integration_test.go b/internal/test/integration_test.go index f9820f9..797bb78 100644 --- a/internal/test/integration_test.go +++ b/internal/test/integration_test.go @@ -310,27 +310,51 @@ func TestClosingStreamsFromProxy(t *testing.T) { t.Fatal(err) } - // closing stream on server side - clientConn, _ := pxyClientD.Dial("", "") - clientConn.Write(make([]byte, 16)) - serverConn, _ := pxyServerL.Accept() - serverConn.Close() + t.Run("closing from server", func(t *testing.T) { + clientConn, _ := pxyClientD.Dial("", "") + clientConn.Write(make([]byte, 16)) + serverConn, _ := pxyServerL.Accept() + serverConn.Close() - time.Sleep(100 * time.Millisecond) - if _, err := clientConn.Read(make([]byte, 16)); err == nil { - t.Errorf("closing stream on server side is not reflected to the client: %v", err) - } + time.Sleep(100 * time.Millisecond) + if _, err := clientConn.Read(make([]byte, 16)); err == nil { + t.Errorf("closing stream on server side is not reflected to the client: %v", err) + } + }) - // closing stream on client side - clientConn, _ = pxyClientD.Dial("", "") - clientConn.Write(make([]byte, 16)) - serverConn, _ = pxyServerL.Accept() - clientConn.Close() + t.Run("closing from client", func(t *testing.T) { + // closing stream on client side + clientConn, _ := pxyClientD.Dial("", "") + clientConn.Write(make([]byte, 16)) + serverConn, _ := pxyServerL.Accept() + clientConn.Close() - time.Sleep(100 * time.Millisecond) - if _, err := serverConn.Read(make([]byte, 16)); err == nil { - t.Errorf("closing stream on client side is not reflected to the server: %v", err) - } + time.Sleep(100 * time.Millisecond) + if _, err := serverConn.Read(make([]byte, 16)); err == nil { + t.Errorf("closing stream on client side is not reflected to the server: %v", err) + } + }) + + t.Run("send then close", func(t *testing.T) { + testData := make([]byte, 24*1024) + rand.Read(testData) + clientConn, _ := pxyClientD.Dial("", "") + go func() { + clientConn.Write(testData) + // TODO: this is time dependent. It could be due to the time it took for this + // connutil.StreamPipe's Close to be reflected on the copy function, instead of inherent bad sync + // in multiplexer + time.Sleep(10 * time.Millisecond) + clientConn.Close() + }() + + readBuf := make([]byte, len(testData)) + serverConn, _ := pxyServerL.Accept() + _, err := io.ReadFull(serverConn, readBuf) + if err != nil { + t.Errorf("failed to read data sent before closing: %v", err) + } + }) } func BenchmarkThroughput(b *testing.B) { @@ -357,39 +381,60 @@ func BenchmarkThroughput(b *testing.B) { } b.Run("single conn", func(b *testing.B) { + more := make(chan int, 100) go func() { + writeBuf := make([]byte, bufSize+100) serverConn, _ := pxyServerL.Accept() - io.Copy(ioutil.Discard, serverConn) + for { + serverConn.Write(writeBuf) + <-more + } }() clientConn, _ := pxyClientD.Dial("", "") - writeBuf := make([]byte, bufSize) + readBuf := make([]byte, bufSize) + clientConn.Write([]byte{1}) // to make server accept b.ResetTimer() for i := 0; i < b.N; i++ { - n, _ := clientConn.Write(writeBuf) + n, _ := clientConn.Read(readBuf) b.SetBytes(int64(n)) + more <- 0 } }) - b.Run("multi conn", func(b *testing.B) { - for i := 0; i < numConns; i++ { - go func() { - serverConn, _ := pxyServerL.Accept() - io.Copy(ioutil.Discard, serverConn) - }() - } - conns := make([]net.Conn, numConns) - for i := 0; i < numConns; i++ { - conns[i], _ = pxyClientD.Dial("", "") - } - writeBuf := make([]byte, bufSize) - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - n, _ := conns[rand.Intn(numConns)].Write(writeBuf) - b.SetBytes(int64(n)) + /* + b.Run("multi conn", func(b *testing.B) { + var connsIds sync.Pool + conns := make([]net.Conn, numConns) + more := make([]chan int, numConns) + for i := 0; i < numConns; i++ { + conns[i], _ = pxyClientD.Dial("", "") + conns[i].Write([]byte{1}) // to make server accept + connsIds.Put(i) + moreChan := make(chan int, 100) + more[i] = moreChan + writeBuf := make([]byte, bufSize + 100) + go func() { + serverConn, _ := pxyServerL.Accept() + for { + serverConn.Write(writeBuf) + <- moreChan + } + }() } + b.SetParallelism(numConns) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + buf := make([]byte, bufSize) + connNum := connsIds.Get().(int) + for pb.Next() { + n, _ := conns[connNum].Read(buf) + more[connNum] <- 0 + b.SetBytes(int64(n)) + } + }) }) - }) + + */ }) }