main.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "log"
  6. "os"
  7. "os/signal"
  8. "strconv"
  9. "syscall"
  10. "time"
  11. "github.com/coder/websocket"
  12. "github.com/coder/websocket/wsjson"
  13. "realtime-gateway/internal/model"
  14. )
  15. func main() {
  16. url := flag.String("url", "ws://127.0.0.1:8080/ws", "gateway websocket url")
  17. token := flag.String("token", "dev-producer-token", "producer token")
  18. channelID := flag.String("channel-id", "", "channel id to join before publish")
  19. deviceID := flag.String("device-id", "child-001", "target device id")
  20. groupID := flag.String("group-id", "", "target group id")
  21. sourceID := flag.String("source-id", "mock-producer-001", "source id")
  22. mode := flag.String("mode", "mock", "source mode")
  23. topic := flag.String("topic", "telemetry.location", "publish topic")
  24. lat := flag.Float64("lat", 31.2304, "start latitude")
  25. lng := flag.Float64("lng", 121.4737, "start longitude")
  26. speed := flag.Float64("speed", 1.2, "speed value")
  27. accuracy := flag.Float64("accuracy", 6, "accuracy value")
  28. bpm := flag.Int("bpm", 120, "heart rate value when topic is telemetry.heart_rate")
  29. confidence := flag.Float64("confidence", 0, "heart rate confidence when topic is telemetry.heart_rate")
  30. interval := flag.Duration("interval", time.Second, "publish interval")
  31. count := flag.Int("count", 0, "number of messages to send, 0 means unlimited")
  32. flag.Parse()
  33. ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
  34. defer stop()
  35. conn, _, err := websocket.Dial(ctx, *url, nil)
  36. if err != nil {
  37. log.Fatalf("dial gateway: %v", err)
  38. }
  39. defer conn.Close(websocket.StatusNormalClosure, "producer closed")
  40. var welcome model.ServerMessage
  41. if err := wsjson.Read(ctx, conn, &welcome); err != nil {
  42. log.Fatalf("read welcome: %v", err)
  43. }
  44. log.Printf("connected: session=%s", welcome.SessionID)
  45. authReq := model.ClientMessage{
  46. Type: "authenticate",
  47. Role: model.RoleProducer,
  48. Token: *token,
  49. }
  50. if *channelID != "" {
  51. authReq.Type = "join_channel"
  52. authReq.ChannelID = *channelID
  53. }
  54. if err := wsjson.Write(ctx, conn, authReq); err != nil {
  55. log.Fatalf("send authenticate: %v", err)
  56. }
  57. var authResp model.ServerMessage
  58. if err := wsjson.Read(ctx, conn, &authResp); err != nil {
  59. log.Fatalf("read authenticate response: %v", err)
  60. }
  61. if authResp.Type == "error" {
  62. log.Fatalf("authenticate failed: %s", authResp.Error)
  63. }
  64. log.Printf("authenticated: session=%s", authResp.SessionID)
  65. ticker := time.NewTicker(*interval)
  66. defer ticker.Stop()
  67. sent := 0
  68. for {
  69. select {
  70. case <-ctx.Done():
  71. log.Println("producer stopping")
  72. return
  73. case <-ticker.C:
  74. envelope := model.Envelope{
  75. SchemaVersion: 1,
  76. MessageID: messageID(sent + 1),
  77. Timestamp: time.Now().UnixMilli(),
  78. Topic: *topic,
  79. Source: model.Source{
  80. Kind: model.RoleProducer,
  81. ID: *sourceID,
  82. Mode: *mode,
  83. },
  84. Target: model.Target{
  85. DeviceID: *deviceID,
  86. GroupID: *groupID,
  87. },
  88. Payload: payloadForTopic(*topic, *lat, *lng, *speed, *accuracy, *bpm, *confidence, sent),
  89. }
  90. req := model.ClientMessage{
  91. Type: "publish",
  92. Envelope: &envelope,
  93. }
  94. if err := wsjson.Write(ctx, conn, req); err != nil {
  95. log.Fatalf("publish failed: %v", err)
  96. }
  97. var resp model.ServerMessage
  98. if err := wsjson.Read(ctx, conn, &resp); err != nil {
  99. log.Fatalf("read publish response: %v", err)
  100. }
  101. if resp.Type == "error" {
  102. log.Fatalf("publish rejected: %s", resp.Error)
  103. }
  104. sent++
  105. log.Printf("published #%d %s", sent, describePublished(*topic, *deviceID, *lat, *lng, *speed, *accuracy, *bpm, *confidence, sent))
  106. if *count > 0 && sent >= *count {
  107. log.Println("producer completed")
  108. return
  109. }
  110. }
  111. }
  112. }
  113. func payloadForTopic(topic string, baseLat, baseLng, speed, accuracy float64, bpm int, confidence float64, step int) []byte {
  114. if topic == "telemetry.heart_rate" {
  115. return heartRatePayload(bpm, confidence)
  116. }
  117. return locationPayload(baseLat, baseLng, speed, accuracy, step)
  118. }
  119. func locationPayload(baseLat, baseLng, speed, accuracy float64, step int) []byte {
  120. lat := currentLat(baseLat, step)
  121. lng := currentLng(baseLng, step)
  122. return []byte(
  123. `{"lat":` + formatFloat(lat) +
  124. `,"lng":` + formatFloat(lng) +
  125. `,"speed":` + formatFloat(speed) +
  126. `,"bearing":90` +
  127. `,"accuracy":` + formatFloat(accuracy) +
  128. `,"coordSystem":"GCJ02"}`,
  129. )
  130. }
  131. func heartRatePayload(bpm int, confidence float64) []byte {
  132. if confidence > 0 {
  133. return []byte(
  134. `{"bpm":` + strconv.Itoa(maxInt(1, bpm)) +
  135. `,"confidence":` + formatFloat(confidence) +
  136. `}`,
  137. )
  138. }
  139. return []byte(`{"bpm":` + strconv.Itoa(maxInt(1, bpm)) + `}`)
  140. }
  141. func currentLat(base float64, step int) float64 {
  142. return base + float64(step)*0.00002
  143. }
  144. func currentLng(base float64, step int) float64 {
  145. return base + float64(step)*0.00003
  146. }
  147. func messageID(step int) string {
  148. return "msg-" + strconv.Itoa(step)
  149. }
  150. func formatFloat(v float64) string {
  151. return strconv.FormatFloat(v, 'f', 6, 64)
  152. }
  153. func describePublished(topic string, deviceID string, lat, lng, speed, accuracy float64, bpm int, confidence float64, step int) string {
  154. if topic == "telemetry.heart_rate" {
  155. if confidence > 0 {
  156. return "device=" + deviceID + " topic=" + topic + " bpm=" + strconv.Itoa(maxInt(1, bpm)) + " confidence=" + formatFloat(confidence)
  157. }
  158. return "device=" + deviceID + " topic=" + topic + " bpm=" + strconv.Itoa(maxInt(1, bpm))
  159. }
  160. return "device=" + deviceID + " topic=" + topic + " lat=" + formatFloat(currentLat(lat, step)) + " lng=" + formatFloat(currentLng(lng, step)) + " speed=" + formatFloat(speed) + " accuracy=" + formatFloat(accuracy)
  161. }
  162. func maxInt(left int, right int) int {
  163. if left > right {
  164. return left
  165. }
  166. return right
  167. }