observation_service.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. from pathlib import Path
  2. from uuid import uuid4
  3. from app.core.config import get_settings
  4. from app.repositories.observation_repository import ObservationRepository
  5. from app.schemas.common import ListResponse
  6. from app.schemas.observation import ObservationCreateRequest, ObservationSummary
  7. class ObservationService:
  8. def __init__(self) -> None:
  9. self.repository = ObservationRepository()
  10. def create_observation(
  11. self,
  12. payload: ObservationCreateRequest,
  13. ) -> ObservationSummary:
  14. observation = ObservationSummary(
  15. id=str(uuid4()),
  16. modality=payload.modality,
  17. duration_ms=payload.duration_ms,
  18. sample_rate=payload.sample_rate,
  19. channels=payload.channels,
  20. tags=payload.tags,
  21. capture_metadata=payload.capture_metadata,
  22. )
  23. self.repository.save(observation)
  24. return observation
  25. def create_observation_from_upload(
  26. self,
  27. *,
  28. filename: str,
  29. content: bytes,
  30. duration_ms: int,
  31. sample_rate: int,
  32. channels: int,
  33. tags: list[str],
  34. capture_metadata: dict[str, object],
  35. ) -> ObservationSummary:
  36. observation_id = str(uuid4())
  37. suffix = Path(filename).suffix or ".bin"
  38. upload_name = f"{observation_id}{suffix}"
  39. settings = get_settings()
  40. upload_dir = Path(settings.upload_dir)
  41. upload_dir.mkdir(parents=True, exist_ok=True)
  42. stored_path = upload_dir / upload_name
  43. stored_path.write_bytes(content)
  44. metadata = dict(capture_metadata)
  45. metadata.update(
  46. {
  47. "source": metadata.get("source", "upload"),
  48. "original_filename": filename,
  49. "stored_filename": upload_name,
  50. "stored_path": str(stored_path),
  51. "file_size_bytes": len(content),
  52. }
  53. )
  54. observation = ObservationSummary(
  55. id=observation_id,
  56. modality="audio",
  57. duration_ms=duration_ms,
  58. sample_rate=sample_rate,
  59. channels=channels,
  60. tags=tags,
  61. capture_metadata=metadata,
  62. )
  63. self.repository.save(observation)
  64. return observation
  65. def get_observation(self, observation_id: str) -> ObservationSummary | None:
  66. return self.repository.get(observation_id)
  67. def list_observations(self) -> ListResponse[ObservationSummary]:
  68. return self.repository.list()