bus.go 939 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. package plugin
  2. import (
  3. "context"
  4. "log/slog"
  5. "sync"
  6. "realtime-gateway/internal/model"
  7. )
  8. type Handler interface {
  9. Name() string
  10. Handle(context.Context, model.Envelope) error
  11. }
  12. type Bus struct {
  13. logger *slog.Logger
  14. mu sync.RWMutex
  15. handlers []Handler
  16. }
  17. func NewBus(logger *slog.Logger) *Bus {
  18. return &Bus{
  19. logger: logger,
  20. }
  21. }
  22. func (b *Bus) Register(handler Handler) {
  23. b.mu.Lock()
  24. b.handlers = append(b.handlers, handler)
  25. b.mu.Unlock()
  26. }
  27. func (b *Bus) Publish(envelope model.Envelope) {
  28. b.mu.RLock()
  29. handlers := append([]Handler(nil), b.handlers...)
  30. b.mu.RUnlock()
  31. for _, handler := range handlers {
  32. handler := handler
  33. go func() {
  34. if err := handler.Handle(context.Background(), envelope); err != nil {
  35. b.logger.Warn("plugin handler failed", "handler", handler.Name(), "error", err)
  36. }
  37. }()
  38. }
  39. }
  40. func (b *Bus) HandlerCount() int {
  41. b.mu.RLock()
  42. defer b.mu.RUnlock()
  43. return len(b.handlers)
  44. }