package main import ( "context" "flag" "log" "os" "os/signal" "strconv" "syscall" "time" "github.com/coder/websocket" "github.com/coder/websocket/wsjson" "realtime-gateway/internal/model" ) func main() { url := flag.String("url", "ws://127.0.0.1:8080/ws", "gateway websocket url") token := flag.String("token", "dev-producer-token", "producer token") channelID := flag.String("channel-id", "", "channel id to join before publish") deviceID := flag.String("device-id", "child-001", "target device id") groupID := flag.String("group-id", "", "target group id") sourceID := flag.String("source-id", "mock-producer-001", "source id") mode := flag.String("mode", "mock", "source mode") topic := flag.String("topic", "telemetry.location", "publish topic") lat := flag.Float64("lat", 31.2304, "start latitude") lng := flag.Float64("lng", 121.4737, "start longitude") speed := flag.Float64("speed", 1.2, "speed value") accuracy := flag.Float64("accuracy", 6, "accuracy value") bpm := flag.Int("bpm", 120, "heart rate value when topic is telemetry.heart_rate") confidence := flag.Float64("confidence", 0, "heart rate confidence when topic is telemetry.heart_rate") interval := flag.Duration("interval", time.Second, "publish interval") count := flag.Int("count", 0, "number of messages to send, 0 means unlimited") flag.Parse() ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() conn, _, err := websocket.Dial(ctx, *url, nil) if err != nil { log.Fatalf("dial gateway: %v", err) } defer conn.Close(websocket.StatusNormalClosure, "producer closed") var welcome model.ServerMessage if err := wsjson.Read(ctx, conn, &welcome); err != nil { log.Fatalf("read welcome: %v", err) } log.Printf("connected: session=%s", welcome.SessionID) authReq := model.ClientMessage{ Type: "authenticate", Role: model.RoleProducer, Token: *token, } if *channelID != "" { authReq.Type = "join_channel" authReq.ChannelID = *channelID } if err := wsjson.Write(ctx, conn, authReq); err != nil { log.Fatalf("send authenticate: %v", err) } var authResp model.ServerMessage if err := wsjson.Read(ctx, conn, &authResp); err != nil { log.Fatalf("read authenticate response: %v", err) } if authResp.Type == "error" { log.Fatalf("authenticate failed: %s", authResp.Error) } log.Printf("authenticated: session=%s", authResp.SessionID) ticker := time.NewTicker(*interval) defer ticker.Stop() sent := 0 for { select { case <-ctx.Done(): log.Println("producer stopping") return case <-ticker.C: envelope := model.Envelope{ SchemaVersion: 1, MessageID: messageID(sent + 1), Timestamp: time.Now().UnixMilli(), Topic: *topic, Source: model.Source{ Kind: model.RoleProducer, ID: *sourceID, Mode: *mode, }, Target: model.Target{ DeviceID: *deviceID, GroupID: *groupID, }, Payload: payloadForTopic(*topic, *lat, *lng, *speed, *accuracy, *bpm, *confidence, sent), } req := model.ClientMessage{ Type: "publish", Envelope: &envelope, } if err := wsjson.Write(ctx, conn, req); err != nil { log.Fatalf("publish failed: %v", err) } var resp model.ServerMessage if err := wsjson.Read(ctx, conn, &resp); err != nil { log.Fatalf("read publish response: %v", err) } if resp.Type == "error" { log.Fatalf("publish rejected: %s", resp.Error) } sent++ log.Printf("published #%d %s", sent, describePublished(*topic, *deviceID, *lat, *lng, *speed, *accuracy, *bpm, *confidence, sent)) if *count > 0 && sent >= *count { log.Println("producer completed") return } } } } func payloadForTopic(topic string, baseLat, baseLng, speed, accuracy float64, bpm int, confidence float64, step int) []byte { if topic == "telemetry.heart_rate" { return heartRatePayload(bpm, confidence) } return locationPayload(baseLat, baseLng, speed, accuracy, step) } func locationPayload(baseLat, baseLng, speed, accuracy float64, step int) []byte { lat := currentLat(baseLat, step) lng := currentLng(baseLng, step) return []byte( `{"lat":` + formatFloat(lat) + `,"lng":` + formatFloat(lng) + `,"speed":` + formatFloat(speed) + `,"bearing":90` + `,"accuracy":` + formatFloat(accuracy) + `,"coordSystem":"GCJ02"}`, ) } func heartRatePayload(bpm int, confidence float64) []byte { if confidence > 0 { return []byte( `{"bpm":` + strconv.Itoa(maxInt(1, bpm)) + `,"confidence":` + formatFloat(confidence) + `}`, ) } return []byte(`{"bpm":` + strconv.Itoa(maxInt(1, bpm)) + `}`) } func currentLat(base float64, step int) float64 { return base + float64(step)*0.00002 } func currentLng(base float64, step int) float64 { return base + float64(step)*0.00003 } func messageID(step int) string { return "msg-" + strconv.Itoa(step) } func formatFloat(v float64) string { return strconv.FormatFloat(v, 'f', 6, 64) } func describePublished(topic string, deviceID string, lat, lng, speed, accuracy float64, bpm int, confidence float64, step int) string { if topic == "telemetry.heart_rate" { if confidence > 0 { return "device=" + deviceID + " topic=" + topic + " bpm=" + strconv.Itoa(maxInt(1, bpm)) + " confidence=" + formatFloat(confidence) } return "device=" + deviceID + " topic=" + topic + " bpm=" + strconv.Itoa(maxInt(1, bpm)) } return "device=" + deviceID + " topic=" + topic + " lat=" + formatFloat(currentLat(lat, step)) + " lng=" + formatFloat(currentLng(lng, step)) + " speed=" + formatFloat(speed) + " accuracy=" + formatFloat(accuracy) } func maxInt(left int, right int) int { if left > right { return left } return right }