Procházet zdrojové kódy

session manager working now. 偶也

master
Patrick Peng Sun před 8 roky
rodič
revize
2c32bdadb7
3 změnil soubory, kde provedl 45 přidání a 17 odebrání
  1. +2
    -2
      main.go
  2. +10
    -2
      server.go
  3. +33
    -13
      sessionManager.go

+ 2
- 2
main.go Zobrazit soubor

@@ -54,8 +54,8 @@ func setupHTTPHandler() {

func startSessionManager(concurrent int) {
m := SessionManager{}
all := make(chan InWechatMsg, concurrent)
go m.startSessionManager(all)
AllInMessage = make(chan InWechatMsg, concurrent)
go m.startSessionManager(AllInMessage)
}

func setupRootFileServer() {

+ 10
- 2
server.go Zobrazit soubor

@@ -13,7 +13,6 @@ import (
"reflect"
"sort"
"strings"
"time"
)

//apiV1Main version 1 main entry for all wechat callbacks
@@ -71,11 +70,20 @@ func answerWechatPost(w http.ResponseWriter, r *http.Request) {
if !valid {
log.Println("Error: Invalid Input ")
}

AllInMessage <- in

openID := in.header.FromUserName
if openID == "" {
log.Println("nothing")
}
time.Sleep(5 * time.Second)

//dummy reply
reply, _ := BuildTextMsg(openID, "澳")
w.Header().Set("Content-Type", "text/xml; charset=utf-8")
fmt.Fprint(w, reply)

//time.Sleep(5 * time.Second)
v := reflect.ValueOf(answerWechatPost)
log.Printf("Current Pointer: %d", v.Pointer())
//debug.PrintStack()

+ 33
- 13
sessionManager.go Zobrazit soubor

@@ -29,7 +29,7 @@ type SessionManager struct {
// we collect them and redistribute it according to
// openID, all messages for a single openID will be
// sequentially processed
var AllInMessage <-chan InWechatMsg
var AllInMessage chan InWechatMsg

//a stand alone routine that manage all
// live sessions
@@ -37,6 +37,8 @@ var AllInMessage <-chan InWechatMsg
// for each openid, its message are manipulated in serial manner.
func (m *SessionManager) startSessionManager(in <-chan InWechatMsg) {
log.Println("session manager start..")
m.sessions = map[string]openIDSession{}
m.done = make(chan workDone, 2000)
for { //forever looping
select {
case msg := <-in:
@@ -50,6 +52,13 @@ func (m *SessionManager) startSessionManager(in <-chan InWechatMsg) {
}

func (m *SessionManager) processIncomingMsg(v InWechatMsg) {
cnt := m.addJob(v)
if cnt == 1 { //there is no worker thread working
go m.startJob(v.header.FromUserName)
}
}

func (m *SessionManager) addJob(v InWechatMsg) (jobCount int) {
openID := v.header.FromUserName
s, found := m.sessions[openID]
if !found { //create one
@@ -57,14 +66,14 @@ func (m *SessionManager) processIncomingMsg(v InWechatMsg) {
}
s.jobs <- v //add job to channel
s.count++
m.sessions[openID] = s

log.Printf("Incoming message in queue %d", s.count)
if s.count == 1 { //there is no worker thread working
m.startJob(openID)
} else if s.count <= 0 {
log.Println(s)
log.Fatal("new job added, but count = 0")
log.Printf("Total Session count = %d", len(m.sessions))
if s.count <= 0 {
log.Fatal("new job added, but count <= 0")
}
return s.count
}

func (m *SessionManager) createSession(openID string) openIDSession {
@@ -79,6 +88,7 @@ func (m *SessionManager) createSession(openID string) openIDSession {

func (m *SessionManager) clearJobDone(d workDone) {
s, found := m.sessions[d.openID]
log.Println(s)
if found {
s.count -= d.consumed
if s.count == 0 { //no job to do
@@ -87,8 +97,9 @@ func (m *SessionManager) clearJobDone(d workDone) {
data.Save()
//remove from memory
m.destroySession(d.openID)
log.Printf("destroy session %s", d.openID)
} else if s.count > 0 {
m.startJob(d.openID) //processing any newly coming jobs
go m.startJob(d.openID) //processing any newly coming jobs
} else {
log.Println(s)
log.Fatal("session job count cannot be negative, problem session")
@@ -104,18 +115,27 @@ func (m *SessionManager) destroySession(openID string) {
close(s.jobs)
//delete it from memory
delete(m.sessions, openID)
log.Printf("total session count=%d", len(m.sessions))
}

//worker thread
func (m *SessionManager) startJob(openID string) {
log.Println("start job worker...")
s := m.sessions[openID]
jobFinished := workDone{openID, 0}
//process all jobs in the channel
for v := range s.jobs {
log.Println(" Processing job..")
log.Println(v)
time.Sleep(5 * time.Second)
jobFinished.consumed++
hasJob := true
for hasJob {
select {
case v := <-s.jobs:
log.Println(" Processing job..")
log.Println(v)
time.Sleep(5 * time.Second)
kfSendTxt(openID, "Job Processed "+v.body.(TextMsg).Content)
jobFinished.consumed++
default:
hasJob = false
}
}
m.done <- jobFinished //notify parent that we have done
return

Načítá se…
Zrušit
Uložit