db.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. import sqlite3
  2. from contextlib import contextmanager
  3. from pathlib import Path
  4. from typing import Iterator
  5. from app.core.config import get_settings
  6. def _ensure_parent_dir(db_path: str) -> None:
  7. Path(db_path).parent.mkdir(parents=True, exist_ok=True)
  8. def initialize_database() -> None:
  9. settings = get_settings()
  10. _ensure_parent_dir(settings.sqlite_path)
  11. Path(settings.upload_dir).mkdir(parents=True, exist_ok=True)
  12. with sqlite3.connect(settings.sqlite_path) as connection:
  13. connection.execute(
  14. """
  15. CREATE TABLE IF NOT EXISTS observations (
  16. id TEXT PRIMARY KEY,
  17. payload_json TEXT NOT NULL,
  18. created_at TEXT DEFAULT CURRENT_TIMESTAMP
  19. )
  20. """
  21. )
  22. connection.execute(
  23. """
  24. CREATE TABLE IF NOT EXISTS analysis_sessions (
  25. session_id TEXT PRIMARY KEY,
  26. observation_id TEXT NOT NULL,
  27. status TEXT NOT NULL,
  28. payload_json TEXT NOT NULL,
  29. created_at TEXT DEFAULT CURRENT_TIMESTAMP
  30. )
  31. """
  32. )
  33. connection.commit()
  34. @contextmanager
  35. def get_connection() -> Iterator[sqlite3.Connection]:
  36. settings = get_settings()
  37. _ensure_parent_dir(settings.sqlite_path)
  38. connection = sqlite3.connect(settings.sqlite_path)
  39. connection.row_factory = sqlite3.Row
  40. try:
  41. yield connection
  42. finally:
  43. connection.close()