From aaf468c1843ec4748d97394b400ccc0d3b64488a Mon Sep 17 00:00:00 2001 From: sp Date: Tue, 11 May 2021 16:28:46 +1000 Subject: [PATCH] session and websocket refactored. --- apiV1login.go | 4 +-- apiv1.go | 27 +++++++++++------- deploy/deploy.sh | 2 +- websocket.go | 17 ++++++----- websocket_message.go | 4 ++- websocket_queue.go | 67 +++++++++++++++++++++++++++++++++++--------- 6 files changed, 86 insertions(+), 35 deletions(-) diff --git a/apiV1login.go b/apiV1login.go index 483ede8..44651d8 100644 --- a/apiV1login.go +++ b/apiV1login.go @@ -18,7 +18,7 @@ func apiV1Login(w http.ResponseWriter, r *http.Request, ss *loan.Session) { res := apiV1ResponseBlank() l := loginForm{} - e := l.getFromClient(r) + e := l.retrieveLoginForm(r) if e != nil { log.Warn("Failed login - cannot analyze request " + e.Error()) res.add("login", false) @@ -116,7 +116,7 @@ func getUserExtraForLogin(u loan.User, ss *loan.Session) (ret UserExtra) { return } -func (m *loginForm) getFromClient(r *http.Request) (e error) { +func (m *loginForm) retrieveLoginForm(r *http.Request) (e error) { e = apiV1DecodeRequestBody(m, r) if e != nil { diff --git a/apiv1.go b/apiv1.go index 87919c3..f0029cc 100644 --- a/apiv1.go +++ b/apiv1.go @@ -251,16 +251,17 @@ func apiV1NoNeedSession(r *http.Request) bool { func apiV1InitSession(r *http.Request) (session loan.Session) { session.MarkEmpty() - //track browser, and take session from cookie - cookieSession, e := apiV1InitSessionByCookie(r) - if e == nil { - session = cookieSession - } - //try session login first, if not an empty session will be created headerSession, e := apiV1InitSessionByHttpHeader(r) if e == nil { session = headerSession + } else { // if session from header failed, we try cookie session + // track browser, and take session from cookie + // cookie has disadvantage that multiple tab will get overwritten + cookieSession, e := apiV1InitSessionByCookie(r) + if e == nil { + session = cookieSession + } } if session.IsEmpty() { @@ -269,8 +270,9 @@ func apiV1InitSession(r *http.Request) (session loan.Session) { session.RenewIfExpireSoon() } //we have a session anyway - session.Add("Biukop-Mid", apiV1GetMachineId(r)) //set machine id - session.SetRemote(r) //make sure they are using latest remote + session.Add("Biukop-Mid", apiV1GetMachineId(r)) //set machine id + session.Add("Biukop-Socket", apiV1GetSocketId(r)) //set machine id + session.SetRemote(r) //make sure they are using latest remote return } @@ -284,7 +286,7 @@ func setupCrossOriginResponse(w *http.ResponseWriter, r *http.Request) { (*w).Header().Set("Access-Control-Allow-Origin", origin) //for that specific origin (*w).Header().Set("Access-Control-Allow-Credentials", "true") (*w).Header().Set("Access-Control-Allow-Methods", removeDupHeaderOptions("POST, GET, OPTIONS, PUT, DELETE, "+method)) - (*w).Header().Set("Access-Control-Allow-Headers", removeDupHeaderOptions("Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, Cookie, Biukop-Session, Biukop-Session-Token, Biukop-Session-Expire, "+requestedHeaders)) + (*w).Header().Set("Access-Control-Allow-Headers", removeDupHeaderOptions("Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, Cookie, Biukop-Session, Biukop-Socket , "+requestedHeaders)) } func apiV1GetMachineId(r *http.Request) string { @@ -303,6 +305,11 @@ func apiV1GetMachineId(r *http.Request) string { return mid } +func apiV1GetSocketId(r *http.Request) string { + socketId := r.Header.Get("Biukop-Socket") + return socketId +} + func apiV1GetCORSHeaders(r *http.Request) (ret string) { requestedHeaders := r.Header.Get("Access-control-Request-Headers") ret = "Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, Cookie, Biukop-Session, Biukop-Session-Token, Biukop-Session-Expire," + requestedHeaders @@ -361,7 +368,7 @@ func apiV1AddTrackingSession(w http.ResponseWriter, r *http.Request, session *lo sessionId := "" if session == nil { log.Warn("non-exist session, empty Id sent", session) - w.Header().Add("Biukop-Session", "") + // w.Header().Add("Biukop-Session", "") } else { if session.Id == "" { log.Warn("empty session, empty Id sent", session) diff --git a/deploy/deploy.sh b/deploy/deploy.sh index def738a..66a2be3 100755 --- a/deploy/deploy.sh +++ b/deploy/deploy.sh @@ -41,4 +41,4 @@ rsync -avh --delete /tmp/sfm_loan_rest_api_dist/ root@c5016.biukop.com.au:/var/s ssh root@c5016.biukop.com.au '/usr/local/lib/sfmapi/local_update.sh' # remove binary file -rm -f /home/sp/go/src/SFM_Loan_RestApi/api \ No newline at end of file +rm -f /home/sp/go/src/SFM_Loan_RestApi/apiv1 \ No newline at end of file diff --git a/websocket.go b/websocket.go index 4588104..f8b37b2 100644 --- a/websocket.go +++ b/websocket.go @@ -107,19 +107,22 @@ func WsBroadCast(msg string) { wsq.wsDoBroadcast(msg, "") } -func WsSend(To string, msg map[string]string) { +func WsSend(To string, msg map[string]string) (e error) { + conn := wsq.getConnBySessionId(To) + return wsSendByConn(conn, msg) +} + +func wsSendByConn(c *websocket.Conn, msg map[string]string) (e error) { b, e := json.Marshal(msg) if e != nil { log.Error("cannot broadcast websocket message, cannot convert to json", msg) } - conn := wsq.getConnBySessionId(To) - if conn != nil { - err := conn.WriteMessage(websocket.TextMessage, b) - if err != nil { - wsq.del(conn) - } + e = c.WriteMessage(websocket.TextMessage, b) + if e != nil { + wsq.del(c) } + return } func WsBroadCastExceptMe(me string, msg map[string]string) { diff --git a/websocket_message.go b/websocket_message.go index 90921c8..e2a7e6c 100644 --- a/websocket_message.go +++ b/websocket_message.go @@ -10,6 +10,7 @@ func WsNotifyNewLogin(ss *loan.Session) { msg["T"] = "login" msg["Mid"] = apiV1GetMachineIdFromSession(ss) msg["Sid"] = ss.Id + msg["SocketId"] = ss.GetStr("Biukop-Socket") msg["Uid"] = ss.User msg["Role"] = ss.GetRole() WsBroadCastExceptMe(ss.Id, msg) @@ -19,8 +20,9 @@ func WsNotifyLogout(ss *loan.Session) { msg := make(map[string]string) msg["T"] = "logout" msg["Mid"] = apiV1GetMachineIdFromSession(ss) - msg["Uid"] = ss.User msg["Sid"] = ss.Id + msg["SocketId"] = ss.GetStr("Biukop-Socket") + msg["Uid"] = ss.User msg["Role"] = ss.GetRole() WsBroadCastExceptMe(ss.Id, msg) log.Info(ss, msg) diff --git a/websocket_queue.go b/websocket_queue.go index bf7b05d..fd46503 100644 --- a/websocket_queue.go +++ b/websocket_queue.go @@ -1,31 +1,68 @@ package main import ( + "github.com/brianvoe/gofakeit/v6" "github.com/gorilla/websocket" + log "github.com/sirupsen/logrus" "sync" ) type wsQ struct { - wsMutex sync.Mutex - Clients map[*websocket.Conn]string - ClientsBySession map[string]*websocket.Conn + wsMutex sync.Mutex + Clients map[*websocket.Conn]string + ClientsBySocketId map[string]*websocket.Conn + ClientToSessionId map[*websocket.Conn]string } var wsq = wsQ{ - Clients: make(map[*websocket.Conn]string), - ClientsBySession: make(map[string]*websocket.Conn), + Clients: make(map[*websocket.Conn]string), + ClientsBySocketId: make(map[string]*websocket.Conn), + ClientToSessionId: make(map[*websocket.Conn]string), } -func (m *wsQ) addConnection(c *websocket.Conn) { +func (m *wsQ) addConnection(c *websocket.Conn) (socketId string) { m.wsMutex.Lock() - m.Clients[c] = "" + socketId = gofakeit.UUID() + m.Clients[c] = socketId + m.ClientsBySocketId[socketId] = c m.wsMutex.Unlock() + m.debugNumOfClients() + m.informClient(c, socketId) + return +} + +func (m *wsQ) informClient(conn *websocket.Conn, socketId string) { + msg := make(map[string]string) + msg["T"] = "assign-socketId" + msg["socketId"] = socketId + _ = wsSendByConn(conn, msg) +} + +func (m *wsQ) debugNumOfClients() { + m.wsMutex.Lock() + numOfClients := len(m.Clients) + numOfClientsById := len(m.ClientsBySocketId) + m.wsMutex.Unlock() + log.Info("number of concurrent websocket is ", numOfClients, numOfClientsById) } func (m *wsQ) MapSessionToConnection(sid string, c *websocket.Conn) { m.wsMutex.Lock() - m.ClientsBySession[sid] = c - m.Clients[c] = sid + m.ClientToSessionId[c] = sid + m.wsMutex.Unlock() +} + +func (m *wsQ) RemoveSession(sessionId string, c *websocket.Conn) { + m.wsMutex.Lock() + for conn, v := range m.ClientToSessionId { + if v == sessionId { + delete(m.ClientToSessionId, conn) // remove connection to session map + if socketId, exist := m.Clients[conn]; exist { + delete(m.Clients, conn) + delete(m.ClientsBySocketId, socketId) + } + } + } m.wsMutex.Unlock() } @@ -39,7 +76,7 @@ func (m *wsQ) has(conn *websocket.Conn) (yes bool) { func (m *wsQ) getConnBySessionId(sid string) (conn *websocket.Conn) { conn = nil m.wsMutex.Lock() - for k, v := range m.ClientsBySession { + for k, v := range m.ClientsBySocketId { if k == sid { conn = v break @@ -53,25 +90,27 @@ func (m *wsQ) del(conn *websocket.Conn) { m.wsMutex.Lock() if sid, exist := m.Clients[conn]; exist { delete(m.Clients, conn) - delete(m.ClientsBySession, sid) + delete(m.ClientsBySocketId, sid) } m.wsMutex.Unlock() + m.debugNumOfClients() } func (m *wsQ) delBySessionId(sid string) { m.wsMutex.Lock() - if conn, exist := m.ClientsBySession[sid]; exist { - delete(m.ClientsBySession, sid) + if conn, exist := m.ClientsBySocketId[sid]; exist { + delete(m.ClientsBySocketId, sid) delete(m.Clients, conn) } m.wsMutex.Unlock() + m.debugNumOfClients() } func (m *wsQ) wsDoBroadcast(msg string, except string) { toBeRemoved := make([]*websocket.Conn, 0, 20) m.wsMutex.Lock() for k, _ := range m.Clients { - if m.ClientsBySession[except] == k { + if m.ClientsBySocketId[except] == k { continue } err := k.WriteMessage(websocket.TextMessage, []byte(msg))