package main import ( "log" "time" ) //openIDSession for a given openID // regardless howmany type openIDSession struct { openID string //who is this? count int // number of message in the Queue, channel jobs chan InWechatMsg data openIDSessionData //session data, that needs to be saved to disk } type workDone struct { openID string //which user consumed int //job done } //SessionManager manage all sessions type SessionManager struct { sessions map[string]openIDSession done chan workDone } //AllInMessage every API post that we get // we collect them and redistribute it according to // openID, all messages for a single openID will be // sequentially processed var AllInMessage <-chan InWechatMsg //a stand alone routine that manage all // live sessions // take all message and fanout to different openid handlers // for each openid, its message are manipulated in serial manner. func (m *SessionManager) startSessionManager(in <-chan InWechatMsg) { log.Println("session manager start..") for { //forever looping select { case msg := <-in: log.Printf("SessionMgr : incoming msg %s (%s)", msg.header.FromUserName, msg.header.MsgType) m.processIncomingMsg(msg) case d := <-m.done: log.Printf("SessionMgr : worker done openid=%s (done)=%d", d.openID, d.consumed) m.clearJobDone(d) } } } func (m *SessionManager) processIncomingMsg(v InWechatMsg) { openID := v.header.FromUserName s, found := m.sessions[openID] if !found { //create one s = m.createSession(openID) } s.jobs <- v //add job to channel s.count++ log.Printf("Incoming message in queue %d", s.count) if s.count == 1 { //there is no worker thread working m.startJob(openID) } else if s.count <= 0 { log.Println(s) log.Fatal("new job added, but count = 0") } } func (m *SessionManager) createSession(openID string) openIDSession { s := openIDSession{} s.openID = openID s.count = 0 s.jobs = make(chan InWechatMsg, 200) s.data, _ = getCurrentSesssion(openID) //either load or create new m.sessions[openID] = s //register it to memory return s } func (m *SessionManager) clearJobDone(d workDone) { s, found := m.sessions[d.openID] if found { s.count -= d.consumed if s.count == 0 { //no job to do //save session data to disk data := m.sessions[d.openID].data data.Save() //remove from memory m.destroySession(d.openID) } else if s.count > 0 { m.startJob(d.openID) //processing any newly coming jobs } else { log.Println(s) log.Fatal("session job count cannot be negative, problem session") } } else { log.Fatal("When job done, we canot find proper session") } } func (m *SessionManager) destroySession(openID string) { //close job channels s := m.sessions[openID] close(s.jobs) //delete it from memory delete(m.sessions, openID) } //worker thread func (m *SessionManager) startJob(openID string) { s := m.sessions[openID] jobFinished := workDone{openID, 0} //process all jobs in the channel for v := range s.jobs { log.Println(" Processing job..") log.Println(v) time.Sleep(5 * time.Second) jobFinished.consumed++ } m.done <- jobFinished //notify parent that we have done return }