From 74876004342f7f0e49ed56b0b98bee396025a266 Mon Sep 17 00:00:00 2001 From: Qian Wang Date: Sat, 20 Oct 2018 11:35:50 +0100 Subject: [PATCH] Mostly works --- cmd/ck-client/ck-client.go | 11 ++++++++++- cmd/ck-server/ck-server.go | 11 ++++++++++- internal/multiplex/frameSorter.go | 3 ++- internal/multiplex/stream.go | 15 ++++++--------- internal/multiplex/switchboard.go | 17 +++++++++++------ 5 files changed, 39 insertions(+), 18 deletions(-) diff --git a/cmd/ck-client/ck-client.go b/cmd/ck-client/ck-client.go index db24b9a..75ee00e 100644 --- a/cmd/ck-client/ck-client.go +++ b/cmd/ck-client/ck-client.go @@ -20,9 +20,18 @@ import ( var version string func pipe(dst io.ReadWriteCloser, src io.ReadWriteCloser) { + buf := make([]byte, 20000) for { - i, err := io.Copy(dst, src) + i, err := io.ReadAtLeast(src, buf, 1) if err != nil || i == 0 { + log.Println(err) + go dst.Close() + go src.Close() + return + } + i, err = dst.Write(buf[:i]) + if err != nil || i == 0 { + log.Println(err) go dst.Close() go src.Close() return diff --git a/cmd/ck-server/ck-server.go b/cmd/ck-server/ck-server.go index 2536b12..953b356 100644 --- a/cmd/ck-server/ck-server.go +++ b/cmd/ck-server/ck-server.go @@ -21,9 +21,18 @@ import ( var version string func pipe(dst io.ReadWriteCloser, src io.ReadWriteCloser) { + buf := make([]byte, 20000) for { - i, err := io.Copy(dst, src) + i, err := io.ReadAtLeast(src, buf, 1) if err != nil || i == 0 { + log.Println(err) + go dst.Close() + go src.Close() + return + } + i, err = dst.Write(buf[:i]) + if err != nil || i == 0 { + log.Println(err) go dst.Close() go src.Close() return diff --git a/internal/multiplex/frameSorter.go b/internal/multiplex/frameSorter.go index eac1df3..9f71c6a 100644 --- a/internal/multiplex/frameSorter.go +++ b/internal/multiplex/frameSorter.go @@ -91,7 +91,8 @@ func (s *Stream) recvNewFrame() { // Keep popping from the heap until empty or to the point that the wanted seq was not received for len(s.sh) > 0 && s.sh[0].seq == s.nextRecvSeq { - s.sortedBufCh <- heap.Pop(&s.sh).(*frameNode).frame.Payload + payload := heap.Pop(&s.sh).(*frameNode).frame.Payload + s.sortedBufCh <- payload s.nextRecvSeq += 1 if s.nextRecvSeq == 0 { diff --git a/internal/multiplex/stream.go b/internal/multiplex/stream.go index 9169c5a..5172fa2 100644 --- a/internal/multiplex/stream.go +++ b/internal/multiplex/stream.go @@ -2,7 +2,6 @@ package multiplex import ( "errors" - "io" "log" "sync" ) @@ -43,7 +42,7 @@ func makeStream(id uint32, sesh *Session) *Stream { die: make(chan struct{}), sh: []*frameNode{}, newFrameCh: make(chan *Frame, 1024), - sortedBufCh: make(chan []byte, readBuffer), + sortedBufCh: make(chan []byte, 4096), } go stream.recvNewFrame() return stream @@ -63,15 +62,13 @@ func (stream *Stream) Read(buf []byte) (n int, err error) { case <-stream.die: log.Printf("Stream %v dying\n", stream.id) return 0, errors.New(errBrokenPipe) - default: - } - data := <-stream.sortedBufCh - if len(data) > 0 { + case data := <-stream.sortedBufCh: + if len(buf) < len(data) { + log.Println(len(data)) + return 0, errors.New("buf too small") + } copy(buf, data) return len(data), nil - } else { - // TODO: close stream here or not? - return 0, io.EOF } } diff --git a/internal/multiplex/switchboard.go b/internal/multiplex/switchboard.go index 3d7ed60..9477e71 100644 --- a/internal/multiplex/switchboard.go +++ b/internal/multiplex/switchboard.go @@ -8,7 +8,7 @@ import ( const ( sentNotifyBacklog = 1024 - dispatchBacklog = 102400 + dispatchBacklog = 10240 newConnBacklog = 8 ) @@ -141,14 +141,19 @@ func (sb *switchboard) deplex(ce *connEnclave) { return } frame := sb.session.deobfs(buf[:i]) - var stream *Stream - if stream = sb.session.getStream(frame.StreamID); stream == nil { - stream = sb.session.addStream(frame.StreamID) - } if closing := sb.session.getStream(frame.ClosingStreamID); closing != nil { log.Printf("HeaderClosing: %v\n", frame.ClosingStreamID) closing.Close() } - stream.newFrameCh <- frame + sb.session.nextStreamIDM.Lock() + nextID := sb.session.nextStreamID + sb.session.nextStreamIDM.Unlock() + var stream *Stream + if stream = sb.session.getStream(frame.StreamID); nextID <= frame.StreamID && stream == nil { + stream = sb.session.addStream(frame.StreamID) + } + if stream != nil { + stream.newFrameCh <- frame + } } }