package main import ( "context" "encoding/json" "flag" "fmt" "log" "os" "os/signal" "strings" "syscall" "github.com/coder/websocket" "github.com/coder/websocket/wsjson" "realtime-gateway/internal/model" ) type consumerConfig struct { URL string `json:"url"` Token string `json:"token"` ChannelID string `json:"channelId"` DeviceID string `json:"deviceId"` GroupID string `json:"groupId"` Topic string `json:"topic"` Topics []string `json:"topics"` Subscriptions []model.Subscription `json:"subscriptions"` Snapshot *bool `json:"snapshot"` } func main() { configPath := findConfigPath(os.Args[1:]) fileConfig, err := loadConsumerConfig(configPath) if err != nil { log.Fatalf("load config: %v", err) } flag.StringVar(&configPath, "config", configPath, "path to consumer config file") url := flag.String("url", valueOr(fileConfig.URL, "ws://127.0.0.1:8080/ws"), "gateway websocket url") token := flag.String("token", fileConfig.Token, "consumer token, leave empty if anonymous consumers are allowed") channelID := flag.String("channel-id", fileConfig.ChannelID, "channel id to join before subscribe") deviceID := flag.String("device-id", valueOr(fileConfig.DeviceID, "child-001"), "device id to subscribe") groupID := flag.String("group-id", fileConfig.GroupID, "group id to subscribe") topic := flag.String("topic", valueOr(fileConfig.Topic, "telemetry.location"), "single topic to subscribe") topics := flag.String("topics", strings.Join(fileConfig.Topics, ","), "comma-separated topics to subscribe, overrides -topic when set") snapshot := flag.Bool("snapshot", boolValue(fileConfig.Snapshot, true), "request latest snapshot after subscribe") flag.Parse() subscriptions := resolveSubscriptions(fileConfig, *deviceID, *groupID, *topic, *topics) if len(subscriptions) == 0 { log.Fatalf("no subscriptions configured") } ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() conn, _, err := websocket.Dial(ctx, *url, nil) if err != nil { log.Fatalf("dial gateway: %v", err) } defer conn.Close(websocket.StatusNormalClosure, "consumer closed") var welcome model.ServerMessage if err := wsjson.Read(ctx, conn, &welcome); err != nil { log.Fatalf("read welcome: %v", err) } log.Printf("connected: session=%s", welcome.SessionID) if *token != "" || *channelID != "" { authReq := model.ClientMessage{ Type: "authenticate", Role: model.RoleConsumer, Token: *token, } if *channelID != "" { authReq.Type = "join_channel" authReq.ChannelID = *channelID } if err := wsjson.Write(ctx, conn, authReq); err != nil { log.Fatalf("send authenticate: %v", err) } var authResp model.ServerMessage if err := wsjson.Read(ctx, conn, &authResp); err != nil { log.Fatalf("read authenticate response: %v", err) } if authResp.Type == "error" { log.Fatalf("authenticate failed: %s", authResp.Error) } log.Printf("authenticated: session=%s", authResp.SessionID) } subReq := model.ClientMessage{ Type: "subscribe", Subscriptions: subscriptions, } if err := wsjson.Write(ctx, conn, subReq); err != nil { log.Fatalf("send subscribe: %v", err) } var subResp model.ServerMessage if err := wsjson.Read(ctx, conn, &subResp); err != nil { log.Fatalf("read subscribe response: %v", err) } if subResp.Type == "error" { log.Fatalf("subscribe failed: %s", subResp.Error) } log.Printf("subscribed: %s", describeSubscriptions(subscriptions)) if *snapshot { req := model.ClientMessage{ Type: "snapshot", Subscriptions: []model.Subscription{ {DeviceID: firstSnapshotDeviceID(subscriptions, *deviceID)}, }, } if err := wsjson.Write(ctx, conn, req); err != nil { log.Fatalf("send snapshot: %v", err) } var resp model.ServerMessage if err := wsjson.Read(ctx, conn, &resp); err != nil { log.Fatalf("read snapshot response: %v", err) } if resp.Type == "snapshot" { log.Printf("snapshot: %s", compactJSON(resp.State)) } else if resp.Type == "error" { log.Printf("snapshot unavailable: %s", resp.Error) } } for { var message model.ServerMessage if err := wsjson.Read(ctx, conn, &message); err != nil { log.Fatalf("read event: %v", err) } switch message.Type { case "event": log.Printf("event: %s", describeEnvelope(message.Envelope)) case "error": log.Printf("server error: %s", message.Error) default: log.Printf("message: type=%s", message.Type) } } } func loadConsumerConfig(path string) (consumerConfig, error) { if strings.TrimSpace(path) == "" { return consumerConfig{}, nil } data, err := os.ReadFile(path) if err != nil { return consumerConfig{}, fmt.Errorf("read %s: %w", path, err) } var cfg consumerConfig if err := json.Unmarshal(data, &cfg); err != nil { return consumerConfig{}, fmt.Errorf("parse %s: %w", path, err) } return cfg, nil } func findConfigPath(args []string) string { for index := 0; index < len(args); index++ { arg := args[index] if strings.HasPrefix(arg, "-config=") { return strings.TrimSpace(strings.TrimPrefix(arg, "-config=")) } if arg == "-config" && index+1 < len(args) { return strings.TrimSpace(args[index+1]) } if strings.HasPrefix(arg, "--config=") { return strings.TrimSpace(strings.TrimPrefix(arg, "--config=")) } if arg == "--config" && index+1 < len(args) { return strings.TrimSpace(args[index+1]) } } return "" } func valueOr(value string, fallback string) string { if strings.TrimSpace(value) == "" { return fallback } return value } func boolValue(value *bool, fallback bool) bool { if value == nil { return fallback } return *value } func resolveSubscriptions(fileConfig consumerConfig, deviceID string, groupID string, topic string, topicsCSV string) []model.Subscription { if len(fileConfig.Subscriptions) > 0 && strings.TrimSpace(topicsCSV) == "" && strings.TrimSpace(topic) == valueOr(fileConfig.Topic, "telemetry.location") { return filterValidSubscriptions(fileConfig.Subscriptions) } topics := parseTopics(topicsCSV) if len(topics) == 0 { topics = []string{topic} } subscriptions := make([]model.Subscription, 0, len(topics)) for _, entry := range topics { subscriptions = append(subscriptions, model.Subscription{ DeviceID: deviceID, GroupID: groupID, Topic: entry, }) } return filterValidSubscriptions(subscriptions) } func parseTopics(value string) []string { if strings.TrimSpace(value) == "" { return nil } parts := strings.Split(value, ",") topics := make([]string, 0, len(parts)) for _, part := range parts { topic := strings.TrimSpace(part) if topic == "" { continue } topics = append(topics, topic) } return topics } func filterValidSubscriptions(items []model.Subscription) []model.Subscription { subscriptions := make([]model.Subscription, 0, len(items)) for _, item := range items { if strings.TrimSpace(item.ChannelID) == "" && strings.TrimSpace(item.DeviceID) == "" && strings.TrimSpace(item.GroupID) == "" { continue } subscriptions = append(subscriptions, model.Subscription{ ChannelID: strings.TrimSpace(item.ChannelID), DeviceID: strings.TrimSpace(item.DeviceID), GroupID: strings.TrimSpace(item.GroupID), Topic: strings.TrimSpace(item.Topic), }) } return subscriptions } func describeSubscriptions(items []model.Subscription) string { parts := make([]string, 0, len(items)) for _, item := range items { scope := item.DeviceID if scope == "" { if item.GroupID != "" { scope = "group:" + item.GroupID } else { scope = "all" } } if item.ChannelID != "" { scope = "channel:" + item.ChannelID + " / " + scope } if item.Topic != "" { scope += "/" + item.Topic } parts = append(parts, scope) } return strings.Join(parts, ", ") } func firstSnapshotDeviceID(items []model.Subscription, fallback string) string { for _, item := range items { if strings.TrimSpace(item.DeviceID) != "" { return item.DeviceID } } return fallback } func describeEnvelope(envelope *model.Envelope) string { if envelope == nil { return "{}" } switch envelope.Topic { case "telemetry.location": return describeLocationEnvelope(envelope) case "telemetry.heart_rate": return describeHeartRateEnvelope(envelope) default: return compactEnvelope(envelope) } } func describeLocationEnvelope(envelope *model.Envelope) string { var payload struct { Lat float64 `json:"lat"` Lng float64 `json:"lng"` Speed float64 `json:"speed"` Bearing float64 `json:"bearing"` Accuracy float64 `json:"accuracy"` CoordSystem string `json:"coordSystem"` } if err := json.Unmarshal(envelope.Payload, &payload); err != nil { return compactEnvelope(envelope) } return fmt.Sprintf( "gps device=%s lat=%.6f lng=%.6f speed=%.2f bearing=%.1f accuracy=%.1f coord=%s", envelope.Target.DeviceID, payload.Lat, payload.Lng, payload.Speed, payload.Bearing, payload.Accuracy, payload.CoordSystem, ) } func describeHeartRateEnvelope(envelope *model.Envelope) string { var payload struct { BPM int `json:"bpm"` Confidence float64 `json:"confidence"` } if err := json.Unmarshal(envelope.Payload, &payload); err != nil { return compactEnvelope(envelope) } if payload.Confidence > 0 { return fmt.Sprintf("heart_rate device=%s bpm=%d confidence=%.2f", envelope.Target.DeviceID, payload.BPM, payload.Confidence) } return fmt.Sprintf("heart_rate device=%s bpm=%d", envelope.Target.DeviceID, payload.BPM) } func compactEnvelope(envelope *model.Envelope) string { if envelope == nil { return "{}" } data, err := json.Marshal(envelope) if err != nil { return "{}" } return compactJSON(data) } func compactJSON(data []byte) string { var value any if err := json.Unmarshal(data, &value); err != nil { return string(data) } compact, err := json.Marshal(value) if err != nil { return string(data) } return string(compact) }