analysis_repository.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. import json
  2. from app.core.db import get_connection
  3. from app.schemas.analysis import (
  4. AnalysisSessionListResponse,
  5. AnalysisSessionResponse,
  6. AnalysisSessionSnapshot,
  7. )
  8. class AnalysisRepository:
  9. def save_snapshot(self, snapshot: AnalysisSessionSnapshot) -> None:
  10. payload_json = json.dumps(snapshot.model_dump())
  11. with get_connection() as connection:
  12. connection.execute(
  13. """
  14. INSERT OR REPLACE INTO analysis_sessions (
  15. session_id,
  16. observation_id,
  17. status,
  18. payload_json
  19. )
  20. VALUES (?, ?, ?, ?)
  21. """,
  22. (
  23. snapshot.session_id,
  24. snapshot.observation_id,
  25. snapshot.status,
  26. payload_json,
  27. ),
  28. )
  29. connection.commit()
  30. def get_snapshot(self, session_id: str) -> AnalysisSessionSnapshot | None:
  31. with get_connection() as connection:
  32. row = connection.execute(
  33. """
  34. SELECT payload_json
  35. FROM analysis_sessions
  36. WHERE session_id = ?
  37. """,
  38. (session_id,),
  39. ).fetchone()
  40. if row is None:
  41. return None
  42. return AnalysisSessionSnapshot.model_validate_json(row["payload_json"])
  43. def list_sessions(self) -> AnalysisSessionListResponse:
  44. with get_connection() as connection:
  45. rows = connection.execute(
  46. """
  47. SELECT session_id, observation_id, status
  48. FROM analysis_sessions
  49. ORDER BY created_at DESC
  50. """
  51. ).fetchall()
  52. items = [
  53. AnalysisSessionResponse(
  54. session_id=row["session_id"],
  55. observation_id=row["observation_id"],
  56. status=row["status"],
  57. message="Session snapshot available.",
  58. )
  59. for row in rows
  60. ]
  61. return AnalysisSessionListResponse(items=items)