Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

154 Zeilen
4.3KB

  1. package main
  2. import (
  3. "log"
  4. )
  5. //openIDSession for a given openID
  6. // regardless howmany
  7. type openIDSession struct {
  8. openID string //who is this?
  9. count int // number of message in the Queue, channel
  10. jobs chan InWechatMsg
  11. data openIDSessionData //session data, that needs to be saved to disk
  12. }
  13. type workDone struct {
  14. openID string //which user
  15. consumed int //job done
  16. }
  17. //SessionManager manage all sessions
  18. type SessionManager struct {
  19. sessions map[string]openIDSession
  20. done chan workDone
  21. }
  22. //AllInMessage every API post that we get
  23. // we collect them and redistribute it according to
  24. // openID, all messages for a single openID will be
  25. // sequentially processed
  26. var AllInMessage chan InWechatMsg
  27. //a stand alone routine that manage all
  28. // live sessions
  29. // take all message and fanout to different openid handlers
  30. // for each openid, its message are manipulated in serial manner.
  31. func (m *SessionManager) startSessionManager(in <-chan InWechatMsg) {
  32. log.Println("session manager start..")
  33. m.sessions = map[string]openIDSession{}
  34. m.done = make(chan workDone, 2000) //2000 different openID simultaneously.
  35. for { //forever looping
  36. select {
  37. case msg := <-in:
  38. log.Printf("SessionMgr : incoming msg %s (%s)", msg.header.FromUserName, msg.header.MsgType)
  39. m.processIncomingMsg(msg)
  40. case d := <-m.done:
  41. log.Printf("SessionMgr : worker done openid=%s (done)=%d", d.openID, d.consumed)
  42. m.clearJobDone(d)
  43. }
  44. }
  45. }
  46. func (m *SessionManager) processIncomingMsg(v InWechatMsg) {
  47. cnt := m.addJob(v)
  48. if cnt == 1 { //there is no worker thread working
  49. go m.startJob(v.header.FromUserName)
  50. }
  51. }
  52. func (m *SessionManager) addJob(v InWechatMsg) (jobCount int) {
  53. openID := v.header.FromUserName
  54. s, found := m.sessions[openID]
  55. if !found { //create one
  56. s = m.createSession(openID)
  57. }
  58. s.jobs <- v //add job to channel
  59. s.count++
  60. m.sessions[openID] = s
  61. log.Printf("SessionMgr/addJob: Incoming message in queue %d", s.count)
  62. log.Printf("SessionMgr/addJob: Total Session count = %d", len(m.sessions))
  63. if s.count <= 0 {
  64. log.Fatal("SessionMgr/Addjob: new job added, but count <= 0")
  65. }
  66. return s.count
  67. }
  68. func (m *SessionManager) createSession(openID string) openIDSession {
  69. s := openIDSession{}
  70. s.openID = openID
  71. s.count = 0
  72. s.jobs = make(chan InWechatMsg, 50)
  73. s.data = openIDSessionData{}
  74. s.data.Load(openID) //either load or create new
  75. m.sessions[openID] = s //register it to memory
  76. return s
  77. }
  78. func (m *SessionManager) clearJobDone(d workDone) {
  79. s, found := m.sessions[d.openID]
  80. log.Println("SessionMgr/clearJobDone: start clearning jobs ... ")
  81. log.Println(s)
  82. log.Println(d)
  83. if found {
  84. s.count -= d.consumed
  85. m.sessions[d.openID] = s //update
  86. if s.count == 0 { //no job to do
  87. //remove from memory
  88. m.destroySession(d.openID)
  89. log.Printf("SessionMgr/clearJobDone: destroy session %s", d.openID)
  90. } else if s.count > 0 {
  91. go m.startJob(d.openID) //processing any newly coming jobs
  92. } else {
  93. log.Println(s)
  94. log.Fatal("SessionMgr/clearJobDone: session job count cannot be negative, problem session")
  95. }
  96. } else {
  97. log.Println(d)
  98. log.Fatal("SessionMgr/clearJobDone: When job done, we canot find proper session")
  99. }
  100. }
  101. func (m *SessionManager) destroySession(openID string) {
  102. //save session data to disk
  103. data := m.sessions[openID].data
  104. data.Save()
  105. //close job channels
  106. s := m.sessions[openID]
  107. close(s.jobs)
  108. //delete it from memory
  109. delete(m.sessions, openID)
  110. log.Printf("SessionMgr/destroySession: total session count=%d", len(m.sessions))
  111. }
  112. //worker thread, cannot change session info
  113. func (m *SessionManager) startJob(openID string) {
  114. log.Println("SessionMgr/startJob: enter...")
  115. jobFinished := workDone{openID, 0}
  116. //process all jobs in the channel
  117. hasJob := true
  118. for hasJob {
  119. select {
  120. case v := <-m.sessions[openID].jobs:
  121. m.checkOpenID(openID, v)
  122. s := m.sessions[openID]
  123. s.data.incomingMsg(v) //<=== main logic for processing each incoming message
  124. jobFinished.consumed++
  125. default:
  126. hasJob = false
  127. }
  128. }
  129. m.done <- jobFinished //notify parent that we have done
  130. return
  131. }
  132. func (m *SessionManager) checkOpenID(openID string, v InWechatMsg) {
  133. if v.header.FromUserName != openID {
  134. log.Println("Error:[SessionMgr/checkOpenID] Weird Message below ...")
  135. log.Println(v)
  136. log.Fatalf("Error:[[SessionMgr/checkOpenID]] worker thread for %s, see different id=%s \n", openID, v.header.FromUserName)
  137. }
  138. }