Files
musicseerr/backend/services/navidrome_playback_service.py
Harvey 0f25ebc26d Plex Integration + Music Source Integration Improvements (#37)
* plex integration

* The big one - Full Music Source page rework + Playlist importing + Full Plex Integration + Discovery Options + More Like This/Surprise Me/Instant Mix + More...

* Music source track page - Play all / shuffle fixes

* lint

* format

* fix type checks

* format
2026-04-13 23:39:01 +01:00

83 lines
3.0 KiB
Python

from __future__ import annotations
import logging
import time
from fastapi.responses import Response, StreamingResponse
from infrastructure.cache.memory_cache import CacheInterface
from repositories.navidrome_models import StreamProxyResult
from repositories.protocols import NavidromeRepositoryProtocol
logger = logging.getLogger(__name__)
_playback_start_times: dict[str, float] = {}
class NavidromePlaybackService:
def __init__(
self,
navidrome_repo: NavidromeRepositoryProtocol,
cache: CacheInterface | None = None,
) -> None:
self._navidrome = navidrome_repo
self._cache = cache
def get_stream_url(self, song_id: str) -> str:
return self._navidrome.build_stream_url(song_id)
async def proxy_head(self, item_id: str) -> Response:
"""Proxy a HEAD request to Navidrome and return a FastAPI Response."""
result: StreamProxyResult = await self._navidrome.proxy_head_stream(item_id)
return Response(status_code=200, headers=result.headers)
async def proxy_stream(
self, item_id: str, range_header: str | None = None
) -> StreamingResponse:
"""Proxy a GET stream from Navidrome and return a FastAPI StreamingResponse."""
result: StreamProxyResult = await self._navidrome.proxy_get_stream(
item_id, range_header=range_header
)
return StreamingResponse(
content=result.body_chunks,
status_code=result.status_code,
headers=result.headers,
media_type=result.media_type,
)
async def scrobble(self, song_id: str) -> bool:
time_ms = int(time.time() * 1000)
try:
ok = await self._navidrome.scrobble(song_id, time_ms=time_ms)
_playback_start_times.pop(song_id, None)
if self._cache:
await self._cache.delete("navidrome:now_playing")
return ok
except Exception: # noqa: BLE001
logger.warning("Navidrome scrobble failed for %s", song_id, exc_info=True)
return False
async def report_now_playing(self, song_id: str) -> bool:
try:
ok = await self._navidrome.now_playing(song_id)
_playback_start_times[song_id] = time.time()
if self._cache:
await self._cache.delete("navidrome:now_playing")
return ok
except Exception: # noqa: BLE001
logger.warning("Navidrome now-playing failed for %s", song_id, exc_info=True)
return False
async def clear_now_playing(self, song_id: str) -> None:
"""Stop tracking estimated position for a song (player closed/paused)."""
_playback_start_times.pop(song_id, None)
if self._cache:
await self._cache.delete("navidrome:now_playing")
@staticmethod
def get_estimated_position(song_id: str) -> float | None:
started = _playback_start_times.get(song_id)
if started is None:
return None
return time.time() - started