admin_ui.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. package gateway
  2. import (
  3. "embed"
  4. "encoding/json"
  5. "fmt"
  6. "io/fs"
  7. "net/http"
  8. "sort"
  9. "strings"
  10. "time"
  11. "realtime-gateway/internal/model"
  12. )
  13. //go:embed adminui/*
  14. var adminUIFiles embed.FS
  15. type adminOverview struct {
  16. Status string `json:"status"`
  17. StartedAt time.Time `json:"startedAt"`
  18. Now time.Time `json:"now"`
  19. UptimeSeconds int64 `json:"uptimeSeconds"`
  20. HTTPListen string `json:"httpListen"`
  21. Anonymous bool `json:"anonymousConsumers"`
  22. Metrics map[string]any `json:"metrics"`
  23. Auth map[string]any `json:"auth"`
  24. Endpoints map[string]any `json:"endpoints"`
  25. }
  26. func (s *Server) registerAdminRoutes(mux *http.ServeMux) error {
  27. sub, err := fs.Sub(adminUIFiles, "adminui")
  28. if err != nil {
  29. return err
  30. }
  31. fileServer := http.FileServer(http.FS(sub))
  32. mux.Handle("/assets/", http.StripPrefix("/assets/", noStoreHandler(fileServer)))
  33. mux.HandleFunc("/", s.handleAdminIndex)
  34. mux.HandleFunc("/admin", s.handleAdminIndex)
  35. mux.HandleFunc("/api/admin/overview", s.handleAdminOverview)
  36. mux.HandleFunc("/api/admin/sessions", s.handleAdminSessions)
  37. mux.HandleFunc("/api/admin/latest", s.handleAdminLatest)
  38. mux.HandleFunc("/api/admin/traffic", s.handleAdminTraffic)
  39. mux.HandleFunc("/api/admin/live", s.handleAdminLive)
  40. return nil
  41. }
  42. func (s *Server) handleAdminIndex(w http.ResponseWriter, r *http.Request) {
  43. if r.URL.Path != "/" && r.URL.Path != "/admin" {
  44. http.NotFound(w, r)
  45. return
  46. }
  47. data, err := adminUIFiles.ReadFile("adminui/index.html")
  48. if err != nil {
  49. http.Error(w, "admin ui unavailable", http.StatusInternalServerError)
  50. return
  51. }
  52. w.Header().Set("Content-Type", "text/html; charset=utf-8")
  53. w.Header().Set("Cache-Control", "no-store")
  54. _, _ = w.Write(data)
  55. }
  56. func noStoreHandler(next http.Handler) http.Handler {
  57. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  58. w.Header().Set("Cache-Control", "no-store")
  59. next.ServeHTTP(w, r)
  60. })
  61. }
  62. func (s *Server) handleAdminOverview(w http.ResponseWriter, _ *http.Request) {
  63. subscriberCount, latestStateCount := s.hub.Stats()
  64. traffic := s.hub.TrafficSnapshot()
  65. now := time.Now()
  66. writeJSON(w, http.StatusOK, adminOverview{
  67. Status: "ok",
  68. StartedAt: s.startedAt,
  69. Now: now,
  70. UptimeSeconds: int64(now.Sub(s.startedAt).Seconds()),
  71. HTTPListen: s.cfg.Server.HTTPListen,
  72. Anonymous: s.cfg.Auth.AllowAnonymousConsumers,
  73. Metrics: map[string]any{
  74. "sessions": s.sessions.Count(),
  75. "subscribers": subscriberCount,
  76. "latestState": latestStateCount,
  77. "channels": len(s.channels.List()),
  78. "pluginHandlers": s.plugins.HandlerCount(),
  79. "published": traffic.Published,
  80. "dropped": traffic.Dropped,
  81. "fanout": traffic.Fanout,
  82. },
  83. Auth: map[string]any{
  84. "producerTokens": len(s.cfg.Auth.ProducerTokens),
  85. "consumerTokens": len(s.cfg.Auth.ConsumerTokens),
  86. "controllerTokens": len(s.cfg.Auth.ControllerTokens),
  87. },
  88. Endpoints: map[string]any{
  89. "websocket": "/ws",
  90. "health": "/healthz",
  91. "metrics": "/metrics",
  92. "createChannel": "/api/channel/create",
  93. "channels": "/api/admin/channels",
  94. "traffic": "/api/admin/traffic",
  95. "admin": "/admin",
  96. },
  97. })
  98. }
  99. func (s *Server) handleAdminSessions(w http.ResponseWriter, _ *http.Request) {
  100. snapshots := s.sessions.List()
  101. sort.Slice(snapshots, func(i int, j int) bool {
  102. return snapshots[i].CreatedAt.After(snapshots[j].CreatedAt)
  103. })
  104. writeJSON(w, http.StatusOK, map[string]any{
  105. "items": snapshots,
  106. "count": len(snapshots),
  107. })
  108. }
  109. func (s *Server) handleAdminLatest(w http.ResponseWriter, r *http.Request) {
  110. envelopes := s.hub.LatestStates()
  111. sort.Slice(envelopes, func(i int, j int) bool {
  112. return envelopes[i].Timestamp > envelopes[j].Timestamp
  113. })
  114. query := strings.TrimSpace(r.URL.Query().Get("topic"))
  115. if query != "" {
  116. filtered := make([]any, 0, len(envelopes))
  117. for _, envelope := range envelopes {
  118. if envelope.Topic != query {
  119. continue
  120. }
  121. filtered = append(filtered, adminLatestItem(envelope))
  122. }
  123. writeJSON(w, http.StatusOK, map[string]any{
  124. "items": filtered,
  125. "count": len(filtered),
  126. })
  127. return
  128. }
  129. items := make([]any, 0, len(envelopes))
  130. for _, envelope := range envelopes {
  131. items = append(items, adminLatestItem(envelope))
  132. }
  133. writeJSON(w, http.StatusOK, map[string]any{
  134. "items": items,
  135. "count": len(items),
  136. })
  137. }
  138. func (s *Server) handleAdminLive(w http.ResponseWriter, r *http.Request) {
  139. flusher, ok := w.(http.Flusher)
  140. if !ok {
  141. http.Error(w, "streaming unsupported", http.StatusInternalServerError)
  142. return
  143. }
  144. topic := strings.TrimSpace(r.URL.Query().Get("topic"))
  145. channelID := strings.TrimSpace(r.URL.Query().Get("channelId"))
  146. deviceID := strings.TrimSpace(r.URL.Query().Get("deviceId"))
  147. w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
  148. w.Header().Set("Cache-Control", "no-store")
  149. w.Header().Set("Connection", "keep-alive")
  150. w.Header().Set("X-Accel-Buffering", "no")
  151. id, stream := s.hub.SubscribeLive(64)
  152. defer s.hub.UnsubscribeLive(id)
  153. fmt.Fprint(w, ": live stream ready\n\n")
  154. flusher.Flush()
  155. ping := time.NewTicker(15 * time.Second)
  156. defer ping.Stop()
  157. ctx := r.Context()
  158. for {
  159. select {
  160. case <-ctx.Done():
  161. return
  162. case <-ping.C:
  163. fmt.Fprint(w, ": ping\n\n")
  164. flusher.Flush()
  165. case envelope, ok := <-stream:
  166. if !ok {
  167. return
  168. }
  169. if topic != "" && envelope.Topic != topic {
  170. continue
  171. }
  172. if channelID != "" && envelope.Target.ChannelID != channelID {
  173. continue
  174. }
  175. if deviceID != "" && envelope.Target.DeviceID != deviceID {
  176. continue
  177. }
  178. data, err := json.Marshal(adminLatestItem(envelope))
  179. if err != nil {
  180. continue
  181. }
  182. fmt.Fprintf(w, "event: envelope\ndata: %s\n\n", data)
  183. flusher.Flush()
  184. }
  185. }
  186. }
  187. func (s *Server) handleAdminTraffic(w http.ResponseWriter, _ *http.Request) {
  188. writeJSON(w, http.StatusOK, s.hub.TrafficSnapshot())
  189. }
  190. func adminLatestItem(envelope model.Envelope) map[string]any {
  191. payload := map[string]any{}
  192. _ = json.Unmarshal(envelope.Payload, &payload)
  193. return map[string]any{
  194. "timestamp": envelope.Timestamp,
  195. "topic": envelope.Topic,
  196. "channelId": envelope.Target.ChannelID,
  197. "deviceId": envelope.Target.DeviceID,
  198. "groupId": envelope.Target.GroupID,
  199. "sourceId": envelope.Source.ID,
  200. "mode": envelope.Source.Mode,
  201. "payload": payload,
  202. }
  203. }