package gateway import ( "embed" "encoding/json" "fmt" "io/fs" "net/http" "sort" "strings" "time" "realtime-gateway/internal/model" ) //go:embed adminui/* var adminUIFiles embed.FS type adminOverview struct { Status string `json:"status"` StartedAt time.Time `json:"startedAt"` Now time.Time `json:"now"` UptimeSeconds int64 `json:"uptimeSeconds"` HTTPListen string `json:"httpListen"` Anonymous bool `json:"anonymousConsumers"` Metrics map[string]any `json:"metrics"` Auth map[string]any `json:"auth"` Endpoints map[string]any `json:"endpoints"` } func (s *Server) registerAdminRoutes(mux *http.ServeMux) error { sub, err := fs.Sub(adminUIFiles, "adminui") if err != nil { return err } fileServer := http.FileServer(http.FS(sub)) mux.Handle("/assets/", http.StripPrefix("/assets/", noStoreHandler(fileServer))) mux.HandleFunc("/", s.handleAdminIndex) mux.HandleFunc("/admin", s.handleAdminIndex) mux.HandleFunc("/api/admin/overview", s.handleAdminOverview) mux.HandleFunc("/api/admin/sessions", s.handleAdminSessions) mux.HandleFunc("/api/admin/latest", s.handleAdminLatest) mux.HandleFunc("/api/admin/traffic", s.handleAdminTraffic) mux.HandleFunc("/api/admin/live", s.handleAdminLive) return nil } func (s *Server) handleAdminIndex(w http.ResponseWriter, r *http.Request) { if r.URL.Path != "/" && r.URL.Path != "/admin" { http.NotFound(w, r) return } data, err := adminUIFiles.ReadFile("adminui/index.html") if err != nil { http.Error(w, "admin ui unavailable", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Cache-Control", "no-store") _, _ = w.Write(data) } func noStoreHandler(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Cache-Control", "no-store") next.ServeHTTP(w, r) }) } func (s *Server) handleAdminOverview(w http.ResponseWriter, _ *http.Request) { subscriberCount, latestStateCount := s.hub.Stats() traffic := s.hub.TrafficSnapshot() now := time.Now() writeJSON(w, http.StatusOK, adminOverview{ Status: "ok", StartedAt: s.startedAt, Now: now, UptimeSeconds: int64(now.Sub(s.startedAt).Seconds()), HTTPListen: s.cfg.Server.HTTPListen, Anonymous: s.cfg.Auth.AllowAnonymousConsumers, Metrics: map[string]any{ "sessions": s.sessions.Count(), "subscribers": subscriberCount, "latestState": latestStateCount, "channels": len(s.channels.List()), "pluginHandlers": s.plugins.HandlerCount(), "published": traffic.Published, "dropped": traffic.Dropped, "fanout": traffic.Fanout, }, Auth: map[string]any{ "producerTokens": len(s.cfg.Auth.ProducerTokens), "consumerTokens": len(s.cfg.Auth.ConsumerTokens), "controllerTokens": len(s.cfg.Auth.ControllerTokens), }, Endpoints: map[string]any{ "websocket": "/ws", "health": "/healthz", "metrics": "/metrics", "createChannel": "/api/channel/create", "channels": "/api/admin/channels", "traffic": "/api/admin/traffic", "admin": "/admin", }, }) } func (s *Server) handleAdminSessions(w http.ResponseWriter, _ *http.Request) { snapshots := s.sessions.List() sort.Slice(snapshots, func(i int, j int) bool { return snapshots[i].CreatedAt.After(snapshots[j].CreatedAt) }) writeJSON(w, http.StatusOK, map[string]any{ "items": snapshots, "count": len(snapshots), }) } func (s *Server) handleAdminLatest(w http.ResponseWriter, r *http.Request) { envelopes := s.hub.LatestStates() sort.Slice(envelopes, func(i int, j int) bool { return envelopes[i].Timestamp > envelopes[j].Timestamp }) query := strings.TrimSpace(r.URL.Query().Get("topic")) if query != "" { filtered := make([]any, 0, len(envelopes)) for _, envelope := range envelopes { if envelope.Topic != query { continue } filtered = append(filtered, adminLatestItem(envelope)) } writeJSON(w, http.StatusOK, map[string]any{ "items": filtered, "count": len(filtered), }) return } items := make([]any, 0, len(envelopes)) for _, envelope := range envelopes { items = append(items, adminLatestItem(envelope)) } writeJSON(w, http.StatusOK, map[string]any{ "items": items, "count": len(items), }) } func (s *Server) handleAdminLive(w http.ResponseWriter, r *http.Request) { flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming unsupported", http.StatusInternalServerError) return } topic := strings.TrimSpace(r.URL.Query().Get("topic")) channelID := strings.TrimSpace(r.URL.Query().Get("channelId")) deviceID := strings.TrimSpace(r.URL.Query().Get("deviceId")) w.Header().Set("Content-Type", "text/event-stream; charset=utf-8") w.Header().Set("Cache-Control", "no-store") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") id, stream := s.hub.SubscribeLive(64) defer s.hub.UnsubscribeLive(id) fmt.Fprint(w, ": live stream ready\n\n") flusher.Flush() ping := time.NewTicker(15 * time.Second) defer ping.Stop() ctx := r.Context() for { select { case <-ctx.Done(): return case <-ping.C: fmt.Fprint(w, ": ping\n\n") flusher.Flush() case envelope, ok := <-stream: if !ok { return } if topic != "" && envelope.Topic != topic { continue } if channelID != "" && envelope.Target.ChannelID != channelID { continue } if deviceID != "" && envelope.Target.DeviceID != deviceID { continue } data, err := json.Marshal(adminLatestItem(envelope)) if err != nil { continue } fmt.Fprintf(w, "event: envelope\ndata: %s\n\n", data) flusher.Flush() } } } func (s *Server) handleAdminTraffic(w http.ResponseWriter, _ *http.Request) { writeJSON(w, http.StatusOK, s.hub.TrafficSnapshot()) } func adminLatestItem(envelope model.Envelope) map[string]any { payload := map[string]any{} _ = json.Unmarshal(envelope.Payload, &payload) return map[string]any{ "timestamp": envelope.Timestamp, "topic": envelope.Topic, "channelId": envelope.Target.ChannelID, "deviceId": envelope.Target.DeviceID, "groupId": envelope.Target.GroupID, "sourceId": envelope.Source.ID, "mode": envelope.Source.Mode, "payload": payload, } }