package gateway import ( "context" "encoding/json" "log/slog" "net/http" "time" "realtime-gateway/internal/channel" "realtime-gateway/internal/config" "realtime-gateway/internal/plugin" "realtime-gateway/internal/router" "realtime-gateway/internal/session" ) type Server struct { cfg config.Config logger *slog.Logger httpSrv *http.Server channels *channel.Manager hub *router.Hub plugins *plugin.Bus sessions *session.Manager startedAt time.Time } func NewServer(cfg config.Config, logger *slog.Logger) (*Server, error) { channels := channel.NewManager(8 * time.Hour) hub := router.NewHub(cfg.Gateway.MaxLatestStateEntries) plugins := plugin.NewBus(logger.With("component", "plugin-bus")) sessions := session.NewManager() mux := http.NewServeMux() server := &Server{ cfg: cfg, logger: logger, channels: channels, hub: hub, plugins: plugins, sessions: sessions, startedAt: time.Now(), httpSrv: &http.Server{ Addr: cfg.Server.HTTPListen, ReadTimeout: cfg.Server.ReadTimeout(), WriteTimeout: cfg.Server.WriteTimeout(), IdleTimeout: cfg.Server.IdleTimeout(), }, } mux.HandleFunc("/healthz", server.handleHealth) mux.HandleFunc("/metrics", server.handleMetrics) mux.HandleFunc("/ws", server.handleWS) server.registerChannelRoutes(mux) if err := server.registerAdminRoutes(mux); err != nil { return nil, err } server.httpSrv.Handler = mux return server, nil } func (s *Server) Run(ctx context.Context) error { errCh := make(chan error, 1) go func() { s.logger.Info("gateway listening", "addr", s.cfg.Server.HTTPListen) errCh <- s.httpSrv.ListenAndServe() }() select { case <-ctx.Done(): shutdownCtx, cancel := context.WithTimeout(context.Background(), s.cfg.Server.ShutdownTimeout()) defer cancel() s.logger.Info("shutting down gateway") return s.httpSrv.Shutdown(shutdownCtx) case err := <-errCh: if err == http.ErrServerClosed { return nil } return err } } func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) { writeJSON(w, http.StatusOK, map[string]any{ "status": "ok", }) } func (s *Server) handleMetrics(w http.ResponseWriter, _ *http.Request) { subscriberCount, latestStateCount := s.hub.Stats() writeJSON(w, http.StatusOK, map[string]any{ "sessions": s.sessions.Count(), "subscribers": subscriberCount, "latestState": latestStateCount, "pluginHandlers": s.plugins.HandlerCount(), "httpListen": s.cfg.Server.HTTPListen, "anonymousClient": s.cfg.Auth.AllowAnonymousConsumers, }) } func (s *Server) handleWS(w http.ResponseWriter, r *http.Request) { serveClient(w, r, s.logger, s.cfg, s.hub, s.channels, s.plugins, s.sessions) } func writeJSON(w http.ResponseWriter, status int, value any) { w.Header().Set("Content-Type", "application/json; charset=utf-8") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(value) }