| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 |
- import json
- from app.core.db import get_connection
- from app.schemas.analysis import (
- AnalysisSessionListResponse,
- AnalysisSessionResponse,
- AnalysisSessionSnapshot,
- )
- class AnalysisRepository:
- def save_snapshot(self, snapshot: AnalysisSessionSnapshot) -> None:
- payload_json = json.dumps(snapshot.model_dump())
- with get_connection() as connection:
- connection.execute(
- """
- INSERT OR REPLACE INTO analysis_sessions (
- session_id,
- observation_id,
- status,
- payload_json
- )
- VALUES (?, ?, ?, ?)
- """,
- (
- snapshot.session_id,
- snapshot.observation_id,
- snapshot.status,
- payload_json,
- ),
- )
- connection.commit()
- def get_snapshot(self, session_id: str) -> AnalysisSessionSnapshot | None:
- with get_connection() as connection:
- row = connection.execute(
- """
- SELECT payload_json
- FROM analysis_sessions
- WHERE session_id = ?
- """,
- (session_id,),
- ).fetchone()
- if row is None:
- return None
- return AnalysisSessionSnapshot.model_validate_json(row["payload_json"])
- def list_sessions(self) -> AnalysisSessionListResponse:
- with get_connection() as connection:
- rows = connection.execute(
- """
- SELECT session_id, observation_id, status
- FROM analysis_sessions
- ORDER BY created_at DESC
- """
- ).fetchall()
- items = [
- AnalysisSessionResponse(
- session_id=row["session_id"],
- observation_id=row["observation_id"],
- status=row["status"],
- message="Session snapshot available.",
- )
- for row in rows
- ]
- return AnalysisSessionListResponse(items=items)
|