From 2c32bdadb7093471208b6cd9372718896d6fcb40 Mon Sep 17 00:00:00 2001 From: Patrick Peng Sun Date: Mon, 29 May 2017 16:51:18 +1000 Subject: [PATCH] =?UTF-8?q?session=20manager=20working=20now.=20=20?= =?UTF-8?q?=E5=81=B6=E4=B9=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.go | 4 ++-- server.go | 12 ++++++++++-- sessionManager.go | 46 +++++++++++++++++++++++++++++++++------------- 3 files changed, 45 insertions(+), 17 deletions(-) diff --git a/main.go b/main.go index bb9ed85..bbea87c 100644 --- a/main.go +++ b/main.go @@ -54,8 +54,8 @@ func setupHTTPHandler() { func startSessionManager(concurrent int) { m := SessionManager{} - all := make(chan InWechatMsg, concurrent) - go m.startSessionManager(all) + AllInMessage = make(chan InWechatMsg, concurrent) + go m.startSessionManager(AllInMessage) } func setupRootFileServer() { diff --git a/server.go b/server.go index 32c1d84..4c39541 100644 --- a/server.go +++ b/server.go @@ -13,7 +13,6 @@ import ( "reflect" "sort" "strings" - "time" ) //apiV1Main version 1 main entry for all wechat callbacks @@ -71,11 +70,20 @@ func answerWechatPost(w http.ResponseWriter, r *http.Request) { if !valid { log.Println("Error: Invalid Input ") } + + AllInMessage <- in + openID := in.header.FromUserName if openID == "" { log.Println("nothing") } - time.Sleep(5 * time.Second) + + //dummy reply + reply, _ := BuildTextMsg(openID, "澳") + w.Header().Set("Content-Type", "text/xml; charset=utf-8") + fmt.Fprint(w, reply) + + //time.Sleep(5 * time.Second) v := reflect.ValueOf(answerWechatPost) log.Printf("Current Pointer: %d", v.Pointer()) //debug.PrintStack() diff --git a/sessionManager.go b/sessionManager.go index 4dcf8c6..efd5bfe 100644 --- a/sessionManager.go +++ b/sessionManager.go @@ -29,7 +29,7 @@ type SessionManager struct { // we collect them and redistribute it according to // openID, all messages for a single openID will be // sequentially processed -var AllInMessage <-chan InWechatMsg +var AllInMessage chan InWechatMsg //a stand alone routine that manage all // live sessions @@ -37,6 +37,8 @@ var AllInMessage <-chan InWechatMsg // for each openid, its message are manipulated in serial manner. func (m *SessionManager) startSessionManager(in <-chan InWechatMsg) { log.Println("session manager start..") + m.sessions = map[string]openIDSession{} + m.done = make(chan workDone, 2000) for { //forever looping select { case msg := <-in: @@ -50,6 +52,13 @@ func (m *SessionManager) startSessionManager(in <-chan InWechatMsg) { } func (m *SessionManager) processIncomingMsg(v InWechatMsg) { + cnt := m.addJob(v) + if cnt == 1 { //there is no worker thread working + go m.startJob(v.header.FromUserName) + } +} + +func (m *SessionManager) addJob(v InWechatMsg) (jobCount int) { openID := v.header.FromUserName s, found := m.sessions[openID] if !found { //create one @@ -57,14 +66,14 @@ func (m *SessionManager) processIncomingMsg(v InWechatMsg) { } s.jobs <- v //add job to channel s.count++ + m.sessions[openID] = s + log.Printf("Incoming message in queue %d", s.count) - if s.count == 1 { //there is no worker thread working - m.startJob(openID) - } else if s.count <= 0 { - log.Println(s) - log.Fatal("new job added, but count = 0") + log.Printf("Total Session count = %d", len(m.sessions)) + if s.count <= 0 { + log.Fatal("new job added, but count <= 0") } - + return s.count } func (m *SessionManager) createSession(openID string) openIDSession { @@ -79,6 +88,7 @@ func (m *SessionManager) createSession(openID string) openIDSession { func (m *SessionManager) clearJobDone(d workDone) { s, found := m.sessions[d.openID] + log.Println(s) if found { s.count -= d.consumed if s.count == 0 { //no job to do @@ -87,8 +97,9 @@ func (m *SessionManager) clearJobDone(d workDone) { data.Save() //remove from memory m.destroySession(d.openID) + log.Printf("destroy session %s", d.openID) } else if s.count > 0 { - m.startJob(d.openID) //processing any newly coming jobs + go m.startJob(d.openID) //processing any newly coming jobs } else { log.Println(s) log.Fatal("session job count cannot be negative, problem session") @@ -104,18 +115,27 @@ func (m *SessionManager) destroySession(openID string) { close(s.jobs) //delete it from memory delete(m.sessions, openID) + log.Printf("total session count=%d", len(m.sessions)) } //worker thread func (m *SessionManager) startJob(openID string) { + log.Println("start job worker...") s := m.sessions[openID] jobFinished := workDone{openID, 0} //process all jobs in the channel - for v := range s.jobs { - log.Println(" Processing job..") - log.Println(v) - time.Sleep(5 * time.Second) - jobFinished.consumed++ + hasJob := true + for hasJob { + select { + case v := <-s.jobs: + log.Println(" Processing job..") + log.Println(v) + time.Sleep(5 * time.Second) + kfSendTxt(openID, "Job Processed "+v.body.(TextMsg).Content) + jobFinished.consumed++ + default: + hasJob = false + } } m.done <- jobFinished //notify parent that we have done return