Explorar el Código

bug fix in startJob, we should not assign s back to m.sessions[openID] as it conflicts with sessionmanager with 'counter' attribute.

but for session data, it should be fine, as it is only loaded once, and sequentially handled by each openID.
master
Patrick Peng Sun hace 8 años
padre
commit
dc51f0d3f2
Se han modificado 3 ficheros con 36 adiciones y 22 borrados
  1. +9
    -7
      eventSubscribe.go
  2. +19
    -15
      sessionManager.go
  3. +8
    -0
      sync_wechat_token.sh

+ 9
- 7
eventSubscribe.go Ver fichero

func onSubscribe(in InWechatMsg) { func onSubscribe(in InWechatMsg) {
openID := in.header.FromUserName openID := in.header.FromUserName
//getUser fromCRM //getUser fromCRM
log.Println(openID)
info := WechatUserInfo{}
info.getUserInfo(in.header.FromUserName, "zh-CN")
log.Println(openID + " information get above onSubscribe")
} }


//WechatUserInfo response for wechat user info. //WechatUserInfo response for wechat user info.
Subscribe int32 `json:"subscribe"` Subscribe int32 `json:"subscribe"`
OpenID string `json:"openid"` OpenID string `json:"openid"`
NickName string `json:"nickname"` NickName string `json:"nickname"`
Sex int32 `json:"sex"`
Sex int32 `json:"sex"` //0 未知 1 男 2 女
Language string `json:"language"` Language string `json:"language"`
City string `json:"city"` City string `json:"city"`
Province string `json:"province"` Province string `json:"province"`
TagIDList []int32 `json:"tagid_list"` TagIDList []int32 `json:"tagid_list"`
} }


func getUserInfo(openID string, lang string) (result WechatUserInfo) {
url := result.getURL(openID, lang)
func (m *WechatUserInfo) getUserInfo(openID string, lang string) {
url := m.getURL(openID, lang)
resp, err := http.Get(url) resp, err := http.Get(url)
if err != nil { if err != nil {
} }
b, err := ioutil.ReadAll(resp.Body) b, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(b, &result)
return
err = json.Unmarshal(b, m)
log.Println(m)
} }
//lang 返回国家地区语言版本,zh_CN 简体,zh_TW 繁体,en 英语
func (m *WechatUserInfo) getURL(openID, lang string) string { func (m *WechatUserInfo) getURL(openID, lang string) string {
atk, _ := GetAccessToken() atk, _ := GetAccessToken()
u := fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/user/info?access_token=%s&openid=%s&lang=%s", atk, openID, lang) u := fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/user/info?access_token=%s&openid=%s&lang=%s", atk, openID, lang)

+ 19
- 15
sessionManager.go Ver fichero

func (m *SessionManager) startSessionManager(in <-chan InWechatMsg) { func (m *SessionManager) startSessionManager(in <-chan InWechatMsg) {
log.Println("session manager start..") log.Println("session manager start..")
m.sessions = map[string]openIDSession{} m.sessions = map[string]openIDSession{}
m.done = make(chan workDone, 2000)
for { //forever looping
m.done = make(chan workDone, 2000) //2000 different openID simultaneously.
for { //forever looping
select { select {
case msg := <-in: case msg := <-in:
log.Printf("SessionMgr : incoming msg %s (%s)", msg.header.FromUserName, msg.header.MsgType) log.Printf("SessionMgr : incoming msg %s (%s)", msg.header.FromUserName, msg.header.MsgType)
s.count++ s.count++
m.sessions[openID] = s m.sessions[openID] = s


log.Printf("Incoming message in queue %d", s.count)
log.Printf("Total Session count = %d", len(m.sessions))
log.Printf("SessionMgr/addJob: Incoming message in queue %d", s.count)
log.Printf("SessionMgr/addJob: Total Session count = %d", len(m.sessions))
if s.count <= 0 { if s.count <= 0 {
log.Fatal("new job added, but count <= 0")
log.Fatal("SessionMgr/Addjob: new job added, but count <= 0")
} }
return s.count return s.count
} }


func (m *SessionManager) clearJobDone(d workDone) { func (m *SessionManager) clearJobDone(d workDone) {
s, found := m.sessions[d.openID] s, found := m.sessions[d.openID]
log.Println("SessionMgr/clearJobDone: start clearning jobs ... ")
log.Println(s) log.Println(s)
log.Println(d)
if found { if found {
s.count -= d.consumed s.count -= d.consumed
if s.count == 0 { //no job to do
m.sessions[d.openID] = s //update
if s.count == 0 { //no job to do
//remove from memory //remove from memory
m.destroySession(d.openID) m.destroySession(d.openID)
log.Printf("destroy session %s", d.openID)
log.Printf("SessionMgr/clearJobDone: destroy session %s", d.openID)
} else if s.count > 0 { } else if s.count > 0 {
go m.startJob(d.openID) //processing any newly coming jobs go m.startJob(d.openID) //processing any newly coming jobs
} else { } else {
log.Println(s) log.Println(s)
log.Fatal("session job count cannot be negative, problem session")
log.Fatal("SessionMgr/clearJobDone: session job count cannot be negative, problem session")
} }
} else { } else {
log.Println(d) log.Println(d)
log.Fatal("When job done, we canot find proper session")
log.Fatal("SessionMgr/clearJobDone: When job done, we canot find proper session")
} }
} }


close(s.jobs) close(s.jobs)
//delete it from memory //delete it from memory
delete(m.sessions, openID) delete(m.sessions, openID)
log.Printf("total session count=%d", len(m.sessions))
log.Printf("SessionMgr/destroySession: total session count=%d", len(m.sessions))
} }


//worker thread
//worker thread, cannot change session info
func (m *SessionManager) startJob(openID string) { func (m *SessionManager) startJob(openID string) {
log.Println("start job worker...")
log.Println("SessionMgr/startJob: enter...")
jobFinished := workDone{openID, 0} jobFinished := workDone{openID, 0}
//process all jobs in the channel //process all jobs in the channel
hasJob := true hasJob := true
m.checkOpenID(openID, v) m.checkOpenID(openID, v)
s := m.sessions[openID] s := m.sessions[openID]
s.data.incomingMsg(v) //<=== main logic for processing each incoming message s.data.incomingMsg(v) //<=== main logic for processing each incoming message
m.sessions[openID] = s
log.Println("in worker thread, set s to session")
log.Println(s)
jobFinished.consumed++ jobFinished.consumed++
default: default:
hasJob = false hasJob = false


func (m *SessionManager) checkOpenID(openID string, v InWechatMsg) { func (m *SessionManager) checkOpenID(openID string, v InWechatMsg) {
if v.header.FromUserName != openID { if v.header.FromUserName != openID {
log.Println("Error: Weird Message below ...")
log.Println("Error:[SessionMgr/checkOpenID] Weird Message below ...")
log.Println(v) log.Println(v)
log.Fatalf("Error: worker thread for %s, see different id=%s \n", openID, v.header.FromUserName)
log.Fatalf("Error:[[SessionMgr/checkOpenID]] worker thread for %s, see different id=%s \n", openID, v.header.FromUserName)
} }
} }

+ 8
- 0
sync_wechat_token.sh Ver fichero

#!/bin/bash
FILE=`find /tmp -path "/tmp/ssh-*" -user sp -type s -iname "agent.*" 2>/dev/null`
export SSH_AUTH_SOCK="$FILE"

#download
SSH_AUTH_SOCK="$FILE" rsync --update root@biukop.com.au:/tmp/wechat_hitxy_token /tmp/wechat_hitxy_token
#upload
SSH_AUTH_SOCK="$FILE" rsync --update /tmp/wechat_hitxy_token root@biukop.com.au:/tmp/wechat_hitxy_token

Cargando…
Cancelar
Guardar