Explorar el Código

broadcast websocket message with protocol defined as string to string mapping.

master
sp hace 4 años
padre
commit
c80d3f101e
Se han modificado 6 ficheros con 175 adiciones y 6 borrados
  1. +1
    -1
      apiV1login.go
  2. +1
    -1
      apiV1logout.go
  3. +3
    -0
      apiv1.go
  4. +52
    -4
      websocket.go
  5. +25
    -0
      websocket_message.go
  6. +93
    -0
      websocket_queue.go

+ 1
- 1
apiV1login.go Ver fichero

@@ -68,7 +68,7 @@ func apiV1Login(w http.ResponseWriter, r *http.Request, ss *loan.Session) {
res.Env.Body["debug_session_user_error"] = e.Error()
}
}
WsNotifyNewLogin(ss)
//send out
apiV1AddTrackingCookie(w, r, ss) //always the last one to set cookies
res.sendJson(w)

+ 1
- 1
apiV1logout.go Ver fichero

@@ -9,7 +9,7 @@ import (

func apiV1Logout(w http.ResponseWriter, r *http.Request, ss *loan.Session) {
res := apiV1ResponseBlank()
WsNotifyLogout(ss)
ss.Expire = time.Now().Add(-10000) //make sure it expired

ssEmpty := loan.Session{}

+ 3
- 0
apiv1.go Ver fichero

@@ -271,6 +271,9 @@ func apiV1GetMachineId(r *http.Request) string {
return mid
}

func apiV1GetMachineIdFromSession(ss *loan.Session) string {
return ss.GetStr("Biukop-Mid")
}
func apiV1InitSessionByBrowserId(r *http.Request) (session loan.Session, e error) {
var sid string
mid := apiV1GetMachineId(r)

+ 52
- 4
websocket.go Ver fichero

@@ -1,7 +1,9 @@
package main

import (
"encoding/json"
"fmt"
"github.com/brianvoe/gofakeit/v6"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
"net/http"
@@ -19,26 +21,40 @@ func apiV1WebSocketHandler(w http.ResponseWriter, r *http.Request) {
upgradeToWs.CheckOrigin = func(r *http.Request) bool { return true }
ws, err := upgradeToWs.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
log.Println("cannot upgrade websocket", err)
return
}

mid := apiV1GetMachineId(r)
wsq.MapMidToConnection(mid, ws)

// helpful log statement to show connections
log.Println("Websocket Api/V1: Client Connected")
log.Println("Websocket Api/V1: Client Connected", mid)

wsReader(ws)
}

func wsReader(conn *websocket.Conn) {
wsq.addConnection(conn)
for {

stillExist := wsq.has(conn)
if !stillExist {
return // the connection has been removed.
}

// read in a message
messageType, p, err := conn.ReadMessage()
if err != nil {
wsq.del(conn)
log.Println(err)
return
}
// print out that message for clarity
fmt.Println(string(p))
// fmt.Println(string(p))

if err := conn.WriteMessage(messageType, p); err != nil {
wsq.del(conn)
log.Println(err)
return
}
@@ -49,13 +65,40 @@ func wsReader(conn *websocket.Conn) {
}
}

func WsBroadCast(msg string) {
wsq.wsDoBroadcast(msg, "")
}

func WsSend(To string, msg map[string]string) {
b, e := json.Marshal(msg)
if e != nil {
log.Error("cannot broadcast websocket message, cannot convert to json", msg)
}

conn := wsq.getConnBySessionId(To)
if conn != nil {
err := conn.WriteMessage(websocket.TextMessage, b)
if err != nil {
wsq.del(conn)
}
}
}

func WsBroadCastExceptMe(me string, msg map[string]string) {
b, e := json.Marshal(msg)
if e == nil {
wsq.wsDoBroadcast(string(b), me)
}
}

func wsDummySender(conn *websocket.Conn) {
//write subsequent 5 copies, each after 1 second
log.Info("start sending server data to client ..")
p := "dummy string from server "
for i := 1; i < 500; i++ {
time.Sleep(1 * time.Second)
msg := fmt.Sprintf("copy %d, %s", i, p)
//msg := fmt.Sprintf("copy %d, %s, %s", i, p, wsDummyString()) // 4M long string no issue
msg := fmt.Sprintf("copy %d, %s ", i, p)
log.Info("dummy sender is working, ", msg)
if err := conn.WriteMessage(websocket.TextMessage, []byte(msg)); err != nil {
log.Println("wsDummySender stopped on error: ", err)
@@ -63,3 +106,8 @@ func wsDummySender(conn *websocket.Conn) {
}
}
}

func wsDummyString() string {
s := gofakeit.LetterN(4096000) // 4M
return s
}

+ 25
- 0
websocket_message.go Ver fichero

@@ -0,0 +1,25 @@
package main

import "biukop.com/sfm/loan"

func WsNotifyNewLogin(ss *loan.Session) {

msg := make(map[string]string)
msg["T"] = "login"
msg["Mid"] = apiV1GetMachineIdFromSession(ss)
msg["Sid"] = ss.Id
msg["Uid"] = ss.User
msg["Role"] = ss.GetRole()
WsBroadCastExceptMe(ss.Id, msg)
}

func WsNotifyLogout(ss *loan.Session) {

msg := make(map[string]string)
msg["T"] = "logout"
msg["Mid"] = apiV1GetMachineIdFromSession(ss)
msg["Uid"] = ss.User
msg["Sid"] = ss.Id
msg["Role"] = ss.GetRole()
WsBroadCastExceptMe(ss.Id, msg)
}

+ 93
- 0
websocket_queue.go Ver fichero

@@ -0,0 +1,93 @@
package main

import (
"github.com/gorilla/websocket"
"sync"
)

type wsQ struct {
wsMutex sync.Mutex
Clients map[*websocket.Conn]struct{}
ClientsBySession map[string]*websocket.Conn
}

var wsq = wsQ{
Clients: make(map[*websocket.Conn]struct{}),
ClientsBySession: make(map[string]*websocket.Conn),
}

func (m *wsQ) addConnection(c *websocket.Conn) {
m.wsMutex.Lock()
m.Clients[c] = struct{}{}
m.wsMutex.Unlock()
}

func (m *wsQ) MapMidToConnection(mid string, c *websocket.Conn) {
m.wsMutex.Lock()
m.ClientsBySession[mid] = c
m.wsMutex.Unlock()
}

func (m *wsQ) has(conn *websocket.Conn) (yes bool) {
m.wsMutex.Lock()
_, ok := m.Clients[conn]
m.wsMutex.Unlock()
return ok
}

func (m *wsQ) getConnBySessionId(sid string) (conn *websocket.Conn) {
conn = nil
m.wsMutex.Lock()
for k, v := range m.ClientsBySession {
if k == sid {
conn = v
break
}
}
m.wsMutex.Unlock()
return
}

func (m *wsQ) del(conn *websocket.Conn) {
m.wsMutex.Lock()
delete(m.Clients, conn)
for k, v := range m.ClientsBySession {
if v == conn {
delete(m.ClientsBySession, k)
break
}
}
m.wsMutex.Unlock()
}

func (m *wsQ) delBySessionId(sid string) {
m.wsMutex.Lock()
for k, v := range m.ClientsBySession {
if k == sid {
delete(m.ClientsBySession, k)
break
}
delete(m.Clients, v)
}
m.wsMutex.Unlock()
}

func (m *wsQ) wsDoBroadcast(msg string, except string) {
toBeRemoved := make([]*websocket.Conn, 0, 20)
m.wsMutex.Lock()
for k, _ := range m.Clients {
if m.ClientsBySession[except] == k {
continue
}
err := k.WriteMessage(websocket.TextMessage, []byte(msg))
if err != nil {
toBeRemoved = append(toBeRemoved, k)
}
}

// remove all those have errors for communication
for _, v := range toBeRemoved {
delete(m.Clients, v)
}
m.wsMutex.Unlock()
}

Cargando…
Cancelar
Guardar