package channel import ( "crypto/rand" "encoding/hex" "errors" "sort" "strconv" "sync" "time" "realtime-gateway/internal/model" ) const ( DeliveryModeCacheLatest = "cache_latest" DeliveryModeDropIfNoConsumer = "drop_if_no_consumer" ) var ( ErrChannelNotFound = errors.New("channel not found") ErrChannelExpired = errors.New("channel expired") ErrChannelUnauthorized = errors.New("channel token invalid") ErrInvalidRole = errors.New("invalid channel role") ) type CreateRequest struct { Label string DeliveryMode string TTLSeconds int } type Snapshot struct { ID string `json:"id"` Label string `json:"label,omitempty"` DeliveryMode string `json:"deliveryMode"` CreatedAt time.Time `json:"createdAt"` ExpiresAt time.Time `json:"expiresAt"` ActiveProducers int `json:"activeProducers"` ActiveConsumers int `json:"activeConsumers"` ActiveControllers int `json:"activeControllers"` } type CreateResponse struct { Snapshot Snapshot `json:"snapshot"` ProducerToken string `json:"producerToken"` ConsumerToken string `json:"consumerToken"` ControllerToken string `json:"controllerToken"` } type Manager struct { mu sync.RWMutex defaultTTL time.Duration channels map[string]*channelState } type channelState struct { id string label string deliveryMode string createdAt time.Time expiresAt time.Time producerToken string consumerToken string controllerToken string activeProducers int activeConsumers int activeControllers int } func NewManager(defaultTTL time.Duration) *Manager { if defaultTTL <= 0 { defaultTTL = 8 * time.Hour } return &Manager{ defaultTTL: defaultTTL, channels: make(map[string]*channelState), } } func (m *Manager) Create(request CreateRequest) (CreateResponse, error) { m.mu.Lock() defer m.mu.Unlock() now := time.Now() deliveryMode := normalizeDeliveryMode(request.DeliveryMode) ttl := m.defaultTTL if request.TTLSeconds > 0 { ttl = time.Duration(request.TTLSeconds) * time.Second } state := &channelState{ id: "ch-" + randomHex(6), label: request.Label, deliveryMode: deliveryMode, createdAt: now, expiresAt: now.Add(ttl), producerToken: randomHex(16), consumerToken: randomHex(16), controllerToken: randomHex(16), } m.channels[state.id] = state return CreateResponse{ Snapshot: snapshotOf(state), ProducerToken: state.producerToken, ConsumerToken: state.consumerToken, ControllerToken: state.controllerToken, }, nil } func (m *Manager) Join(channelID string, token string, role model.Role) (Snapshot, error) { m.mu.RLock() defer m.mu.RUnlock() state, ok := m.channels[channelID] if !ok { return Snapshot{}, ErrChannelNotFound } if state.expiresAt.Before(time.Now()) { return Snapshot{}, ErrChannelExpired } if !authorizeToken(state, role, token) { return Snapshot{}, ErrChannelUnauthorized } return snapshotOf(state), nil } func (m *Manager) Bind(channelID string, role model.Role) error { m.mu.Lock() defer m.mu.Unlock() state, ok := m.channels[channelID] if !ok { return ErrChannelNotFound } if state.expiresAt.Before(time.Now()) { return ErrChannelExpired } switch role { case model.RoleProducer: state.activeProducers++ case model.RoleConsumer: state.activeConsumers++ case model.RoleController: state.activeControllers++ default: return ErrInvalidRole } return nil } func (m *Manager) Unbind(channelID string, role model.Role) { m.mu.Lock() defer m.mu.Unlock() state, ok := m.channels[channelID] if !ok { return } switch role { case model.RoleProducer: if state.activeProducers > 0 { state.activeProducers-- } case model.RoleConsumer: if state.activeConsumers > 0 { state.activeConsumers-- } case model.RoleController: if state.activeControllers > 0 { state.activeControllers-- } } } func (m *Manager) DeliveryMode(channelID string) string { m.mu.RLock() defer m.mu.RUnlock() state, ok := m.channels[channelID] if !ok { return DeliveryModeCacheLatest } return state.deliveryMode } func (m *Manager) HasConsumers(channelID string) bool { m.mu.RLock() defer m.mu.RUnlock() state, ok := m.channels[channelID] if !ok { return false } return state.activeConsumers > 0 } func (m *Manager) List() []Snapshot { m.mu.RLock() defer m.mu.RUnlock() now := time.Now() items := make([]Snapshot, 0, len(m.channels)) for _, state := range m.channels { if state.expiresAt.Before(now) { continue } items = append(items, snapshotOf(state)) } sort.Slice(items, func(i int, j int) bool { return items[i].CreatedAt.After(items[j].CreatedAt) }) return items } func normalizeDeliveryMode(value string) string { switch value { case DeliveryModeDropIfNoConsumer: return DeliveryModeDropIfNoConsumer default: return DeliveryModeCacheLatest } } func authorizeToken(state *channelState, role model.Role, token string) bool { switch role { case model.RoleProducer: return state.producerToken == token case model.RoleConsumer: return state.consumerToken == token case model.RoleController: return state.controllerToken == token default: return false } } func snapshotOf(state *channelState) Snapshot { return Snapshot{ ID: state.id, Label: state.label, DeliveryMode: state.deliveryMode, CreatedAt: state.createdAt, ExpiresAt: state.expiresAt, ActiveProducers: state.activeProducers, ActiveConsumers: state.activeConsumers, ActiveControllers: state.activeControllers, } } func randomHex(size int) string { if size <= 0 { size = 8 } buf := make([]byte, size) if _, err := rand.Read(buf); err != nil { return strconv.FormatInt(time.Now().UnixNano(), 16) } return hex.EncodeToString(buf) }