| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647 |
- import json
- from app.core.db import get_connection
- from app.schemas.common import ListResponse
- from app.schemas.observation import ObservationSummary
- class ObservationRepository:
- def save(self, observation: ObservationSummary) -> None:
- payload_json = json.dumps(observation.model_dump())
- with get_connection() as connection:
- connection.execute(
- """
- INSERT OR REPLACE INTO observations (id, payload_json)
- VALUES (?, ?)
- """,
- (observation.id, payload_json),
- )
- connection.commit()
- def get(self, observation_id: str) -> ObservationSummary | None:
- with get_connection() as connection:
- row = connection.execute(
- """
- SELECT payload_json
- FROM observations
- WHERE id = ?
- """,
- (observation_id,),
- ).fetchone()
- if row is None:
- return None
- return ObservationSummary.model_validate_json(row["payload_json"])
- def list(self) -> ListResponse[ObservationSummary]:
- with get_connection() as connection:
- rows = connection.execute(
- """
- SELECT payload_json
- FROM observations
- ORDER BY created_at DESC
- """
- ).fetchall()
- items = [
- ObservationSummary.model_validate_json(row["payload_json"]) for row in rows
- ]
- return ListResponse(items=items)
|