observation_repository.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. import json
  2. from app.core.db import get_connection
  3. from app.schemas.common import ListResponse
  4. from app.schemas.observation import ObservationSummary
  5. class ObservationRepository:
  6. def save(self, observation: ObservationSummary) -> None:
  7. payload_json = json.dumps(observation.model_dump())
  8. with get_connection() as connection:
  9. connection.execute(
  10. """
  11. INSERT OR REPLACE INTO observations (id, payload_json)
  12. VALUES (?, ?)
  13. """,
  14. (observation.id, payload_json),
  15. )
  16. connection.commit()
  17. def get(self, observation_id: str) -> ObservationSummary | None:
  18. with get_connection() as connection:
  19. row = connection.execute(
  20. """
  21. SELECT payload_json
  22. FROM observations
  23. WHERE id = ?
  24. """,
  25. (observation_id,),
  26. ).fetchone()
  27. if row is None:
  28. return None
  29. return ObservationSummary.model_validate_json(row["payload_json"])
  30. def list(self) -> ListResponse[ObservationSummary]:
  31. with get_connection() as connection:
  32. rows = connection.execute(
  33. """
  34. SELECT payload_json
  35. FROM observations
  36. ORDER BY created_at DESC
  37. """
  38. ).fetchall()
  39. items = [
  40. ObservationSummary.model_validate_json(row["payload_json"]) for row in rows
  41. ]
  42. return ListResponse(items=items)