Pārlūkot izejas kodu

session and websocket refactored.

master
sp pirms 4 gadiem
vecāks
revīzija
aaf468c184
6 mainītis faili ar 86 papildinājumiem un 35 dzēšanām
  1. +2
    -2
      apiV1login.go
  2. +17
    -10
      apiv1.go
  3. +1
    -1
      deploy/deploy.sh
  4. +10
    -7
      websocket.go
  5. +3
    -1
      websocket_message.go
  6. +53
    -14
      websocket_queue.go

+ 2
- 2
apiV1login.go Parādīt failu

@@ -18,7 +18,7 @@ func apiV1Login(w http.ResponseWriter, r *http.Request, ss *loan.Session) {
res := apiV1ResponseBlank()

l := loginForm{}
e := l.getFromClient(r)
e := l.retrieveLoginForm(r)
if e != nil {
log.Warn("Failed login - cannot analyze request " + e.Error())
res.add("login", false)
@@ -116,7 +116,7 @@ func getUserExtraForLogin(u loan.User, ss *loan.Session) (ret UserExtra) {
return
}

func (m *loginForm) getFromClient(r *http.Request) (e error) {
func (m *loginForm) retrieveLoginForm(r *http.Request) (e error) {

e = apiV1DecodeRequestBody(m, r)
if e != nil {

+ 17
- 10
apiv1.go Parādīt failu

@@ -251,16 +251,17 @@ func apiV1NoNeedSession(r *http.Request) bool {
func apiV1InitSession(r *http.Request) (session loan.Session) {
session.MarkEmpty()

//track browser, and take session from cookie
cookieSession, e := apiV1InitSessionByCookie(r)
if e == nil {
session = cookieSession
}

//try session login first, if not an empty session will be created
headerSession, e := apiV1InitSessionByHttpHeader(r)
if e == nil {
session = headerSession
} else { // if session from header failed, we try cookie session
// track browser, and take session from cookie
// cookie has disadvantage that multiple tab will get overwritten
cookieSession, e := apiV1InitSessionByCookie(r)
if e == nil {
session = cookieSession
}
}

if session.IsEmpty() {
@@ -269,8 +270,9 @@ func apiV1InitSession(r *http.Request) (session loan.Session) {
session.RenewIfExpireSoon()
}
//we have a session anyway
session.Add("Biukop-Mid", apiV1GetMachineId(r)) //set machine id
session.SetRemote(r) //make sure they are using latest remote
session.Add("Biukop-Mid", apiV1GetMachineId(r)) //set machine id
session.Add("Biukop-Socket", apiV1GetSocketId(r)) //set machine id
session.SetRemote(r) //make sure they are using latest remote
return
}

@@ -284,7 +286,7 @@ func setupCrossOriginResponse(w *http.ResponseWriter, r *http.Request) {
(*w).Header().Set("Access-Control-Allow-Origin", origin) //for that specific origin
(*w).Header().Set("Access-Control-Allow-Credentials", "true")
(*w).Header().Set("Access-Control-Allow-Methods", removeDupHeaderOptions("POST, GET, OPTIONS, PUT, DELETE, "+method))
(*w).Header().Set("Access-Control-Allow-Headers", removeDupHeaderOptions("Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, Cookie, Biukop-Session, Biukop-Session-Token, Biukop-Session-Expire, "+requestedHeaders))
(*w).Header().Set("Access-Control-Allow-Headers", removeDupHeaderOptions("Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, Cookie, Biukop-Session, Biukop-Socket , "+requestedHeaders))
}

func apiV1GetMachineId(r *http.Request) string {
@@ -303,6 +305,11 @@ func apiV1GetMachineId(r *http.Request) string {
return mid
}

func apiV1GetSocketId(r *http.Request) string {
socketId := r.Header.Get("Biukop-Socket")
return socketId
}

func apiV1GetCORSHeaders(r *http.Request) (ret string) {
requestedHeaders := r.Header.Get("Access-control-Request-Headers")
ret = "Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, Cookie, Biukop-Session, Biukop-Session-Token, Biukop-Session-Expire," + requestedHeaders
@@ -361,7 +368,7 @@ func apiV1AddTrackingSession(w http.ResponseWriter, r *http.Request, session *lo
sessionId := ""
if session == nil {
log.Warn("non-exist session, empty Id sent", session)
w.Header().Add("Biukop-Session", "")
// w.Header().Add("Biukop-Session", "")
} else {
if session.Id == "" {
log.Warn("empty session, empty Id sent", session)

+ 1
- 1
deploy/deploy.sh Parādīt failu

@@ -41,4 +41,4 @@ rsync -avh --delete /tmp/sfm_loan_rest_api_dist/ root@c5016.biukop.com.au:/var/s
ssh root@c5016.biukop.com.au '/usr/local/lib/sfmapi/local_update.sh'

# remove binary file
rm -f /home/sp/go/src/SFM_Loan_RestApi/api
rm -f /home/sp/go/src/SFM_Loan_RestApi/apiv1

+ 10
- 7
websocket.go Parādīt failu

@@ -107,19 +107,22 @@ func WsBroadCast(msg string) {
wsq.wsDoBroadcast(msg, "")
}

func WsSend(To string, msg map[string]string) {
func WsSend(To string, msg map[string]string) (e error) {
conn := wsq.getConnBySessionId(To)
return wsSendByConn(conn, msg)
}

func wsSendByConn(c *websocket.Conn, msg map[string]string) (e error) {
b, e := json.Marshal(msg)
if e != nil {
log.Error("cannot broadcast websocket message, cannot convert to json", msg)
}

conn := wsq.getConnBySessionId(To)
if conn != nil {
err := conn.WriteMessage(websocket.TextMessage, b)
if err != nil {
wsq.del(conn)
}
e = c.WriteMessage(websocket.TextMessage, b)
if e != nil {
wsq.del(c)
}
return
}

func WsBroadCastExceptMe(me string, msg map[string]string) {

+ 3
- 1
websocket_message.go Parādīt failu

@@ -10,6 +10,7 @@ func WsNotifyNewLogin(ss *loan.Session) {
msg["T"] = "login"
msg["Mid"] = apiV1GetMachineIdFromSession(ss)
msg["Sid"] = ss.Id
msg["SocketId"] = ss.GetStr("Biukop-Socket")
msg["Uid"] = ss.User
msg["Role"] = ss.GetRole()
WsBroadCastExceptMe(ss.Id, msg)
@@ -19,8 +20,9 @@ func WsNotifyLogout(ss *loan.Session) {
msg := make(map[string]string)
msg["T"] = "logout"
msg["Mid"] = apiV1GetMachineIdFromSession(ss)
msg["Uid"] = ss.User
msg["Sid"] = ss.Id
msg["SocketId"] = ss.GetStr("Biukop-Socket")
msg["Uid"] = ss.User
msg["Role"] = ss.GetRole()
WsBroadCastExceptMe(ss.Id, msg)
log.Info(ss, msg)

+ 53
- 14
websocket_queue.go Parādīt failu

@@ -1,31 +1,68 @@
package main

import (
"github.com/brianvoe/gofakeit/v6"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
"sync"
)

type wsQ struct {
wsMutex sync.Mutex
Clients map[*websocket.Conn]string
ClientsBySession map[string]*websocket.Conn
wsMutex sync.Mutex
Clients map[*websocket.Conn]string
ClientsBySocketId map[string]*websocket.Conn
ClientToSessionId map[*websocket.Conn]string
}

var wsq = wsQ{
Clients: make(map[*websocket.Conn]string),
ClientsBySession: make(map[string]*websocket.Conn),
Clients: make(map[*websocket.Conn]string),
ClientsBySocketId: make(map[string]*websocket.Conn),
ClientToSessionId: make(map[*websocket.Conn]string),
}

func (m *wsQ) addConnection(c *websocket.Conn) {
func (m *wsQ) addConnection(c *websocket.Conn) (socketId string) {
m.wsMutex.Lock()
m.Clients[c] = ""
socketId = gofakeit.UUID()
m.Clients[c] = socketId
m.ClientsBySocketId[socketId] = c
m.wsMutex.Unlock()
m.debugNumOfClients()
m.informClient(c, socketId)
return
}

func (m *wsQ) informClient(conn *websocket.Conn, socketId string) {
msg := make(map[string]string)
msg["T"] = "assign-socketId"
msg["socketId"] = socketId
_ = wsSendByConn(conn, msg)
}

func (m *wsQ) debugNumOfClients() {
m.wsMutex.Lock()
numOfClients := len(m.Clients)
numOfClientsById := len(m.ClientsBySocketId)
m.wsMutex.Unlock()
log.Info("number of concurrent websocket is ", numOfClients, numOfClientsById)
}

func (m *wsQ) MapSessionToConnection(sid string, c *websocket.Conn) {
m.wsMutex.Lock()
m.ClientsBySession[sid] = c
m.Clients[c] = sid
m.ClientToSessionId[c] = sid
m.wsMutex.Unlock()
}

func (m *wsQ) RemoveSession(sessionId string, c *websocket.Conn) {
m.wsMutex.Lock()
for conn, v := range m.ClientToSessionId {
if v == sessionId {
delete(m.ClientToSessionId, conn) // remove connection to session map
if socketId, exist := m.Clients[conn]; exist {
delete(m.Clients, conn)
delete(m.ClientsBySocketId, socketId)
}
}
}
m.wsMutex.Unlock()
}

@@ -39,7 +76,7 @@ func (m *wsQ) has(conn *websocket.Conn) (yes bool) {
func (m *wsQ) getConnBySessionId(sid string) (conn *websocket.Conn) {
conn = nil
m.wsMutex.Lock()
for k, v := range m.ClientsBySession {
for k, v := range m.ClientsBySocketId {
if k == sid {
conn = v
break
@@ -53,25 +90,27 @@ func (m *wsQ) del(conn *websocket.Conn) {
m.wsMutex.Lock()
if sid, exist := m.Clients[conn]; exist {
delete(m.Clients, conn)
delete(m.ClientsBySession, sid)
delete(m.ClientsBySocketId, sid)
}
m.wsMutex.Unlock()
m.debugNumOfClients()
}

func (m *wsQ) delBySessionId(sid string) {
m.wsMutex.Lock()
if conn, exist := m.ClientsBySession[sid]; exist {
delete(m.ClientsBySession, sid)
if conn, exist := m.ClientsBySocketId[sid]; exist {
delete(m.ClientsBySocketId, sid)
delete(m.Clients, conn)
}
m.wsMutex.Unlock()
m.debugNumOfClients()
}

func (m *wsQ) wsDoBroadcast(msg string, except string) {
toBeRemoved := make([]*websocket.Conn, 0, 20)
m.wsMutex.Lock()
for k, _ := range m.Clients {
if m.ClientsBySession[except] == k {
if m.ClientsBySocketId[except] == k {
continue
}
err := k.WriteMessage(websocket.TextMessage, []byte(msg))

Notiek ielāde…
Atcelt
Saglabāt