session_store.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. package postgres
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "time"
  7. "github.com/jackc/pgx/v5"
  8. )
  9. type Session struct {
  10. ID string
  11. SessionPublicID string
  12. UserID string
  13. EventID string
  14. EventReleaseID string
  15. ReleasePublicID *string
  16. ConfigLabel *string
  17. ManifestURL *string
  18. ManifestChecksum *string
  19. DeviceKey string
  20. ClientType string
  21. AssignmentMode *string
  22. VariantID *string
  23. VariantName *string
  24. RouteCode *string
  25. Status string
  26. SessionTokenHash string
  27. SessionTokenExpiresAt time.Time
  28. LaunchedAt time.Time
  29. StartedAt *time.Time
  30. EndedAt *time.Time
  31. EventPublicID *string
  32. EventDisplayName *string
  33. }
  34. type FinishSessionParams struct {
  35. SessionID string
  36. Status string
  37. }
  38. func (s *Store) GetSessionByPublicID(ctx context.Context, sessionPublicID string) (*Session, error) {
  39. row := s.pool.QueryRow(ctx, `
  40. SELECT
  41. gs.id,
  42. gs.session_public_id,
  43. gs.user_id,
  44. gs.event_id,
  45. gs.event_release_id,
  46. er.release_public_id,
  47. er.config_label,
  48. er.manifest_url,
  49. er.manifest_checksum_sha256,
  50. gs.device_key,
  51. gs.client_type,
  52. gs.assignment_mode,
  53. gs.variant_id,
  54. gs.variant_name,
  55. gs.route_code,
  56. gs.status,
  57. gs.session_token_hash,
  58. gs.session_token_expires_at,
  59. gs.launched_at,
  60. gs.started_at,
  61. gs.ended_at,
  62. e.event_public_id,
  63. e.display_name
  64. FROM game_sessions gs
  65. JOIN events e ON e.id = gs.event_id
  66. JOIN event_releases er ON er.id = gs.event_release_id
  67. WHERE gs.session_public_id = $1
  68. LIMIT 1
  69. `, sessionPublicID)
  70. return scanSession(row)
  71. }
  72. func (s *Store) GetSessionByPublicIDForUpdate(ctx context.Context, tx Tx, sessionPublicID string) (*Session, error) {
  73. row := tx.QueryRow(ctx, `
  74. SELECT
  75. gs.id,
  76. gs.session_public_id,
  77. gs.user_id,
  78. gs.event_id,
  79. gs.event_release_id,
  80. er.release_public_id,
  81. er.config_label,
  82. er.manifest_url,
  83. er.manifest_checksum_sha256,
  84. gs.device_key,
  85. gs.client_type,
  86. gs.assignment_mode,
  87. gs.variant_id,
  88. gs.variant_name,
  89. gs.route_code,
  90. gs.status,
  91. gs.session_token_hash,
  92. gs.session_token_expires_at,
  93. gs.launched_at,
  94. gs.started_at,
  95. gs.ended_at,
  96. e.event_public_id,
  97. e.display_name
  98. FROM game_sessions gs
  99. JOIN events e ON e.id = gs.event_id
  100. JOIN event_releases er ON er.id = gs.event_release_id
  101. WHERE gs.session_public_id = $1
  102. FOR UPDATE
  103. `, sessionPublicID)
  104. return scanSession(row)
  105. }
  106. func (s *Store) ListSessionsByUserID(ctx context.Context, userID string, limit int) ([]Session, error) {
  107. if limit <= 0 || limit > 100 {
  108. limit = 20
  109. }
  110. rows, err := s.pool.Query(ctx, `
  111. SELECT
  112. gs.id,
  113. gs.session_public_id,
  114. gs.user_id,
  115. gs.event_id,
  116. gs.event_release_id,
  117. er.release_public_id,
  118. er.config_label,
  119. er.manifest_url,
  120. er.manifest_checksum_sha256,
  121. gs.device_key,
  122. gs.client_type,
  123. gs.assignment_mode,
  124. gs.variant_id,
  125. gs.variant_name,
  126. gs.route_code,
  127. gs.status,
  128. gs.session_token_hash,
  129. gs.session_token_expires_at,
  130. gs.launched_at,
  131. gs.started_at,
  132. gs.ended_at,
  133. e.event_public_id,
  134. e.display_name
  135. FROM game_sessions gs
  136. JOIN events e ON e.id = gs.event_id
  137. JOIN event_releases er ON er.id = gs.event_release_id
  138. WHERE gs.user_id = $1
  139. ORDER BY gs.created_at DESC
  140. LIMIT $2
  141. `, userID, limit)
  142. if err != nil {
  143. return nil, fmt.Errorf("list sessions by user id: %w", err)
  144. }
  145. defer rows.Close()
  146. var sessions []Session
  147. for rows.Next() {
  148. session, err := scanSessionFromRows(rows)
  149. if err != nil {
  150. return nil, err
  151. }
  152. sessions = append(sessions, *session)
  153. }
  154. if err := rows.Err(); err != nil {
  155. return nil, fmt.Errorf("iterate sessions by user id: %w", err)
  156. }
  157. return sessions, nil
  158. }
  159. func (s *Store) ListSessionsByUserAndEvent(ctx context.Context, userID, eventID string, limit int) ([]Session, error) {
  160. if limit <= 0 || limit > 100 {
  161. limit = 20
  162. }
  163. rows, err := s.pool.Query(ctx, `
  164. SELECT
  165. gs.id,
  166. gs.session_public_id,
  167. gs.user_id,
  168. gs.event_id,
  169. gs.event_release_id,
  170. er.release_public_id,
  171. er.config_label,
  172. er.manifest_url,
  173. er.manifest_checksum_sha256,
  174. gs.device_key,
  175. gs.client_type,
  176. gs.assignment_mode,
  177. gs.variant_id,
  178. gs.variant_name,
  179. gs.route_code,
  180. gs.status,
  181. gs.session_token_hash,
  182. gs.session_token_expires_at,
  183. gs.launched_at,
  184. gs.started_at,
  185. gs.ended_at,
  186. e.event_public_id,
  187. e.display_name
  188. FROM game_sessions gs
  189. JOIN events e ON e.id = gs.event_id
  190. JOIN event_releases er ON er.id = gs.event_release_id
  191. WHERE gs.user_id = $1
  192. AND gs.event_id = $2
  193. ORDER BY gs.created_at DESC
  194. LIMIT $3
  195. `, userID, eventID, limit)
  196. if err != nil {
  197. return nil, fmt.Errorf("list sessions by user and event: %w", err)
  198. }
  199. defer rows.Close()
  200. var sessions []Session
  201. for rows.Next() {
  202. session, err := scanSessionFromRows(rows)
  203. if err != nil {
  204. return nil, err
  205. }
  206. sessions = append(sessions, *session)
  207. }
  208. if err := rows.Err(); err != nil {
  209. return nil, fmt.Errorf("iterate sessions by user and event: %w", err)
  210. }
  211. return sessions, nil
  212. }
  213. func (s *Store) StartSession(ctx context.Context, tx Tx, sessionID string) error {
  214. _, err := tx.Exec(ctx, `
  215. UPDATE game_sessions
  216. SET status = CASE WHEN status = 'launched' THEN 'running' ELSE status END,
  217. started_at = COALESCE(started_at, NOW())
  218. WHERE id = $1
  219. `, sessionID)
  220. if err != nil {
  221. return fmt.Errorf("start session: %w", err)
  222. }
  223. return nil
  224. }
  225. func (s *Store) FinishSession(ctx context.Context, tx Tx, params FinishSessionParams) error {
  226. _, err := tx.Exec(ctx, `
  227. UPDATE game_sessions
  228. SET status = $2,
  229. started_at = COALESCE(started_at, NOW()),
  230. ended_at = COALESCE(ended_at, NOW())
  231. WHERE id = $1
  232. `, params.SessionID, params.Status)
  233. if err != nil {
  234. return fmt.Errorf("finish session: %w", err)
  235. }
  236. return nil
  237. }
  238. func scanSession(row pgx.Row) (*Session, error) {
  239. var session Session
  240. err := row.Scan(
  241. &session.ID,
  242. &session.SessionPublicID,
  243. &session.UserID,
  244. &session.EventID,
  245. &session.EventReleaseID,
  246. &session.ReleasePublicID,
  247. &session.ConfigLabel,
  248. &session.ManifestURL,
  249. &session.ManifestChecksum,
  250. &session.DeviceKey,
  251. &session.ClientType,
  252. &session.AssignmentMode,
  253. &session.VariantID,
  254. &session.VariantName,
  255. &session.RouteCode,
  256. &session.Status,
  257. &session.SessionTokenHash,
  258. &session.SessionTokenExpiresAt,
  259. &session.LaunchedAt,
  260. &session.StartedAt,
  261. &session.EndedAt,
  262. &session.EventPublicID,
  263. &session.EventDisplayName,
  264. )
  265. if errors.Is(err, pgx.ErrNoRows) {
  266. return nil, nil
  267. }
  268. if err != nil {
  269. return nil, fmt.Errorf("scan session: %w", err)
  270. }
  271. return &session, nil
  272. }
  273. func scanSessionFromRows(rows pgx.Rows) (*Session, error) {
  274. var session Session
  275. err := rows.Scan(
  276. &session.ID,
  277. &session.SessionPublicID,
  278. &session.UserID,
  279. &session.EventID,
  280. &session.EventReleaseID,
  281. &session.ReleasePublicID,
  282. &session.ConfigLabel,
  283. &session.ManifestURL,
  284. &session.ManifestChecksum,
  285. &session.DeviceKey,
  286. &session.ClientType,
  287. &session.AssignmentMode,
  288. &session.VariantID,
  289. &session.VariantName,
  290. &session.RouteCode,
  291. &session.Status,
  292. &session.SessionTokenHash,
  293. &session.SessionTokenExpiresAt,
  294. &session.LaunchedAt,
  295. &session.StartedAt,
  296. &session.EndedAt,
  297. &session.EventPublicID,
  298. &session.EventDisplayName,
  299. )
  300. if err != nil {
  301. return nil, fmt.Errorf("scan session row: %w", err)
  302. }
  303. return &session, nil
  304. }