|
- package main
-
- import (
- "log"
- )
-
- //openIDSession for a given openID
- // regardless howmany
- type openIDSession struct {
- openID string //who is this?
- count int // number of message in the Queue, channel
- jobs chan InWechatMsg
- data openIDSessionData //session data, that needs to be saved to disk
- }
-
- type workDone struct {
- openID string //which user
- consumed int //job done
- }
-
- //SessionManager manage all sessions
- type SessionManager struct {
- sessions map[string]openIDSession
- done chan workDone
- }
-
- //AllInMessage every API post that we get
- // we collect them and redistribute it according to
- // openID, all messages for a single openID will be
- // sequentially processed
- var AllInMessage chan InWechatMsg
-
- //a stand alone routine that manage all
- // live sessions
- // take all message and fanout to different openid handlers
- // for each openid, its message are manipulated in serial manner.
- func (m *SessionManager) startSessionManager(in <-chan InWechatMsg) {
- log.Println("session manager start..")
- m.sessions = map[string]openIDSession{}
- m.done = make(chan workDone, 2000) //2000 different openID simultaneously.
- for { //forever looping
- select {
- case msg := <-in:
- log.Printf("SessionMgr : incoming msg %s (%s)", msg.header.FromUserName, msg.header.MsgType)
- m.processIncomingMsg(msg)
- case d := <-m.done:
- log.Printf("SessionMgr : worker done openid=%s (done)=%d", d.openID, d.consumed)
- m.clearJobDone(d)
- }
- }
- }
-
- func (m *SessionManager) processIncomingMsg(v InWechatMsg) {
- cnt := m.addJob(v)
- if cnt == 1 { //there is no worker thread working
- go m.startJob(v.header.FromUserName)
- }
- }
-
- func (m *SessionManager) addJob(v InWechatMsg) (jobCount int) {
- openID := v.header.FromUserName
- s, found := m.sessions[openID]
- if !found { //create one
- s = m.createSession(openID)
- }
- s.jobs <- v //add job to channel
- s.count++
- m.sessions[openID] = s
-
- log.Printf("SessionMgr/addJob: Incoming message in queue %d", s.count)
- log.Printf("SessionMgr/addJob: Total Session count = %d", len(m.sessions))
- if s.count <= 0 {
- log.Fatal("SessionMgr/Addjob: new job added, but count <= 0")
- }
- return s.count
- }
-
- func (m *SessionManager) createSession(openID string) openIDSession {
- s := openIDSession{}
- s.openID = openID
- s.count = 0
- s.jobs = make(chan InWechatMsg, 50)
- s.data = openIDSessionData{}
- s.data.Load(openID) //either load or create new
- m.sessions[openID] = s //register it to memory
- return s
- }
-
- func (m *SessionManager) clearJobDone(d workDone) {
- s, found := m.sessions[d.openID]
- log.Println("SessionMgr/clearJobDone: start clearning jobs ... ")
- log.Println(s)
- log.Println(d)
- if found {
- s.count -= d.consumed
- m.sessions[d.openID] = s //update
- if s.count == 0 { //no job to do
- //remove from memory
- m.destroySession(d.openID)
- log.Printf("SessionMgr/clearJobDone: destroy session %s", d.openID)
- } else if s.count > 0 {
- go m.startJob(d.openID) //processing any newly coming jobs
- } else {
- log.Println(s)
- log.Fatal("SessionMgr/clearJobDone: session job count cannot be negative, problem session")
- }
- } else {
- log.Println(d)
- log.Fatal("SessionMgr/clearJobDone: When job done, we canot find proper session")
- }
- }
-
- func (m *SessionManager) destroySession(openID string) {
- //save session data to disk
- data := m.sessions[openID].data
- data.Save()
-
- //close job channels
- s := m.sessions[openID]
- close(s.jobs)
- //delete it from memory
- delete(m.sessions, openID)
- log.Printf("SessionMgr/destroySession: total session count=%d", len(m.sessions))
- }
-
- //worker thread, cannot change session info
- func (m *SessionManager) startJob(openID string) {
- log.Println("SessionMgr/startJob: enter...")
- jobFinished := workDone{openID, 0}
- //process all jobs in the channel
- hasJob := true
- for hasJob {
- select {
- case v := <-m.sessions[openID].jobs:
- m.checkOpenID(openID, v)
- s := m.sessions[openID]
- s.data.incomingMsg(v) //<=== main logic for processing each incoming message
- jobFinished.consumed++
- default:
- hasJob = false
- }
- }
- m.done <- jobFinished //notify parent that we have done
- return
- }
-
- func (m *SessionManager) checkOpenID(openID string, v InWechatMsg) {
- if v.header.FromUserName != openID {
- log.Println("Error:[SessionMgr/checkOpenID] Weird Message below ...")
- log.Println(v)
- log.Fatalf("Error:[[SessionMgr/checkOpenID]] worker thread for %s, see different id=%s \n", openID, v.header.FromUserName)
- }
- }
|