manager.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. package channel
  2. import (
  3. "crypto/rand"
  4. "encoding/hex"
  5. "errors"
  6. "sort"
  7. "strconv"
  8. "sync"
  9. "time"
  10. "realtime-gateway/internal/model"
  11. )
  12. const (
  13. DeliveryModeCacheLatest = "cache_latest"
  14. DeliveryModeDropIfNoConsumer = "drop_if_no_consumer"
  15. )
  16. var (
  17. ErrChannelNotFound = errors.New("channel not found")
  18. ErrChannelExpired = errors.New("channel expired")
  19. ErrChannelUnauthorized = errors.New("channel token invalid")
  20. ErrInvalidRole = errors.New("invalid channel role")
  21. )
  22. type CreateRequest struct {
  23. Label string
  24. DeliveryMode string
  25. TTLSeconds int
  26. }
  27. type Snapshot struct {
  28. ID string `json:"id"`
  29. Label string `json:"label,omitempty"`
  30. DeliveryMode string `json:"deliveryMode"`
  31. CreatedAt time.Time `json:"createdAt"`
  32. ExpiresAt time.Time `json:"expiresAt"`
  33. ActiveProducers int `json:"activeProducers"`
  34. ActiveConsumers int `json:"activeConsumers"`
  35. ActiveControllers int `json:"activeControllers"`
  36. }
  37. type CreateResponse struct {
  38. Snapshot Snapshot `json:"snapshot"`
  39. ProducerToken string `json:"producerToken"`
  40. ConsumerToken string `json:"consumerToken"`
  41. ControllerToken string `json:"controllerToken"`
  42. }
  43. type Manager struct {
  44. mu sync.RWMutex
  45. defaultTTL time.Duration
  46. channels map[string]*channelState
  47. }
  48. type channelState struct {
  49. id string
  50. label string
  51. deliveryMode string
  52. createdAt time.Time
  53. expiresAt time.Time
  54. producerToken string
  55. consumerToken string
  56. controllerToken string
  57. activeProducers int
  58. activeConsumers int
  59. activeControllers int
  60. }
  61. func NewManager(defaultTTL time.Duration) *Manager {
  62. if defaultTTL <= 0 {
  63. defaultTTL = 8 * time.Hour
  64. }
  65. return &Manager{
  66. defaultTTL: defaultTTL,
  67. channels: make(map[string]*channelState),
  68. }
  69. }
  70. func (m *Manager) Create(request CreateRequest) (CreateResponse, error) {
  71. m.mu.Lock()
  72. defer m.mu.Unlock()
  73. now := time.Now()
  74. deliveryMode := normalizeDeliveryMode(request.DeliveryMode)
  75. ttl := m.defaultTTL
  76. if request.TTLSeconds > 0 {
  77. ttl = time.Duration(request.TTLSeconds) * time.Second
  78. }
  79. state := &channelState{
  80. id: "ch-" + randomHex(6),
  81. label: request.Label,
  82. deliveryMode: deliveryMode,
  83. createdAt: now,
  84. expiresAt: now.Add(ttl),
  85. producerToken: randomHex(16),
  86. consumerToken: randomHex(16),
  87. controllerToken: randomHex(16),
  88. }
  89. m.channels[state.id] = state
  90. return CreateResponse{
  91. Snapshot: snapshotOf(state),
  92. ProducerToken: state.producerToken,
  93. ConsumerToken: state.consumerToken,
  94. ControllerToken: state.controllerToken,
  95. }, nil
  96. }
  97. func (m *Manager) Join(channelID string, token string, role model.Role) (Snapshot, error) {
  98. m.mu.RLock()
  99. defer m.mu.RUnlock()
  100. state, ok := m.channels[channelID]
  101. if !ok {
  102. return Snapshot{}, ErrChannelNotFound
  103. }
  104. if state.expiresAt.Before(time.Now()) {
  105. return Snapshot{}, ErrChannelExpired
  106. }
  107. if !authorizeToken(state, role, token) {
  108. return Snapshot{}, ErrChannelUnauthorized
  109. }
  110. return snapshotOf(state), nil
  111. }
  112. func (m *Manager) Bind(channelID string, role model.Role) error {
  113. m.mu.Lock()
  114. defer m.mu.Unlock()
  115. state, ok := m.channels[channelID]
  116. if !ok {
  117. return ErrChannelNotFound
  118. }
  119. if state.expiresAt.Before(time.Now()) {
  120. return ErrChannelExpired
  121. }
  122. switch role {
  123. case model.RoleProducer:
  124. state.activeProducers++
  125. case model.RoleConsumer:
  126. state.activeConsumers++
  127. case model.RoleController:
  128. state.activeControllers++
  129. default:
  130. return ErrInvalidRole
  131. }
  132. return nil
  133. }
  134. func (m *Manager) Unbind(channelID string, role model.Role) {
  135. m.mu.Lock()
  136. defer m.mu.Unlock()
  137. state, ok := m.channels[channelID]
  138. if !ok {
  139. return
  140. }
  141. switch role {
  142. case model.RoleProducer:
  143. if state.activeProducers > 0 {
  144. state.activeProducers--
  145. }
  146. case model.RoleConsumer:
  147. if state.activeConsumers > 0 {
  148. state.activeConsumers--
  149. }
  150. case model.RoleController:
  151. if state.activeControllers > 0 {
  152. state.activeControllers--
  153. }
  154. }
  155. }
  156. func (m *Manager) DeliveryMode(channelID string) string {
  157. m.mu.RLock()
  158. defer m.mu.RUnlock()
  159. state, ok := m.channels[channelID]
  160. if !ok {
  161. return DeliveryModeCacheLatest
  162. }
  163. return state.deliveryMode
  164. }
  165. func (m *Manager) HasConsumers(channelID string) bool {
  166. m.mu.RLock()
  167. defer m.mu.RUnlock()
  168. state, ok := m.channels[channelID]
  169. if !ok {
  170. return false
  171. }
  172. return state.activeConsumers > 0
  173. }
  174. func (m *Manager) List() []Snapshot {
  175. m.mu.RLock()
  176. defer m.mu.RUnlock()
  177. now := time.Now()
  178. items := make([]Snapshot, 0, len(m.channels))
  179. for _, state := range m.channels {
  180. if state.expiresAt.Before(now) {
  181. continue
  182. }
  183. items = append(items, snapshotOf(state))
  184. }
  185. sort.Slice(items, func(i int, j int) bool {
  186. return items[i].CreatedAt.After(items[j].CreatedAt)
  187. })
  188. return items
  189. }
  190. func normalizeDeliveryMode(value string) string {
  191. switch value {
  192. case DeliveryModeDropIfNoConsumer:
  193. return DeliveryModeDropIfNoConsumer
  194. default:
  195. return DeliveryModeCacheLatest
  196. }
  197. }
  198. func authorizeToken(state *channelState, role model.Role, token string) bool {
  199. switch role {
  200. case model.RoleProducer:
  201. return state.producerToken == token
  202. case model.RoleConsumer:
  203. return state.consumerToken == token
  204. case model.RoleController:
  205. return state.controllerToken == token
  206. default:
  207. return false
  208. }
  209. }
  210. func snapshotOf(state *channelState) Snapshot {
  211. return Snapshot{
  212. ID: state.id,
  213. Label: state.label,
  214. DeliveryMode: state.deliveryMode,
  215. CreatedAt: state.createdAt,
  216. ExpiresAt: state.expiresAt,
  217. ActiveProducers: state.activeProducers,
  218. ActiveConsumers: state.activeConsumers,
  219. ActiveControllers: state.activeControllers,
  220. }
  221. }
  222. func randomHex(size int) string {
  223. if size <= 0 {
  224. size = 8
  225. }
  226. buf := make([]byte, size)
  227. if _, err := rand.Read(buf); err != nil {
  228. return strconv.FormatInt(time.Now().UnixNano(), 16)
  229. }
  230. return hex.EncodeToString(buf)
  231. }