You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

149 lines
4.0KB

  1. package main
  2. import (
  3. "log"
  4. "time"
  5. )
  6. //openIDSession for a given openID
  7. // regardless howmany
  8. type openIDSession struct {
  9. openID string //who is this?
  10. count int // number of message in the Queue, channel
  11. jobs chan InWechatMsg
  12. data openIDSessionData //session data, that needs to be saved to disk
  13. states map[string]chatState //map from procedure to its current states
  14. }
  15. type workDone struct {
  16. openID string //which user
  17. consumed int //job done
  18. }
  19. //SessionManager manage all sessions
  20. type SessionManager struct {
  21. sessions map[string]openIDSession
  22. done chan workDone
  23. }
  24. //AllInMessage every API post that we get
  25. // we collect them and redistribute it according to
  26. // openID, all messages for a single openID will be
  27. // sequentially processed
  28. var AllInMessage chan InWechatMsg
  29. //a stand alone routine that manage all
  30. // live sessions
  31. // take all message and fanout to different openid handlers
  32. // for each openid, its message are manipulated in serial manner.
  33. func (m *SessionManager) startSessionManager(in <-chan InWechatMsg) {
  34. log.Println("session manager start..")
  35. m.sessions = map[string]openIDSession{}
  36. m.done = make(chan workDone, 2000)
  37. for { //forever looping
  38. select {
  39. case msg := <-in:
  40. log.Printf("SessionMgr : incoming msg %s (%s)", msg.header.FromUserName, msg.header.MsgType)
  41. m.processIncomingMsg(msg)
  42. case d := <-m.done:
  43. log.Printf("SessionMgr : worker done openid=%s (done)=%d", d.openID, d.consumed)
  44. m.clearJobDone(d)
  45. }
  46. }
  47. }
  48. func (m *SessionManager) processIncomingMsg(v InWechatMsg) {
  49. cnt := m.addJob(v)
  50. if cnt == 1 { //there is no worker thread working
  51. go m.startJob(v.header.FromUserName)
  52. }
  53. }
  54. func (m *SessionManager) addJob(v InWechatMsg) (jobCount int) {
  55. openID := v.header.FromUserName
  56. s, found := m.sessions[openID]
  57. if !found { //create one
  58. s = m.createSession(openID)
  59. }
  60. s.jobs <- v //add job to channel
  61. s.count++
  62. m.sessions[openID] = s
  63. log.Printf("Incoming message in queue %d", s.count)
  64. log.Printf("Total Session count = %d", len(m.sessions))
  65. if s.count <= 0 {
  66. log.Fatal("new job added, but count <= 0")
  67. }
  68. return s.count
  69. }
  70. func (m *SessionManager) createSession(openID string) openIDSession {
  71. s := openIDSession{}
  72. s.openID = openID
  73. s.count = 0
  74. s.jobs = make(chan InWechatMsg, 200)
  75. s.data, _ = getCurrentSesssion(openID) //either load or create new
  76. m.sessions[openID] = s //register it to memory
  77. return s
  78. }
  79. func (m *SessionManager) clearJobDone(d workDone) {
  80. s, found := m.sessions[d.openID]
  81. log.Println(s)
  82. if found {
  83. s.count -= d.consumed
  84. if s.count == 0 { //no job to do
  85. //save session data to disk
  86. data := m.sessions[d.openID].data
  87. data.Save()
  88. //remove from memory
  89. m.destroySession(d.openID)
  90. log.Printf("destroy session %s", d.openID)
  91. } else if s.count > 0 {
  92. go m.startJob(d.openID) //processing any newly coming jobs
  93. } else {
  94. log.Println(s)
  95. log.Fatal("session job count cannot be negative, problem session")
  96. }
  97. } else {
  98. log.Println(d)
  99. log.Fatal("When job done, we canot find proper session")
  100. }
  101. }
  102. func (m *SessionManager) destroySession(openID string) {
  103. //close job channels
  104. s := m.sessions[openID]
  105. close(s.jobs)
  106. //delete it from memory
  107. delete(m.sessions, openID)
  108. log.Printf("total session count=%d", len(m.sessions))
  109. }
  110. //worker thread
  111. func (m *SessionManager) startJob(openID string) {
  112. log.Println("start job worker...")
  113. jobFinished := workDone{openID, 0}
  114. //process all jobs in the channel
  115. hasJob := true
  116. for hasJob {
  117. select {
  118. case v := <-m.sessions[openID].jobs:
  119. if v.header.FromUserName != openID {
  120. log.Println("Error: Weird Message below ...")
  121. log.Println(v)
  122. log.Fatalf("Error: worker thread for %s, see different id=%s \n", openID, v.header.FromUserName)
  123. }
  124. log.Println(" Processing job..")
  125. log.Println(v)
  126. time.Sleep(5 * time.Second)
  127. kfSendTxt(openID, "Job Processed "+v.body.(TextMsg).Content)
  128. jobFinished.consumed++
  129. default:
  130. hasJob = false
  131. }
  132. }
  133. m.done <- jobFinished //notify parent that we have done
  134. return
  135. }