server.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package gateway
  2. import (
  3. "context"
  4. "encoding/json"
  5. "log/slog"
  6. "net/http"
  7. "time"
  8. "realtime-gateway/internal/channel"
  9. "realtime-gateway/internal/config"
  10. "realtime-gateway/internal/plugin"
  11. "realtime-gateway/internal/router"
  12. "realtime-gateway/internal/session"
  13. )
  14. type Server struct {
  15. cfg config.Config
  16. logger *slog.Logger
  17. httpSrv *http.Server
  18. channels *channel.Manager
  19. hub *router.Hub
  20. plugins *plugin.Bus
  21. sessions *session.Manager
  22. startedAt time.Time
  23. }
  24. func NewServer(cfg config.Config, logger *slog.Logger) (*Server, error) {
  25. channels := channel.NewManager(8 * time.Hour)
  26. hub := router.NewHub(cfg.Gateway.MaxLatestStateEntries)
  27. plugins := plugin.NewBus(logger.With("component", "plugin-bus"))
  28. sessions := session.NewManager()
  29. mux := http.NewServeMux()
  30. server := &Server{
  31. cfg: cfg,
  32. logger: logger,
  33. channels: channels,
  34. hub: hub,
  35. plugins: plugins,
  36. sessions: sessions,
  37. startedAt: time.Now(),
  38. httpSrv: &http.Server{
  39. Addr: cfg.Server.HTTPListen,
  40. ReadTimeout: cfg.Server.ReadTimeout(),
  41. WriteTimeout: cfg.Server.WriteTimeout(),
  42. IdleTimeout: cfg.Server.IdleTimeout(),
  43. },
  44. }
  45. mux.HandleFunc("/healthz", server.handleHealth)
  46. mux.HandleFunc("/metrics", server.handleMetrics)
  47. mux.HandleFunc("/ws", server.handleWS)
  48. server.registerChannelRoutes(mux)
  49. if err := server.registerAdminRoutes(mux); err != nil {
  50. return nil, err
  51. }
  52. server.httpSrv.Handler = mux
  53. return server, nil
  54. }
  55. func (s *Server) Run(ctx context.Context) error {
  56. errCh := make(chan error, 1)
  57. go func() {
  58. s.logger.Info("gateway listening", "addr", s.cfg.Server.HTTPListen)
  59. errCh <- s.httpSrv.ListenAndServe()
  60. }()
  61. select {
  62. case <-ctx.Done():
  63. shutdownCtx, cancel := context.WithTimeout(context.Background(), s.cfg.Server.ShutdownTimeout())
  64. defer cancel()
  65. s.logger.Info("shutting down gateway")
  66. return s.httpSrv.Shutdown(shutdownCtx)
  67. case err := <-errCh:
  68. if err == http.ErrServerClosed {
  69. return nil
  70. }
  71. return err
  72. }
  73. }
  74. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
  75. writeJSON(w, http.StatusOK, map[string]any{
  76. "status": "ok",
  77. })
  78. }
  79. func (s *Server) handleMetrics(w http.ResponseWriter, _ *http.Request) {
  80. subscriberCount, latestStateCount := s.hub.Stats()
  81. writeJSON(w, http.StatusOK, map[string]any{
  82. "sessions": s.sessions.Count(),
  83. "subscribers": subscriberCount,
  84. "latestState": latestStateCount,
  85. "pluginHandlers": s.plugins.HandlerCount(),
  86. "httpListen": s.cfg.Server.HTTPListen,
  87. "anonymousClient": s.cfg.Auth.AllowAnonymousConsumers,
  88. })
  89. }
  90. func (s *Server) handleWS(w http.ResponseWriter, r *http.Request) {
  91. serveClient(w, r, s.logger, s.cfg, s.hub, s.channels, s.plugins, s.sessions)
  92. }
  93. func writeJSON(w http.ResponseWriter, status int, value any) {
  94. w.Header().Set("Content-Type", "application/json; charset=utf-8")
  95. w.WriteHeader(status)
  96. _ = json.NewEncoder(w).Encode(value)
  97. }