diff --git a/chatSession.go b/chatSession.go index 07f3784..f0a6bcc 100644 --- a/chatSession.go +++ b/chatSession.go @@ -28,35 +28,17 @@ var sessionDir = "sessions" //where to find the procedure state information var ProcedureDir = "procedure" -//openIDSession openID user's session -type openIDSession struct { - Procedure string `json:"Procedure"` //name of the current current process, alphanumerical - CreateAt int32 `json:"CreateAt"` //when is this session created - UpdateAt int32 `json:"UpdateAt"` //when is this session updated - Expire int32 `json:"Expire"` //unix timestamp of when this Procedure expires +//openIDSessionData openID user's session +type openIDSessionData struct { + OpenID string `json:"OpenID"` //who's session this is belongs to + Procedure string `json:"Procedure"` //name of the current current process, alphanumerical + CreateAt int32 `json:"CreateAt"` //when is this session created + UpdateAt int32 `json:"UpdateAt"` //when is this session updated + Expire int32 `json:"Expire"` //unix timestamp of when this Procedure expires + States map[string]chatState `json:"States"` //each procedure will have a state } -//read sessions/openID.json to check whether its a Procedure for ongoing dialog -func getCurrentSesssion(openID string) (result openIDSession, err error) { - path := getSessionPath(openID) - log.Printf("read session from %s\r\n", path) - body, err := ioutil.ReadFile(path) - if err != nil { //read file error - if isFileExist(path) { - log.Println("Error session reading " + path) - } - return //empty and expired session - } - err = json.Unmarshal(body, &result) - if err != nil { - log.Printf("Session Content [path=%s] not correct: ", path) - } - //we don't check Expire, we give the caller full control on - //how to deal wiht expired session - return -} - -func writeSession(openid string, ss openIDSession) (err error) { +func writeSession(openid string, ss openIDSessionData) (err error) { path := getSessionPath(openid) r, err := json.Marshal(ss) if err == nil { @@ -75,7 +57,7 @@ func writeSession(openid string, ss openIDSession) (err error) { } //create if not available -func setSessionProcedure(openID, procedure string, expireAfter int32) (updatedSession openIDSession, err error) { +func setSessionProcedure(openID, procedure string, expireAfter int32) (updatedSession openIDSessionData, err error) { s, err := getCurrentSesssion(openID) now := int32(time.Now().Unix()) if s.CreateAt == 0 { @@ -147,3 +129,38 @@ type Procedure struct { recv recvProcMsgFunc //function for receiving message, possibly transfer to new state clean cleanProcFunc //function for cleanning up } + +func (c *openIDSessionData) Save() { + //TODO: save real session date to disk +} + +//read sessions/openID.json to check whether its a Procedure for ongoing dialog +func getCurrentSesssion(openID string) (result openIDSessionData, err error) { + result = createEmptySession(openID, 3600) + path := getSessionPath(openID) + if isFileExist(path) { + log.Printf("read session from %s\r\n", path) + body, err := ioutil.ReadFile(path) + if err != nil { //read file error + log.Println("Error session reading " + path) + } else { + err = json.Unmarshal(body, &result) + if err != nil { + log.Printf("Session Content [path=%s] not correct: ", path) + result = createEmptySession(openID, 3600) + } + } + } + return +} + +func createEmptySession(openID string, expire int32) (result openIDSessionData) { + result.OpenID = openID + result.Procedure = "" + now := int32(time.Now().Unix()) + result.CreateAt = now + result.UpdateAt = now + result.Expire = now + expire + result.States = map[string]chatState{} + return +} diff --git a/main.go b/main.go index 2094980..bb9ed85 100644 --- a/main.go +++ b/main.go @@ -35,6 +35,12 @@ func main() { setupRootFileServer() + startSessionManager(2048) + + setupHTTPHandler() +} + +func setupHTTPHandler() { //setup handler //http.HandleFunc("/", webrootHandler) http.HandleFunc("/api", apiV1Main) @@ -46,6 +52,12 @@ func main() { http.ListenAndServe(":65500", nil) } +func startSessionManager(concurrent int) { + m := SessionManager{} + all := make(chan InWechatMsg, concurrent) + go m.startSessionManager(all) +} + func setupRootFileServer() { http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { http.ServeFile(w, r, GlobalPath.Angular2App+r.URL.Path[1:]) diff --git a/server.go b/server.go index 480f31c..32c1d84 100644 --- a/server.go +++ b/server.go @@ -10,8 +10,10 @@ import ( "net/http/httputil" "net/url" "os" + "reflect" "sort" "strings" + "time" ) //apiV1Main version 1 main entry for all wechat callbacks @@ -65,42 +67,58 @@ type InWechatMsg struct { //answerWechatPost distribute PostRequest according to xml body info func answerWechatPost(w http.ResponseWriter, r *http.Request) { in, valid := readWechatInput(r) - reply := "" //nothing + //reply := "" //nothing if !valid { log.Println("Error: Invalid Input ") } openID := in.header.FromUserName - //are we in an existing procedure - inProc, state := isInProc(openID) //if inside a procedure, resume last saved state - if inProc { - state = serveProc(state, in) //transit to new state - reply = state.response //xml response - } else { - state, processed := serveCommand(openID, in) //menu or txt command e.g. search - if !processed { // transfer to Customer Service (kf) - reply = buildKfForwardMsg(openID, "") - kfSendTxt(openID, "未识别的命令,已转接校友会理事会,稍后答复您") - } else { - reply = state.response - } + if openID == "" { + log.Println("nothing") } - log.Println(reply) //instant reply, answering user's request - w.Header().Set("Content-Type", "text/xml; charset=utf-8") - fmt.Fprint(w, reply) + time.Sleep(5 * time.Second) + v := reflect.ValueOf(answerWechatPost) + log.Printf("Current Pointer: %d", v.Pointer()) + //debug.PrintStack() - if !isEndingState(state) { - err := saveChatState(openID, state.Procedure, state) - if err != nil { - log.Println("Error Cannot Save chat sate") - log.Println(err) - log.Println(state) - } - } else { //state ending - cleanProcedure(openID, state.Procedure) - } - return + //load session into memory + // session := startSession(openID, in, w, r) //who , input what? + // session.ProcessInput() + // //put session back to disk + // session.Save() } +// func (session) ProcessInput(w, r) { +// //are we in an existing procedure +// inProc, state := isInProc(openID) //if inside a procedure, resume last saved state +// if inProc { +// state = serveProc(state, in) //transit to new state +// reply = state.response //xml response +// } else { +// state, processed := serveCommand(openID, in) //menu or txt command e.g. search +// if !processed { // transfer to Customer Service (kf) +// reply = buildKfForwardMsg(openID, "") +// kfSendTxt(openID, "未识别的命令,已转接校友会理事会,稍后答复您") +// } else { +// reply = state.response +// } +// } +// log.Println(reply) //instant reply, answering user's request +// w.Header().Set("Content-Type", "text/xml; charset=utf-8") +// fmt.Fprint(w, reply) + +// if !isEndingState(state) { +// err := saveChatState(openID, state.Procedure, state) +// if err != nil { +// log.Println("Error Cannot Save chat sate") +// log.Println(err) +// log.Println(state) +// } +// } else { //state ending +// cleanProcedure(openID, state.Procedure) +// } +// return +// } + func readWechatInput(r *http.Request) (result InWechatMsg, valid bool) { body, err := ioutil.ReadAll(r.Body) if err != nil { diff --git a/sessionManager.go b/sessionManager.go new file mode 100644 index 0000000..4dcf8c6 --- /dev/null +++ b/sessionManager.go @@ -0,0 +1,122 @@ +package main + +import ( + "log" + "time" +) + +//openIDSession for a given openID +// regardless howmany +type openIDSession struct { + openID string //who is this? + count int // number of message in the Queue, channel + jobs chan InWechatMsg + data openIDSessionData //session data, that needs to be saved to disk +} + +type workDone struct { + openID string //which user + consumed int //job done +} + +//SessionManager manage all sessions +type SessionManager struct { + sessions map[string]openIDSession + done chan workDone +} + +//AllInMessage every API post that we get +// we collect them and redistribute it according to +// openID, all messages for a single openID will be +// sequentially processed +var AllInMessage <-chan InWechatMsg + +//a stand alone routine that manage all +// live sessions +// take all message and fanout to different openid handlers +// for each openid, its message are manipulated in serial manner. +func (m *SessionManager) startSessionManager(in <-chan InWechatMsg) { + log.Println("session manager start..") + for { //forever looping + select { + case msg := <-in: + log.Printf("SessionMgr : incoming msg %s (%s)", msg.header.FromUserName, msg.header.MsgType) + m.processIncomingMsg(msg) + case d := <-m.done: + log.Printf("SessionMgr : worker done openid=%s (done)=%d", d.openID, d.consumed) + m.clearJobDone(d) + } + } +} + +func (m *SessionManager) processIncomingMsg(v InWechatMsg) { + openID := v.header.FromUserName + s, found := m.sessions[openID] + if !found { //create one + s = m.createSession(openID) + } + s.jobs <- v //add job to channel + s.count++ + log.Printf("Incoming message in queue %d", s.count) + if s.count == 1 { //there is no worker thread working + m.startJob(openID) + } else if s.count <= 0 { + log.Println(s) + log.Fatal("new job added, but count = 0") + } + +} + +func (m *SessionManager) createSession(openID string) openIDSession { + s := openIDSession{} + s.openID = openID + s.count = 0 + s.jobs = make(chan InWechatMsg, 200) + s.data, _ = getCurrentSesssion(openID) //either load or create new + m.sessions[openID] = s //register it to memory + return s +} + +func (m *SessionManager) clearJobDone(d workDone) { + s, found := m.sessions[d.openID] + if found { + s.count -= d.consumed + if s.count == 0 { //no job to do + //save session data to disk + data := m.sessions[d.openID].data + data.Save() + //remove from memory + m.destroySession(d.openID) + } else if s.count > 0 { + m.startJob(d.openID) //processing any newly coming jobs + } else { + log.Println(s) + log.Fatal("session job count cannot be negative, problem session") + } + } else { + log.Fatal("When job done, we canot find proper session") + } +} + +func (m *SessionManager) destroySession(openID string) { + //close job channels + s := m.sessions[openID] + close(s.jobs) + //delete it from memory + delete(m.sessions, openID) +} + +//worker thread +func (m *SessionManager) startJob(openID string) { + s := m.sessions[openID] + jobFinished := workDone{openID, 0} + //process all jobs in the channel + for v := range s.jobs { + log.Println(" Processing job..") + log.Println(v) + time.Sleep(5 * time.Second) + jobFinished.consumed++ + } + m.done <- jobFinished //notify parent that we have done + return +}