| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337 |
- package router
- import (
- "encoding/json"
- "sync"
- "realtime-gateway/internal/model"
- )
- type Subscriber interface {
- ID() string
- Send(message model.ServerMessage) error
- }
- type Hub struct {
- mu sync.RWMutex
- subscribers map[string]Subscriber
- filters map[string][]model.Subscription
- latestState map[string]model.Envelope
- liveFeeds map[uint64]chan model.Envelope
- nextLiveID uint64
- stats trafficStats
- maxLatest int
- }
- type TrafficSnapshot struct {
- Published uint64 `json:"published"`
- Dropped uint64 `json:"dropped"`
- Fanout uint64 `json:"fanout"`
- Topics []TopicTrafficItem `json:"topics"`
- Channels []ChannelTrafficItem `json:"channels"`
- }
- type TopicTrafficItem struct {
- Topic string `json:"topic"`
- Published uint64 `json:"published"`
- Dropped uint64 `json:"dropped"`
- Fanout uint64 `json:"fanout"`
- }
- type ChannelTrafficItem struct {
- ChannelID string `json:"channelId"`
- Published uint64 `json:"published"`
- Dropped uint64 `json:"dropped"`
- Fanout uint64 `json:"fanout"`
- }
- type trafficStats struct {
- Published uint64
- Dropped uint64
- Fanout uint64
- Topics map[string]*trafficCounter
- Channels map[string]*trafficCounter
- }
- type trafficCounter struct {
- Published uint64
- Dropped uint64
- Fanout uint64
- }
- type PublishResult struct {
- Matched int `json:"matched"`
- Stored bool `json:"stored"`
- Dropped bool `json:"dropped"`
- }
- func NewHub(maxLatest int) *Hub {
- return &Hub{
- subscribers: make(map[string]Subscriber),
- filters: make(map[string][]model.Subscription),
- latestState: make(map[string]model.Envelope),
- liveFeeds: make(map[uint64]chan model.Envelope),
- stats: trafficStats{
- Topics: make(map[string]*trafficCounter),
- Channels: make(map[string]*trafficCounter),
- },
- maxLatest: maxLatest,
- }
- }
- func (h *Hub) Register(subscriber Subscriber, subscriptions []model.Subscription) {
- h.mu.Lock()
- defer h.mu.Unlock()
- h.subscribers[subscriber.ID()] = subscriber
- h.filters[subscriber.ID()] = subscriptions
- }
- func (h *Hub) Unregister(subscriberID string) {
- h.mu.Lock()
- defer h.mu.Unlock()
- delete(h.subscribers, subscriberID)
- delete(h.filters, subscriberID)
- }
- func (h *Hub) UpdateSubscriptions(subscriberID string, subscriptions []model.Subscription) {
- h.mu.Lock()
- defer h.mu.Unlock()
- h.filters[subscriberID] = subscriptions
- }
- func (h *Hub) Publish(envelope model.Envelope, deliveryMode string) PublishResult {
- h.mu.RLock()
- matches := make([]Subscriber, 0, len(h.subscribers))
- for subscriberID, subscriber := range h.subscribers {
- subscriptions := h.filters[subscriberID]
- if !matchesAny(envelope, subscriptions) {
- continue
- }
- matches = append(matches, subscriber)
- }
- h.mu.RUnlock()
- if deliveryMode == "drop_if_no_consumer" && len(matches) == 0 {
- h.recordTraffic(envelope, 0, true)
- return PublishResult{
- Matched: 0,
- Stored: false,
- Dropped: true,
- }
- }
- h.storeLatest(envelope)
- h.publishLive(envelope)
- h.recordTraffic(envelope, len(matches), false)
- for _, subscriber := range matches {
- _ = subscriber.Send(model.ServerMessage{
- Type: "event",
- Envelope: &envelope,
- })
- }
- return PublishResult{
- Matched: len(matches),
- Stored: true,
- Dropped: false,
- }
- }
- func (h *Hub) Snapshot(channelID string, deviceID string) (json.RawMessage, bool) {
- h.mu.RLock()
- defer h.mu.RUnlock()
- envelope, ok := h.latestState[latestStateKey(channelID, deviceID)]
- if !ok {
- return nil, false
- }
- data, err := json.Marshal(envelope)
- if err != nil {
- return nil, false
- }
- return data, true
- }
- func (h *Hub) Stats() (subscriberCount int, latestStateCount int) {
- h.mu.RLock()
- defer h.mu.RUnlock()
- return len(h.subscribers), len(h.latestState)
- }
- func (h *Hub) LatestStates() []model.Envelope {
- h.mu.RLock()
- defer h.mu.RUnlock()
- items := make([]model.Envelope, 0, len(h.latestState))
- for _, envelope := range h.latestState {
- items = append(items, envelope)
- }
- return items
- }
- func (h *Hub) TrafficSnapshot() TrafficSnapshot {
- h.mu.RLock()
- defer h.mu.RUnlock()
- topics := make([]TopicTrafficItem, 0, len(h.stats.Topics))
- for topic, counter := range h.stats.Topics {
- topics = append(topics, TopicTrafficItem{
- Topic: topic,
- Published: counter.Published,
- Dropped: counter.Dropped,
- Fanout: counter.Fanout,
- })
- }
- channels := make([]ChannelTrafficItem, 0, len(h.stats.Channels))
- for channelID, counter := range h.stats.Channels {
- channels = append(channels, ChannelTrafficItem{
- ChannelID: channelID,
- Published: counter.Published,
- Dropped: counter.Dropped,
- Fanout: counter.Fanout,
- })
- }
- return TrafficSnapshot{
- Published: h.stats.Published,
- Dropped: h.stats.Dropped,
- Fanout: h.stats.Fanout,
- Topics: topics,
- Channels: channels,
- }
- }
- func (h *Hub) SubscribeLive(buffer int) (uint64, <-chan model.Envelope) {
- if buffer <= 0 {
- buffer = 32
- }
- h.mu.Lock()
- defer h.mu.Unlock()
- h.nextLiveID += 1
- id := h.nextLiveID
- ch := make(chan model.Envelope, buffer)
- h.liveFeeds[id] = ch
- return id, ch
- }
- func (h *Hub) UnsubscribeLive(id uint64) {
- h.mu.Lock()
- defer h.mu.Unlock()
- ch, ok := h.liveFeeds[id]
- if !ok {
- return
- }
- delete(h.liveFeeds, id)
- close(ch)
- }
- func (h *Hub) storeLatest(envelope model.Envelope) {
- if envelope.Target.DeviceID == "" {
- return
- }
- h.mu.Lock()
- defer h.mu.Unlock()
- if len(h.latestState) >= h.maxLatest {
- for key := range h.latestState {
- delete(h.latestState, key)
- break
- }
- }
- h.latestState[latestStateKey(envelope.Target.ChannelID, envelope.Target.DeviceID)] = envelope
- }
- func (h *Hub) publishLive(envelope model.Envelope) {
- h.mu.RLock()
- feeds := make([]chan model.Envelope, 0, len(h.liveFeeds))
- for _, ch := range h.liveFeeds {
- feeds = append(feeds, ch)
- }
- h.mu.RUnlock()
- for _, ch := range feeds {
- select {
- case ch <- envelope:
- default:
- }
- }
- }
- func (h *Hub) recordTraffic(envelope model.Envelope, matched int, dropped bool) {
- h.mu.Lock()
- defer h.mu.Unlock()
- h.stats.Published += 1
- h.stats.Fanout += uint64(matched)
- if dropped {
- h.stats.Dropped += 1
- }
- topicKey := envelope.Topic
- if topicKey == "" {
- topicKey = "--"
- }
- topicCounter := h.stats.Topics[topicKey]
- if topicCounter == nil {
- topicCounter = &trafficCounter{}
- h.stats.Topics[topicKey] = topicCounter
- }
- topicCounter.Published += 1
- topicCounter.Fanout += uint64(matched)
- if dropped {
- topicCounter.Dropped += 1
- }
- channelKey := envelope.Target.ChannelID
- if channelKey == "" {
- channelKey = "--"
- }
- channelCounter := h.stats.Channels[channelKey]
- if channelCounter == nil {
- channelCounter = &trafficCounter{}
- h.stats.Channels[channelKey] = channelCounter
- }
- channelCounter.Published += 1
- channelCounter.Fanout += uint64(matched)
- if dropped {
- channelCounter.Dropped += 1
- }
- }
- func matchesAny(envelope model.Envelope, subscriptions []model.Subscription) bool {
- if len(subscriptions) == 0 {
- return false
- }
- for _, subscription := range subscriptions {
- if matches(envelope, subscription) {
- return true
- }
- }
- return false
- }
- func matches(envelope model.Envelope, subscription model.Subscription) bool {
- if subscription.ChannelID != "" && subscription.ChannelID != envelope.Target.ChannelID {
- return false
- }
- if subscription.DeviceID != "" && subscription.DeviceID != envelope.Target.DeviceID {
- return false
- }
- if subscription.GroupID != "" && subscription.GroupID != envelope.Target.GroupID {
- return false
- }
- if subscription.Topic != "" && subscription.Topic != envelope.Topic {
- return false
- }
- return true
- }
- func latestStateKey(channelID string, deviceID string) string {
- if channelID == "" {
- return deviceID
- }
- return channelID + "::" + deviceID
- }
|