Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

151 lines
3.9KB

  1. package main
  2. import (
  3. "log"
  4. )
  5. //openIDSession for a given openID
  6. // regardless howmany
  7. type openIDSession struct {
  8. openID string //who is this?
  9. count int // number of message in the Queue, channel
  10. jobs chan InWechatMsg
  11. data openIDSessionData //session data, that needs to be saved to disk
  12. }
  13. type workDone struct {
  14. openID string //which user
  15. consumed int //job done
  16. }
  17. //SessionManager manage all sessions
  18. type SessionManager struct {
  19. sessions map[string]openIDSession
  20. done chan workDone
  21. }
  22. //AllInMessage every API post that we get
  23. // we collect them and redistribute it according to
  24. // openID, all messages for a single openID will be
  25. // sequentially processed
  26. var AllInMessage chan InWechatMsg
  27. //a stand alone routine that manage all
  28. // live sessions
  29. // take all message and fanout to different openid handlers
  30. // for each openid, its message are manipulated in serial manner.
  31. func (m *SessionManager) startSessionManager(in <-chan InWechatMsg) {
  32. log.Println("session manager start..")
  33. m.sessions = map[string]openIDSession{}
  34. m.done = make(chan workDone, 2000)
  35. for { //forever looping
  36. select {
  37. case msg := <-in:
  38. log.Printf("SessionMgr : incoming msg %s (%s)", msg.header.FromUserName, msg.header.MsgType)
  39. m.processIncomingMsg(msg)
  40. case d := <-m.done:
  41. log.Printf("SessionMgr : worker done openid=%s (done)=%d", d.openID, d.consumed)
  42. m.clearJobDone(d)
  43. }
  44. }
  45. }
  46. func (m *SessionManager) processIncomingMsg(v InWechatMsg) {
  47. cnt := m.addJob(v)
  48. if cnt == 1 { //there is no worker thread working
  49. go m.startJob(v.header.FromUserName)
  50. }
  51. }
  52. func (m *SessionManager) addJob(v InWechatMsg) (jobCount int) {
  53. openID := v.header.FromUserName
  54. s, found := m.sessions[openID]
  55. if !found { //create one
  56. s = m.createSession(openID)
  57. }
  58. s.jobs <- v //add job to channel
  59. s.count++
  60. m.sessions[openID] = s
  61. log.Printf("Incoming message in queue %d", s.count)
  62. log.Printf("Total Session count = %d", len(m.sessions))
  63. if s.count <= 0 {
  64. log.Fatal("new job added, but count <= 0")
  65. }
  66. return s.count
  67. }
  68. func (m *SessionManager) createSession(openID string) openIDSession {
  69. s := openIDSession{}
  70. s.openID = openID
  71. s.count = 0
  72. s.jobs = make(chan InWechatMsg, 50)
  73. s.data = openIDSessionData{}
  74. s.data.Load(openID) //either load or create new
  75. m.sessions[openID] = s //register it to memory
  76. return s
  77. }
  78. func (m *SessionManager) clearJobDone(d workDone) {
  79. s, found := m.sessions[d.openID]
  80. log.Println(s)
  81. if found {
  82. s.count -= d.consumed
  83. if s.count == 0 { //no job to do
  84. //remove from memory
  85. m.destroySession(d.openID)
  86. log.Printf("destroy session %s", d.openID)
  87. } else if s.count > 0 {
  88. go m.startJob(d.openID) //processing any newly coming jobs
  89. } else {
  90. log.Println(s)
  91. log.Fatal("session job count cannot be negative, problem session")
  92. }
  93. } else {
  94. log.Println(d)
  95. log.Fatal("When job done, we canot find proper session")
  96. }
  97. }
  98. func (m *SessionManager) destroySession(openID string) {
  99. //save session data to disk
  100. data := m.sessions[openID].data
  101. data.Save()
  102. //close job channels
  103. s := m.sessions[openID]
  104. close(s.jobs)
  105. //delete it from memory
  106. delete(m.sessions, openID)
  107. log.Printf("total session count=%d", len(m.sessions))
  108. }
  109. //worker thread
  110. func (m *SessionManager) startJob(openID string) {
  111. log.Println("start job worker...")
  112. jobFinished := workDone{openID, 0}
  113. //process all jobs in the channel
  114. hasJob := true
  115. for hasJob {
  116. select {
  117. case v := <-m.sessions[openID].jobs:
  118. m.checkOpenID(openID, v)
  119. s := m.sessions[openID]
  120. s.data.incomingMsg(v) //<=== main logic for processing each incoming message
  121. jobFinished.consumed++
  122. default:
  123. hasJob = false
  124. }
  125. }
  126. m.done <- jobFinished //notify parent that we have done
  127. return
  128. }
  129. func (m *SessionManager) checkOpenID(openID string, v InWechatMsg) {
  130. if v.header.FromUserName != openID {
  131. log.Println("Error: Weird Message below ...")
  132. log.Println(v)
  133. log.Fatalf("Error: worker thread for %s, see different id=%s \n", openID, v.header.FromUserName)
  134. }
  135. }