Explicitly allow buf overlap in obfs

This commit is contained in:
Andy Wang 2020-04-13 22:48:28 +01:00
parent 97eb6aa096
commit 53c0c6bafe
8 changed files with 104 additions and 95 deletions

View File

@ -9,10 +9,9 @@ import (
"github.com/cbeuw/Cloak/internal/util"
"golang.org/x/crypto/chacha20poly1305"
"golang.org/x/crypto/salsa20"
"io"
)
type Obfser func(*Frame, []byte) (int, error)
type Obfser func(*Frame, []byte, int) (int, error)
type Deobfser func([]byte) (*Frame, error)
var u32 = binary.BigEndian.Uint32
@ -39,13 +38,17 @@ type Obfuscator struct {
}
func MakeObfs(salsaKey [32]byte, payloadCipher cipher.AEAD) Obfser {
obfs := func(f *Frame, buf []byte) (int, error) {
obfs := func(f *Frame, buf []byte, payloadOffsetInBuf int) (int, error) {
// we need the encrypted data to be at least 8 bytes to be used as nonce for salsa20 stream header encryption
// this will be the case if the encryption method is an AEAD cipher, however for plain, it's well possible
// that the frame payload is smaller than 8 bytes, so we need to add on the difference
payloadLen := len(f.Payload)
if payloadLen == 0 {
return 0, errors.New("payload cannot be empty")
}
var extraLen int
if payloadCipher == nil {
if extraLen = 8 - len(f.Payload); extraLen < 0 {
if extraLen = 8 - payloadLen; extraLen < 0 {
extraLen = 0
}
} else {
@ -54,31 +57,34 @@ func MakeObfs(salsaKey [32]byte, payloadCipher cipher.AEAD) Obfser {
return 0, errors.New("AEAD's Overhead cannot be fewer than 8 bytes")
}
}
// usefulLen is the amount of bytes that will be eventually sent off
usefulLen := HEADER_LEN + len(f.Payload) + extraLen
if usefulLen < HEADER_LEN || len(buf) < usefulLen { // compiler hint to eliminate bound check
return 0, io.ErrShortBuffer
usefulLen := HEADER_LEN + payloadLen + extraLen
if len(buf) < usefulLen {
return 0, errors.New("obfs buffer too small")
}
// we do as much in-place as possible to save allocation
header := buf[:HEADER_LEN]
encryptedPayloadWithExtra := buf[HEADER_LEN:usefulLen]
payload := buf[HEADER_LEN : HEADER_LEN+payloadLen]
if payloadOffsetInBuf != HEADER_LEN {
// if payload is not at the correct location in buffer
copy(payload, f.Payload)
}
header := buf[:HEADER_LEN]
putU32(header[0:4], f.StreamID)
putU64(header[4:12], f.Seq)
header[12] = f.Closing
header[13] = byte(extraLen)
if payloadCipher == nil {
copy(encryptedPayloadWithExtra, f.Payload)
if extraLen != 0 { // read nonce
util.CryptoRandRead(encryptedPayloadWithExtra[len(encryptedPayloadWithExtra)-extraLen:])
extra := buf[usefulLen-extraLen : usefulLen]
util.CryptoRandRead(extra)
}
} else {
ciphertext := payloadCipher.Seal(nil, header[:12], f.Payload, nil)
copy(encryptedPayloadWithExtra, ciphertext)
payloadCipher.Seal(payload[:0], header[:12], payload, nil)
}
nonce := encryptedPayloadWithExtra[len(encryptedPayloadWithExtra)-8:]
nonce := buf[usefulLen-8 : usefulLen]
salsa20.XORKeyStream(header, header, nonce, &salsaKey)
return usefulLen, nil

View File

@ -20,7 +20,7 @@ func TestGenerateObfs(t *testing.T) {
f := &Frame{}
_testFrame, _ := quick.Value(reflect.TypeOf(f), rand.New(rand.NewSource(42)))
testFrame := _testFrame.Interface().(*Frame)
i, err := obfuscator.Obfs(testFrame, obfsBuf)
i, err := obfuscator.Obfs(testFrame, obfsBuf, 0)
if err != nil {
ct.Error("failed to obfs ", err)
return
@ -92,7 +92,7 @@ func BenchmarkObfs(b *testing.B) {
b.SetBytes(int64(len(testFrame.Payload)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
obfs(testFrame, obfsBuf)
obfs(testFrame, obfsBuf, 0)
}
})
b.Run("AES128GCM", func(b *testing.B) {
@ -103,7 +103,7 @@ func BenchmarkObfs(b *testing.B) {
b.SetBytes(int64(len(testFrame.Payload)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
obfs(testFrame, obfsBuf)
obfs(testFrame, obfsBuf, 0)
}
})
b.Run("plain", func(b *testing.B) {
@ -111,7 +111,7 @@ func BenchmarkObfs(b *testing.B) {
b.SetBytes(int64(len(testFrame.Payload)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
obfs(testFrame, obfsBuf)
obfs(testFrame, obfsBuf, 0)
}
})
b.Run("chacha20Poly1305", func(b *testing.B) {
@ -121,7 +121,7 @@ func BenchmarkObfs(b *testing.B) {
b.SetBytes(int64(len(testFrame.Payload)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
obfs(testFrame, obfsBuf)
obfs(testFrame, obfsBuf, 0)
}
})
}
@ -145,7 +145,7 @@ func BenchmarkDeobfs(b *testing.B) {
payloadCipher, _ := cipher.NewGCM(c)
obfs := MakeObfs(key, payloadCipher)
n, _ := obfs(testFrame, obfsBuf)
n, _ := obfs(testFrame, obfsBuf, 0)
deobfs := MakeDeobfs(key, payloadCipher)
b.SetBytes(int64(n))
@ -159,7 +159,7 @@ func BenchmarkDeobfs(b *testing.B) {
payloadCipher, _ := cipher.NewGCM(c)
obfs := MakeObfs(key, payloadCipher)
n, _ := obfs(testFrame, obfsBuf)
n, _ := obfs(testFrame, obfsBuf, 0)
deobfs := MakeDeobfs(key, payloadCipher)
b.ResetTimer()
@ -170,7 +170,7 @@ func BenchmarkDeobfs(b *testing.B) {
})
b.Run("plain", func(b *testing.B) {
obfs := MakeObfs(key, nil)
n, _ := obfs(testFrame, obfsBuf)
n, _ := obfs(testFrame, obfsBuf, 0)
deobfs := MakeDeobfs(key, nil)
b.ResetTimer()
@ -183,7 +183,7 @@ func BenchmarkDeobfs(b *testing.B) {
payloadCipher, _ := chacha20poly1305.New(key[:16])
obfs := MakeObfs(key, payloadCipher)
n, _ := obfs(testFrame, obfsBuf)
n, _ := obfs(testFrame, obfsBuf, 0)
deobfs := MakeDeobfs(key, payloadCipher)
b.ResetTimer()

View File

@ -161,7 +161,7 @@ func (sesh *Session) closeStream(s *Stream, active bool) error {
s.nextSendSeq++
obfsBuf := make([]byte, len(padding)+64)
i, err := sesh.Obfs(f, obfsBuf)
i, err := sesh.Obfs(f, obfsBuf, 0)
if err != nil {
return err
}
@ -286,7 +286,7 @@ func (sesh *Session) Close() error {
Payload: pad,
}
obfsBuf := make([]byte, len(pad)+64)
i, err := sesh.Obfs(f, obfsBuf)
i, err := sesh.Obfs(f, obfsBuf, 0)
if err != nil {
return err
}

View File

@ -35,7 +35,7 @@ func TestRecvDataFromRemote(t *testing.T) {
obfuscator, _ := MakeObfuscator(E_METHOD_PLAIN, sessionKey)
seshConfigOrdered.Obfuscator = obfuscator
sesh := MakeSession(0, seshConfigOrdered)
n, _ := sesh.Obfs(f, obfsBuf)
n, _ := sesh.Obfs(f, obfsBuf, 0)
err := sesh.recvDataFromRemote(obfsBuf[:n])
if err != nil {
@ -62,7 +62,7 @@ func TestRecvDataFromRemote(t *testing.T) {
obfuscator, _ := MakeObfuscator(E_METHOD_AES_GCM, sessionKey)
seshConfigOrdered.Obfuscator = obfuscator
sesh := MakeSession(0, seshConfigOrdered)
n, _ := sesh.Obfs(f, obfsBuf)
n, _ := sesh.Obfs(f, obfsBuf, 0)
err := sesh.recvDataFromRemote(obfsBuf[:n])
if err != nil {
@ -89,7 +89,7 @@ func TestRecvDataFromRemote(t *testing.T) {
obfuscator, _ := MakeObfuscator(E_METHOD_CHACHA20_POLY1305, sessionKey)
seshConfigOrdered.Obfuscator = obfuscator
sesh := MakeSession(0, seshConfigOrdered)
n, _ := sesh.Obfs(f, obfsBuf)
n, _ := sesh.Obfs(f, obfsBuf, 0)
err := sesh.recvDataFromRemote(obfsBuf[:n])
if err != nil {
@ -117,7 +117,7 @@ func TestRecvDataFromRemote(t *testing.T) {
obfuscator, _ := MakeObfuscator(E_METHOD_PLAIN, sessionKey)
seshConfigUnordered.Obfuscator = obfuscator
sesh := MakeSession(0, seshConfigOrdered)
n, _ := sesh.Obfs(f, obfsBuf)
n, _ := sesh.Obfs(f, obfsBuf, 0)
err := sesh.recvDataFromRemote(obfsBuf[:n])
if err != nil {
@ -163,7 +163,7 @@ func TestRecvDataFromRemote_Closing_InOrder(t *testing.T) {
testPayload,
}
// create stream 1
n, _ := sesh.Obfs(f1, obfsBuf)
n, _ := sesh.Obfs(f1, obfsBuf, 0)
err := sesh.recvDataFromRemote(obfsBuf[:n])
if err != nil {
t.Fatalf("receiving normal frame for stream 1: %v", err)
@ -183,7 +183,7 @@ func TestRecvDataFromRemote_Closing_InOrder(t *testing.T) {
C_NOOP,
testPayload,
}
n, _ = sesh.Obfs(f2, obfsBuf)
n, _ = sesh.Obfs(f2, obfsBuf, 0)
err = sesh.recvDataFromRemote(obfsBuf[:n])
if err != nil {
t.Fatalf("receiving normal frame for stream 2: %v", err)
@ -203,7 +203,7 @@ func TestRecvDataFromRemote_Closing_InOrder(t *testing.T) {
C_STREAM,
testPayload,
}
n, _ = sesh.Obfs(f1CloseStream, obfsBuf)
n, _ = sesh.Obfs(f1CloseStream, obfsBuf, 0)
err = sesh.recvDataFromRemote(obfsBuf[:n])
if err != nil {
t.Fatalf("receiving stream closing frame for stream 1: %v", err)
@ -230,7 +230,7 @@ func TestRecvDataFromRemote_Closing_InOrder(t *testing.T) {
}
// close stream 1 again
n, _ = sesh.Obfs(f1CloseStream, obfsBuf)
n, _ = sesh.Obfs(f1CloseStream, obfsBuf, 0)
err = sesh.recvDataFromRemote(obfsBuf[:n])
if err != nil {
t.Fatalf("receiving stream closing frame for stream 1 %v", err)
@ -250,7 +250,7 @@ func TestRecvDataFromRemote_Closing_InOrder(t *testing.T) {
Closing: C_SESSION,
Payload: testPayload,
}
n, _ = sesh.Obfs(fCloseSession, obfsBuf)
n, _ = sesh.Obfs(fCloseSession, obfsBuf, 0)
err = sesh.recvDataFromRemote(obfsBuf[:n])
if err != nil {
t.Fatalf("receiving session closing frame: %v", err)
@ -294,7 +294,7 @@ func TestRecvDataFromRemote_Closing_OutOfOrder(t *testing.T) {
C_STREAM,
testPayload,
}
n, _ := sesh.Obfs(f1CloseStream, obfsBuf)
n, _ := sesh.Obfs(f1CloseStream, obfsBuf, 0)
err := sesh.recvDataFromRemote(obfsBuf[:n])
if err != nil {
t.Fatalf("receiving out of order stream closing frame for stream 1: %v", err)
@ -314,7 +314,7 @@ func TestRecvDataFromRemote_Closing_OutOfOrder(t *testing.T) {
C_NOOP,
testPayload,
}
n, _ = sesh.Obfs(f1, obfsBuf)
n, _ = sesh.Obfs(f1, obfsBuf, 0)
err = sesh.recvDataFromRemote(obfsBuf[:n])
if err != nil {
t.Fatalf("receiving normal frame for stream 1: %v", err)
@ -375,7 +375,7 @@ func TestParallel(t *testing.T) {
wg.Add(1)
go func(frame *Frame) {
data := make([]byte, 1000)
n, _ := sesh.Obfs(frame, data)
n, _ := sesh.Obfs(frame, data, 0)
data = data[0:n]
err := sesh.recvDataFromRemote(data)
@ -463,7 +463,7 @@ func BenchmarkRecvDataFromRemote_Ordered(b *testing.B) {
obfuscator, _ := MakeObfuscator(E_METHOD_PLAIN, sessionKey)
seshConfigOrdered.Obfuscator = obfuscator
sesh := MakeSession(0, seshConfigOrdered)
n, _ := sesh.Obfs(f, obfsBuf)
n, _ := sesh.Obfs(f, obfsBuf, 0)
b.SetBytes(int64(len(f.Payload)))
b.ResetTimer()
@ -476,7 +476,7 @@ func BenchmarkRecvDataFromRemote_Ordered(b *testing.B) {
obfuscator, _ := MakeObfuscator(E_METHOD_AES_GCM, sessionKey)
seshConfigOrdered.Obfuscator = obfuscator
sesh := MakeSession(0, seshConfigOrdered)
n, _ := sesh.Obfs(f, obfsBuf)
n, _ := sesh.Obfs(f, obfsBuf, 0)
b.SetBytes(int64(len(f.Payload)))
b.ResetTimer()
@ -489,7 +489,7 @@ func BenchmarkRecvDataFromRemote_Ordered(b *testing.B) {
obfuscator, _ := MakeObfuscator(E_METHOD_CHACHA20_POLY1305, sessionKey)
seshConfigOrdered.Obfuscator = obfuscator
sesh := MakeSession(0, seshConfigOrdered)
n, _ := sesh.Obfs(f, obfsBuf)
n, _ := sesh.Obfs(f, obfsBuf, 0)
b.SetBytes(int64(len(f.Payload)))
b.ResetTimer()

View File

@ -90,9 +90,9 @@ func (s *Stream) WriteTo(w io.Writer) (int64, error) {
return n, nil
}
func (s *Stream) sendFrame(f *Frame) error {
func (s *Stream) sendFrame(f *Frame, framePayloadOffset int) error {
var cipherTextLen int
cipherTextLen, err := s.session.Obfs(f, s.obfsBuf)
cipherTextLen, err := s.session.Obfs(f, s.obfsBuf, framePayloadOffset)
if err != nil {
return err
}
@ -138,7 +138,7 @@ func (s *Stream) Write(in []byte) (n int, err error) {
Payload: framePayload,
}
s.nextSendSeq++
err = s.sendFrame(f)
err = s.sendFrame(f, 0)
if err != nil {
return
}
@ -168,7 +168,7 @@ func (s *Stream) ReadFrom(r io.Reader) (n int64, err error) {
Payload: s.obfsBuf[HEADER_LEN : HEADER_LEN+read],
}
s.nextSendSeq++
err = s.sendFrame(f)
err = s.sendFrame(f, HEADER_LEN)
s.writingM.Unlock()
if err != nil {

View File

@ -13,8 +13,8 @@ import (
const payloadLen = 1000
func setupSesh(unordered bool, key [32]byte) *Session {
obfuscator, _ := MakeObfuscator(0x00, key)
func setupSesh(unordered bool, key [32]byte, encryptionMethod byte) *Session {
obfuscator, _ := MakeObfuscator(encryptionMethod, key)
seshConfig := SessionConfig{
Obfuscator: obfuscator,
@ -28,17 +28,28 @@ func BenchmarkStream_Write_Ordered(b *testing.B) {
hole := connutil.Discard()
var sessionKey [32]byte
rand.Read(sessionKey[:])
sesh := setupSesh(false, sessionKey)
sesh.AddConnection(hole)
testData := make([]byte, payloadLen)
rand.Read(testData)
const testDataLen = 65536
testData := make([]byte, testDataLen)
rand.Read(testData)
eMethods := map[string]byte{
"plain": E_METHOD_PLAIN,
"chacha20-poly1305": E_METHOD_CHACHA20_POLY1305,
"aes-gcm": E_METHOD_AES_GCM,
}
for name, method := range eMethods {
b.Run(name, func(b *testing.B) {
sesh := setupSesh(false, sessionKey, method)
sesh.AddConnection(hole)
stream, _ := sesh.OpenStream()
b.SetBytes(payloadLen)
b.SetBytes(testDataLen)
b.ResetTimer()
for i := 0; i < b.N; i++ {
stream.Write(testData)
}
})
}
}
/*
@ -96,7 +107,7 @@ func TestStream_Write(t *testing.T) {
hole := connutil.Discard()
var sessionKey [32]byte
rand.Read(sessionKey[:])
sesh := setupSesh(false, sessionKey)
sesh := setupSesh(false, sessionKey, E_METHOD_PLAIN)
sesh.AddConnection(hole)
testData := make([]byte, payloadLen)
rand.Read(testData)
@ -115,8 +126,8 @@ func TestStream_WriteSync(t *testing.T) {
// Close calls made after write MUST have a higher seq
var sessionKey [32]byte
rand.Read(sessionKey[:])
clientSesh := setupSesh(false, sessionKey)
serverSesh := setupSesh(false, sessionKey)
clientSesh := setupSesh(false, sessionKey, E_METHOD_PLAIN)
serverSesh := setupSesh(false, sessionKey, E_METHOD_PLAIN)
w, r := connutil.AsyncPipe()
clientSesh.AddConnection(&common.TLSConn{Conn: w})
serverSesh.AddConnection(&common.TLSConn{Conn: r})
@ -161,7 +172,7 @@ func TestStream_WriteSync(t *testing.T) {
func TestStream_Close(t *testing.T) {
var sessionKey [32]byte
rand.Read(sessionKey[:])
sesh := setupSesh(false, sessionKey)
sesh := setupSesh(false, sessionKey, E_METHOD_PLAIN)
testPayload := []byte{42, 42, 42}
f := &Frame{
@ -174,7 +185,7 @@ func TestStream_Close(t *testing.T) {
conn, writingEnd := connutil.AsyncPipe()
sesh.AddConnection(conn)
obfsBuf := make([]byte, 512)
i, _ := sesh.Obfs(f, obfsBuf)
i, _ := sesh.Obfs(f, obfsBuf, 0)
writingEnd.Write(obfsBuf[:i])
time.Sleep(100 * time.Microsecond)
stream, err := sesh.Accept()
@ -206,7 +217,7 @@ func TestStream_Close(t *testing.T) {
func TestStream_Read(t *testing.T) {
var sessionKey [32]byte
rand.Read(sessionKey[:])
sesh := setupSesh(false, sessionKey)
sesh := setupSesh(false, sessionKey, E_METHOD_PLAIN)
testPayload := []byte{42, 42, 42}
const smallPayloadLen = 3
@ -226,7 +237,7 @@ func TestStream_Read(t *testing.T) {
obfsBuf := make([]byte, 512)
t.Run("Plain read", func(t *testing.T) {
f.StreamID = streamID
i, _ := sesh.Obfs(f, obfsBuf)
i, _ := sesh.Obfs(f, obfsBuf, 0)
streamID++
writingEnd.Write(obfsBuf[:i])
time.Sleep(100 * time.Microsecond)
@ -252,7 +263,7 @@ func TestStream_Read(t *testing.T) {
})
t.Run("Nil buf", func(t *testing.T) {
f.StreamID = streamID
i, _ := sesh.Obfs(f, obfsBuf)
i, _ := sesh.Obfs(f, obfsBuf, 0)
streamID++
writingEnd.Write(obfsBuf[:i])
time.Sleep(100 * time.Microsecond)
@ -265,7 +276,7 @@ func TestStream_Read(t *testing.T) {
})
t.Run("Read after stream close", func(t *testing.T) {
f.StreamID = streamID
i, _ := sesh.Obfs(f, obfsBuf)
i, _ := sesh.Obfs(f, obfsBuf, 0)
streamID++
writingEnd.Write(obfsBuf[:i])
time.Sleep(100 * time.Microsecond)
@ -290,7 +301,7 @@ func TestStream_Read(t *testing.T) {
})
t.Run("Read after session close", func(t *testing.T) {
f.StreamID = streamID
i, _ := sesh.Obfs(f, obfsBuf)
i, _ := sesh.Obfs(f, obfsBuf, 0)
streamID++
writingEnd.Write(obfsBuf[:i])
time.Sleep(100 * time.Microsecond)
@ -319,7 +330,7 @@ func TestStream_Read(t *testing.T) {
func TestStream_UnorderedRead(t *testing.T) {
var sessionKey [32]byte
rand.Read(sessionKey[:])
sesh := setupSesh(false, sessionKey)
sesh := setupSesh(false, sessionKey, E_METHOD_PLAIN)
testPayload := []byte{42, 42, 42}
const smallPayloadLen = 3
@ -339,7 +350,7 @@ func TestStream_UnorderedRead(t *testing.T) {
obfsBuf := make([]byte, 512)
t.Run("Plain read", func(t *testing.T) {
f.StreamID = streamID
i, _ := sesh.Obfs(f, obfsBuf)
i, _ := sesh.Obfs(f, obfsBuf, 0)
streamID++
writingEnd.Write(obfsBuf[:i])
time.Sleep(100 * time.Microsecond)
@ -361,7 +372,7 @@ func TestStream_UnorderedRead(t *testing.T) {
})
t.Run("Nil buf", func(t *testing.T) {
f.StreamID = streamID
i, _ := sesh.Obfs(f, obfsBuf)
i, _ := sesh.Obfs(f, obfsBuf, 0)
streamID++
writingEnd.Write(obfsBuf[:i])
time.Sleep(100 * time.Microsecond)
@ -374,7 +385,7 @@ func TestStream_UnorderedRead(t *testing.T) {
})
t.Run("Read after stream close", func(t *testing.T) {
f.StreamID = streamID
i, _ := sesh.Obfs(f, obfsBuf)
i, _ := sesh.Obfs(f, obfsBuf, 0)
streamID++
writingEnd.Write(obfsBuf[:i])
time.Sleep(100 * time.Microsecond)
@ -399,7 +410,7 @@ func TestStream_UnorderedRead(t *testing.T) {
})
t.Run("Read after session close", func(t *testing.T) {
f.StreamID = streamID
i, _ := sesh.Obfs(f, obfsBuf)
i, _ := sesh.Obfs(f, obfsBuf, 0)
streamID++
writingEnd.Write(obfsBuf[:i])
time.Sleep(100 * time.Microsecond)

View File

@ -135,7 +135,7 @@ func TestSwitchboard_TxCredit(t *testing.T) {
func TestSwitchboard_CloseOnOneDisconn(t *testing.T) {
var sessionKey [32]byte
rand.Read(sessionKey[:])
sesh := setupSesh(false, sessionKey)
sesh := setupSesh(false, sessionKey, E_METHOD_PLAIN)
conn0client, conn0server := connutil.AsyncPipe()
sesh.AddConnection(conn0client)

View File

@ -381,7 +381,7 @@ func BenchmarkThroughput(b *testing.B) {
}
b.Run("single conn", func(b *testing.B) {
more := make(chan int, 100)
more := make(chan int, 10)
go func() {
writeBuf := make([]byte, bufSize+100)
serverConn, _ := pxyServerL.Accept()
@ -396,45 +396,37 @@ func BenchmarkThroughput(b *testing.B) {
b.SetBytes(bufSize)
b.ResetTimer()
for i := 0; i < b.N; i++ {
clientConn.Read(readBuf)
io.ReadFull(clientConn, readBuf)
more <- 0
}
})
/*
b.Run("multiconn", func(b *testing.B) {
var connsIds sync.Pool
conns := make([]net.Conn, numConns)
more := make([]chan int, numConns)
for i := 0; i < numConns; i++ {
conns[i], _ = pxyClientD.Dial("", "")
conns[i].Write([]byte{1}) // to make server accept
connsIds.Put(i)
moreChan := make(chan int, 100)
more[i] = moreChan
writeBuf := make([]byte, bufSize + 100)
writeBuf := make([]byte, bufSize)
b.SetBytes(bufSize)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
ready := make(chan int, 10)
go func() {
serverConn, _ := pxyServerL.Accept()
for {
serverConn.Write(writeBuf)
<- moreChan
<-ready
}
}()
}
b.SetParallelism(numConns)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
buf := make([]byte, bufSize)
connNum := connsIds.Get().(int)
readBuf := make([]byte, bufSize)
clientConn, _ := pxyClientD.Dial("", "")
clientConn.Write([]byte{1}) // to make server accept
for pb.Next() {
n, _ := conns[connNum].Read(buf)
more[connNum] <- 0
b.SetBytes(int64(n))
io.ReadFull(clientConn,readBuf)
ready <- 0
}
})
})
*/
})
}