main.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "log"
  8. "os"
  9. "os/signal"
  10. "strings"
  11. "syscall"
  12. "github.com/coder/websocket"
  13. "github.com/coder/websocket/wsjson"
  14. "realtime-gateway/internal/model"
  15. )
  16. type consumerConfig struct {
  17. URL string `json:"url"`
  18. Token string `json:"token"`
  19. ChannelID string `json:"channelId"`
  20. DeviceID string `json:"deviceId"`
  21. GroupID string `json:"groupId"`
  22. Topic string `json:"topic"`
  23. Topics []string `json:"topics"`
  24. Subscriptions []model.Subscription `json:"subscriptions"`
  25. Snapshot *bool `json:"snapshot"`
  26. }
  27. func main() {
  28. configPath := findConfigPath(os.Args[1:])
  29. fileConfig, err := loadConsumerConfig(configPath)
  30. if err != nil {
  31. log.Fatalf("load config: %v", err)
  32. }
  33. flag.StringVar(&configPath, "config", configPath, "path to consumer config file")
  34. url := flag.String("url", valueOr(fileConfig.URL, "ws://127.0.0.1:8080/ws"), "gateway websocket url")
  35. token := flag.String("token", fileConfig.Token, "consumer token, leave empty if anonymous consumers are allowed")
  36. channelID := flag.String("channel-id", fileConfig.ChannelID, "channel id to join before subscribe")
  37. deviceID := flag.String("device-id", valueOr(fileConfig.DeviceID, "child-001"), "device id to subscribe")
  38. groupID := flag.String("group-id", fileConfig.GroupID, "group id to subscribe")
  39. topic := flag.String("topic", valueOr(fileConfig.Topic, "telemetry.location"), "single topic to subscribe")
  40. topics := flag.String("topics", strings.Join(fileConfig.Topics, ","), "comma-separated topics to subscribe, overrides -topic when set")
  41. snapshot := flag.Bool("snapshot", boolValue(fileConfig.Snapshot, true), "request latest snapshot after subscribe")
  42. flag.Parse()
  43. subscriptions := resolveSubscriptions(fileConfig, *deviceID, *groupID, *topic, *topics)
  44. if len(subscriptions) == 0 {
  45. log.Fatalf("no subscriptions configured")
  46. }
  47. ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
  48. defer stop()
  49. conn, _, err := websocket.Dial(ctx, *url, nil)
  50. if err != nil {
  51. log.Fatalf("dial gateway: %v", err)
  52. }
  53. defer conn.Close(websocket.StatusNormalClosure, "consumer closed")
  54. var welcome model.ServerMessage
  55. if err := wsjson.Read(ctx, conn, &welcome); err != nil {
  56. log.Fatalf("read welcome: %v", err)
  57. }
  58. log.Printf("connected: session=%s", welcome.SessionID)
  59. if *token != "" || *channelID != "" {
  60. authReq := model.ClientMessage{
  61. Type: "authenticate",
  62. Role: model.RoleConsumer,
  63. Token: *token,
  64. }
  65. if *channelID != "" {
  66. authReq.Type = "join_channel"
  67. authReq.ChannelID = *channelID
  68. }
  69. if err := wsjson.Write(ctx, conn, authReq); err != nil {
  70. log.Fatalf("send authenticate: %v", err)
  71. }
  72. var authResp model.ServerMessage
  73. if err := wsjson.Read(ctx, conn, &authResp); err != nil {
  74. log.Fatalf("read authenticate response: %v", err)
  75. }
  76. if authResp.Type == "error" {
  77. log.Fatalf("authenticate failed: %s", authResp.Error)
  78. }
  79. log.Printf("authenticated: session=%s", authResp.SessionID)
  80. }
  81. subReq := model.ClientMessage{
  82. Type: "subscribe",
  83. Subscriptions: subscriptions,
  84. }
  85. if err := wsjson.Write(ctx, conn, subReq); err != nil {
  86. log.Fatalf("send subscribe: %v", err)
  87. }
  88. var subResp model.ServerMessage
  89. if err := wsjson.Read(ctx, conn, &subResp); err != nil {
  90. log.Fatalf("read subscribe response: %v", err)
  91. }
  92. if subResp.Type == "error" {
  93. log.Fatalf("subscribe failed: %s", subResp.Error)
  94. }
  95. log.Printf("subscribed: %s", describeSubscriptions(subscriptions))
  96. if *snapshot {
  97. req := model.ClientMessage{
  98. Type: "snapshot",
  99. Subscriptions: []model.Subscription{
  100. {DeviceID: firstSnapshotDeviceID(subscriptions, *deviceID)},
  101. },
  102. }
  103. if err := wsjson.Write(ctx, conn, req); err != nil {
  104. log.Fatalf("send snapshot: %v", err)
  105. }
  106. var resp model.ServerMessage
  107. if err := wsjson.Read(ctx, conn, &resp); err != nil {
  108. log.Fatalf("read snapshot response: %v", err)
  109. }
  110. if resp.Type == "snapshot" {
  111. log.Printf("snapshot: %s", compactJSON(resp.State))
  112. } else if resp.Type == "error" {
  113. log.Printf("snapshot unavailable: %s", resp.Error)
  114. }
  115. }
  116. for {
  117. var message model.ServerMessage
  118. if err := wsjson.Read(ctx, conn, &message); err != nil {
  119. log.Fatalf("read event: %v", err)
  120. }
  121. switch message.Type {
  122. case "event":
  123. log.Printf("event: %s", describeEnvelope(message.Envelope))
  124. case "error":
  125. log.Printf("server error: %s", message.Error)
  126. default:
  127. log.Printf("message: type=%s", message.Type)
  128. }
  129. }
  130. }
  131. func loadConsumerConfig(path string) (consumerConfig, error) {
  132. if strings.TrimSpace(path) == "" {
  133. return consumerConfig{}, nil
  134. }
  135. data, err := os.ReadFile(path)
  136. if err != nil {
  137. return consumerConfig{}, fmt.Errorf("read %s: %w", path, err)
  138. }
  139. var cfg consumerConfig
  140. if err := json.Unmarshal(data, &cfg); err != nil {
  141. return consumerConfig{}, fmt.Errorf("parse %s: %w", path, err)
  142. }
  143. return cfg, nil
  144. }
  145. func findConfigPath(args []string) string {
  146. for index := 0; index < len(args); index++ {
  147. arg := args[index]
  148. if strings.HasPrefix(arg, "-config=") {
  149. return strings.TrimSpace(strings.TrimPrefix(arg, "-config="))
  150. }
  151. if arg == "-config" && index+1 < len(args) {
  152. return strings.TrimSpace(args[index+1])
  153. }
  154. if strings.HasPrefix(arg, "--config=") {
  155. return strings.TrimSpace(strings.TrimPrefix(arg, "--config="))
  156. }
  157. if arg == "--config" && index+1 < len(args) {
  158. return strings.TrimSpace(args[index+1])
  159. }
  160. }
  161. return ""
  162. }
  163. func valueOr(value string, fallback string) string {
  164. if strings.TrimSpace(value) == "" {
  165. return fallback
  166. }
  167. return value
  168. }
  169. func boolValue(value *bool, fallback bool) bool {
  170. if value == nil {
  171. return fallback
  172. }
  173. return *value
  174. }
  175. func resolveSubscriptions(fileConfig consumerConfig, deviceID string, groupID string, topic string, topicsCSV string) []model.Subscription {
  176. if len(fileConfig.Subscriptions) > 0 && strings.TrimSpace(topicsCSV) == "" && strings.TrimSpace(topic) == valueOr(fileConfig.Topic, "telemetry.location") {
  177. return filterValidSubscriptions(fileConfig.Subscriptions)
  178. }
  179. topics := parseTopics(topicsCSV)
  180. if len(topics) == 0 {
  181. topics = []string{topic}
  182. }
  183. subscriptions := make([]model.Subscription, 0, len(topics))
  184. for _, entry := range topics {
  185. subscriptions = append(subscriptions, model.Subscription{
  186. DeviceID: deviceID,
  187. GroupID: groupID,
  188. Topic: entry,
  189. })
  190. }
  191. return filterValidSubscriptions(subscriptions)
  192. }
  193. func parseTopics(value string) []string {
  194. if strings.TrimSpace(value) == "" {
  195. return nil
  196. }
  197. parts := strings.Split(value, ",")
  198. topics := make([]string, 0, len(parts))
  199. for _, part := range parts {
  200. topic := strings.TrimSpace(part)
  201. if topic == "" {
  202. continue
  203. }
  204. topics = append(topics, topic)
  205. }
  206. return topics
  207. }
  208. func filterValidSubscriptions(items []model.Subscription) []model.Subscription {
  209. subscriptions := make([]model.Subscription, 0, len(items))
  210. for _, item := range items {
  211. if strings.TrimSpace(item.ChannelID) == "" && strings.TrimSpace(item.DeviceID) == "" && strings.TrimSpace(item.GroupID) == "" {
  212. continue
  213. }
  214. subscriptions = append(subscriptions, model.Subscription{
  215. ChannelID: strings.TrimSpace(item.ChannelID),
  216. DeviceID: strings.TrimSpace(item.DeviceID),
  217. GroupID: strings.TrimSpace(item.GroupID),
  218. Topic: strings.TrimSpace(item.Topic),
  219. })
  220. }
  221. return subscriptions
  222. }
  223. func describeSubscriptions(items []model.Subscription) string {
  224. parts := make([]string, 0, len(items))
  225. for _, item := range items {
  226. scope := item.DeviceID
  227. if scope == "" {
  228. if item.GroupID != "" {
  229. scope = "group:" + item.GroupID
  230. } else {
  231. scope = "all"
  232. }
  233. }
  234. if item.ChannelID != "" {
  235. scope = "channel:" + item.ChannelID + " / " + scope
  236. }
  237. if item.Topic != "" {
  238. scope += "/" + item.Topic
  239. }
  240. parts = append(parts, scope)
  241. }
  242. return strings.Join(parts, ", ")
  243. }
  244. func firstSnapshotDeviceID(items []model.Subscription, fallback string) string {
  245. for _, item := range items {
  246. if strings.TrimSpace(item.DeviceID) != "" {
  247. return item.DeviceID
  248. }
  249. }
  250. return fallback
  251. }
  252. func describeEnvelope(envelope *model.Envelope) string {
  253. if envelope == nil {
  254. return "{}"
  255. }
  256. switch envelope.Topic {
  257. case "telemetry.location":
  258. return describeLocationEnvelope(envelope)
  259. case "telemetry.heart_rate":
  260. return describeHeartRateEnvelope(envelope)
  261. default:
  262. return compactEnvelope(envelope)
  263. }
  264. }
  265. func describeLocationEnvelope(envelope *model.Envelope) string {
  266. var payload struct {
  267. Lat float64 `json:"lat"`
  268. Lng float64 `json:"lng"`
  269. Speed float64 `json:"speed"`
  270. Bearing float64 `json:"bearing"`
  271. Accuracy float64 `json:"accuracy"`
  272. CoordSystem string `json:"coordSystem"`
  273. }
  274. if err := json.Unmarshal(envelope.Payload, &payload); err != nil {
  275. return compactEnvelope(envelope)
  276. }
  277. return fmt.Sprintf(
  278. "gps device=%s lat=%.6f lng=%.6f speed=%.2f bearing=%.1f accuracy=%.1f coord=%s",
  279. envelope.Target.DeviceID,
  280. payload.Lat,
  281. payload.Lng,
  282. payload.Speed,
  283. payload.Bearing,
  284. payload.Accuracy,
  285. payload.CoordSystem,
  286. )
  287. }
  288. func describeHeartRateEnvelope(envelope *model.Envelope) string {
  289. var payload struct {
  290. BPM int `json:"bpm"`
  291. Confidence float64 `json:"confidence"`
  292. }
  293. if err := json.Unmarshal(envelope.Payload, &payload); err != nil {
  294. return compactEnvelope(envelope)
  295. }
  296. if payload.Confidence > 0 {
  297. return fmt.Sprintf("heart_rate device=%s bpm=%d confidence=%.2f", envelope.Target.DeviceID, payload.BPM, payload.Confidence)
  298. }
  299. return fmt.Sprintf("heart_rate device=%s bpm=%d", envelope.Target.DeviceID, payload.BPM)
  300. }
  301. func compactEnvelope(envelope *model.Envelope) string {
  302. if envelope == nil {
  303. return "{}"
  304. }
  305. data, err := json.Marshal(envelope)
  306. if err != nil {
  307. return "{}"
  308. }
  309. return compactJSON(data)
  310. }
  311. func compactJSON(data []byte) string {
  312. var value any
  313. if err := json.Unmarshal(data, &value); err != nil {
  314. return string(data)
  315. }
  316. compact, err := json.Marshal(value)
  317. if err != nil {
  318. return string(data)
  319. }
  320. return string(compact)
  321. }