import json from app.core.db import get_connection from app.schemas.analysis import ( AnalysisSessionListResponse, AnalysisSessionResponse, AnalysisSessionSnapshot, ) class AnalysisRepository: def save_snapshot(self, snapshot: AnalysisSessionSnapshot) -> None: payload_json = json.dumps(snapshot.model_dump()) with get_connection() as connection: connection.execute( """ INSERT OR REPLACE INTO analysis_sessions ( session_id, observation_id, status, payload_json ) VALUES (?, ?, ?, ?) """, ( snapshot.session_id, snapshot.observation_id, snapshot.status, payload_json, ), ) connection.commit() def get_snapshot(self, session_id: str) -> AnalysisSessionSnapshot | None: with get_connection() as connection: row = connection.execute( """ SELECT payload_json FROM analysis_sessions WHERE session_id = ? """, (session_id,), ).fetchone() if row is None: return None return AnalysisSessionSnapshot.model_validate_json(row["payload_json"]) def list_sessions(self) -> AnalysisSessionListResponse: with get_connection() as connection: rows = connection.execute( """ SELECT session_id, observation_id, status FROM analysis_sessions ORDER BY created_at DESC """ ).fetchall() items = [ AnalysisSessionResponse( session_id=row["session_id"], observation_id=row["observation_id"], status=row["status"], message="Session snapshot available.", ) for row in rows ] return AnalysisSessionListResponse(items=items)