Mostly works

This commit is contained in:
Qian Wang 2018-10-20 11:35:50 +01:00
parent 02fa072964
commit 7487600434
5 changed files with 39 additions and 18 deletions

View File

@ -20,9 +20,18 @@ import (
var version string var version string
func pipe(dst io.ReadWriteCloser, src io.ReadWriteCloser) { func pipe(dst io.ReadWriteCloser, src io.ReadWriteCloser) {
buf := make([]byte, 20000)
for { for {
i, err := io.Copy(dst, src) i, err := io.ReadAtLeast(src, buf, 1)
if err != nil || i == 0 { if err != nil || i == 0 {
log.Println(err)
go dst.Close()
go src.Close()
return
}
i, err = dst.Write(buf[:i])
if err != nil || i == 0 {
log.Println(err)
go dst.Close() go dst.Close()
go src.Close() go src.Close()
return return

View File

@ -21,9 +21,18 @@ import (
var version string var version string
func pipe(dst io.ReadWriteCloser, src io.ReadWriteCloser) { func pipe(dst io.ReadWriteCloser, src io.ReadWriteCloser) {
buf := make([]byte, 20000)
for { for {
i, err := io.Copy(dst, src) i, err := io.ReadAtLeast(src, buf, 1)
if err != nil || i == 0 { if err != nil || i == 0 {
log.Println(err)
go dst.Close()
go src.Close()
return
}
i, err = dst.Write(buf[:i])
if err != nil || i == 0 {
log.Println(err)
go dst.Close() go dst.Close()
go src.Close() go src.Close()
return return

View File

@ -91,7 +91,8 @@ func (s *Stream) recvNewFrame() {
// Keep popping from the heap until empty or to the point that the wanted seq was not received // Keep popping from the heap until empty or to the point that the wanted seq was not received
for len(s.sh) > 0 && s.sh[0].seq == s.nextRecvSeq { for len(s.sh) > 0 && s.sh[0].seq == s.nextRecvSeq {
s.sortedBufCh <- heap.Pop(&s.sh).(*frameNode).frame.Payload payload := heap.Pop(&s.sh).(*frameNode).frame.Payload
s.sortedBufCh <- payload
s.nextRecvSeq += 1 s.nextRecvSeq += 1
if s.nextRecvSeq == 0 { if s.nextRecvSeq == 0 {

View File

@ -2,7 +2,6 @@ package multiplex
import ( import (
"errors" "errors"
"io"
"log" "log"
"sync" "sync"
) )
@ -43,7 +42,7 @@ func makeStream(id uint32, sesh *Session) *Stream {
die: make(chan struct{}), die: make(chan struct{}),
sh: []*frameNode{}, sh: []*frameNode{},
newFrameCh: make(chan *Frame, 1024), newFrameCh: make(chan *Frame, 1024),
sortedBufCh: make(chan []byte, readBuffer), sortedBufCh: make(chan []byte, 4096),
} }
go stream.recvNewFrame() go stream.recvNewFrame()
return stream return stream
@ -63,15 +62,13 @@ func (stream *Stream) Read(buf []byte) (n int, err error) {
case <-stream.die: case <-stream.die:
log.Printf("Stream %v dying\n", stream.id) log.Printf("Stream %v dying\n", stream.id)
return 0, errors.New(errBrokenPipe) return 0, errors.New(errBrokenPipe)
default: case data := <-stream.sortedBufCh:
if len(buf) < len(data) {
log.Println(len(data))
return 0, errors.New("buf too small")
} }
data := <-stream.sortedBufCh
if len(data) > 0 {
copy(buf, data) copy(buf, data)
return len(data), nil return len(data), nil
} else {
// TODO: close stream here or not?
return 0, io.EOF
} }
} }

View File

@ -8,7 +8,7 @@ import (
const ( const (
sentNotifyBacklog = 1024 sentNotifyBacklog = 1024
dispatchBacklog = 102400 dispatchBacklog = 10240
newConnBacklog = 8 newConnBacklog = 8
) )
@ -141,14 +141,19 @@ func (sb *switchboard) deplex(ce *connEnclave) {
return return
} }
frame := sb.session.deobfs(buf[:i]) frame := sb.session.deobfs(buf[:i])
var stream *Stream
if stream = sb.session.getStream(frame.StreamID); stream == nil {
stream = sb.session.addStream(frame.StreamID)
}
if closing := sb.session.getStream(frame.ClosingStreamID); closing != nil { if closing := sb.session.getStream(frame.ClosingStreamID); closing != nil {
log.Printf("HeaderClosing: %v\n", frame.ClosingStreamID) log.Printf("HeaderClosing: %v\n", frame.ClosingStreamID)
closing.Close() closing.Close()
} }
sb.session.nextStreamIDM.Lock()
nextID := sb.session.nextStreamID
sb.session.nextStreamIDM.Unlock()
var stream *Stream
if stream = sb.session.getStream(frame.StreamID); nextID <= frame.StreamID && stream == nil {
stream = sb.session.addStream(frame.StreamID)
}
if stream != nil {
stream.newFrameCh <- frame stream.newFrameCh <- frame
} }
} }
}