| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- package channel
- import (
- "crypto/rand"
- "encoding/hex"
- "errors"
- "sort"
- "strconv"
- "sync"
- "time"
- "realtime-gateway/internal/model"
- )
- const (
- DeliveryModeCacheLatest = "cache_latest"
- DeliveryModeDropIfNoConsumer = "drop_if_no_consumer"
- )
- var (
- ErrChannelNotFound = errors.New("channel not found")
- ErrChannelExpired = errors.New("channel expired")
- ErrChannelUnauthorized = errors.New("channel token invalid")
- ErrInvalidRole = errors.New("invalid channel role")
- )
- type CreateRequest struct {
- Label string
- DeliveryMode string
- TTLSeconds int
- }
- type Snapshot struct {
- ID string `json:"id"`
- Label string `json:"label,omitempty"`
- DeliveryMode string `json:"deliveryMode"`
- CreatedAt time.Time `json:"createdAt"`
- ExpiresAt time.Time `json:"expiresAt"`
- ActiveProducers int `json:"activeProducers"`
- ActiveConsumers int `json:"activeConsumers"`
- ActiveControllers int `json:"activeControllers"`
- }
- type CreateResponse struct {
- Snapshot Snapshot `json:"snapshot"`
- ProducerToken string `json:"producerToken"`
- ConsumerToken string `json:"consumerToken"`
- ControllerToken string `json:"controllerToken"`
- }
- type Manager struct {
- mu sync.RWMutex
- defaultTTL time.Duration
- channels map[string]*channelState
- }
- type channelState struct {
- id string
- label string
- deliveryMode string
- createdAt time.Time
- expiresAt time.Time
- producerToken string
- consumerToken string
- controllerToken string
- activeProducers int
- activeConsumers int
- activeControllers int
- }
- func NewManager(defaultTTL time.Duration) *Manager {
- if defaultTTL <= 0 {
- defaultTTL = 8 * time.Hour
- }
- return &Manager{
- defaultTTL: defaultTTL,
- channels: make(map[string]*channelState),
- }
- }
- func (m *Manager) Create(request CreateRequest) (CreateResponse, error) {
- m.mu.Lock()
- defer m.mu.Unlock()
- now := time.Now()
- deliveryMode := normalizeDeliveryMode(request.DeliveryMode)
- ttl := m.defaultTTL
- if request.TTLSeconds > 0 {
- ttl = time.Duration(request.TTLSeconds) * time.Second
- }
- state := &channelState{
- id: "ch-" + randomHex(6),
- label: request.Label,
- deliveryMode: deliveryMode,
- createdAt: now,
- expiresAt: now.Add(ttl),
- producerToken: randomHex(16),
- consumerToken: randomHex(16),
- controllerToken: randomHex(16),
- }
- m.channels[state.id] = state
- return CreateResponse{
- Snapshot: snapshotOf(state),
- ProducerToken: state.producerToken,
- ConsumerToken: state.consumerToken,
- ControllerToken: state.controllerToken,
- }, nil
- }
- func (m *Manager) Join(channelID string, token string, role model.Role) (Snapshot, error) {
- m.mu.RLock()
- defer m.mu.RUnlock()
- state, ok := m.channels[channelID]
- if !ok {
- return Snapshot{}, ErrChannelNotFound
- }
- if state.expiresAt.Before(time.Now()) {
- return Snapshot{}, ErrChannelExpired
- }
- if !authorizeToken(state, role, token) {
- return Snapshot{}, ErrChannelUnauthorized
- }
- return snapshotOf(state), nil
- }
- func (m *Manager) Bind(channelID string, role model.Role) error {
- m.mu.Lock()
- defer m.mu.Unlock()
- state, ok := m.channels[channelID]
- if !ok {
- return ErrChannelNotFound
- }
- if state.expiresAt.Before(time.Now()) {
- return ErrChannelExpired
- }
- switch role {
- case model.RoleProducer:
- state.activeProducers++
- case model.RoleConsumer:
- state.activeConsumers++
- case model.RoleController:
- state.activeControllers++
- default:
- return ErrInvalidRole
- }
- return nil
- }
- func (m *Manager) Unbind(channelID string, role model.Role) {
- m.mu.Lock()
- defer m.mu.Unlock()
- state, ok := m.channels[channelID]
- if !ok {
- return
- }
- switch role {
- case model.RoleProducer:
- if state.activeProducers > 0 {
- state.activeProducers--
- }
- case model.RoleConsumer:
- if state.activeConsumers > 0 {
- state.activeConsumers--
- }
- case model.RoleController:
- if state.activeControllers > 0 {
- state.activeControllers--
- }
- }
- }
- func (m *Manager) DeliveryMode(channelID string) string {
- m.mu.RLock()
- defer m.mu.RUnlock()
- state, ok := m.channels[channelID]
- if !ok {
- return DeliveryModeCacheLatest
- }
- return state.deliveryMode
- }
- func (m *Manager) HasConsumers(channelID string) bool {
- m.mu.RLock()
- defer m.mu.RUnlock()
- state, ok := m.channels[channelID]
- if !ok {
- return false
- }
- return state.activeConsumers > 0
- }
- func (m *Manager) List() []Snapshot {
- m.mu.RLock()
- defer m.mu.RUnlock()
- now := time.Now()
- items := make([]Snapshot, 0, len(m.channels))
- for _, state := range m.channels {
- if state.expiresAt.Before(now) {
- continue
- }
- items = append(items, snapshotOf(state))
- }
- sort.Slice(items, func(i int, j int) bool {
- return items[i].CreatedAt.After(items[j].CreatedAt)
- })
- return items
- }
- func normalizeDeliveryMode(value string) string {
- switch value {
- case DeliveryModeDropIfNoConsumer:
- return DeliveryModeDropIfNoConsumer
- default:
- return DeliveryModeCacheLatest
- }
- }
- func authorizeToken(state *channelState, role model.Role, token string) bool {
- switch role {
- case model.RoleProducer:
- return state.producerToken == token
- case model.RoleConsumer:
- return state.consumerToken == token
- case model.RoleController:
- return state.controllerToken == token
- default:
- return false
- }
- }
- func snapshotOf(state *channelState) Snapshot {
- return Snapshot{
- ID: state.id,
- Label: state.label,
- DeliveryMode: state.deliveryMode,
- CreatedAt: state.createdAt,
- ExpiresAt: state.expiresAt,
- ActiveProducers: state.activeProducers,
- ActiveConsumers: state.activeConsumers,
- ActiveControllers: state.activeControllers,
- }
- }
- func randomHex(size int) string {
- if size <= 0 {
- size = 8
- }
- buf := make([]byte, size)
- if _, err := rand.Read(buf); err != nil {
- return strconv.FormatInt(time.Now().UnixNano(), 16)
- }
- return hex.EncodeToString(buf)
- }
|