| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354 |
- package main
- import (
- "context"
- "encoding/json"
- "flag"
- "fmt"
- "log"
- "os"
- "os/signal"
- "strings"
- "syscall"
- "github.com/coder/websocket"
- "github.com/coder/websocket/wsjson"
- "realtime-gateway/internal/model"
- )
- type consumerConfig struct {
- URL string `json:"url"`
- Token string `json:"token"`
- ChannelID string `json:"channelId"`
- DeviceID string `json:"deviceId"`
- GroupID string `json:"groupId"`
- Topic string `json:"topic"`
- Topics []string `json:"topics"`
- Subscriptions []model.Subscription `json:"subscriptions"`
- Snapshot *bool `json:"snapshot"`
- }
- func main() {
- configPath := findConfigPath(os.Args[1:])
- fileConfig, err := loadConsumerConfig(configPath)
- if err != nil {
- log.Fatalf("load config: %v", err)
- }
- flag.StringVar(&configPath, "config", configPath, "path to consumer config file")
- url := flag.String("url", valueOr(fileConfig.URL, "ws://127.0.0.1:8080/ws"), "gateway websocket url")
- token := flag.String("token", fileConfig.Token, "consumer token, leave empty if anonymous consumers are allowed")
- channelID := flag.String("channel-id", fileConfig.ChannelID, "channel id to join before subscribe")
- deviceID := flag.String("device-id", valueOr(fileConfig.DeviceID, "child-001"), "device id to subscribe")
- groupID := flag.String("group-id", fileConfig.GroupID, "group id to subscribe")
- topic := flag.String("topic", valueOr(fileConfig.Topic, "telemetry.location"), "single topic to subscribe")
- topics := flag.String("topics", strings.Join(fileConfig.Topics, ","), "comma-separated topics to subscribe, overrides -topic when set")
- snapshot := flag.Bool("snapshot", boolValue(fileConfig.Snapshot, true), "request latest snapshot after subscribe")
- flag.Parse()
- subscriptions := resolveSubscriptions(fileConfig, *deviceID, *groupID, *topic, *topics)
- if len(subscriptions) == 0 {
- log.Fatalf("no subscriptions configured")
- }
- ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
- defer stop()
- conn, _, err := websocket.Dial(ctx, *url, nil)
- if err != nil {
- log.Fatalf("dial gateway: %v", err)
- }
- defer conn.Close(websocket.StatusNormalClosure, "consumer closed")
- var welcome model.ServerMessage
- if err := wsjson.Read(ctx, conn, &welcome); err != nil {
- log.Fatalf("read welcome: %v", err)
- }
- log.Printf("connected: session=%s", welcome.SessionID)
- if *token != "" || *channelID != "" {
- authReq := model.ClientMessage{
- Type: "authenticate",
- Role: model.RoleConsumer,
- Token: *token,
- }
- if *channelID != "" {
- authReq.Type = "join_channel"
- authReq.ChannelID = *channelID
- }
- if err := wsjson.Write(ctx, conn, authReq); err != nil {
- log.Fatalf("send authenticate: %v", err)
- }
- var authResp model.ServerMessage
- if err := wsjson.Read(ctx, conn, &authResp); err != nil {
- log.Fatalf("read authenticate response: %v", err)
- }
- if authResp.Type == "error" {
- log.Fatalf("authenticate failed: %s", authResp.Error)
- }
- log.Printf("authenticated: session=%s", authResp.SessionID)
- }
- subReq := model.ClientMessage{
- Type: "subscribe",
- Subscriptions: subscriptions,
- }
- if err := wsjson.Write(ctx, conn, subReq); err != nil {
- log.Fatalf("send subscribe: %v", err)
- }
- var subResp model.ServerMessage
- if err := wsjson.Read(ctx, conn, &subResp); err != nil {
- log.Fatalf("read subscribe response: %v", err)
- }
- if subResp.Type == "error" {
- log.Fatalf("subscribe failed: %s", subResp.Error)
- }
- log.Printf("subscribed: %s", describeSubscriptions(subscriptions))
- if *snapshot {
- req := model.ClientMessage{
- Type: "snapshot",
- Subscriptions: []model.Subscription{
- {DeviceID: firstSnapshotDeviceID(subscriptions, *deviceID)},
- },
- }
- if err := wsjson.Write(ctx, conn, req); err != nil {
- log.Fatalf("send snapshot: %v", err)
- }
- var resp model.ServerMessage
- if err := wsjson.Read(ctx, conn, &resp); err != nil {
- log.Fatalf("read snapshot response: %v", err)
- }
- if resp.Type == "snapshot" {
- log.Printf("snapshot: %s", compactJSON(resp.State))
- } else if resp.Type == "error" {
- log.Printf("snapshot unavailable: %s", resp.Error)
- }
- }
- for {
- var message model.ServerMessage
- if err := wsjson.Read(ctx, conn, &message); err != nil {
- log.Fatalf("read event: %v", err)
- }
- switch message.Type {
- case "event":
- log.Printf("event: %s", describeEnvelope(message.Envelope))
- case "error":
- log.Printf("server error: %s", message.Error)
- default:
- log.Printf("message: type=%s", message.Type)
- }
- }
- }
- func loadConsumerConfig(path string) (consumerConfig, error) {
- if strings.TrimSpace(path) == "" {
- return consumerConfig{}, nil
- }
- data, err := os.ReadFile(path)
- if err != nil {
- return consumerConfig{}, fmt.Errorf("read %s: %w", path, err)
- }
- var cfg consumerConfig
- if err := json.Unmarshal(data, &cfg); err != nil {
- return consumerConfig{}, fmt.Errorf("parse %s: %w", path, err)
- }
- return cfg, nil
- }
- func findConfigPath(args []string) string {
- for index := 0; index < len(args); index++ {
- arg := args[index]
- if strings.HasPrefix(arg, "-config=") {
- return strings.TrimSpace(strings.TrimPrefix(arg, "-config="))
- }
- if arg == "-config" && index+1 < len(args) {
- return strings.TrimSpace(args[index+1])
- }
- if strings.HasPrefix(arg, "--config=") {
- return strings.TrimSpace(strings.TrimPrefix(arg, "--config="))
- }
- if arg == "--config" && index+1 < len(args) {
- return strings.TrimSpace(args[index+1])
- }
- }
- return ""
- }
- func valueOr(value string, fallback string) string {
- if strings.TrimSpace(value) == "" {
- return fallback
- }
- return value
- }
- func boolValue(value *bool, fallback bool) bool {
- if value == nil {
- return fallback
- }
- return *value
- }
- func resolveSubscriptions(fileConfig consumerConfig, deviceID string, groupID string, topic string, topicsCSV string) []model.Subscription {
- if len(fileConfig.Subscriptions) > 0 && strings.TrimSpace(topicsCSV) == "" && strings.TrimSpace(topic) == valueOr(fileConfig.Topic, "telemetry.location") {
- return filterValidSubscriptions(fileConfig.Subscriptions)
- }
- topics := parseTopics(topicsCSV)
- if len(topics) == 0 {
- topics = []string{topic}
- }
- subscriptions := make([]model.Subscription, 0, len(topics))
- for _, entry := range topics {
- subscriptions = append(subscriptions, model.Subscription{
- DeviceID: deviceID,
- GroupID: groupID,
- Topic: entry,
- })
- }
- return filterValidSubscriptions(subscriptions)
- }
- func parseTopics(value string) []string {
- if strings.TrimSpace(value) == "" {
- return nil
- }
- parts := strings.Split(value, ",")
- topics := make([]string, 0, len(parts))
- for _, part := range parts {
- topic := strings.TrimSpace(part)
- if topic == "" {
- continue
- }
- topics = append(topics, topic)
- }
- return topics
- }
- func filterValidSubscriptions(items []model.Subscription) []model.Subscription {
- subscriptions := make([]model.Subscription, 0, len(items))
- for _, item := range items {
- if strings.TrimSpace(item.ChannelID) == "" && strings.TrimSpace(item.DeviceID) == "" && strings.TrimSpace(item.GroupID) == "" {
- continue
- }
- subscriptions = append(subscriptions, model.Subscription{
- ChannelID: strings.TrimSpace(item.ChannelID),
- DeviceID: strings.TrimSpace(item.DeviceID),
- GroupID: strings.TrimSpace(item.GroupID),
- Topic: strings.TrimSpace(item.Topic),
- })
- }
- return subscriptions
- }
- func describeSubscriptions(items []model.Subscription) string {
- parts := make([]string, 0, len(items))
- for _, item := range items {
- scope := item.DeviceID
- if scope == "" {
- if item.GroupID != "" {
- scope = "group:" + item.GroupID
- } else {
- scope = "all"
- }
- }
- if item.ChannelID != "" {
- scope = "channel:" + item.ChannelID + " / " + scope
- }
- if item.Topic != "" {
- scope += "/" + item.Topic
- }
- parts = append(parts, scope)
- }
- return strings.Join(parts, ", ")
- }
- func firstSnapshotDeviceID(items []model.Subscription, fallback string) string {
- for _, item := range items {
- if strings.TrimSpace(item.DeviceID) != "" {
- return item.DeviceID
- }
- }
- return fallback
- }
- func describeEnvelope(envelope *model.Envelope) string {
- if envelope == nil {
- return "{}"
- }
- switch envelope.Topic {
- case "telemetry.location":
- return describeLocationEnvelope(envelope)
- case "telemetry.heart_rate":
- return describeHeartRateEnvelope(envelope)
- default:
- return compactEnvelope(envelope)
- }
- }
- func describeLocationEnvelope(envelope *model.Envelope) string {
- var payload struct {
- Lat float64 `json:"lat"`
- Lng float64 `json:"lng"`
- Speed float64 `json:"speed"`
- Bearing float64 `json:"bearing"`
- Accuracy float64 `json:"accuracy"`
- CoordSystem string `json:"coordSystem"`
- }
- if err := json.Unmarshal(envelope.Payload, &payload); err != nil {
- return compactEnvelope(envelope)
- }
- return fmt.Sprintf(
- "gps device=%s lat=%.6f lng=%.6f speed=%.2f bearing=%.1f accuracy=%.1f coord=%s",
- envelope.Target.DeviceID,
- payload.Lat,
- payload.Lng,
- payload.Speed,
- payload.Bearing,
- payload.Accuracy,
- payload.CoordSystem,
- )
- }
- func describeHeartRateEnvelope(envelope *model.Envelope) string {
- var payload struct {
- BPM int `json:"bpm"`
- Confidence float64 `json:"confidence"`
- }
- if err := json.Unmarshal(envelope.Payload, &payload); err != nil {
- return compactEnvelope(envelope)
- }
- if payload.Confidence > 0 {
- return fmt.Sprintf("heart_rate device=%s bpm=%d confidence=%.2f", envelope.Target.DeviceID, payload.BPM, payload.Confidence)
- }
- return fmt.Sprintf("heart_rate device=%s bpm=%d", envelope.Target.DeviceID, payload.BPM)
- }
- func compactEnvelope(envelope *model.Envelope) string {
- if envelope == nil {
- return "{}"
- }
- data, err := json.Marshal(envelope)
- if err != nil {
- return "{}"
- }
- return compactJSON(data)
- }
- func compactJSON(data []byte) string {
- var value any
- if err := json.Unmarshal(data, &value); err != nil {
- return string(data)
- }
- compact, err := json.Marshal(value)
- if err != nil {
- return string(data)
- }
- return string(compact)
- }
|