| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- package gateway
- import (
- "context"
- "encoding/json"
- "log/slog"
- "net/http"
- "time"
- "realtime-gateway/internal/channel"
- "realtime-gateway/internal/config"
- "realtime-gateway/internal/plugin"
- "realtime-gateway/internal/router"
- "realtime-gateway/internal/session"
- )
- type Server struct {
- cfg config.Config
- logger *slog.Logger
- httpSrv *http.Server
- channels *channel.Manager
- hub *router.Hub
- plugins *plugin.Bus
- sessions *session.Manager
- startedAt time.Time
- }
- func NewServer(cfg config.Config, logger *slog.Logger) (*Server, error) {
- channels := channel.NewManager(8 * time.Hour)
- hub := router.NewHub(cfg.Gateway.MaxLatestStateEntries)
- plugins := plugin.NewBus(logger.With("component", "plugin-bus"))
- sessions := session.NewManager()
- mux := http.NewServeMux()
- server := &Server{
- cfg: cfg,
- logger: logger,
- channels: channels,
- hub: hub,
- plugins: plugins,
- sessions: sessions,
- startedAt: time.Now(),
- httpSrv: &http.Server{
- Addr: cfg.Server.HTTPListen,
- ReadTimeout: cfg.Server.ReadTimeout(),
- WriteTimeout: cfg.Server.WriteTimeout(),
- IdleTimeout: cfg.Server.IdleTimeout(),
- },
- }
- mux.HandleFunc("/healthz", server.handleHealth)
- mux.HandleFunc("/metrics", server.handleMetrics)
- mux.HandleFunc("/ws", server.handleWS)
- server.registerChannelRoutes(mux)
- if err := server.registerAdminRoutes(mux); err != nil {
- return nil, err
- }
- server.httpSrv.Handler = mux
- return server, nil
- }
- func (s *Server) Run(ctx context.Context) error {
- errCh := make(chan error, 1)
- go func() {
- s.logger.Info("gateway listening", "addr", s.cfg.Server.HTTPListen)
- errCh <- s.httpSrv.ListenAndServe()
- }()
- select {
- case <-ctx.Done():
- shutdownCtx, cancel := context.WithTimeout(context.Background(), s.cfg.Server.ShutdownTimeout())
- defer cancel()
- s.logger.Info("shutting down gateway")
- return s.httpSrv.Shutdown(shutdownCtx)
- case err := <-errCh:
- if err == http.ErrServerClosed {
- return nil
- }
- return err
- }
- }
- func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
- writeJSON(w, http.StatusOK, map[string]any{
- "status": "ok",
- })
- }
- func (s *Server) handleMetrics(w http.ResponseWriter, _ *http.Request) {
- subscriberCount, latestStateCount := s.hub.Stats()
- writeJSON(w, http.StatusOK, map[string]any{
- "sessions": s.sessions.Count(),
- "subscribers": subscriberCount,
- "latestState": latestStateCount,
- "pluginHandlers": s.plugins.HandlerCount(),
- "httpListen": s.cfg.Server.HTTPListen,
- "anonymousClient": s.cfg.Auth.AllowAnonymousConsumers,
- })
- }
- func (s *Server) handleWS(w http.ResponseWriter, r *http.Request) {
- serveClient(w, r, s.logger, s.cfg, s.hub, s.channels, s.plugins, s.sessions)
- }
- func writeJSON(w http.ResponseWriter, status int, value any) {
- w.Header().Set("Content-Type", "application/json; charset=utf-8")
- w.WriteHeader(status)
- _ = json.NewEncoder(w).Encode(value)
- }
|