Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

148 lines
3.9KB

  1. package main
  2. import (
  3. "log"
  4. "time"
  5. )
  6. //openIDSession for a given openID
  7. // regardless howmany
  8. type openIDSession struct {
  9. openID string //who is this?
  10. count int // number of message in the Queue, channel
  11. jobs chan InWechatMsg
  12. data openIDSessionData //session data, that needs to be saved to disk
  13. }
  14. type workDone struct {
  15. openID string //which user
  16. consumed int //job done
  17. }
  18. //SessionManager manage all sessions
  19. type SessionManager struct {
  20. sessions map[string]openIDSession
  21. done chan workDone
  22. }
  23. //AllInMessage every API post that we get
  24. // we collect them and redistribute it according to
  25. // openID, all messages for a single openID will be
  26. // sequentially processed
  27. var AllInMessage chan InWechatMsg
  28. //a stand alone routine that manage all
  29. // live sessions
  30. // take all message and fanout to different openid handlers
  31. // for each openid, its message are manipulated in serial manner.
  32. func (m *SessionManager) startSessionManager(in <-chan InWechatMsg) {
  33. log.Println("session manager start..")
  34. m.sessions = map[string]openIDSession{}
  35. m.done = make(chan workDone, 2000)
  36. for { //forever looping
  37. select {
  38. case msg := <-in:
  39. log.Printf("SessionMgr : incoming msg %s (%s)", msg.header.FromUserName, msg.header.MsgType)
  40. m.processIncomingMsg(msg)
  41. case d := <-m.done:
  42. log.Printf("SessionMgr : worker done openid=%s (done)=%d", d.openID, d.consumed)
  43. m.clearJobDone(d)
  44. }
  45. }
  46. }
  47. func (m *SessionManager) processIncomingMsg(v InWechatMsg) {
  48. cnt := m.addJob(v)
  49. if cnt == 1 { //there is no worker thread working
  50. go m.startJob(v.header.FromUserName)
  51. }
  52. }
  53. func (m *SessionManager) addJob(v InWechatMsg) (jobCount int) {
  54. openID := v.header.FromUserName
  55. s, found := m.sessions[openID]
  56. if !found { //create one
  57. s = m.createSession(openID)
  58. }
  59. s.jobs <- v //add job to channel
  60. s.count++
  61. m.sessions[openID] = s
  62. log.Printf("Incoming message in queue %d", s.count)
  63. log.Printf("Total Session count = %d", len(m.sessions))
  64. if s.count <= 0 {
  65. log.Fatal("new job added, but count <= 0")
  66. }
  67. return s.count
  68. }
  69. func (m *SessionManager) createSession(openID string) openIDSession {
  70. s := openIDSession{}
  71. s.openID = openID
  72. s.count = 0
  73. s.jobs = make(chan InWechatMsg, 200)
  74. s.data, _ = getCurrentSesssion(openID) //either load or create new
  75. m.sessions[openID] = s //register it to memory
  76. return s
  77. }
  78. func (m *SessionManager) clearJobDone(d workDone) {
  79. s, found := m.sessions[d.openID]
  80. log.Println(s)
  81. if found {
  82. s.count -= d.consumed
  83. if s.count == 0 { //no job to do
  84. //save session data to disk
  85. data := m.sessions[d.openID].data
  86. data.Save()
  87. //remove from memory
  88. m.destroySession(d.openID)
  89. log.Printf("destroy session %s", d.openID)
  90. } else if s.count > 0 {
  91. go m.startJob(d.openID) //processing any newly coming jobs
  92. } else {
  93. log.Println(s)
  94. log.Fatal("session job count cannot be negative, problem session")
  95. }
  96. } else {
  97. log.Println(d)
  98. log.Fatal("When job done, we canot find proper session")
  99. }
  100. }
  101. func (m *SessionManager) destroySession(openID string) {
  102. //close job channels
  103. s := m.sessions[openID]
  104. close(s.jobs)
  105. //delete it from memory
  106. delete(m.sessions, openID)
  107. log.Printf("total session count=%d", len(m.sessions))
  108. }
  109. //worker thread
  110. func (m *SessionManager) startJob(openID string) {
  111. log.Println("start job worker...")
  112. jobFinished := workDone{openID, 0}
  113. //process all jobs in the channel
  114. hasJob := true
  115. for hasJob {
  116. select {
  117. case v := <-m.sessions[openID].jobs:
  118. if v.header.FromUserName != openID {
  119. log.Println("Error: Weird Message below ...")
  120. log.Println(v)
  121. log.Fatalf("Error: worker thread for %s, see different id=%s \n", openID, v.header.FromUserName)
  122. }
  123. log.Println(" Processing job..")
  124. log.Println(v)
  125. time.Sleep(5 * time.Second)
  126. kfSendTxt(openID, "Job Processed "+v.body.(TextMsg).Content)
  127. jobFinished.consumed++
  128. default:
  129. hasJob = false
  130. }
  131. }
  132. m.done <- jobFinished //notify parent that we have done
  133. return
  134. }