package main import ( "log" ) //openIDSession for a given openID // regardless howmany type openIDSession struct { openID string //who is this? count int // number of message in the Queue, channel jobs chan InWechatMsg data openIDSessionData //session data, that needs to be saved to disk states map[string]chatState //map from procedure to its current states } type workDone struct { openID string //which user consumed int //job done } //SessionManager manage all sessions type SessionManager struct { sessions map[string]openIDSession done chan workDone } //AllInMessage every API post that we get // we collect them and redistribute it according to // openID, all messages for a single openID will be // sequentially processed var AllInMessage chan InWechatMsg //a stand alone routine that manage all // live sessions // take all message and fanout to different openid handlers // for each openid, its message are manipulated in serial manner. func (m *SessionManager) startSessionManager(in <-chan InWechatMsg) { log.Println("session manager start..") m.sessions = map[string]openIDSession{} m.done = make(chan workDone, 2000) for { //forever looping select { case msg := <-in: log.Printf("SessionMgr : incoming msg %s (%s)", msg.header.FromUserName, msg.header.MsgType) m.processIncomingMsg(msg) case d := <-m.done: log.Printf("SessionMgr : worker done openid=%s (done)=%d", d.openID, d.consumed) m.clearJobDone(d) } } } func (m *SessionManager) processIncomingMsg(v InWechatMsg) { cnt := m.addJob(v) if cnt == 1 { //there is no worker thread working go m.startJob(v.header.FromUserName) } } func (m *SessionManager) addJob(v InWechatMsg) (jobCount int) { openID := v.header.FromUserName s, found := m.sessions[openID] if !found { //create one s = m.createSession(openID) } s.jobs <- v //add job to channel s.count++ m.sessions[openID] = s log.Printf("Incoming message in queue %d", s.count) log.Printf("Total Session count = %d", len(m.sessions)) if s.count <= 0 { log.Fatal("new job added, but count <= 0") } return s.count } func (m *SessionManager) createSession(openID string) openIDSession { s := openIDSession{} s.openID = openID s.count = 0 s.jobs = make(chan InWechatMsg, 50) s.data, _ = getCurrentSesssion(openID) //either load or create new m.sessions[openID] = s //register it to memory return s } func (m *SessionManager) clearJobDone(d workDone) { s, found := m.sessions[d.openID] log.Println(s) if found { s.count -= d.consumed if s.count == 0 { //no job to do //save session data to disk data := m.sessions[d.openID].data data.Save() //remove from memory m.destroySession(d.openID) log.Printf("destroy session %s", d.openID) } else if s.count > 0 { go m.startJob(d.openID) //processing any newly coming jobs } else { log.Println(s) log.Fatal("session job count cannot be negative, problem session") } } else { log.Println(d) log.Fatal("When job done, we canot find proper session") } } func (m *SessionManager) destroySession(openID string) { //close job channels s := m.sessions[openID] close(s.jobs) //delete it from memory delete(m.sessions, openID) log.Printf("total session count=%d", len(m.sessions)) } //worker thread func (m *SessionManager) startJob(openID string) { log.Println("start job worker...") jobFinished := workDone{openID, 0} //process all jobs in the channel hasJob := true for hasJob { select { case v := <-m.sessions[openID].jobs: m.checkOpenID(openID, v) s := m.sessions[openID] s.data.incomingMsg(v) //<=== main logic for processing each incoming message jobFinished.consumed++ default: hasJob = false } } m.done <- jobFinished //notify parent that we have done return } func (m *SessionManager) checkOpenID(openID string, v InWechatMsg) { if v.header.FromUserName != openID { log.Println("Error: Weird Message below ...") log.Println(v) log.Fatalf("Error: worker thread for %s, see different id=%s \n", openID, v.header.FromUserName) } }