| //main entry point for processing each incoming message | //main entry point for processing each incoming message | ||||
| //this stage has session date available | //this stage has session date available | ||||
| func (ss *openIDSessionData) incomingMsg(v InWechatMsg) { | |||||
| func (ss *openIDSessionData) incomingMsg(v *InWechatMsg) { | |||||
| openID := v.header.FromUserName | openID := v.header.FromUserName | ||||
| //kfSendTxtAs(openID, "信息收到"+v.header.MsgType, "孙鹏") | //kfSendTxtAs(openID, "信息收到"+v.header.MsgType, "孙鹏") | ||||
| //are we in an existing procedure | //are we in an existing procedure |
| "time" | "time" | ||||
| ) | ) | ||||
| func onSubscribe(in InWechatMsg) { | |||||
| func onSubscribe(in* InWechatMsg) { | |||||
| openID := in.header.FromUserName | openID := in.header.FromUserName | ||||
| //check whether we have his own record in the CRM system | //check whether we have his own record in the CRM system | ||||
| info, found, err := crmFindLeadByOpenID(openID) | info, found, err := crmFindLeadByOpenID(openID) | ||||
| templateSendJoinCommunity(in.header.FromUserName, url, first, remark, communityName, joinDate) | templateSendJoinCommunity(in.header.FromUserName, url, first, remark, communityName, joinDate) | ||||
| } | } | ||||
| func sendGreeting4ExistingUser(in InWechatMsg, info crmdLead) { | |||||
| func sendGreeting4ExistingUser(in* InWechatMsg, info crmdLead) { | |||||
| first := "欢迎" + info.Name + "返回澳洲校友会" | first := "欢迎" + info.Name + "返回澳洲校友会" | ||||
| //url := "http://wechat.hitxy.org.au/profile_newly_register" | //url := "http://wechat.hitxy.org.au/profile_newly_register" | ||||
| } | } | ||||
| //when user left | //when user left | ||||
| func onUnSubscribe(in InWechatMsg) { | |||||
| func onUnSubscribe(in* InWechatMsg) { | |||||
| info, found, err := crmFindLeadByOpenID(in.header.FromUserName) | info, found, err := crmFindLeadByOpenID(in.header.FromUserName) | ||||
| if err != nil { | if err != nil { |
| } | } | ||||
| //record location | //record location | ||||
| func onLocation(in InWechatMsg) { | |||||
| func onLocation(in* InWechatMsg) { | |||||
| openID := in.header.FromUserName | openID := in.header.FromUserName | ||||
| lead, found, err := crmFindLeadByOpenID(openID) | lead, found, err := crmFindLeadByOpenID(openID) |
| func startSessionManager(concurrent int) { | func startSessionManager(concurrent int) { | ||||
| m := SessionManager{} | m := SessionManager{} | ||||
| AllInMessage = make(chan InWechatMsg, concurrent) | |||||
| AllInMessage = make(chan *InWechatMsg, concurrent) | |||||
| go m.startSessionManager(AllInMessage) | go m.startSessionManager(AllInMessage) | ||||
| } | } | ||||
| package main | package main | ||||
| func onClick(in InWechatMsg) { | |||||
| func onClick(in* InWechatMsg) { | |||||
| e := in.body.(EventMsg) | e := in.body.(EventMsg) | ||||
| openid := in.header.FromUserName | openid := in.header.FromUserName | ||||
| switch e.EventKey { | switch e.EventKey { |
| ss.Procedure = "" | ss.Procedure = "" | ||||
| } | } | ||||
| func (m *procDummyDef) start(ss *openIDSessionData, in InWechatMsg) { | |||||
| func (m *procDummyDef) start(ss *openIDSessionData, in* InWechatMsg) { | |||||
| } | } | ||||
| func (m *procDummyDef) serve(ss *openIDSessionData, in InWechatMsg) { | |||||
| func (m *procDummyDef) serve(ss *openIDSessionData, in* InWechatMsg) { | |||||
| } | } | ||||
| func (m *procDummyDef) summary(ss *openIDSessionData) { | func (m *procDummyDef) summary(ss *openIDSessionData) { | ||||
| } | } | ||||
| func (m *procDummyDef) intro(ss *openIDSessionData, in InWechatMsg) { | |||||
| func (m *procDummyDef) intro(ss *openIDSessionData, in* InWechatMsg) { | |||||
| } | } |
| //log.Println(*ss) | //log.Println(*ss) | ||||
| } | } | ||||
| func (m *procEchoDef) start(ss *openIDSessionData, in InWechatMsg) { | |||||
| func (m *procEchoDef) start(ss *openIDSessionData, in* InWechatMsg) { | |||||
| ss.setKvPair("started at", time.Now().Format("2006/03:04:05")) | ss.setKvPair("started at", time.Now().Format("2006/03:04:05")) | ||||
| } | } | ||||
| func (m *procEchoDef) serve(ss *openIDSessionData, in InWechatMsg) { | |||||
| func (m *procEchoDef) serve(ss *openIDSessionData, in* InWechatMsg) { | |||||
| stopEcho := false | stopEcho := false | ||||
| if in.header.MsgType == "text" { | if in.header.MsgType == "text" { | ||||
| if in.body.(TextMsg).Content == "结束Echo" || in.body.(TextMsg).Content == "结束echo" { | if in.body.(TextMsg).Content == "结束Echo" || in.body.(TextMsg).Content == "结束echo" { | ||||
| kfSendTxt(ss.OpenID, msg) | kfSendTxt(ss.OpenID, msg) | ||||
| } | } | ||||
| func (m *procEchoDef) intro(ss *openIDSessionData, in InWechatMsg) { | |||||
| func (m *procEchoDef) intro(ss *openIDSessionData, in* InWechatMsg) { | |||||
| in.replyText("请输入不同类型的微信信息,比如文字,图片,视频,地址,链接,我们将原样回应您") | in.replyText("请输入不同类型的微信信息,比如文字,图片,视频,地址,链接,我们将原样回应您") | ||||
| kfSendTxt(ss.OpenID, "10分钟静默之后 Echo将自动结束,\n输入 [结束Echo],或者语音 ‘退出退出', 清除Echo模式") | kfSendTxt(ss.OpenID, "10分钟静默之后 Echo将自动结束,\n输入 [结束Echo],或者语音 ‘退出退出', 清除Echo模式") | ||||
| } | } | ||||
| func (m *procEchoDef) doCommand(ss *openIDSessionData, in InWechatMsg) { | |||||
| func (m *procEchoDef) doCommand(ss *openIDSessionData, in* InWechatMsg) { | |||||
| openID := in.header.FromUserName | openID := in.header.FromUserName | ||||
| str, err := BuildTextMsg(openID, "default") | str, err := BuildTextMsg(openID, "default") | ||||
| log.Println("echoCommand :" + in.header.MsgType) | log.Println("echoCommand :" + in.header.MsgType) | ||||
| return | return | ||||
| } | } | ||||
| func (m *procEchoDef) doEvents(ss *openIDSessionData, in InWechatMsg) { | |||||
| func (m *procEchoDef) doEvents(ss *openIDSessionData, in* InWechatMsg) { | |||||
| openID := in.header.FromUserName | openID := in.header.FromUserName | ||||
| str := "" | str := "" | ||||
| e := in.body.(EventMsg) | e := in.body.(EventMsg) |
| func (m *getBasicUserInfoDef) clean(ss *openIDSessionData) { | func (m *getBasicUserInfoDef) clean(ss *openIDSessionData) { | ||||
| ss.Procedure = "" | ss.Procedure = "" | ||||
| } | } | ||||
| func (m *getBasicUserInfoDef) start(ss *openIDSessionData, in InWechatMsg) { | |||||
| procGetBasicUserInfo.summary(ss) | |||||
| in.replyText("get user info") | |||||
| func (m *getBasicUserInfoDef) start(ss *openIDSessionData, in *InWechatMsg) { | |||||
| //procGetBasicUserInfo.summary(ss) | |||||
| in.replyText("panic") | |||||
| } | } | ||||
| func (m *getBasicUserInfoDef) serve(ss *openIDSessionData, in InWechatMsg) { | |||||
| func (m *getBasicUserInfoDef) serve(ss *openIDSessionData, in *InWechatMsg) { | |||||
| in.replyText("servve get user info") | in.replyText("servve get user info") | ||||
| procGetBasicUserInfo.clean(ss) | procGetBasicUserInfo.clean(ss) | ||||
| } | } | ||||
| func (m *getBasicUserInfoDef) intro(ss *openIDSessionData, in InWechatMsg) { | |||||
| func (m *getBasicUserInfoDef) intro(ss *openIDSessionData, in *InWechatMsg) { | |||||
| in.replyText("111") | |||||
| } | } |
| init(*openIDSessionData) //house keeping | init(*openIDSessionData) //house keeping | ||||
| clean(*openIDSessionData) //house keeping | clean(*openIDSessionData) //house keeping | ||||
| start(*openIDSessionData, InWechatMsg) //for first message | |||||
| serve(*openIDSessionData, InWechatMsg) //for all subsequent message | |||||
| summary(*openIDSessionData) //after all message has been done | |||||
| intro(*openIDSessionData, InWechatMsg) //initial text/video/voice introduction | |||||
| start(*openIDSessionData, *InWechatMsg) //for first message | |||||
| serve(*openIDSessionData, *InWechatMsg) //for all subsequent message | |||||
| summary(*openIDSessionData) //after all message has been done | |||||
| intro(*openIDSessionData, *InWechatMsg) //initial text/video/voice introduction | |||||
| } | } | ||||
| //AllProc all procedure that we implemented | //AllProc all procedure that we implemented |
| "time" | "time" | ||||
| ) | ) | ||||
| type commandFunction func(*openIDSessionData, InWechatMsg) bool | |||||
| type commandFunction func(*openIDSessionData, *InWechatMsg) bool | |||||
| var commandMap = map[string]commandFunction{ | var commandMap = map[string]commandFunction{ | ||||
| "版本信息": cmdVersion, | "版本信息": cmdVersion, | ||||
| "回音": cmdEcho, | "回音": cmdEcho, | ||||
| } | } | ||||
| func (ss *openIDSessionData) serveCommand(in InWechatMsg) (processed bool) { | |||||
| func (ss *openIDSessionData) serveCommand(in *InWechatMsg) (processed bool) { | |||||
| switch in.header.MsgType { | switch in.header.MsgType { | ||||
| case "text": | case "text": | ||||
| processed = ss.serveTextCommand(in) | processed = ss.serveTextCommand(in) | ||||
| } | } | ||||
| func (ss *openIDSessionData) serveTextCommand(in InWechatMsg) (processed bool) { | |||||
| func (ss *openIDSessionData) serveTextCommand(in *InWechatMsg) (processed bool) { | |||||
| cmd := "" | cmd := "" | ||||
| if in.header.MsgType == "text" { | if in.header.MsgType == "text" { | ||||
| cmd = in.body.(TextMsg).Content | cmd = in.body.(TextMsg).Content | ||||
| return false //不认识的命令,我们选择这个信息不处理 | return false //不认识的命令,我们选择这个信息不处理 | ||||
| } | } | ||||
| func allCommand(ss *openIDSessionData, in InWechatMsg) (processed bool) { | |||||
| func allCommand(ss *openIDSessionData, in *InWechatMsg) (processed bool) { | |||||
| processed = true | processed = true | ||||
| msg := "命令如下:\n" | msg := "命令如下:\n" | ||||
| count := 0 | count := 0 | ||||
| return | return | ||||
| } | } | ||||
| func cmdVersion(ss *openIDSessionData, in InWechatMsg) (processed bool) { | |||||
| func cmdVersion(ss *openIDSessionData, in *InWechatMsg) (processed bool) { | |||||
| processed = true | processed = true | ||||
| in.replyText("这是测试版本" + time.Now().Format("2006/01/02 03:04:05")) | in.replyText("这是测试版本" + time.Now().Format("2006/01/02 03:04:05")) | ||||
| return | return | ||||
| } | } | ||||
| func cmdEcho(ss *openIDSessionData, in InWechatMsg) (processed bool) { | |||||
| func cmdEcho(ss *openIDSessionData, in *InWechatMsg) (processed bool) { | |||||
| processed = true | processed = true | ||||
| procEcho.init(ss) | procEcho.init(ss) | ||||
| procEcho.intro(ss, in) | procEcho.intro(ss, in) |
| package main | package main | ||||
| func (ss *openIDSessionData) serveEvents(in InWechatMsg) (processed bool) { | |||||
| func (ss *openIDSessionData) serveEvents(in* InWechatMsg) (processed bool) { | |||||
| processed = true | processed = true | ||||
| e := in.body.(EventMsg) | e := in.body.(EventMsg) | ||||
| switch e.Event { | switch e.Event { |
| log.Println("Error: Invalid Input ") | log.Println("Error: Invalid Input ") | ||||
| } else { | } else { | ||||
| //put into user session based pipeline | //put into user session based pipeline | ||||
| AllInMessage <- in | |||||
| AllInMessage <- &in | |||||
| //read instant response | //read instant response | ||||
| reply = <-in.instantResponse | reply = <-in.instantResponse | ||||
| } | } |
| type openIDSession struct { | type openIDSession struct { | ||||
| openID string //who is this? | openID string //who is this? | ||||
| count int // number of message in the Queue, channel | count int // number of message in the Queue, channel | ||||
| jobs chan InWechatMsg | |||||
| jobs chan *InWechatMsg | |||||
| data openIDSessionData //session data, that needs to be saved to disk | data openIDSessionData //session data, that needs to be saved to disk | ||||
| } | } | ||||
| // we collect them and redistribute it according to | // we collect them and redistribute it according to | ||||
| // openID, all messages for a single openID will be | // openID, all messages for a single openID will be | ||||
| // sequentially processed | // sequentially processed | ||||
| var AllInMessage chan InWechatMsg | |||||
| var AllInMessage chan *InWechatMsg | |||||
| //a stand alone routine that manage all | //a stand alone routine that manage all | ||||
| // live sessions | // live sessions | ||||
| // take all message and fanout to different openid handlers | // take all message and fanout to different openid handlers | ||||
| // for each openid, its message are manipulated in serial manner. | // for each openid, its message are manipulated in serial manner. | ||||
| func (m *SessionManager) startSessionManager(in <-chan InWechatMsg) { | |||||
| func (m *SessionManager) startSessionManager(in <-chan *InWechatMsg) { | |||||
| log.Println("session manager start..") | log.Println("session manager start..") | ||||
| m.sessions = map[string]openIDSession{} | m.sessions = map[string]openIDSession{} | ||||
| m.done = make(chan workDone, 2000) //2000 different openID simultaneously. | m.done = make(chan workDone, 2000) //2000 different openID simultaneously. | ||||
| } | } | ||||
| } | } | ||||
| func (m *SessionManager) processIncomingMsg(v InWechatMsg) { | |||||
| func (m *SessionManager) processIncomingMsg(v *InWechatMsg) { | |||||
| cnt := m.addJob(v) | cnt := m.addJob(v) | ||||
| if cnt == 1 { //there is no worker thread working | if cnt == 1 { //there is no worker thread working | ||||
| go m.startJob(v.header.FromUserName) | go m.startJob(v.header.FromUserName) | ||||
| } | } | ||||
| } | } | ||||
| func (m *SessionManager) addJob(v InWechatMsg) (jobCount int) { | |||||
| func (m *SessionManager) addJob(v *InWechatMsg) (jobCount int) { | |||||
| openID := v.header.FromUserName | openID := v.header.FromUserName | ||||
| s, found := m.sessions[openID] | s, found := m.sessions[openID] | ||||
| if !found { //create one | if !found { //create one | ||||
| s := openIDSession{} | s := openIDSession{} | ||||
| s.openID = openID | s.openID = openID | ||||
| s.count = 0 | s.count = 0 | ||||
| s.jobs = make(chan InWechatMsg, 50) | |||||
| s.jobs = make(chan *InWechatMsg, 50) | |||||
| s.data = openIDSessionData{} | s.data = openIDSessionData{} | ||||
| s.data.Load(openID) //either load or create new | s.data.Load(openID) //either load or create new | ||||
| m.sessions[openID] = s //register it to memory | m.sessions[openID] = s //register it to memory | ||||
| return | return | ||||
| } | } | ||||
| func (m *SessionManager) checkOpenID(openID string, v InWechatMsg) { | |||||
| func (m *SessionManager) checkOpenID(openID string, v *InWechatMsg) { | |||||
| if v.header.FromUserName != openID { | if v.header.FromUserName != openID { | ||||
| log.Println("Error:[SessionMgr/checkOpenID] Weird Message below ...") | log.Println("Error:[SessionMgr/checkOpenID] Weird Message below ...") | ||||
| log.Println(v) | log.Println(v) |