hub.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. package router
  2. import (
  3. "encoding/json"
  4. "sync"
  5. "realtime-gateway/internal/model"
  6. )
  7. type Subscriber interface {
  8. ID() string
  9. Send(message model.ServerMessage) error
  10. }
  11. type Hub struct {
  12. mu sync.RWMutex
  13. subscribers map[string]Subscriber
  14. filters map[string][]model.Subscription
  15. latestState map[string]model.Envelope
  16. liveFeeds map[uint64]chan model.Envelope
  17. nextLiveID uint64
  18. stats trafficStats
  19. maxLatest int
  20. }
  21. type TrafficSnapshot struct {
  22. Published uint64 `json:"published"`
  23. Dropped uint64 `json:"dropped"`
  24. Fanout uint64 `json:"fanout"`
  25. Topics []TopicTrafficItem `json:"topics"`
  26. Channels []ChannelTrafficItem `json:"channels"`
  27. }
  28. type TopicTrafficItem struct {
  29. Topic string `json:"topic"`
  30. Published uint64 `json:"published"`
  31. Dropped uint64 `json:"dropped"`
  32. Fanout uint64 `json:"fanout"`
  33. }
  34. type ChannelTrafficItem struct {
  35. ChannelID string `json:"channelId"`
  36. Published uint64 `json:"published"`
  37. Dropped uint64 `json:"dropped"`
  38. Fanout uint64 `json:"fanout"`
  39. }
  40. type trafficStats struct {
  41. Published uint64
  42. Dropped uint64
  43. Fanout uint64
  44. Topics map[string]*trafficCounter
  45. Channels map[string]*trafficCounter
  46. }
  47. type trafficCounter struct {
  48. Published uint64
  49. Dropped uint64
  50. Fanout uint64
  51. }
  52. type PublishResult struct {
  53. Matched int `json:"matched"`
  54. Stored bool `json:"stored"`
  55. Dropped bool `json:"dropped"`
  56. }
  57. func NewHub(maxLatest int) *Hub {
  58. return &Hub{
  59. subscribers: make(map[string]Subscriber),
  60. filters: make(map[string][]model.Subscription),
  61. latestState: make(map[string]model.Envelope),
  62. liveFeeds: make(map[uint64]chan model.Envelope),
  63. stats: trafficStats{
  64. Topics: make(map[string]*trafficCounter),
  65. Channels: make(map[string]*trafficCounter),
  66. },
  67. maxLatest: maxLatest,
  68. }
  69. }
  70. func (h *Hub) Register(subscriber Subscriber, subscriptions []model.Subscription) {
  71. h.mu.Lock()
  72. defer h.mu.Unlock()
  73. h.subscribers[subscriber.ID()] = subscriber
  74. h.filters[subscriber.ID()] = subscriptions
  75. }
  76. func (h *Hub) Unregister(subscriberID string) {
  77. h.mu.Lock()
  78. defer h.mu.Unlock()
  79. delete(h.subscribers, subscriberID)
  80. delete(h.filters, subscriberID)
  81. }
  82. func (h *Hub) UpdateSubscriptions(subscriberID string, subscriptions []model.Subscription) {
  83. h.mu.Lock()
  84. defer h.mu.Unlock()
  85. h.filters[subscriberID] = subscriptions
  86. }
  87. func (h *Hub) Publish(envelope model.Envelope, deliveryMode string) PublishResult {
  88. h.mu.RLock()
  89. matches := make([]Subscriber, 0, len(h.subscribers))
  90. for subscriberID, subscriber := range h.subscribers {
  91. subscriptions := h.filters[subscriberID]
  92. if !matchesAny(envelope, subscriptions) {
  93. continue
  94. }
  95. matches = append(matches, subscriber)
  96. }
  97. h.mu.RUnlock()
  98. if deliveryMode == "drop_if_no_consumer" && len(matches) == 0 {
  99. h.recordTraffic(envelope, 0, true)
  100. return PublishResult{
  101. Matched: 0,
  102. Stored: false,
  103. Dropped: true,
  104. }
  105. }
  106. h.storeLatest(envelope)
  107. h.publishLive(envelope)
  108. h.recordTraffic(envelope, len(matches), false)
  109. for _, subscriber := range matches {
  110. _ = subscriber.Send(model.ServerMessage{
  111. Type: "event",
  112. Envelope: &envelope,
  113. })
  114. }
  115. return PublishResult{
  116. Matched: len(matches),
  117. Stored: true,
  118. Dropped: false,
  119. }
  120. }
  121. func (h *Hub) Snapshot(channelID string, deviceID string) (json.RawMessage, bool) {
  122. h.mu.RLock()
  123. defer h.mu.RUnlock()
  124. envelope, ok := h.latestState[latestStateKey(channelID, deviceID)]
  125. if !ok {
  126. return nil, false
  127. }
  128. data, err := json.Marshal(envelope)
  129. if err != nil {
  130. return nil, false
  131. }
  132. return data, true
  133. }
  134. func (h *Hub) Stats() (subscriberCount int, latestStateCount int) {
  135. h.mu.RLock()
  136. defer h.mu.RUnlock()
  137. return len(h.subscribers), len(h.latestState)
  138. }
  139. func (h *Hub) LatestStates() []model.Envelope {
  140. h.mu.RLock()
  141. defer h.mu.RUnlock()
  142. items := make([]model.Envelope, 0, len(h.latestState))
  143. for _, envelope := range h.latestState {
  144. items = append(items, envelope)
  145. }
  146. return items
  147. }
  148. func (h *Hub) TrafficSnapshot() TrafficSnapshot {
  149. h.mu.RLock()
  150. defer h.mu.RUnlock()
  151. topics := make([]TopicTrafficItem, 0, len(h.stats.Topics))
  152. for topic, counter := range h.stats.Topics {
  153. topics = append(topics, TopicTrafficItem{
  154. Topic: topic,
  155. Published: counter.Published,
  156. Dropped: counter.Dropped,
  157. Fanout: counter.Fanout,
  158. })
  159. }
  160. channels := make([]ChannelTrafficItem, 0, len(h.stats.Channels))
  161. for channelID, counter := range h.stats.Channels {
  162. channels = append(channels, ChannelTrafficItem{
  163. ChannelID: channelID,
  164. Published: counter.Published,
  165. Dropped: counter.Dropped,
  166. Fanout: counter.Fanout,
  167. })
  168. }
  169. return TrafficSnapshot{
  170. Published: h.stats.Published,
  171. Dropped: h.stats.Dropped,
  172. Fanout: h.stats.Fanout,
  173. Topics: topics,
  174. Channels: channels,
  175. }
  176. }
  177. func (h *Hub) SubscribeLive(buffer int) (uint64, <-chan model.Envelope) {
  178. if buffer <= 0 {
  179. buffer = 32
  180. }
  181. h.mu.Lock()
  182. defer h.mu.Unlock()
  183. h.nextLiveID += 1
  184. id := h.nextLiveID
  185. ch := make(chan model.Envelope, buffer)
  186. h.liveFeeds[id] = ch
  187. return id, ch
  188. }
  189. func (h *Hub) UnsubscribeLive(id uint64) {
  190. h.mu.Lock()
  191. defer h.mu.Unlock()
  192. ch, ok := h.liveFeeds[id]
  193. if !ok {
  194. return
  195. }
  196. delete(h.liveFeeds, id)
  197. close(ch)
  198. }
  199. func (h *Hub) storeLatest(envelope model.Envelope) {
  200. if envelope.Target.DeviceID == "" {
  201. return
  202. }
  203. h.mu.Lock()
  204. defer h.mu.Unlock()
  205. if len(h.latestState) >= h.maxLatest {
  206. for key := range h.latestState {
  207. delete(h.latestState, key)
  208. break
  209. }
  210. }
  211. h.latestState[latestStateKey(envelope.Target.ChannelID, envelope.Target.DeviceID)] = envelope
  212. }
  213. func (h *Hub) publishLive(envelope model.Envelope) {
  214. h.mu.RLock()
  215. feeds := make([]chan model.Envelope, 0, len(h.liveFeeds))
  216. for _, ch := range h.liveFeeds {
  217. feeds = append(feeds, ch)
  218. }
  219. h.mu.RUnlock()
  220. for _, ch := range feeds {
  221. select {
  222. case ch <- envelope:
  223. default:
  224. }
  225. }
  226. }
  227. func (h *Hub) recordTraffic(envelope model.Envelope, matched int, dropped bool) {
  228. h.mu.Lock()
  229. defer h.mu.Unlock()
  230. h.stats.Published += 1
  231. h.stats.Fanout += uint64(matched)
  232. if dropped {
  233. h.stats.Dropped += 1
  234. }
  235. topicKey := envelope.Topic
  236. if topicKey == "" {
  237. topicKey = "--"
  238. }
  239. topicCounter := h.stats.Topics[topicKey]
  240. if topicCounter == nil {
  241. topicCounter = &trafficCounter{}
  242. h.stats.Topics[topicKey] = topicCounter
  243. }
  244. topicCounter.Published += 1
  245. topicCounter.Fanout += uint64(matched)
  246. if dropped {
  247. topicCounter.Dropped += 1
  248. }
  249. channelKey := envelope.Target.ChannelID
  250. if channelKey == "" {
  251. channelKey = "--"
  252. }
  253. channelCounter := h.stats.Channels[channelKey]
  254. if channelCounter == nil {
  255. channelCounter = &trafficCounter{}
  256. h.stats.Channels[channelKey] = channelCounter
  257. }
  258. channelCounter.Published += 1
  259. channelCounter.Fanout += uint64(matched)
  260. if dropped {
  261. channelCounter.Dropped += 1
  262. }
  263. }
  264. func matchesAny(envelope model.Envelope, subscriptions []model.Subscription) bool {
  265. if len(subscriptions) == 0 {
  266. return false
  267. }
  268. for _, subscription := range subscriptions {
  269. if matches(envelope, subscription) {
  270. return true
  271. }
  272. }
  273. return false
  274. }
  275. func matches(envelope model.Envelope, subscription model.Subscription) bool {
  276. if subscription.ChannelID != "" && subscription.ChannelID != envelope.Target.ChannelID {
  277. return false
  278. }
  279. if subscription.DeviceID != "" && subscription.DeviceID != envelope.Target.DeviceID {
  280. return false
  281. }
  282. if subscription.GroupID != "" && subscription.GroupID != envelope.Target.GroupID {
  283. return false
  284. }
  285. if subscription.Topic != "" && subscription.Topic != envelope.Topic {
  286. return false
  287. }
  288. return true
  289. }
  290. func latestStateKey(channelID string, deviceID string) string {
  291. if channelID == "" {
  292. return deviceID
  293. }
  294. return channelID + "::" + deviceID
  295. }