| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- package main
- import (
- "context"
- "flag"
- "log"
- "os"
- "os/signal"
- "strconv"
- "syscall"
- "time"
- "github.com/coder/websocket"
- "github.com/coder/websocket/wsjson"
- "realtime-gateway/internal/model"
- )
- func main() {
- url := flag.String("url", "ws://127.0.0.1:8080/ws", "gateway websocket url")
- token := flag.String("token", "dev-producer-token", "producer token")
- channelID := flag.String("channel-id", "", "channel id to join before publish")
- deviceID := flag.String("device-id", "child-001", "target device id")
- groupID := flag.String("group-id", "", "target group id")
- sourceID := flag.String("source-id", "mock-producer-001", "source id")
- mode := flag.String("mode", "mock", "source mode")
- topic := flag.String("topic", "telemetry.location", "publish topic")
- lat := flag.Float64("lat", 31.2304, "start latitude")
- lng := flag.Float64("lng", 121.4737, "start longitude")
- speed := flag.Float64("speed", 1.2, "speed value")
- accuracy := flag.Float64("accuracy", 6, "accuracy value")
- bpm := flag.Int("bpm", 120, "heart rate value when topic is telemetry.heart_rate")
- confidence := flag.Float64("confidence", 0, "heart rate confidence when topic is telemetry.heart_rate")
- interval := flag.Duration("interval", time.Second, "publish interval")
- count := flag.Int("count", 0, "number of messages to send, 0 means unlimited")
- flag.Parse()
- ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
- defer stop()
- conn, _, err := websocket.Dial(ctx, *url, nil)
- if err != nil {
- log.Fatalf("dial gateway: %v", err)
- }
- defer conn.Close(websocket.StatusNormalClosure, "producer closed")
- var welcome model.ServerMessage
- if err := wsjson.Read(ctx, conn, &welcome); err != nil {
- log.Fatalf("read welcome: %v", err)
- }
- log.Printf("connected: session=%s", welcome.SessionID)
- authReq := model.ClientMessage{
- Type: "authenticate",
- Role: model.RoleProducer,
- Token: *token,
- }
- if *channelID != "" {
- authReq.Type = "join_channel"
- authReq.ChannelID = *channelID
- }
- if err := wsjson.Write(ctx, conn, authReq); err != nil {
- log.Fatalf("send authenticate: %v", err)
- }
- var authResp model.ServerMessage
- if err := wsjson.Read(ctx, conn, &authResp); err != nil {
- log.Fatalf("read authenticate response: %v", err)
- }
- if authResp.Type == "error" {
- log.Fatalf("authenticate failed: %s", authResp.Error)
- }
- log.Printf("authenticated: session=%s", authResp.SessionID)
- ticker := time.NewTicker(*interval)
- defer ticker.Stop()
- sent := 0
- for {
- select {
- case <-ctx.Done():
- log.Println("producer stopping")
- return
- case <-ticker.C:
- envelope := model.Envelope{
- SchemaVersion: 1,
- MessageID: messageID(sent + 1),
- Timestamp: time.Now().UnixMilli(),
- Topic: *topic,
- Source: model.Source{
- Kind: model.RoleProducer,
- ID: *sourceID,
- Mode: *mode,
- },
- Target: model.Target{
- DeviceID: *deviceID,
- GroupID: *groupID,
- },
- Payload: payloadForTopic(*topic, *lat, *lng, *speed, *accuracy, *bpm, *confidence, sent),
- }
- req := model.ClientMessage{
- Type: "publish",
- Envelope: &envelope,
- }
- if err := wsjson.Write(ctx, conn, req); err != nil {
- log.Fatalf("publish failed: %v", err)
- }
- var resp model.ServerMessage
- if err := wsjson.Read(ctx, conn, &resp); err != nil {
- log.Fatalf("read publish response: %v", err)
- }
- if resp.Type == "error" {
- log.Fatalf("publish rejected: %s", resp.Error)
- }
- sent++
- log.Printf("published #%d %s", sent, describePublished(*topic, *deviceID, *lat, *lng, *speed, *accuracy, *bpm, *confidence, sent))
- if *count > 0 && sent >= *count {
- log.Println("producer completed")
- return
- }
- }
- }
- }
- func payloadForTopic(topic string, baseLat, baseLng, speed, accuracy float64, bpm int, confidence float64, step int) []byte {
- if topic == "telemetry.heart_rate" {
- return heartRatePayload(bpm, confidence)
- }
- return locationPayload(baseLat, baseLng, speed, accuracy, step)
- }
- func locationPayload(baseLat, baseLng, speed, accuracy float64, step int) []byte {
- lat := currentLat(baseLat, step)
- lng := currentLng(baseLng, step)
- return []byte(
- `{"lat":` + formatFloat(lat) +
- `,"lng":` + formatFloat(lng) +
- `,"speed":` + formatFloat(speed) +
- `,"bearing":90` +
- `,"accuracy":` + formatFloat(accuracy) +
- `,"coordSystem":"GCJ02"}`,
- )
- }
- func heartRatePayload(bpm int, confidence float64) []byte {
- if confidence > 0 {
- return []byte(
- `{"bpm":` + strconv.Itoa(maxInt(1, bpm)) +
- `,"confidence":` + formatFloat(confidence) +
- `}`,
- )
- }
- return []byte(`{"bpm":` + strconv.Itoa(maxInt(1, bpm)) + `}`)
- }
- func currentLat(base float64, step int) float64 {
- return base + float64(step)*0.00002
- }
- func currentLng(base float64, step int) float64 {
- return base + float64(step)*0.00003
- }
- func messageID(step int) string {
- return "msg-" + strconv.Itoa(step)
- }
- func formatFloat(v float64) string {
- return strconv.FormatFloat(v, 'f', 6, 64)
- }
- func describePublished(topic string, deviceID string, lat, lng, speed, accuracy float64, bpm int, confidence float64, step int) string {
- if topic == "telemetry.heart_rate" {
- if confidence > 0 {
- return "device=" + deviceID + " topic=" + topic + " bpm=" + strconv.Itoa(maxInt(1, bpm)) + " confidence=" + formatFloat(confidence)
- }
- return "device=" + deviceID + " topic=" + topic + " bpm=" + strconv.Itoa(maxInt(1, bpm))
- }
- return "device=" + deviceID + " topic=" + topic + " lat=" + formatFloat(currentLat(lat, step)) + " lng=" + formatFloat(currentLng(lng, step)) + " speed=" + formatFloat(speed) + " accuracy=" + formatFloat(accuracy)
- }
- func maxInt(left int, right int) int {
- if left > right {
- return left
- }
- return right
- }
|