Files
Harvey a69a26852e Cut down unnecessary logging (#48)
* Cut down unnecessary logging

* fix format etc

* fix checks

* fix tests
2026-04-14 00:02:38 +01:00

747 lines
27 KiB
Python

from __future__ import annotations
import hashlib
import logging
import secrets
from typing import Any
from urllib.parse import urlencode
import httpx
import msgspec
from core.exceptions import ExternalServiceError, NavidromeApiError, NavidromeAuthError, NavidromeSubsonicError
from infrastructure.cache.cache_keys import NAVIDROME_PREFIX
from infrastructure.cache.memory_cache import CacheInterface
from infrastructure.resilience.retry import with_retry, CircuitBreaker
from repositories.navidrome_models import (
StreamProxyResult as StreamProxyResult,
SubsonicAlbum,
SubsonicAlbumInfo,
SubsonicArtist,
SubsonicArtistIndex,
SubsonicArtistInfo,
SubsonicGenre,
SubsonicLyrics,
SubsonicMusicFolder,
SubsonicNowPlayingEntry,
SubsonicPlaylist,
SubsonicSearchResult,
SubsonicSong,
parse_album,
parse_album_info,
parse_artist,
parse_artist_info,
parse_genre,
parse_lyrics,
parse_now_playing_entries,
parse_similar_songs,
parse_song,
parse_subsonic_response,
parse_top_songs,
)
from infrastructure.degradation import try_get_degradation_context
from infrastructure.integration_result import IntegrationResult
logger = logging.getLogger(__name__)
_SOURCE = "navidrome"
def _record_degradation(msg: str) -> None:
ctx = try_get_degradation_context()
if ctx is not None:
ctx.record(IntegrationResult.error(source=_SOURCE, msg=msg))
_navidrome_circuit_breaker = CircuitBreaker(
failure_threshold=5,
success_threshold=2,
timeout=60.0,
name="navidrome",
)
_DEFAULT_TTL_LIST = 300
_DEFAULT_TTL_SEARCH = 120
_DEFAULT_TTL_GENRES = 3600
_DEFAULT_TTL_DETAIL = 300
class NavidromeRepository:
def __init__(
self,
http_client: httpx.AsyncClient,
cache: CacheInterface,
) -> None:
self._client = http_client
self._cache = cache
self._url: str = ""
self._username: str = ""
self._password: str = ""
self._configured: bool = False
self._ttl_list: int = _DEFAULT_TTL_LIST
self._ttl_search: int = _DEFAULT_TTL_SEARCH
self._ttl_genres: int = _DEFAULT_TTL_GENRES
self._ttl_detail: int = _DEFAULT_TTL_DETAIL
def configure(self, url: str, username: str, password: str) -> None:
self._url = url.rstrip("/") if url else ""
self._username = username
self._password = password
self._configured = bool(self._url and self._username and self._password)
def is_configured(self) -> bool:
return self._configured
def configure_cache_ttls(
self,
*,
list_ttl: int | None = None,
search_ttl: int | None = None,
genres_ttl: int | None = None,
detail_ttl: int | None = None,
) -> None:
if list_ttl is not None:
self._ttl_list = list_ttl
if search_ttl is not None:
self._ttl_search = search_ttl
if genres_ttl is not None:
self._ttl_genres = genres_ttl
if detail_ttl is not None:
self._ttl_detail = detail_ttl
@staticmethod
def reset_circuit_breaker() -> None:
_navidrome_circuit_breaker.reset()
def _build_auth_params(self) -> dict[str, str]:
salt = secrets.token_hex(3)
token = hashlib.md5(
(self._password + salt).encode("utf-8")
).hexdigest()
return {
"u": self._username,
"t": token,
"s": salt,
"v": "1.16.1",
"c": "musicseerr",
"f": "json",
}
def build_stream_url(self, song_id: str) -> str:
"""Build a full stream URL for a song, including auth params."""
if not self._configured:
raise ValueError("Navidrome is not configured")
params = self._build_auth_params()
params["id"] = song_id
return f"{self._url}/rest/stream?{urlencode(params)}"
async def proxy_head_stream(self, song_id: str) -> StreamProxyResult:
"""HEAD proxy to Navidrome stream endpoint. Returns filtered headers."""
stream_url = self.build_stream_url(song_id)
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=10, read=10, write=10, pool=10)
) as client:
try:
resp = await client.head(stream_url)
except httpx.HTTPError:
raise ExternalServiceError("Failed to reach Navidrome")
headers: dict[str, str] = {}
for h in _PROXY_FORWARD_HEADERS:
v = resp.headers.get(h)
if v:
headers[h] = v
return StreamProxyResult(
status_code=resp.status_code,
headers=headers,
media_type=headers.get("Content-Type", "audio/mpeg"),
)
async def proxy_get_stream(
self, song_id: str, range_header: str | None = None
) -> StreamProxyResult:
"""GET streaming proxy to Navidrome. Returns chunked body iterator with cleanup."""
stream_url = self.build_stream_url(song_id)
upstream_headers: dict[str, str] = {}
if range_header:
upstream_headers["Range"] = range_header
client = httpx.AsyncClient(
timeout=httpx.Timeout(connect=10, read=120, write=30, pool=10)
)
upstream_resp = None
try:
upstream_resp = await client.send(
client.build_request("GET", stream_url, headers=upstream_headers),
stream=True,
)
if upstream_resp.status_code == 416:
raise ExternalServiceError("416 Range not satisfiable")
if upstream_resp.status_code >= 400:
logger.error(
"Navidrome upstream returned %d for %s",
upstream_resp.status_code, song_id,
)
raise ExternalServiceError("Navidrome returned an error")
resp_headers: dict[str, str] = {}
for header_name in _PROXY_FORWARD_HEADERS:
value = upstream_resp.headers.get(header_name)
if value:
resp_headers[header_name] = value
status_code = 206 if upstream_resp.status_code == 206 else 200
async def _stream_body() -> AsyncIterator[bytes]:
try:
async for chunk in upstream_resp.aiter_bytes(
chunk_size=_STREAM_CHUNK_SIZE
):
yield chunk
finally:
await upstream_resp.aclose()
await client.aclose()
return StreamProxyResult(
status_code=status_code,
headers=resp_headers,
media_type=resp_headers.get("Content-Type", "audio/mpeg"),
body_chunks=_stream_body(),
)
except Exception:
if upstream_resp:
await upstream_resp.aclose()
await client.aclose()
raise
@with_retry(
max_attempts=3,
base_delay=1.0,
max_delay=5.0,
circuit_breaker=_navidrome_circuit_breaker,
retriable_exceptions=(httpx.HTTPError, ExternalServiceError),
non_breaking_exceptions=(NavidromeSubsonicError,),
)
async def _request(
self,
endpoint: str,
params: dict[str, Any] | None = None,
) -> dict[str, Any]:
if not self._configured:
raise ExternalServiceError("Navidrome not configured")
merged = self._build_auth_params()
if params:
merged.update(params)
url = f"{self._url}{endpoint}"
try:
response = await self._client.get(url, params=merged, timeout=15.0)
except httpx.TimeoutException as exc:
raise ExternalServiceError(f"Navidrome request timed out: {exc}")
except httpx.HTTPError as exc:
raise ExternalServiceError(f"Navidrome request failed: {exc}")
if response.status_code in (401, 403):
raise NavidromeAuthError(
f"Navidrome authentication failed ({response.status_code})"
)
if response.status_code != 200:
raise NavidromeApiError(
f"Navidrome request failed ({response.status_code})",
)
try:
data: dict[str, Any] = response.json()
except Exception as exc:
raise NavidromeApiError(f"Navidrome returned invalid JSON for {endpoint}") from exc
return parse_subsonic_response(data)
async def ping(self) -> bool:
try:
await self._request("/rest/ping")
return True
except Exception: # noqa: BLE001
_record_degradation("Navidrome ping failed")
return False
async def get_album_list(
self,
type: str,
size: int = 20,
offset: int = 0,
genre: str | None = None,
from_year: int | None = None,
to_year: int | None = None,
) -> list[SubsonicAlbum]:
cache_key = f"{NAVIDROME_PREFIX}albums:{type}:{size}:{offset}:{genre or ''}:{from_year}:{to_year}"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
params: dict[str, Any] = {"type": type, "size": size, "offset": offset}
if genre and type == "byGenre":
params["genre"] = genre
if type == "byYear":
params["fromYear"] = from_year if from_year is not None else 0
params["toYear"] = to_year if to_year is not None else 9999
resp = await self._request(
"/rest/getAlbumList2",
params,
)
raw = resp.get("albumList2", {}).get("album", [])
albums = [parse_album(a) for a in raw]
await self._cache.set(cache_key, albums, self._ttl_list)
return albums
async def get_album(self, id: str) -> SubsonicAlbum:
cache_key = f"{NAVIDROME_PREFIX}album:{id}"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
resp = await self._request("/rest/getAlbum", {"id": id})
album = parse_album(resp.get("album", {}))
await self._cache.set(cache_key, album, self._ttl_detail)
return album
async def get_artists(self) -> list[SubsonicArtist]:
cache_key = "navidrome:artists"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
resp = await self._request("/rest/getArtists")
artists: list[SubsonicArtist] = []
for index in resp.get("artists", {}).get("index", []):
for a in index.get("artist", []):
artists.append(parse_artist(a))
await self._cache.set(cache_key, artists, self._ttl_list)
return artists
async def get_artist(self, id: str) -> SubsonicArtist:
cache_key = f"{NAVIDROME_PREFIX}artist:{id}"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
resp = await self._request("/rest/getArtist", {"id": id})
artist = parse_artist(resp.get("artist", {}))
await self._cache.set(cache_key, artist, self._ttl_detail)
return artist
async def get_song(self, id: str) -> SubsonicSong:
cache_key = f"{NAVIDROME_PREFIX}song:{id}"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
resp = await self._request("/rest/getSong", {"id": id})
song = parse_song(resp.get("song", {}))
await self._cache.set(cache_key, song, self._ttl_detail)
return song
async def search(
self,
query: str,
artist_count: int = 20,
album_count: int = 20,
song_count: int = 20,
) -> SubsonicSearchResult:
cache_key = f"{NAVIDROME_PREFIX}search:{query}:{artist_count}:{album_count}:{song_count}"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
resp = await self._request(
"/rest/search3",
{
"query": query,
"artistCount": artist_count,
"albumCount": album_count,
"songCount": song_count,
},
)
sr = resp.get("searchResult3", {})
result = SubsonicSearchResult(
artist=[parse_artist(a) for a in sr.get("artist", [])],
album=[parse_album(a) for a in sr.get("album", [])],
song=[parse_song(s) for s in sr.get("song", [])],
)
await self._cache.set(cache_key, result, self._ttl_search)
return result
async def get_starred(self) -> SubsonicSearchResult:
cache_key = "navidrome:starred"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
resp = await self._request("/rest/getStarred2")
sr = resp.get("starred2", {})
result = SubsonicSearchResult(
artist=[parse_artist(a) for a in sr.get("artist", [])],
album=[parse_album(a) for a in sr.get("album", [])],
song=[parse_song(s) for s in sr.get("song", [])],
)
await self._cache.set(cache_key, result, self._ttl_list)
return result
async def get_genres(self) -> list[SubsonicGenre]:
cache_key = "navidrome:genres"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
resp = await self._request("/rest/getGenres")
raw = resp.get("genres", {}).get("genre", [])
genres = [parse_genre(g) for g in raw]
await self._cache.set(cache_key, genres, self._ttl_genres)
return genres
async def get_artists_index(self) -> list[SubsonicArtistIndex]:
cache_key = "navidrome:artists_index"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
resp = await self._request("/rest/getArtists")
index_data: list[SubsonicArtistIndex] = []
for idx in resp.get("artists", {}).get("index", []):
artists = [parse_artist(a) for a in idx.get("artist", [])]
index_data.append(SubsonicArtistIndex(name=idx.get("name", ""), artists=artists))
await self._cache.set(cache_key, index_data, self._ttl_list)
return index_data
async def get_songs_by_genre(
self, genre: str, count: int = 50, offset: int = 0
) -> list[SubsonicSong]:
cache_key = f"navidrome:songs_by_genre:{genre}:{count}:{offset}"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
resp = await self._request(
"/rest/getSongsByGenre",
{"genre": genre, "count": count, "offset": offset},
)
raw = resp.get("songsByGenre", {}).get("song", [])
songs = [parse_song(s) for s in raw]
await self._cache.set(cache_key, songs, self._ttl_list)
return songs
async def search_songs(
self, query: str = "", count: int = 50, offset: int = 0
) -> list[SubsonicSong]:
cache_key = f"{NAVIDROME_PREFIX}songs_browse:{query}:{count}:{offset}"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
resp = await self._request(
"/rest/search3",
{
"query": query or '""',
"artistCount": 0,
"albumCount": 0,
"songCount": count,
"songOffset": offset,
},
)
sr = resp.get("searchResult3", {})
songs = [parse_song(s) for s in sr.get("song", [])]
await self._cache.set(cache_key, songs, self._ttl_list)
return songs
async def get_music_folders(self) -> list[SubsonicMusicFolder]:
cache_key = "navidrome:music_folders"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
resp = await self._request("/rest/getMusicFolders")
raw = resp.get("musicFolders", {}).get("musicFolder", [])
folders = [
SubsonicMusicFolder(id=str(f.get("id", "")), name=f.get("name", ""))
for f in raw
]
await self._cache.set(cache_key, folders, self._ttl_list)
return folders
async def get_playlists(self) -> list[SubsonicPlaylist]:
cache_key = "navidrome:playlists"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
resp = await self._request("/rest/getPlaylists")
raw = resp.get("playlists", {}).get("playlist", [])
playlists: list[SubsonicPlaylist] = []
for p in raw:
playlists.append(
SubsonicPlaylist(
id=p.get("id", ""),
name=p.get("name", ""),
songCount=p.get("songCount", 0),
duration=p.get("duration", 0),
)
)
await self._cache.set(cache_key, playlists, self._ttl_list)
return playlists
async def get_playlist(self, id: str) -> SubsonicPlaylist:
cache_key = f"{NAVIDROME_PREFIX}playlist:{id}"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
resp = await self._request("/rest/getPlaylist", {"id": id})
raw = resp.get("playlist", {})
entries = raw.get("entry", [])
playlist = SubsonicPlaylist(
id=raw.get("id", ""),
name=raw.get("name", ""),
songCount=raw.get("songCount", 0),
duration=raw.get("duration", 0),
entry=[parse_song(e) for e in entries] if entries else None,
)
await self._cache.set(cache_key, playlist, self._ttl_detail)
return playlist
async def get_random_songs(
self,
size: int = 20,
genre: str | None = None,
) -> list[SubsonicSong]:
params: dict[str, Any] = {"size": size}
if genre:
params["genre"] = genre
resp = await self._request("/rest/getRandomSongs", params)
raw = resp.get("randomSongs", {}).get("song", [])
return [parse_song(s) for s in raw]
async def scrobble(
self,
id: str,
time_ms: int | None = None,
) -> bool:
params: dict[str, Any] = {"id": id}
if time_ms is not None:
params["time"] = time_ms
try:
await self._request("/rest/scrobble", params)
return True
except Exception: # noqa: BLE001
_record_degradation("Navidrome scrobble failed")
return False
async def now_playing(self, id: str) -> bool:
params: dict[str, Any] = {"id": id, "submission": "false"}
try:
await self._request("/rest/scrobble", params)
return True
except Exception: # noqa: BLE001
_record_degradation("Navidrome now-playing report failed")
return False
async def get_now_playing(self) -> list[SubsonicNowPlayingEntry]:
if not self._configured:
return []
cache_key = f"{NAVIDROME_PREFIX}now_playing"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
try:
data = await self._request("/rest/getNowPlaying")
entries = parse_now_playing_entries(data)
await self._cache.set(cache_key, entries, 2)
return entries
except Exception: # noqa: BLE001
logger.warning("Failed to fetch Navidrome now-playing", exc_info=True)
_record_degradation("Navidrome getNowPlaying failed")
return []
async def get_top_songs(
self,
artist_name: str,
count: int = 20,
) -> list[SubsonicSong]:
if not self._configured:
return []
cache_key = f"{NAVIDROME_PREFIX}top_songs:{artist_name}:{count}"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
try:
data = await self._request("/rest/getTopSongs", {"artist": artist_name, "count": count})
songs = parse_top_songs(data)
await self._cache.set(cache_key, songs, 900)
return songs
except Exception: # noqa: BLE001
logger.debug("Navidrome getTopSongs returned empty for %s (Last.fm may not be configured)", artist_name)
_record_degradation("Navidrome getTopSongs failed")
return []
async def get_similar_songs(
self,
song_id: str,
count: int = 20,
) -> list[SubsonicSong]:
if not self._configured:
return []
cache_key = f"{NAVIDROME_PREFIX}similar_songs:{song_id}:{count}"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
try:
data = await self._request("/rest/getSimilarSongs2", {"id": song_id, "count": count})
songs = parse_similar_songs(data)
await self._cache.set(cache_key, songs, 900)
return songs
except Exception: # noqa: BLE001
logger.debug("Navidrome getSimilarSongs2 returned empty for %s", song_id)
_record_degradation("Navidrome getSimilarSongs2 failed")
return []
async def get_artist_info(self, artist_id: str) -> SubsonicArtistInfo | None:
if not self._configured:
return None
cache_key = f"{NAVIDROME_PREFIX}artist_info:{artist_id}"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
try:
data = await self._request("/rest/getArtistInfo2", {"id": artist_id})
info = parse_artist_info(data)
await self._cache.set(cache_key, info, 1800)
return info
except Exception: # noqa: BLE001
logger.debug("Navidrome getArtistInfo2 returned empty for %s", artist_id)
_record_degradation("Navidrome getArtistInfo2 failed")
return None
async def get_album_info(self, album_id: str) -> SubsonicAlbumInfo | None:
if not self._configured:
return None
cache_key = f"{NAVIDROME_PREFIX}album_info:{album_id}"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
try:
data = await self._request("/rest/getAlbumInfo2", {"id": album_id})
info = parse_album_info(data)
await self._cache.set(cache_key, info, 1800)
return info
except Exception: # noqa: BLE001
logger.debug("Navidrome getAlbumInfo2 returned empty for %s", album_id)
_record_degradation("Navidrome getAlbumInfo2 failed")
return None
async def get_lyrics(self, artist: str, title: str) -> SubsonicLyrics | None:
if not self._configured:
return None
cache_key = f"{NAVIDROME_PREFIX}lyrics:{artist}:{title}"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
try:
data = await self._request("/rest/getLyrics", {"artist": artist, "title": title})
lyrics = parse_lyrics(data)
if lyrics:
await self._cache.set(cache_key, lyrics, 3600)
return lyrics
except Exception: # noqa: BLE001
logger.debug("Navidrome getLyrics returned empty for %s - %s", artist, title)
return None
async def get_lyrics_by_song_id(self, song_id: str) -> SubsonicLyrics | None:
if not self._configured:
return None
cache_key = f"{NAVIDROME_PREFIX}lyrics_by_id:{song_id}"
cached = await self._cache.get(cache_key)
if cached is not None:
return cached
try:
from repositories.navidrome_models import SubsonicLyricLine
data = await self._request("/rest/getLyricsBySongId", {"id": song_id})
lyric_list = data.get("lyricsList", {}).get("structuredLyrics", [])
if not lyric_list:
return None
best = lyric_list[0]
raw_lines = best.get("line", [])
synced = best.get("synced", False)
lines = [
SubsonicLyricLine(
value=l.get("value", ""),
start=l.get("start") if synced else None,
)
for l in raw_lines
]
has_text = any(l.value.strip() for l in lines)
has_timing = any(l.start is not None for l in lines)
if not has_text and not has_timing:
return None
text = "\n".join(l.value for l in lines)
lyrics = SubsonicLyrics(value=text, lines=lines, is_synced=synced)
await self._cache.set(cache_key, lyrics, 3600)
return lyrics
except Exception: # noqa: BLE001
logger.debug("Navidrome getLyricsBySongId not supported or empty for %s", song_id)
return None
async def validate_connection(self) -> tuple[bool, str]:
if not self._configured:
return False, "Navidrome URL, username, or password not configured"
try:
resp = await self._request("/rest/ping")
version = resp.get("version", "unknown")
return True, f"Connected to Navidrome (API v{version})"
except NavidromeAuthError as exc:
return False, f"Authentication failed: {exc.message}"
except ExternalServiceError as exc:
msg = str(exc)
if "timed out" in msg.lower():
return False, "Connection timed out - check URL"
if "connect" in msg.lower() or "refused" in msg.lower():
return False, "Could not connect - check URL and ensure server is running"
return False, f"Connection failed: {msg}"
except Exception as exc: # noqa: BLE001
return False, f"Connection failed: {exc}"
async def clear_cache(self) -> None:
await self._cache.clear_prefix(NAVIDROME_PREFIX)
async def get_cover_art(self, cover_art_id: str, size: int = 500) -> tuple[bytes, str]:
if not self._configured:
raise ExternalServiceError("Navidrome not configured")
params = self._build_auth_params()
params["id"] = cover_art_id
params["size"] = str(size)
url = f"{self._url}/rest/getCoverArt"
try:
response = await self._client.get(url, params=params, timeout=15.0)
except httpx.TimeoutException:
raise ExternalServiceError("Navidrome cover art request timed out")
except httpx.HTTPError:
raise ExternalServiceError("Navidrome cover art request failed")
if response.status_code != 200:
raise ExternalServiceError(
f"Navidrome cover art failed ({response.status_code})"
)
content_type = response.headers.get("content-type", "image/jpeg")
return response.content, content_type
_PROXY_FORWARD_HEADERS = {"Content-Type", "Content-Length", "Content-Range", "Accept-Ranges"}
_STREAM_CHUNK_SIZE = 64 * 1024