diff --git a/apiV1login.go b/apiV1login.go index 4fa16ac..483ede8 100644 --- a/apiV1login.go +++ b/apiV1login.go @@ -68,7 +68,7 @@ func apiV1Login(w http.ResponseWriter, r *http.Request, ss *loan.Session) { res.Env.Body["debug_session_user_error"] = e.Error() } } - + WsNotifyNewLogin(ss) //send out apiV1AddTrackingCookie(w, r, ss) //always the last one to set cookies res.sendJson(w) diff --git a/apiV1logout.go b/apiV1logout.go index 66c4219..1b7b690 100644 --- a/apiV1logout.go +++ b/apiV1logout.go @@ -9,7 +9,7 @@ import ( func apiV1Logout(w http.ResponseWriter, r *http.Request, ss *loan.Session) { res := apiV1ResponseBlank() - + WsNotifyLogout(ss) ss.Expire = time.Now().Add(-10000) //make sure it expired ssEmpty := loan.Session{} diff --git a/apiv1.go b/apiv1.go index 07017f6..7e07af6 100644 --- a/apiv1.go +++ b/apiv1.go @@ -271,6 +271,9 @@ func apiV1GetMachineId(r *http.Request) string { return mid } +func apiV1GetMachineIdFromSession(ss *loan.Session) string { + return ss.GetStr("Biukop-Mid") +} func apiV1InitSessionByBrowserId(r *http.Request) (session loan.Session, e error) { var sid string mid := apiV1GetMachineId(r) diff --git a/websocket.go b/websocket.go index ad20365..d0ea717 100644 --- a/websocket.go +++ b/websocket.go @@ -1,7 +1,9 @@ package main import ( + "encoding/json" "fmt" + "github.com/brianvoe/gofakeit/v6" "github.com/gorilla/websocket" log "github.com/sirupsen/logrus" "net/http" @@ -19,26 +21,40 @@ func apiV1WebSocketHandler(w http.ResponseWriter, r *http.Request) { upgradeToWs.CheckOrigin = func(r *http.Request) bool { return true } ws, err := upgradeToWs.Upgrade(w, r, nil) if err != nil { - log.Println(err) + log.Println("cannot upgrade websocket", err) + return } + + mid := apiV1GetMachineId(r) + wsq.MapMidToConnection(mid, ws) + // helpful log statement to show connections - log.Println("Websocket Api/V1: Client Connected") + log.Println("Websocket Api/V1: Client Connected", mid) wsReader(ws) } func wsReader(conn *websocket.Conn) { + wsq.addConnection(conn) for { + + stillExist := wsq.has(conn) + if !stillExist { + return // the connection has been removed. + } + // read in a message messageType, p, err := conn.ReadMessage() if err != nil { + wsq.del(conn) log.Println(err) return } // print out that message for clarity - fmt.Println(string(p)) + // fmt.Println(string(p)) if err := conn.WriteMessage(messageType, p); err != nil { + wsq.del(conn) log.Println(err) return } @@ -49,13 +65,40 @@ func wsReader(conn *websocket.Conn) { } } +func WsBroadCast(msg string) { + wsq.wsDoBroadcast(msg, "") +} + +func WsSend(To string, msg map[string]string) { + b, e := json.Marshal(msg) + if e != nil { + log.Error("cannot broadcast websocket message, cannot convert to json", msg) + } + + conn := wsq.getConnBySessionId(To) + if conn != nil { + err := conn.WriteMessage(websocket.TextMessage, b) + if err != nil { + wsq.del(conn) + } + } +} + +func WsBroadCastExceptMe(me string, msg map[string]string) { + b, e := json.Marshal(msg) + if e == nil { + wsq.wsDoBroadcast(string(b), me) + } +} + func wsDummySender(conn *websocket.Conn) { //write subsequent 5 copies, each after 1 second log.Info("start sending server data to client ..") p := "dummy string from server " for i := 1; i < 500; i++ { time.Sleep(1 * time.Second) - msg := fmt.Sprintf("copy %d, %s", i, p) + //msg := fmt.Sprintf("copy %d, %s, %s", i, p, wsDummyString()) // 4M long string no issue + msg := fmt.Sprintf("copy %d, %s ", i, p) log.Info("dummy sender is working, ", msg) if err := conn.WriteMessage(websocket.TextMessage, []byte(msg)); err != nil { log.Println("wsDummySender stopped on error: ", err) @@ -63,3 +106,8 @@ func wsDummySender(conn *websocket.Conn) { } } } + +func wsDummyString() string { + s := gofakeit.LetterN(4096000) // 4M + return s +} diff --git a/websocket_message.go b/websocket_message.go new file mode 100644 index 0000000..c9b30d1 --- /dev/null +++ b/websocket_message.go @@ -0,0 +1,25 @@ +package main + +import "biukop.com/sfm/loan" + +func WsNotifyNewLogin(ss *loan.Session) { + + msg := make(map[string]string) + msg["T"] = "login" + msg["Mid"] = apiV1GetMachineIdFromSession(ss) + msg["Sid"] = ss.Id + msg["Uid"] = ss.User + msg["Role"] = ss.GetRole() + WsBroadCastExceptMe(ss.Id, msg) +} + +func WsNotifyLogout(ss *loan.Session) { + + msg := make(map[string]string) + msg["T"] = "logout" + msg["Mid"] = apiV1GetMachineIdFromSession(ss) + msg["Uid"] = ss.User + msg["Sid"] = ss.Id + msg["Role"] = ss.GetRole() + WsBroadCastExceptMe(ss.Id, msg) +} diff --git a/websocket_queue.go b/websocket_queue.go new file mode 100644 index 0000000..6d0106b --- /dev/null +++ b/websocket_queue.go @@ -0,0 +1,93 @@ +package main + +import ( + "github.com/gorilla/websocket" + "sync" +) + +type wsQ struct { + wsMutex sync.Mutex + Clients map[*websocket.Conn]struct{} + ClientsBySession map[string]*websocket.Conn +} + +var wsq = wsQ{ + Clients: make(map[*websocket.Conn]struct{}), + ClientsBySession: make(map[string]*websocket.Conn), +} + +func (m *wsQ) addConnection(c *websocket.Conn) { + m.wsMutex.Lock() + m.Clients[c] = struct{}{} + m.wsMutex.Unlock() +} + +func (m *wsQ) MapMidToConnection(mid string, c *websocket.Conn) { + m.wsMutex.Lock() + m.ClientsBySession[mid] = c + m.wsMutex.Unlock() +} + +func (m *wsQ) has(conn *websocket.Conn) (yes bool) { + m.wsMutex.Lock() + _, ok := m.Clients[conn] + m.wsMutex.Unlock() + return ok +} + +func (m *wsQ) getConnBySessionId(sid string) (conn *websocket.Conn) { + conn = nil + m.wsMutex.Lock() + for k, v := range m.ClientsBySession { + if k == sid { + conn = v + break + } + } + m.wsMutex.Unlock() + return +} + +func (m *wsQ) del(conn *websocket.Conn) { + m.wsMutex.Lock() + delete(m.Clients, conn) + for k, v := range m.ClientsBySession { + if v == conn { + delete(m.ClientsBySession, k) + break + } + } + m.wsMutex.Unlock() +} + +func (m *wsQ) delBySessionId(sid string) { + m.wsMutex.Lock() + for k, v := range m.ClientsBySession { + if k == sid { + delete(m.ClientsBySession, k) + break + } + delete(m.Clients, v) + } + m.wsMutex.Unlock() +} + +func (m *wsQ) wsDoBroadcast(msg string, except string) { + toBeRemoved := make([]*websocket.Conn, 0, 20) + m.wsMutex.Lock() + for k, _ := range m.Clients { + if m.ClientsBySession[except] == k { + continue + } + err := k.WriteMessage(websocket.TextMessage, []byte(msg)) + if err != nil { + toBeRemoved = append(toBeRemoved, k) + } + } + + // remove all those have errors for communication + for _, v := range toBeRemoved { + delete(m.Clients, v) + } + m.wsMutex.Unlock() +}