package router import ( "encoding/json" "sync" "realtime-gateway/internal/model" ) type Subscriber interface { ID() string Send(message model.ServerMessage) error } type Hub struct { mu sync.RWMutex subscribers map[string]Subscriber filters map[string][]model.Subscription latestState map[string]model.Envelope liveFeeds map[uint64]chan model.Envelope nextLiveID uint64 stats trafficStats maxLatest int } type TrafficSnapshot struct { Published uint64 `json:"published"` Dropped uint64 `json:"dropped"` Fanout uint64 `json:"fanout"` Topics []TopicTrafficItem `json:"topics"` Channels []ChannelTrafficItem `json:"channels"` } type TopicTrafficItem struct { Topic string `json:"topic"` Published uint64 `json:"published"` Dropped uint64 `json:"dropped"` Fanout uint64 `json:"fanout"` } type ChannelTrafficItem struct { ChannelID string `json:"channelId"` Published uint64 `json:"published"` Dropped uint64 `json:"dropped"` Fanout uint64 `json:"fanout"` } type trafficStats struct { Published uint64 Dropped uint64 Fanout uint64 Topics map[string]*trafficCounter Channels map[string]*trafficCounter } type trafficCounter struct { Published uint64 Dropped uint64 Fanout uint64 } type PublishResult struct { Matched int `json:"matched"` Stored bool `json:"stored"` Dropped bool `json:"dropped"` } func NewHub(maxLatest int) *Hub { return &Hub{ subscribers: make(map[string]Subscriber), filters: make(map[string][]model.Subscription), latestState: make(map[string]model.Envelope), liveFeeds: make(map[uint64]chan model.Envelope), stats: trafficStats{ Topics: make(map[string]*trafficCounter), Channels: make(map[string]*trafficCounter), }, maxLatest: maxLatest, } } func (h *Hub) Register(subscriber Subscriber, subscriptions []model.Subscription) { h.mu.Lock() defer h.mu.Unlock() h.subscribers[subscriber.ID()] = subscriber h.filters[subscriber.ID()] = subscriptions } func (h *Hub) Unregister(subscriberID string) { h.mu.Lock() defer h.mu.Unlock() delete(h.subscribers, subscriberID) delete(h.filters, subscriberID) } func (h *Hub) UpdateSubscriptions(subscriberID string, subscriptions []model.Subscription) { h.mu.Lock() defer h.mu.Unlock() h.filters[subscriberID] = subscriptions } func (h *Hub) Publish(envelope model.Envelope, deliveryMode string) PublishResult { h.mu.RLock() matches := make([]Subscriber, 0, len(h.subscribers)) for subscriberID, subscriber := range h.subscribers { subscriptions := h.filters[subscriberID] if !matchesAny(envelope, subscriptions) { continue } matches = append(matches, subscriber) } h.mu.RUnlock() if deliveryMode == "drop_if_no_consumer" && len(matches) == 0 { h.recordTraffic(envelope, 0, true) return PublishResult{ Matched: 0, Stored: false, Dropped: true, } } h.storeLatest(envelope) h.publishLive(envelope) h.recordTraffic(envelope, len(matches), false) for _, subscriber := range matches { _ = subscriber.Send(model.ServerMessage{ Type: "event", Envelope: &envelope, }) } return PublishResult{ Matched: len(matches), Stored: true, Dropped: false, } } func (h *Hub) Snapshot(channelID string, deviceID string) (json.RawMessage, bool) { h.mu.RLock() defer h.mu.RUnlock() envelope, ok := h.latestState[latestStateKey(channelID, deviceID)] if !ok { return nil, false } data, err := json.Marshal(envelope) if err != nil { return nil, false } return data, true } func (h *Hub) Stats() (subscriberCount int, latestStateCount int) { h.mu.RLock() defer h.mu.RUnlock() return len(h.subscribers), len(h.latestState) } func (h *Hub) LatestStates() []model.Envelope { h.mu.RLock() defer h.mu.RUnlock() items := make([]model.Envelope, 0, len(h.latestState)) for _, envelope := range h.latestState { items = append(items, envelope) } return items } func (h *Hub) TrafficSnapshot() TrafficSnapshot { h.mu.RLock() defer h.mu.RUnlock() topics := make([]TopicTrafficItem, 0, len(h.stats.Topics)) for topic, counter := range h.stats.Topics { topics = append(topics, TopicTrafficItem{ Topic: topic, Published: counter.Published, Dropped: counter.Dropped, Fanout: counter.Fanout, }) } channels := make([]ChannelTrafficItem, 0, len(h.stats.Channels)) for channelID, counter := range h.stats.Channels { channels = append(channels, ChannelTrafficItem{ ChannelID: channelID, Published: counter.Published, Dropped: counter.Dropped, Fanout: counter.Fanout, }) } return TrafficSnapshot{ Published: h.stats.Published, Dropped: h.stats.Dropped, Fanout: h.stats.Fanout, Topics: topics, Channels: channels, } } func (h *Hub) SubscribeLive(buffer int) (uint64, <-chan model.Envelope) { if buffer <= 0 { buffer = 32 } h.mu.Lock() defer h.mu.Unlock() h.nextLiveID += 1 id := h.nextLiveID ch := make(chan model.Envelope, buffer) h.liveFeeds[id] = ch return id, ch } func (h *Hub) UnsubscribeLive(id uint64) { h.mu.Lock() defer h.mu.Unlock() ch, ok := h.liveFeeds[id] if !ok { return } delete(h.liveFeeds, id) close(ch) } func (h *Hub) storeLatest(envelope model.Envelope) { if envelope.Target.DeviceID == "" { return } h.mu.Lock() defer h.mu.Unlock() if len(h.latestState) >= h.maxLatest { for key := range h.latestState { delete(h.latestState, key) break } } h.latestState[latestStateKey(envelope.Target.ChannelID, envelope.Target.DeviceID)] = envelope } func (h *Hub) publishLive(envelope model.Envelope) { h.mu.RLock() feeds := make([]chan model.Envelope, 0, len(h.liveFeeds)) for _, ch := range h.liveFeeds { feeds = append(feeds, ch) } h.mu.RUnlock() for _, ch := range feeds { select { case ch <- envelope: default: } } } func (h *Hub) recordTraffic(envelope model.Envelope, matched int, dropped bool) { h.mu.Lock() defer h.mu.Unlock() h.stats.Published += 1 h.stats.Fanout += uint64(matched) if dropped { h.stats.Dropped += 1 } topicKey := envelope.Topic if topicKey == "" { topicKey = "--" } topicCounter := h.stats.Topics[topicKey] if topicCounter == nil { topicCounter = &trafficCounter{} h.stats.Topics[topicKey] = topicCounter } topicCounter.Published += 1 topicCounter.Fanout += uint64(matched) if dropped { topicCounter.Dropped += 1 } channelKey := envelope.Target.ChannelID if channelKey == "" { channelKey = "--" } channelCounter := h.stats.Channels[channelKey] if channelCounter == nil { channelCounter = &trafficCounter{} h.stats.Channels[channelKey] = channelCounter } channelCounter.Published += 1 channelCounter.Fanout += uint64(matched) if dropped { channelCounter.Dropped += 1 } } func matchesAny(envelope model.Envelope, subscriptions []model.Subscription) bool { if len(subscriptions) == 0 { return false } for _, subscription := range subscriptions { if matches(envelope, subscription) { return true } } return false } func matches(envelope model.Envelope, subscription model.Subscription) bool { if subscription.ChannelID != "" && subscription.ChannelID != envelope.Target.ChannelID { return false } if subscription.DeviceID != "" && subscription.DeviceID != envelope.Target.DeviceID { return false } if subscription.GroupID != "" && subscription.GroupID != envelope.Target.GroupID { return false } if subscription.Topic != "" && subscription.Topic != envelope.Topic { return false } return true } func latestStateKey(channelID string, deviceID string) string { if channelID == "" { return deviceID } return channelID + "::" + deviceID }