From c4048dcceaa67be36332c7144d63b22f0d1c6eeb Mon Sep 17 00:00:00 2001 From: Qian Wang Date: Thu, 22 Nov 2018 21:55:23 +0000 Subject: [PATCH] Basic remote control --- cmd/ck-client/ck-client.go | 25 +- cmd/ck-server/ck-server.go | 51 ++- internal/multiplex/qos.go | 24 +- internal/multiplex/session.go | 7 +- internal/multiplex/stream.go | 8 +- internal/multiplex/switchboard.go | 28 +- internal/server/state.go | 19 ++ internal/server/usermanager/user.go | 60 ++-- internal/server/usermanager/userpanel.go | 400 +++++++++++++++++------ internal/util/obfs.go | 25 +- 10 files changed, 490 insertions(+), 157 deletions(-) diff --git a/cmd/ck-client/ck-client.go b/cmd/ck-client/ck-client.go index 2eeb923..b8a2f62 100644 --- a/cmd/ck-client/ck-client.go +++ b/cmd/ck-client/ck-client.go @@ -91,6 +91,8 @@ func main() { var remotePort string var pluginOpts string + var isAdmin *bool + log.SetFlags(log.LstdFlags | log.Lshortfile) log_init() @@ -108,6 +110,7 @@ func main() { flag.StringVar(&remotePort, "p", "443", "remotePort: proxy port, should be 443") flag.StringVar(&pluginOpts, "c", "ckclient.json", "pluginOpts: path to ckclient.json or options seperated with semicolons") askVersion := flag.Bool("v", false, "Print the version number") + isAdmin = flag.Bool("a", false, "Admin mode") printUsage := flag.Bool("h", false, "Print this message") flag.Parse() @@ -136,6 +139,25 @@ func main() { log.Fatal(err) } + if *isAdmin { + a := adminHandshake(sta) + buf := make([]byte, 16000) + for { + req := a.getCommand() + a.adminConn.Write(req) + n, err := a.adminConn.Read(buf) + if err != nil { + log.Println(err) + return + } + resp, err := a.checkAndDecrypt(buf[:n]) + if err != nil { + log.Println(err) + } + fmt.Println(string(resp)) + } + return + } if sta.SS_LOCAL_PORT == "" { log.Fatal("Must specify localPort") } @@ -146,7 +168,8 @@ func main() { log.Fatal("TicketTimeHint cannot be empty or 0") } - valve := mux.MakeValve(1e12, 1e12, 1e12, 1e12) + var UNLIMITED int64 = 1e12 + valve := mux.MakeValve(1e12, 1e12, &UNLIMITED, &UNLIMITED) obfs := util.MakeObfs(sta.UID) deobfs := util.MakeDeobfs(sta.UID) sesh := mux.MakeSession(0, valve, obfs, deobfs, util.ReadTLS) diff --git a/cmd/ck-server/ck-server.go b/cmd/ck-server/ck-server.go index d8f8820..a5fde7d 100644 --- a/cmd/ck-server/ck-server.go +++ b/cmd/ck-server/ck-server.go @@ -1,7 +1,7 @@ package main import ( - "encoding/hex" + "bytes" "flag" "fmt" "io" @@ -78,12 +78,50 @@ func dispatchConnection(conn net.Conn, sta *server.State) { return } - var arrUID [32]byte - copy(arrUID[:], UID) - user, err := sta.Userpanel.GetAndActivateUser(arrUID) + if bytes.Equal(UID, sta.AdminUID) { + reply := server.ComposeReply(ch) + _, err = conn.Write(reply) + if err != nil { + log.Printf("Sending reply to remote: %v\n", err) + go conn.Close() + return + } + + // Two discarded messages: ChangeCipherSpec and Finished + discardBuf := make([]byte, 1024) + for c := 0; c < 2; c++ { + _, err = util.ReadTLS(conn, discardBuf) + if err != nil { + log.Printf("Reading discarded message %v: %v\n", c, err) + go conn.Close() + return + } + } + + c := sta.Userpanel.MakeController(sta.AdminUID) + for { + n, err := conn.Read(data) + if err != nil { + log.Println(err) + return + } + resp, err := c.HandleRequest(data[:n]) + if err != nil { + return + } + _, err = conn.Write(resp) + if err != nil { + log.Println(err) + return + } + } + + } + user, err := sta.Userpanel.GetAndActivateUser(UID) if err != nil { log.Printf("+1 unauthorised user from %v, uid: %x\n", conn.RemoteAddr(), UID) goWeb(data) + return } reply := server.ComposeReply(ch) @@ -185,11 +223,6 @@ func main() { } sta, _ := server.InitState(localHost, localPort, remoteHost, remotePort, time.Now, "userinfo.db") - //debug - var arrUID [32]byte - UID, _ := hex.DecodeString("50d858e0985ecc7f60418aaf0cc5ab587f42c2570a884095a9e8ccacd0f6545c") - copy(arrUID[:], UID) - sta.Userpanel.AddNewUser(arrUID, 10, 1e12, 1e12, 1e12, 1e12) err := sta.ParseConfig(pluginOpts) if err != nil { log.Fatalf("Configuration file error: %v", err) diff --git a/internal/multiplex/qos.go b/internal/multiplex/qos.go index ca51b0e..933bb9c 100644 --- a/internal/multiplex/qos.go +++ b/internal/multiplex/qos.go @@ -17,11 +17,11 @@ type Valve struct { rxtb atomic.Value // *ratelimit.Bucket txtb atomic.Value // *ratelimit.Bucket - rxCredit int64 - txCredit int64 + rxCredit *int64 + txCredit *int64 } -func MakeValve(rxRate, txRate, rxCredit, txCredit int64) *Valve { +func MakeValve(rxRate, txRate int64, rxCredit, txCredit *int64) *Valve { v := &Valve{ rxCredit: rxCredit, txCredit: txCredit, @@ -31,6 +31,8 @@ func MakeValve(rxRate, txRate, rxCredit, txCredit int64) *Valve { return v } +// TODO: inline formatting + func (v *Valve) SetRxRate(rate int64) { v.rxtb.Store(ratelimit.NewBucketWithRate(float64(rate), rate)) } @@ -47,20 +49,28 @@ func (v *Valve) txWait(n int) { v.txtb.Load().(*ratelimit.Bucket).Wait(int64(n)) } +func (v *Valve) SetRxCredit(n int64) { + atomic.StoreInt64(v.rxCredit, n) +} + +func (v *Valve) SetTxCredit(n int64) { + atomic.StoreInt64(v.txCredit, n) +} + func (v *Valve) GetRxCredit() int64 { - return atomic.LoadInt64(&v.rxCredit) + return atomic.LoadInt64(v.rxCredit) } func (v *Valve) GetTxCredit() int64 { - return atomic.LoadInt64(&v.txCredit) + return atomic.LoadInt64(v.txCredit) } // n can be negative func (v *Valve) AddRxCredit(n int64) int64 { - return atomic.AddInt64(&v.rxCredit, n) + return atomic.AddInt64(v.rxCredit, n) } // n can be negative func (v *Valve) AddTxCredit(n int64) int64 { - return atomic.AddInt64(&v.txCredit, n) + return atomic.AddInt64(v.txCredit, n) } diff --git a/internal/multiplex/session.go b/internal/multiplex/session.go index 1301008..6f2cfb2 100644 --- a/internal/multiplex/session.go +++ b/internal/multiplex/session.go @@ -105,8 +105,9 @@ func (sesh *Session) isStream(id uint32) bool { func (sesh *Session) getStream(id uint32) *Stream { sesh.streamsM.RLock() - defer sesh.streamsM.RUnlock() - return sesh.streams[id] + ret := sesh.streams[id] + sesh.streamsM.RUnlock() + return ret } // addStream is used when the remote opened a new stream and we got notified @@ -123,8 +124,8 @@ func (sesh *Session) addStream(id uint32) *Stream { func (sesh *Session) Close() error { // Because closing a closed channel causes panic sesh.closingM.Lock() - defer sesh.closingM.Unlock() if sesh.closing { + sesh.closingM.Unlock() return errRepeatSessionClosing } sesh.closing = true diff --git a/internal/multiplex/stream.go b/internal/multiplex/stream.go index 0625b92..3d89e25 100644 --- a/internal/multiplex/stream.go +++ b/internal/multiplex/stream.go @@ -85,9 +85,9 @@ func (stream *Stream) Write(in []byte) (n int, err error) { // in the middle of the execution of Write. This may cause the closing frame // to be sent before the data frame and cause loss of packet. stream.closingM.RLock() - defer stream.closingM.RUnlock() select { case <-stream.die: + stream.closingM.RUnlock() return 0, errBrokenStream default: } @@ -101,6 +101,7 @@ func (stream *Stream) Write(in []byte) (n int, err error) { tlsRecord := stream.session.obfs(f) n, err = stream.session.sb.send(tlsRecord) + stream.closingM.RUnlock() return @@ -109,12 +110,13 @@ func (stream *Stream) Write(in []byte) (n int, err error) { func (stream *Stream) shutdown() error { // Lock here because closing a closed channel causes panic stream.closingM.Lock() - defer stream.closingM.Unlock() if stream.closing { + stream.closingM.Unlock() return errRepeatStreamClosing } stream.closing = true close(stream.die) + stream.closingM.Unlock() return nil } @@ -149,6 +151,8 @@ func (stream *Stream) Close() error { Payload: pad, } tlsRecord := stream.session.obfs(f) + // FIXME: despite sb.send being always called after Write(), the actual TCP sending + // may still be out of order stream.session.sb.send(tlsRecord) stream.session.delStream(stream.id) diff --git a/internal/multiplex/switchboard.go b/internal/multiplex/switchboard.go index 9723281..eca81ff 100644 --- a/internal/multiplex/switchboard.go +++ b/internal/multiplex/switchboard.go @@ -19,9 +19,11 @@ type switchboard struct { cesM sync.RWMutex ces []*connEnclave - //debug - hM sync.Mutex - used map[uint32]bool + /* + //debug + hM sync.Mutex + used map[uint32]bool + */ } func (sb *switchboard) getOptimum() *connEnclave { @@ -52,7 +54,8 @@ func makeSwitchboard(sesh *Session, valve *Valve) *switchboard { session: sesh, Valve: valve, ces: []*connEnclave{}, - used: make(map[uint32]bool), + //debug + // used: make(map[uint32]bool), } return sb } @@ -151,9 +154,8 @@ func (sb *switchboard) deplex(ce *connEnclave) { } frame := sb.session.deobfs(buf[:n]) - //debug - var stream *Stream + // FIXME: get-then-put without lock if stream = sb.session.getStream(frame.StreamID); stream == nil { if frame.Closing == 1 { // if the frame is telling us to close a closed stream @@ -162,12 +164,14 @@ func (sb *switchboard) deplex(ce *connEnclave) { continue } //debug - sb.hM.Lock() - if sb.used[frame.StreamID] { - log.Printf("%v lost!\n", frame.StreamID) - } - sb.used[frame.StreamID] = true - sb.hM.Unlock() + /* + sb.hM.Lock() + if sb.used[frame.StreamID] { + log.Printf("%v lost!\n", frame.StreamID) + } + sb.used[frame.StreamID] = true + sb.hM.Unlock() + */ stream = sb.session.addStream(frame.StreamID) } stream.writeNewFrame(frame) diff --git a/internal/server/state.go b/internal/server/state.go index 8807478..679d061 100644 --- a/internal/server/state.go +++ b/internal/server/state.go @@ -15,6 +15,7 @@ import ( type rawConfig struct { WebServerAddr string Key string + AdminUID string } type stateManager interface { ParseConfig(string) error @@ -30,6 +31,7 @@ type State struct { SS_REMOTE_PORT string Now func() time.Time + AdminUID []byte staticPv crypto.PrivateKey Userpanel *usermanager.Userpanel usedRandomM sync.RWMutex @@ -91,6 +93,17 @@ func parseKey(b64 string) (crypto.PrivateKey, error) { return &pv, nil } +// base64 encoded 32 byte adminUID +func parseAdminUID(b64 string) ([]byte, error) { + uid, err := base64.StdEncoding.DecodeString(b64) + if err != nil { + return nil, err + } + return uid, nil +} + +// TODO: specify which parse fails + // ParseConfig parses the config (either a path to json or in-line ssv config) into a State variable func (sta *State) ParseConfig(conf string) (err error) { var content []byte @@ -115,6 +128,12 @@ func (sta *State) ParseConfig(conf string) (err error) { return err } sta.staticPv = pv + + adminUID, err := parseAdminUID(preParse.AdminUID) + if err != nil { + return err + } + sta.AdminUID = adminUID return nil } diff --git a/internal/server/usermanager/user.go b/internal/server/usermanager/user.go index 4c92b9e..b018c5e 100644 --- a/internal/server/usermanager/user.go +++ b/internal/server/usermanager/user.go @@ -9,22 +9,26 @@ import ( mux "github.com/cbeuw/Cloak/internal/multiplex" ) -/* -type userParams struct { - sessionsCap uint32 - upRate int64 - downRate int64 - upCredit int64 - downCredit int64 +// for the ease of using json package +type UserInfo struct { + UID []byte + // ALL of the following fields have to be accessed atomically + SessionsCap uint32 + UpRate int64 + DownRate int64 + UpCredit int64 + DownCredit int64 + ExpiryTime int64 } -*/ type User struct { up *Userpanel - uid [32]byte + arrUID [32]byte - sessionsCap uint32 //userParams + // TODO: use pointer here instead because we don't want to accidentally read + // UserInfo's Credits? + UserInfo valve *mux.Valve @@ -32,20 +36,35 @@ type User struct { sessions map[uint32]*mux.Session } -func MakeUser(up *Userpanel, uid [32]byte, sessionsCap uint32, upRate, downRate, upCredit, downCredit int64) *User { - valve := mux.MakeValve(upRate, downRate, upCredit, downCredit) +func MakeUser(up *Userpanel, uinfo UserInfo) *User { + // this instance of valve is shared across ALL sessions of a user + valve := mux.MakeValve(uinfo.UpRate, uinfo.DownRate, &uinfo.UpCredit, &uinfo.DownCredit) u := &User{ - up: up, - uid: uid, - valve: valve, - sessionsCap: sessionsCap, - sessions: make(map[uint32]*mux.Session), + up: up, + UserInfo: uinfo, + valve: valve, + sessions: make(map[uint32]*mux.Session), } + copy(u.arrUID[:], uinfo.UID) return u } -func (u *User) setSessionsCap(cap uint32) { - atomic.StoreUint32(&u.sessionsCap, cap) +func (u *User) addUpCredit(delta int64) { u.valve.AddRxCredit(delta) } +func (u *User) addDownCredit(delta int64) { u.valve.AddTxCredit(delta) } +func (u *User) setSessionsCap(cap uint32) { atomic.StoreUint32(&u.SessionsCap, cap) } +func (u *User) setUpRate(rate int64) { u.valve.SetRxRate(rate) } +func (u *User) setDownRate(rate int64) { u.valve.SetTxRate(rate) } +func (u *User) setUpCredit(n int64) { u.valve.SetRxCredit(n) } +func (u *User) setDownCredit(n int64) { u.valve.SetTxCredit(n) } +func (u *User) setExpiryTime(time int64) { atomic.StoreInt64(&u.ExpiryTime, time) } + +func (u *User) updateInfo(uinfo UserInfo) { + u.setSessionsCap(uinfo.SessionsCap) + u.setUpCredit(uinfo.UpCredit) + u.setDownCredit(uinfo.DownCredit) + u.setUpRate(uinfo.UpRate) + u.setDownRate(uinfo.DownRate) + u.setExpiryTime(uinfo.ExpiryTime) } func (u *User) GetSession(sessionID uint32) *mux.Session { @@ -65,13 +84,14 @@ func (u *User) DelSession(sessionID uint32) { delete(u.sessions, sessionID) if len(u.sessions) == 0 { u.sessionsM.Unlock() - u.up.delActiveUser(u.uid) + u.up.delActiveUser(u.UID) return } u.sessionsM.Unlock() } func (u *User) GetOrCreateSession(sessionID uint32, obfs func(*mux.Frame) []byte, deobfs func([]byte) *mux.Frame, obfsedRead func(net.Conn, []byte) (int, error)) (sesh *mux.Session, existing bool) { + // TODO: session cap u.sessionsM.Lock() defer u.sessionsM.Unlock() if sesh = u.sessions[sessionID]; sesh != nil { diff --git a/internal/server/usermanager/userpanel.go b/internal/server/usermanager/userpanel.go index ab48fdd..27e5863 100644 --- a/internal/server/usermanager/userpanel.go +++ b/internal/server/usermanager/userpanel.go @@ -2,13 +2,22 @@ package usermanager import ( "encoding/binary" + "encoding/hex" "errors" + "os" + "strconv" "sync" "time" "github.com/boltdb/bolt" ) +var Uint32 = binary.BigEndian.Uint32 +var Uint64 = binary.BigEndian.Uint64 +var PutUint16 = binary.BigEndian.PutUint16 +var PutUint32 = binary.BigEndian.PutUint32 +var PutUint64 = binary.BigEndian.PutUint64 + type Userpanel struct { db *bolt.DB @@ -26,73 +35,108 @@ func MakeUserpanel(dbPath string) (*Userpanel, error) { activeUsers: make(map[[32]byte]*User), } go func() { - time.Sleep(time.Second * 10) - up.updateCredits() + for { + time.Sleep(time.Second * 10) + up.updateCredits() + } }() return up, nil } -var ErrUserNotFound = errors.New("User does not exist in memory or db") +// credits of all users are updated together so that there is only 1 goroutine managing it +func (up *Userpanel) updateCredits() { + up.activeUsersM.RLock() + for _, u := range up.activeUsers { + up.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(u.arrUID[:]) + if b == nil { + return ErrUserNotFound + } + if err := b.Put([]byte("UpCredit"), i64ToB(u.valve.GetRxCredit())); err != nil { + return err + } + if err := b.Put([]byte("DownCredit"), i64ToB(u.valve.GetTxCredit())); err != nil { + return err + } + return nil + + }) + } + up.activeUsersM.RUnlock() + +} + +// TODO: prefixed backup path +func (up *Userpanel) backupDB(bakPath string) error { + _, err := os.Stat(bakPath) + if err == nil { + return errors.New("Attempting to overwrite a file during backup!") + } + var bak *os.File + if os.IsNotExist(err) { + bak, err = os.Create(bakPath) + if err != nil { + return err + } + } + err = up.db.View(func(tx *bolt.Tx) error { + _, err := tx.WriteTo(bak) + if err != nil { + return err + } + return nil + }) + return err +} + +var ErrUserNotFound = errors.New("User does not exist in db") +var ErrUserNotActive = errors.New("User is not active") + +// TODO: expiry check // GetUser is used to retrieve a user if s/he is active, or to retrieve the user's infor // from the db and mark it as an active user -func (up *Userpanel) GetAndActivateUser(UID [32]byte) (*User, error) { +func (up *Userpanel) GetAndActivateUser(UID []byte) (*User, error) { up.activeUsersM.Lock() - defer up.activeUsersM.Unlock() - if user, ok := up.activeUsers[UID]; ok { + var arrUID [32]byte + copy(arrUID[:], UID) + if user, ok := up.activeUsers[arrUID]; ok { + up.activeUsersM.Unlock() return user, nil } - var sessionsCap uint32 - var upRate, downRate, upCredit, downCredit int64 + var uinfo UserInfo + uinfo.UID = UID err := up.db.View(func(tx *bolt.Tx) error { b := tx.Bucket(UID[:]) if b == nil { return ErrUserNotFound } - sessionsCap = binary.BigEndian.Uint32(b.Get([]byte("sessionsCap"))) - upRate = int64(binary.BigEndian.Uint64(b.Get([]byte("upRate")))) - downRate = int64(binary.BigEndian.Uint64(b.Get([]byte("downRate")))) - upCredit = int64(binary.BigEndian.Uint64(b.Get([]byte("upCredit")))) // reee brackets - downCredit = int64(binary.BigEndian.Uint64(b.Get([]byte("downCredit")))) + uinfo.SessionsCap = Uint32(b.Get([]byte("SessionsCap"))) + uinfo.UpRate = int64(Uint64(b.Get([]byte("UpRate")))) + uinfo.DownRate = int64(Uint64(b.Get([]byte("DownRate")))) + uinfo.UpCredit = int64(Uint64(b.Get([]byte("UpCredit")))) // reee brackets + uinfo.DownCredit = int64(Uint64(b.Get([]byte("DownCredit")))) + uinfo.ExpiryTime = int64(Uint64(b.Get([]byte("ExpiryTime")))) return nil }) if err != nil { + up.activeUsersM.Unlock() return nil, err } - // TODO: put all of these parameters in a struct instead - u := MakeUser(up, UID, sessionsCap, upRate, downRate, upCredit, downCredit) - up.activeUsers[UID] = u + u := MakeUser(up, uinfo) + up.activeUsers[arrUID] = u + up.activeUsersM.Unlock() return u, nil } -func (up *Userpanel) AddNewUser(UID [32]byte, sessionsCap uint32, upRate, downRate, upCredit, downCredit int64) error { +func (up *Userpanel) updateDBEntryUint32(UID []byte, key string, value uint32) error { err := up.db.Update(func(tx *bolt.Tx) error { - b, err := tx.CreateBucket(UID[:]) - if err != nil { - return err + b := tx.Bucket(UID) + if b == nil { + return ErrUserNotFound } - // FIXME: obnoxious code - quad := make([]byte, 4) - binary.BigEndian.PutUint32(quad, sessionsCap) - if err = b.Put([]byte("sessionsCap"), quad); err != nil { - return err - } - oct := make([]byte, 8) - binary.BigEndian.PutUint64(oct, uint64(upRate)) - if err = b.Put([]byte("upRate"), oct); err != nil { - return err - } - binary.BigEndian.PutUint64(oct, uint64(downRate)) - if err = b.Put([]byte("downRate"), oct); err != nil { - return err - } - binary.BigEndian.PutUint64(oct, uint64(upCredit)) - if err = b.Put([]byte("upCredit"), oct); err != nil { - return err - } - binary.BigEndian.PutUint64(oct, uint64(downCredit)) - if err = b.Put([]byte("downCredit"), oct); err != nil { + if err := b.Put([]byte(key), u32ToB(value)); err != nil { return err } return nil @@ -100,31 +144,13 @@ func (up *Userpanel) AddNewUser(UID [32]byte, sessionsCap uint32, upRate, downRa return err } -func (up *Userpanel) updateDBEntryUint32(UID [32]byte, key string, value uint32) error { +func (up *Userpanel) updateDBEntryInt64(UID []byte, key string, value int64) error { err := up.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(UID[:]) + b := tx.Bucket(UID) if b == nil { return ErrUserNotFound } - quad := make([]byte, 4) - binary.BigEndian.PutUint32(quad, value) - if err := b.Put([]byte(key), quad); err != nil { - return err - } - return nil - }) - return err -} - -func (up *Userpanel) updateDBEntryInt64(UID [32]byte, key string, value int64) error { - err := up.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(UID[:]) - if b == nil { - return ErrUserNotFound - } - oct := make([]byte, 8) - binary.BigEndian.PutUint64(oct, uint64(value)) - if err := b.Put([]byte(key), oct); err != nil { + if err := b.Put([]byte(key), i64ToB(value)); err != nil { return err } return nil @@ -133,46 +159,236 @@ func (up *Userpanel) updateDBEntryInt64(UID [32]byte, key string, value int64) e } // This is used when all sessions of a user close -func (up *Userpanel) delActiveUser(UID [32]byte) { +func (up *Userpanel) delActiveUser(UID []byte) { + var arrUID [32]byte + copy(arrUID[:], UID) up.activeUsersM.Lock() - delete(up.activeUsers, UID) + delete(up.activeUsers, arrUID) up.activeUsersM.Unlock() } -func (up *Userpanel) getActiveUser(UID [32]byte) *User { +func (up *Userpanel) getActiveUser(UID []byte) *User { + var arrUID [32]byte + copy(arrUID[:], UID) up.activeUsersM.RLock() - defer up.activeUsersM.RUnlock() - return up.activeUsers[UID] + ret := up.activeUsers[arrUID] + up.activeUsersM.RUnlock() + return ret } -func (up *Userpanel) SetSessionsCap(UID [32]byte, newSessionsCap uint32) error { - if u := up.getActiveUser(UID); u != nil { - u.setSessionsCap(newSessionsCap) +// below are remote control utilised functions + +func (up *Userpanel) listActiveUsers() [][]byte { + var ret [][]byte + up.activeUsersM.RLock() + for _, u := range up.activeUsers { + ret = append(ret, u.UID) } - err := up.updateDBEntryUint32(UID, "sessionsCap", newSessionsCap) + up.activeUsersM.RUnlock() + return ret +} + +func (up *Userpanel) listAllUsers() []UserInfo { + var ret []UserInfo + up.db.View(func(tx *bolt.Tx) error { + tx.ForEach(func(UID []byte, b *bolt.Bucket) error { + // if we want to avoid writing every single key out, + // we would have to either make UserInfo a map, + // or use reflect. + // neither is convinient + var uinfo UserInfo + uinfo.UID = UID + uinfo.SessionsCap = Uint32(b.Get([]byte("SessionsCap"))) + uinfo.UpRate = int64(Uint64(b.Get([]byte("UpRate")))) + uinfo.DownRate = int64(Uint64(b.Get([]byte("DownRate")))) + uinfo.UpCredit = int64(Uint64(b.Get([]byte("UpCredit")))) + uinfo.DownCredit = int64(Uint64(b.Get([]byte("DownCredit")))) + uinfo.ExpiryTime = int64(Uint64(b.Get([]byte("ExpiryTime")))) + ret = append(ret, uinfo) + return nil + }) + return nil + }) + return ret +} + +func (up *Userpanel) getUserInfo(UID []byte) (UserInfo, error) { + var uinfo UserInfo + err := up.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(UID) + if b == nil { + return ErrUserNotFound + } + uinfo.UID = UID + uinfo.SessionsCap = Uint32(b.Get([]byte("SessionsCap"))) + uinfo.UpRate = int64(Uint64(b.Get([]byte("UpRate")))) + uinfo.DownRate = int64(Uint64(b.Get([]byte("DownRate")))) + uinfo.UpCredit = int64(Uint64(b.Get([]byte("UpCredit")))) + uinfo.DownCredit = int64(Uint64(b.Get([]byte("DownCredit")))) + uinfo.ExpiryTime = int64(Uint64(b.Get([]byte("ExpiryTime")))) + return nil + }) + return uinfo, err +} + +// In boltdb, the value argument for bucket.Put has to be valid for the duration +// of the transaction. +// This basically means that you cannot reuse a byte slice for two different keys +// in a transaction. So we need to allocate a fresh byte slice for each value +func u32ToB(value uint32) []byte { + quad := make([]byte, 4) + PutUint32(quad, value) + return quad +} + +func i64ToB(value int64) []byte { + oct := make([]byte, 8) + PutUint64(oct, uint64(value)) + return oct +} + +func (up *Userpanel) addNewUser(uinfo UserInfo) error { + err := up.db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket(uinfo.UID[:]) + if err != nil { + return err + } + // FIXME: obnoxious code + if err = b.Put([]byte("SessionsCap"), u32ToB(uinfo.SessionsCap)); err != nil { + return err + } + if err = b.Put([]byte("UpRate"), i64ToB(uinfo.UpRate)); err != nil { + return err + } + if err = b.Put([]byte("DownRate"), i64ToB(uinfo.DownRate)); err != nil { + return err + } + if err = b.Put([]byte("UpCredit"), i64ToB(uinfo.UpCredit)); err != nil { + return err + } + if err = b.Put([]byte("DownCredit"), i64ToB(uinfo.DownCredit)); err != nil { + return err + } + if err = b.Put([]byte("ExpiryTime"), i64ToB(uinfo.ExpiryTime)); err != nil { + return err + } + return nil + }) return err } -func (up *Userpanel) updateCredits() { - up.activeUsersM.RLock() - defer u.activeUsersM.RUnlock() - for _, user := range up.activeUsers { - up.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(u.uid[:]) - if b == nil { - return ErrUserNotFound - } - oct := make([]byte, 8) - binary.BigEndian.PutUint64(oct, uint64(u.valve.GetRxCredit())) - if err := b.Put([]byte("rxCredit"), oct); err != nil { - return err - } - binary.BigEndian.PutUint64(oct, uint64(u.valve.GetTxCredit())) - if err := b.Put([]byte("txCredit"), oct); err != nil { - return err - } - return nil - - }) +func (up *Userpanel) delUser(UID []byte) error { + err := up.backupDB(strconv.FormatInt(time.Now().Unix(), 10) + "_pre_del_" + hex.EncodeToString(UID) + ".bak") + if err != nil { + return err } + err = up.db.Update(func(tx *bolt.Tx) error { + return tx.DeleteBucket(UID) + }) + return err +} + +func (up *Userpanel) syncMemFromDB(UID []byte) error { + var uinfo UserInfo + err := up.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(UID) + if b == nil { + return ErrUserNotFound + } + uinfo.UID = UID + uinfo.SessionsCap = Uint32(b.Get([]byte("SessionsCap"))) + uinfo.UpRate = int64(Uint64(b.Get([]byte("UpRate")))) + uinfo.DownRate = int64(Uint64(b.Get([]byte("DownRate")))) + uinfo.UpCredit = int64(Uint64(b.Get([]byte("UpCredit")))) + uinfo.DownCredit = int64(Uint64(b.Get([]byte("DownCredit")))) + uinfo.ExpiryTime = int64(Uint64(b.Get([]byte("ExpiryTime")))) + return nil + }) + if err != nil { + return err + } + + u := up.getActiveUser(UID) + if u == nil { + return ErrUserNotActive + } + u.updateInfo(uinfo) + return nil +} + +// the following functions will return err==nil if user is not active + +func (up *Userpanel) setSessionsCap(UID []byte, cap uint32) error { + err := up.updateDBEntryUint32(UID, "SessionsCap", cap) + if err != nil { + return err + } + u := up.getActiveUser(UID) + if u == nil { + return nil + } + u.setSessionsCap(cap) + return nil +} + +func (up *Userpanel) setUpRate(UID []byte, rate int64) error { + err := up.updateDBEntryInt64(UID, "UpRate", rate) + if err != nil { + return err + } + u := up.getActiveUser(UID) + if u == nil { + return nil + } + u.setUpRate(rate) + return nil +} +func (up *Userpanel) setDownRate(UID []byte, rate int64) error { + err := up.updateDBEntryInt64(UID, "DownRate", rate) + if err != nil { + return err + } + u := up.getActiveUser(UID) + if u == nil { + return nil + } + u.setDownRate(rate) + return nil +} +func (up *Userpanel) setUpCredit(UID []byte, n int64) error { + err := up.updateDBEntryInt64(UID, "UpCredit", n) + if err != nil { + return err + } + u := up.getActiveUser(UID) + if u == nil { + return nil + } + u.setUpCredit(n) + return nil +} +func (up *Userpanel) setDownCredit(UID []byte, n int64) error { + err := up.updateDBEntryInt64(UID, "DownCredit", n) + if err != nil { + return err + } + u := up.getActiveUser(UID) + if u == nil { + return nil + } + u.setDownCredit(n) + return nil +} + +func (up *Userpanel) setExpiryTime(UID []byte, time int64) error { + err := up.updateDBEntryInt64(UID, "ExpiryTime", time) + if err != nil { + return err + } + u := up.getActiveUser(UID) + if u == nil { + return nil + } + u.setExpiryTime(time) + return nil } diff --git a/internal/util/obfs.go b/internal/util/obfs.go index 8286c69..6fc9f0c 100644 --- a/internal/util/obfs.go +++ b/internal/util/obfs.go @@ -3,7 +3,7 @@ package util import ( "encoding/binary" - xxhash "github.com/OneOfOne/xxhash" + //xxhash "github.com/OneOfOne/xxhash" mux "github.com/cbeuw/Cloak/internal/multiplex" ) @@ -11,16 +11,19 @@ import ( // The keys are generated from the SID and the payload of the frame. // FIXME: this code will panic if len(data)<18. func genXorKeys(secret []byte, data []byte) (i uint32, ii uint32, iii uint32) { - h := xxhash.New32() - ret := make([]uint32, 3) - preHash := make([]byte, 16) - for j := 0; j < 3; j++ { - copy(preHash[0:10], secret[j*10:j*10+10]) - copy(preHash[10:16], data[j*6:j*6+6]) - h.Write(preHash) - ret[j] = h.Sum32() - } - return ret[0], ret[1], ret[2] + /* + h := xxhash.New32() + ret := make([]uint32, 3) + preHash := make([]byte, 16) + for j := 0; j < 3; j++ { + copy(preHash[0:10], secret[j*10:j*10+10]) + copy(preHash[10:16], data[j*6:j*6+6]) + h.Write(preHash) + ret[j] = h.Sum32() + } + return ret[0], ret[1], ret[2] + */ + return 0, 0, 0 } func MakeObfs(key []byte) func(*mux.Frame) []byte {