| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 |
- import sqlite3
- from contextlib import contextmanager
- from pathlib import Path
- from typing import Iterator
- from app.core.config import get_settings
- def _ensure_parent_dir(db_path: str) -> None:
- Path(db_path).parent.mkdir(parents=True, exist_ok=True)
- def initialize_database() -> None:
- settings = get_settings()
- _ensure_parent_dir(settings.sqlite_path)
- Path(settings.upload_dir).mkdir(parents=True, exist_ok=True)
- with sqlite3.connect(settings.sqlite_path) as connection:
- connection.execute(
- """
- CREATE TABLE IF NOT EXISTS observations (
- id TEXT PRIMARY KEY,
- payload_json TEXT NOT NULL,
- created_at TEXT DEFAULT CURRENT_TIMESTAMP
- )
- """
- )
- connection.execute(
- """
- CREATE TABLE IF NOT EXISTS analysis_sessions (
- session_id TEXT PRIMARY KEY,
- observation_id TEXT NOT NULL,
- status TEXT NOT NULL,
- payload_json TEXT NOT NULL,
- created_at TEXT DEFAULT CURRENT_TIMESTAMP
- )
- """
- )
- connection.commit()
- @contextmanager
- def get_connection() -> Iterator[sqlite3.Connection]:
- settings = get_settings()
- _ensure_parent_dir(settings.sqlite_path)
- connection = sqlite3.connect(settings.sqlite_path)
- connection.row_factory = sqlite3.Row
- try:
- yield connection
- finally:
- connection.close()
|