| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- package gateway
- import (
- "embed"
- "encoding/json"
- "fmt"
- "io/fs"
- "net/http"
- "sort"
- "strings"
- "time"
- "realtime-gateway/internal/model"
- )
- //go:embed adminui/*
- var adminUIFiles embed.FS
- type adminOverview struct {
- Status string `json:"status"`
- StartedAt time.Time `json:"startedAt"`
- Now time.Time `json:"now"`
- UptimeSeconds int64 `json:"uptimeSeconds"`
- HTTPListen string `json:"httpListen"`
- Anonymous bool `json:"anonymousConsumers"`
- Metrics map[string]any `json:"metrics"`
- Auth map[string]any `json:"auth"`
- Endpoints map[string]any `json:"endpoints"`
- }
- func (s *Server) registerAdminRoutes(mux *http.ServeMux) error {
- sub, err := fs.Sub(adminUIFiles, "adminui")
- if err != nil {
- return err
- }
- fileServer := http.FileServer(http.FS(sub))
- mux.Handle("/assets/", http.StripPrefix("/assets/", noStoreHandler(fileServer)))
- mux.HandleFunc("/", s.handleAdminIndex)
- mux.HandleFunc("/admin", s.handleAdminIndex)
- mux.HandleFunc("/api/admin/overview", s.handleAdminOverview)
- mux.HandleFunc("/api/admin/sessions", s.handleAdminSessions)
- mux.HandleFunc("/api/admin/latest", s.handleAdminLatest)
- mux.HandleFunc("/api/admin/traffic", s.handleAdminTraffic)
- mux.HandleFunc("/api/admin/live", s.handleAdminLive)
- return nil
- }
- func (s *Server) handleAdminIndex(w http.ResponseWriter, r *http.Request) {
- if r.URL.Path != "/" && r.URL.Path != "/admin" {
- http.NotFound(w, r)
- return
- }
- data, err := adminUIFiles.ReadFile("adminui/index.html")
- if err != nil {
- http.Error(w, "admin ui unavailable", http.StatusInternalServerError)
- return
- }
- w.Header().Set("Content-Type", "text/html; charset=utf-8")
- w.Header().Set("Cache-Control", "no-store")
- _, _ = w.Write(data)
- }
- func noStoreHandler(next http.Handler) http.Handler {
- return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Cache-Control", "no-store")
- next.ServeHTTP(w, r)
- })
- }
- func (s *Server) handleAdminOverview(w http.ResponseWriter, _ *http.Request) {
- subscriberCount, latestStateCount := s.hub.Stats()
- traffic := s.hub.TrafficSnapshot()
- now := time.Now()
- writeJSON(w, http.StatusOK, adminOverview{
- Status: "ok",
- StartedAt: s.startedAt,
- Now: now,
- UptimeSeconds: int64(now.Sub(s.startedAt).Seconds()),
- HTTPListen: s.cfg.Server.HTTPListen,
- Anonymous: s.cfg.Auth.AllowAnonymousConsumers,
- Metrics: map[string]any{
- "sessions": s.sessions.Count(),
- "subscribers": subscriberCount,
- "latestState": latestStateCount,
- "channels": len(s.channels.List()),
- "pluginHandlers": s.plugins.HandlerCount(),
- "published": traffic.Published,
- "dropped": traffic.Dropped,
- "fanout": traffic.Fanout,
- },
- Auth: map[string]any{
- "producerTokens": len(s.cfg.Auth.ProducerTokens),
- "consumerTokens": len(s.cfg.Auth.ConsumerTokens),
- "controllerTokens": len(s.cfg.Auth.ControllerTokens),
- },
- Endpoints: map[string]any{
- "websocket": "/ws",
- "health": "/healthz",
- "metrics": "/metrics",
- "createChannel": "/api/channel/create",
- "channels": "/api/admin/channels",
- "traffic": "/api/admin/traffic",
- "admin": "/admin",
- },
- })
- }
- func (s *Server) handleAdminSessions(w http.ResponseWriter, _ *http.Request) {
- snapshots := s.sessions.List()
- sort.Slice(snapshots, func(i int, j int) bool {
- return snapshots[i].CreatedAt.After(snapshots[j].CreatedAt)
- })
- writeJSON(w, http.StatusOK, map[string]any{
- "items": snapshots,
- "count": len(snapshots),
- })
- }
- func (s *Server) handleAdminLatest(w http.ResponseWriter, r *http.Request) {
- envelopes := s.hub.LatestStates()
- sort.Slice(envelopes, func(i int, j int) bool {
- return envelopes[i].Timestamp > envelopes[j].Timestamp
- })
- query := strings.TrimSpace(r.URL.Query().Get("topic"))
- if query != "" {
- filtered := make([]any, 0, len(envelopes))
- for _, envelope := range envelopes {
- if envelope.Topic != query {
- continue
- }
- filtered = append(filtered, adminLatestItem(envelope))
- }
- writeJSON(w, http.StatusOK, map[string]any{
- "items": filtered,
- "count": len(filtered),
- })
- return
- }
- items := make([]any, 0, len(envelopes))
- for _, envelope := range envelopes {
- items = append(items, adminLatestItem(envelope))
- }
- writeJSON(w, http.StatusOK, map[string]any{
- "items": items,
- "count": len(items),
- })
- }
- func (s *Server) handleAdminLive(w http.ResponseWriter, r *http.Request) {
- flusher, ok := w.(http.Flusher)
- if !ok {
- http.Error(w, "streaming unsupported", http.StatusInternalServerError)
- return
- }
- topic := strings.TrimSpace(r.URL.Query().Get("topic"))
- channelID := strings.TrimSpace(r.URL.Query().Get("channelId"))
- deviceID := strings.TrimSpace(r.URL.Query().Get("deviceId"))
- w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
- w.Header().Set("Cache-Control", "no-store")
- w.Header().Set("Connection", "keep-alive")
- w.Header().Set("X-Accel-Buffering", "no")
- id, stream := s.hub.SubscribeLive(64)
- defer s.hub.UnsubscribeLive(id)
- fmt.Fprint(w, ": live stream ready\n\n")
- flusher.Flush()
- ping := time.NewTicker(15 * time.Second)
- defer ping.Stop()
- ctx := r.Context()
- for {
- select {
- case <-ctx.Done():
- return
- case <-ping.C:
- fmt.Fprint(w, ": ping\n\n")
- flusher.Flush()
- case envelope, ok := <-stream:
- if !ok {
- return
- }
- if topic != "" && envelope.Topic != topic {
- continue
- }
- if channelID != "" && envelope.Target.ChannelID != channelID {
- continue
- }
- if deviceID != "" && envelope.Target.DeviceID != deviceID {
- continue
- }
- data, err := json.Marshal(adminLatestItem(envelope))
- if err != nil {
- continue
- }
- fmt.Fprintf(w, "event: envelope\ndata: %s\n\n", data)
- flusher.Flush()
- }
- }
- }
- func (s *Server) handleAdminTraffic(w http.ResponseWriter, _ *http.Request) {
- writeJSON(w, http.StatusOK, s.hub.TrafficSnapshot())
- }
- func adminLatestItem(envelope model.Envelope) map[string]any {
- payload := map[string]any{}
- _ = json.Unmarshal(envelope.Payload, &payload)
- return map[string]any{
- "timestamp": envelope.Timestamp,
- "topic": envelope.Topic,
- "channelId": envelope.Target.ChannelID,
- "deviceId": envelope.Target.DeviceID,
- "groupId": envelope.Target.GroupID,
- "sourceId": envelope.Source.ID,
- "mode": envelope.Source.Mode,
- "payload": payload,
- }
- }
|