Files
musicseerr/backend/services/artist_discovery_service.py
Harvey a69a26852e Cut down unnecessary logging (#48)
* Cut down unnecessary logging

* fix format etc

* fix checks

* fix tests
2026-04-14 00:02:38 +01:00

699 lines
28 KiB
Python

import asyncio
import logging
from typing import Any, Literal, Optional
from api.v1.schemas.discovery import (
SimilarArtist,
SimilarArtistsResponse,
TopSong,
TopSongsResponse,
TopAlbum,
TopAlbumsResponse,
)
from repositories.protocols import ListenBrainzRepositoryProtocol, LastFmRepositoryProtocol, MusicBrainzRepositoryProtocol, LidarrRepositoryProtocol
from infrastructure.cache.memory_cache import CacheInterface
from infrastructure.persistence import LibraryDB
from infrastructure.resilience.retry import CircuitOpenError
from services.preferences_service import PreferencesService
logger = logging.getLogger(__name__)
DISCOVERY_CACHE_TTL_LIBRARY = 21600
DISCOVERY_CACHE_TTL_NON_LIBRARY = 3600
DISCOVERY_EMPTY_CACHE_TTL = 600
CIRCUIT_OPEN_CACHE_TTL = 30
DEFAULT_SIMILAR_COUNT = 15
DEFAULT_TOP_SONGS_COUNT = 10
DEFAULT_TOP_ALBUMS_COUNT = 10
_DISCOVERY_WORKER_TIMEOUT = 120
# Module-level flag survives singleton cache invalidation / instance recreation
_discovery_precache_running = False
class ArtistDiscoveryService:
def __init__(
self,
listenbrainz_repo: ListenBrainzRepositoryProtocol,
musicbrainz_repo: MusicBrainzRepositoryProtocol,
library_db: LibraryDB,
lidarr_repo: LidarrRepositoryProtocol,
memory_cache: CacheInterface,
lastfm_repo: Optional[LastFmRepositoryProtocol] = None,
preferences_service: Optional[PreferencesService] = None,
):
self._lb_repo = listenbrainz_repo
self._mb_repo = musicbrainz_repo
self._library_db = library_db
self._lidarr_repo = lidarr_repo
self._cache = memory_cache
self._lastfm_repo = lastfm_repo
self._preferences_service = preferences_service
def _resolve_source(
self, source: Literal["listenbrainz", "lastfm"] | None
) -> Literal["listenbrainz", "lastfm"]:
if source in ("listenbrainz", "lastfm"):
resolved: Literal["listenbrainz", "lastfm"] = source
elif self._preferences_service:
preferred = self._preferences_service.get_primary_music_source().source
resolved = preferred if preferred in ("listenbrainz", "lastfm") else "listenbrainz"
else:
resolved = "listenbrainz"
return resolved
def _get_discovery_ttl(self, in_library: bool) -> int:
if self._preferences_service:
try:
advanced_settings = self._preferences_service.get_advanced_settings()
return (
advanced_settings.cache_ttl_artist_discovery_library
if in_library
else advanced_settings.cache_ttl_artist_discovery_non_library
)
except AttributeError:
logger.debug("Artist discovery advanced settings unavailable", exc_info=True)
return DISCOVERY_CACHE_TTL_LIBRARY if in_library else DISCOVERY_CACHE_TTL_NON_LIBRARY
def _get_empty_discovery_ttl(self) -> int:
return DISCOVERY_EMPTY_CACHE_TTL
def _build_cache_key(
self,
category: Literal["similar", "top_songs", "top_albums"],
artist_mbid: str,
count: int,
source: str,
) -> str:
return f"artist_discovery:{category}:{artist_mbid}:{count}:{source}"
async def get_similar_artists(
self,
artist_mbid: str,
count: int = 15,
source: Literal["listenbrainz", "lastfm"] | None = None,
) -> SimilarArtistsResponse:
effective_source = self._resolve_source(source)
cache_key = self._build_cache_key("similar", artist_mbid, count, effective_source)
cached = await self._cache.get(cache_key)
if cached:
return cached
if effective_source == "lastfm":
try:
result = await self._get_similar_artists_lastfm(artist_mbid, count)
except Exception as e: # noqa: BLE001
logger.warning("Failed to get Last.fm similar artists for %s: %s", artist_mbid[:8], e)
result = SimilarArtistsResponse(similar_artists=[], source="lastfm")
elif not self._lb_repo.is_configured():
return SimilarArtistsResponse(configured=False)
else:
try:
similar = await self._lb_repo.get_similar_artists(artist_mbid, max_similar=count)
library_artist_mbids = await self._library_db.get_all_artist_mbids()
artists = [
SimilarArtist(
musicbrainz_id=a.artist_mbid,
name=a.artist_name,
listen_count=a.listen_count,
in_library=a.artist_mbid in library_artist_mbids,
)
for a in similar[:count]
]
result = SimilarArtistsResponse(similar_artists=artists)
except CircuitOpenError:
logger.warning("Circuit open for similar artists %s, using short TTL", artist_mbid[:8])
result = SimilarArtistsResponse(similar_artists=[])
await self._cache.set(cache_key, result, ttl_seconds=CIRCUIT_OPEN_CACHE_TTL)
return result
except Exception as e: # noqa: BLE001
logger.warning("Failed to get similar artists for %s: %s(%s)", artist_mbid[:8], type(e).__name__, e)
result = SimilarArtistsResponse(similar_artists=[])
in_library = await self._is_library_artist(artist_mbid)
ttl = (
self._get_discovery_ttl(in_library)
if result.similar_artists
else self._get_empty_discovery_ttl()
)
await self._cache.set(cache_key, result, ttl_seconds=ttl)
return result
async def get_top_songs(
self,
artist_mbid: str,
count: int = 10,
source: Literal["listenbrainz", "lastfm"] | None = None,
) -> TopSongsResponse:
effective_source = self._resolve_source(source)
cache_key = self._build_cache_key("top_songs", artist_mbid, count, effective_source)
cached = await self._cache.get(cache_key)
if cached:
return cached
if effective_source == "lastfm":
try:
result = await self._get_top_songs_lastfm(artist_mbid, count)
except Exception as e: # noqa: BLE001
logger.warning("Failed to get Last.fm top songs for %s: %s", artist_mbid[:8], e)
result = TopSongsResponse(songs=[], source="lastfm")
elif not self._lb_repo.is_configured():
return TopSongsResponse(configured=False)
else:
try:
recordings = await self._lb_repo.get_artist_top_recordings(artist_mbid, count=count)
release_ids = [r.release_mbid for r in recordings if r.release_mbid]
rg_map = await self._resolve_release_groups(release_ids)
songs = []
for r in recordings[:count]:
disc_number = None
track_number = None
if r.release_mbid and r.recording_mbid:
pos = await self._mb_repo.get_recording_position_on_release(
r.release_mbid, r.recording_mbid
)
if pos:
disc_number, track_number = pos
songs.append(TopSong(
recording_mbid=r.recording_mbid,
title=r.track_name,
artist_name=r.artist_name,
release_group_mbid=rg_map.get(r.release_mbid) if r.release_mbid else None,
original_release_mbid=r.release_mbid,
release_name=r.release_name,
listen_count=r.listen_count,
disc_number=disc_number,
track_number=track_number,
))
result = TopSongsResponse(songs=songs)
except CircuitOpenError:
logger.warning("Circuit open for top songs %s, using short TTL", artist_mbid[:8])
result = TopSongsResponse(songs=[])
await self._cache.set(cache_key, result, ttl_seconds=CIRCUIT_OPEN_CACHE_TTL)
return result
except Exception as e: # noqa: BLE001
logger.warning("Failed to get top songs for %s: %s(%s)", artist_mbid[:8], type(e).__name__, e)
result = TopSongsResponse(songs=[])
in_library = await self._is_library_artist(artist_mbid)
ttl = (
self._get_discovery_ttl(in_library)
if result.songs
else self._get_empty_discovery_ttl()
)
await self._cache.set(cache_key, result, ttl_seconds=ttl)
return result
async def get_top_albums(
self,
artist_mbid: str,
count: int = 10,
source: Literal["listenbrainz", "lastfm"] | None = None,
) -> TopAlbumsResponse:
effective_source = self._resolve_source(source)
cache_key = self._build_cache_key("top_albums", artist_mbid, count, effective_source)
cached = await self._cache.get(cache_key)
if cached:
return cached
if effective_source == "lastfm":
try:
result = await self._get_top_albums_lastfm(artist_mbid, count)
except Exception as e: # noqa: BLE001
logger.warning("Failed to get Last.fm top albums for %s: %s", artist_mbid[:8], e)
result = TopAlbumsResponse(albums=[], source="lastfm")
elif not self._lb_repo.is_configured():
return TopAlbumsResponse(configured=False)
else:
try:
release_groups = await self._lb_repo.get_artist_top_release_groups(artist_mbid, count=count)
if not release_groups:
fallback_albums = await self._get_top_albums_from_recordings_fallback(
artist_mbid, count
)
result = TopAlbumsResponse(albums=fallback_albums)
else:
try:
library_album_mbids, requested_album_mbids = await asyncio.gather(
self._lidarr_repo.get_library_mbids(),
self._lidarr_repo.get_requested_mbids(),
)
except Exception as e: # noqa: BLE001
logger.warning(
"Failed to load Lidarr album MBIDs for %s: %s(%s)",
artist_mbid[:8],
type(e).__name__,
e,
)
library_album_mbids, requested_album_mbids = set(), set()
library_album_mbids = {
mbid.lower() for mbid in library_album_mbids if isinstance(mbid, str)
}
requested_album_mbids = {
mbid.lower() for mbid in requested_album_mbids if isinstance(mbid, str)
}
albums = [
TopAlbum(
release_group_mbid=rg.release_group_mbid,
title=rg.release_group_name,
artist_name=rg.artist_name,
listen_count=rg.listen_count,
in_library=rg.release_group_mbid.lower() in library_album_mbids if rg.release_group_mbid else False,
requested=rg.release_group_mbid.lower() in requested_album_mbids if rg.release_group_mbid else False,
)
for rg in release_groups
]
result = TopAlbumsResponse(albums=albums)
except CircuitOpenError:
logger.warning("Circuit open for top albums %s, using short TTL", artist_mbid[:8])
result = TopAlbumsResponse(albums=[])
await self._cache.set(cache_key, result, ttl_seconds=CIRCUIT_OPEN_CACHE_TTL)
return result
except Exception as e: # noqa: BLE001
logger.warning("Failed to get top albums for %s: %s(%s)", artist_mbid[:8], type(e).__name__, e)
try:
fallback_albums = await self._get_top_albums_from_recordings_fallback(
artist_mbid, count
)
result = TopAlbumsResponse(albums=fallback_albums)
except Exception as fallback_error: # noqa: BLE001
logger.warning(
"Top albums fallback from recordings failed for %s: %s(%s)",
artist_mbid[:8],
type(fallback_error).__name__,
fallback_error,
)
result = TopAlbumsResponse(albums=[])
in_library = await self._is_library_artist(artist_mbid)
empty_ttl = (
60
if effective_source == "listenbrainz"
else self._get_empty_discovery_ttl()
)
ttl = (
self._get_discovery_ttl(in_library)
if result.albums
else empty_ttl
)
await self._cache.set(cache_key, result, ttl_seconds=ttl)
return result
async def _get_top_albums_from_recordings_fallback(
self,
artist_mbid: str,
count: int,
) -> list[TopAlbum]:
recordings = await self._lb_repo.get_artist_top_recordings(
artist_mbid,
count=max(count * 8, 80),
)
if not recordings:
return []
try:
library_album_mbids, requested_album_mbids = await asyncio.gather(
self._lidarr_repo.get_library_mbids(),
self._lidarr_repo.get_requested_mbids(),
)
except Exception as e: # noqa: BLE001
logger.warning(
"Fallback Lidarr album MBID load failed for %s: %s(%s)",
artist_mbid[:8],
type(e).__name__,
e,
)
library_album_mbids, requested_album_mbids = set(), set()
library_album_mbids = {
mbid.lower() for mbid in library_album_mbids if isinstance(mbid, str)
}
requested_album_mbids = {
mbid.lower() for mbid in requested_album_mbids if isinstance(mbid, str)
}
release_ids = [r.release_mbid for r in recordings if r.release_mbid]
rg_map = await self._resolve_release_groups(release_ids) if release_ids else {}
aggregated: dict[str, dict[str, str | int | None]] = {}
for recording in recordings:
release_title = (recording.release_name or "").strip()
raw_release_mbid = (
recording.release_mbid.strip().lower()
if recording.release_mbid and recording.release_mbid.strip()
else None
)
resolved_release_group_mbid = (
rg_map.get(raw_release_mbid, raw_release_mbid) if raw_release_mbid else None
)
key = resolved_release_group_mbid or (f"name:{release_title.lower()}" if release_title else None)
if not key:
continue
if key not in aggregated:
aggregated[key] = {
"title": release_title or "Unknown",
"artist_name": recording.artist_name,
"listen_count": 0,
"release_group_mbid": resolved_release_group_mbid,
}
aggregated[key]["listen_count"] = int(aggregated[key]["listen_count"]) + int(
recording.listen_count
)
sorted_albums = sorted(
aggregated.values(),
key=lambda album: int(album["listen_count"]),
reverse=True,
)[:count]
return [
TopAlbum(
release_group_mbid=album["release_group_mbid"] if isinstance(album["release_group_mbid"], str) else None,
title=str(album["title"]),
artist_name=str(album["artist_name"]),
listen_count=int(album["listen_count"]),
in_library=(
isinstance(album["release_group_mbid"], str)
and album["release_group_mbid"] in library_album_mbids
),
requested=(
isinstance(album["release_group_mbid"], str)
and album["release_group_mbid"] in requested_album_mbids
),
)
for album in sorted_albums
]
async def _is_library_artist(self, artist_mbid: str) -> bool:
try:
library_artist_mbids = await self._library_db.get_all_artist_mbids()
return artist_mbid in library_artist_mbids
except Exception: # noqa: BLE001
return False
async def precache_artist_discovery(
self,
artist_mbids: list[str],
delay: float = 0.5,
status_service: Any = None,
mbid_to_name: dict[str, str] | None = None,
generation: int = 0,
) -> int:
global _discovery_precache_running
if _discovery_precache_running:
return 0
_discovery_precache_running = True
try:
return await self._do_precache_artist_discovery(
artist_mbids, delay=delay,
status_service=status_service, mbid_to_name=mbid_to_name,
generation=generation,
)
finally:
_discovery_precache_running = False
async def _do_precache_artist_discovery(
self,
artist_mbids: list[str],
delay: float = 0.5,
status_service: Any = None,
mbid_to_name: dict[str, str] | None = None,
generation: int = 0,
) -> int:
sources: list[Literal["listenbrainz", "lastfm"]] = []
if self._lb_repo.is_configured():
sources.append("listenbrainz")
if (
self._lastfm_repo
and self._preferences_service
and self._preferences_service.is_lastfm_enabled()
):
sources.append("lastfm")
if not sources:
logger.debug("Skipping discovery pre-cache: no configured source")
return 0
cached_count = 0
source_fetches = 0
advanced = self._preferences_service.get_advanced_settings() if self._preferences_service else None
discovery_concurrency = getattr(advanced, 'artist_discovery_precache_concurrency', 5) if advanced else 5
sem = asyncio.Semaphore(discovery_concurrency)
counter_lock = asyncio.Lock()
progress_counter = 0
counted_workers: set[int] = set()
async def process_artist(idx: int, mbid: str) -> bool:
nonlocal cached_count, source_fetches, progress_counter
try:
async with sem:
for source in sources:
similar_key = self._build_cache_key(
"similar", mbid, DEFAULT_SIMILAR_COUNT, source
)
songs_key = self._build_cache_key(
"top_songs", mbid, DEFAULT_TOP_SONGS_COUNT, source
)
albums_key = self._build_cache_key(
"top_albums", mbid, DEFAULT_TOP_ALBUMS_COUNT, source
)
has_all = (
await self._cache.get(similar_key) is not None
and await self._cache.get(songs_key) is not None
and await self._cache.get(albums_key) is not None
)
if has_all:
continue
results = await asyncio.gather(
self.get_similar_artists(
mbid, count=DEFAULT_SIMILAR_COUNT, source=source
),
self.get_top_songs(
mbid, count=DEFAULT_TOP_SONGS_COUNT, source=source
),
self.get_top_albums(
mbid, count=DEFAULT_TOP_ALBUMS_COUNT, source=source
),
return_exceptions=True,
)
errors = [r for r in results if isinstance(r, Exception)]
if errors:
logger.debug("Discovery precache errors for %s: %s", mbid[:8], errors)
async with counter_lock:
source_fetches += 1
if delay > 0:
await asyncio.sleep(delay)
async with counter_lock:
cached_count += 1
progress_counter += 1
local_progress = progress_counter
counted_workers.add(idx)
if status_service:
artist_name = (mbid_to_name or {}).get(mbid, mbid[:8])
await status_service.update_progress(local_progress, current_item=artist_name, generation=generation)
return True
except Exception as e: # noqa: BLE001
logger.warning("Failed to precache discovery for %s: %s", mbid[:8], e)
async with counter_lock:
progress_counter += 1
local_progress = progress_counter
counted_workers.add(idx)
if status_service:
artist_name = (mbid_to_name or {}).get(mbid, mbid[:8])
await status_service.update_progress(local_progress, current_item=artist_name, generation=generation)
return False
async def process_artist_with_timeout(idx: int, mbid: str) -> bool:
nonlocal progress_counter
try:
return await asyncio.wait_for(
process_artist(idx, mbid), timeout=_DISCOVERY_WORKER_TIMEOUT
)
except asyncio.TimeoutError:
logger.warning("Discovery timed out for %s after %ds", mbid[:8], _DISCOVERY_WORKER_TIMEOUT)
async with counter_lock:
if idx not in counted_workers:
progress_counter += 1
counted_workers.add(idx)
local_progress = progress_counter
if status_service:
artist_name = (mbid_to_name or {}).get(mbid, mbid[:8])
await status_service.update_progress(
local_progress, current_item=f"{artist_name} (timed out)",
generation=generation,
)
return False
chunk = max(discovery_concurrency * 4, 20)
for i in range(0, len(artist_mbids), chunk):
if status_service and status_service.is_cancelled():
break
batch = artist_mbids[i:i + chunk]
batch_tasks = [asyncio.create_task(process_artist_with_timeout(i + j, mbid)) for j, mbid in enumerate(batch)]
if batch_tasks:
await asyncio.gather(*batch_tasks, return_exceptions=True)
return cached_count
async def _resolve_release_groups(self, release_ids: list[str]) -> dict[str, str]:
if not release_ids:
return {}
unique_ids = list(dict.fromkeys(release_ids))
tasks = [self._mb_repo.get_release_group_id_from_release(rid) for rid in unique_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
rg_map = {}
errors = 0
for rid, rg_id in zip(unique_ids, results):
if isinstance(rg_id, Exception):
errors += 1
logger.warning(f"Resolution exception for {rid}: {rg_id}")
elif isinstance(rg_id, str) and rg_id:
rg_map[rid] = rg_id
else:
errors += 1
logger.warning(f"Resolution returned None/empty for {rid}")
return rg_map
async def _get_similar_artists_lastfm(
self, artist_mbid: str, count: int
) -> SimilarArtistsResponse:
if (
not self._lastfm_repo
or not self._preferences_service
or not self._preferences_service.is_lastfm_enabled()
):
return SimilarArtistsResponse(
similar_artists=[], source="lastfm", configured=False
)
try:
similar = await self._lastfm_repo.get_similar_artists(
artist="", mbid=artist_mbid, limit=count
)
library_artist_mbids = await self._library_db.get_all_artist_mbids()
artists = [
SimilarArtist(
musicbrainz_id=a.mbid or "",
name=a.name,
listen_count=0,
in_library=bool(a.mbid and a.mbid in library_artist_mbids),
)
for a in similar[:count]
if a.mbid
]
return SimilarArtistsResponse(
similar_artists=artists, source="lastfm"
)
except Exception as e:
logger.warning(
"Last.fm similar artists API error for %s: %s", artist_mbid[:8], e
)
raise
async def _get_top_songs_lastfm(
self, artist_mbid: str, count: int
) -> TopSongsResponse:
if (
not self._lastfm_repo
or not self._preferences_service
or not self._preferences_service.is_lastfm_enabled()
):
return TopSongsResponse(songs=[], source="lastfm", configured=False)
try:
tracks = await self._lastfm_repo.get_artist_top_tracks(
artist="", mbid=artist_mbid, limit=count
)
trimmed = tracks[:count]
songs = [
TopSong(
recording_mbid=t.mbid,
title=t.name,
artist_name=t.artist_name,
release_group_mbid=None,
original_release_mbid=None,
release_name=None,
listen_count=t.playcount,
)
for t in trimmed
]
return TopSongsResponse(songs=songs, source="lastfm")
except Exception as e:
logger.warning(
"Last.fm top songs API error for %s: %s", artist_mbid[:8], e
)
raise
async def _get_top_albums_lastfm(
self, artist_mbid: str, count: int
) -> TopAlbumsResponse:
if (
not self._lastfm_repo
or not self._preferences_service
or not self._preferences_service.is_lastfm_enabled()
):
return TopAlbumsResponse(albums=[], source="lastfm", configured=False)
try:
lfm_albums = await self._lastfm_repo.get_artist_top_albums(
artist="", mbid=artist_mbid, limit=count
)
library_album_mbids, requested_album_mbids = await asyncio.gather(
self._lidarr_repo.get_library_mbids(),
self._lidarr_repo.get_requested_mbids(),
)
trimmed = lfm_albums[:count]
# Last.fm usually returns release-group MBIDs here, so keep them as-is
# and let the discover queue resolve the rare mismatches.
albums = []
for a in trimmed:
raw_mbid = a.mbid.strip().lower() if a.mbid and a.mbid.strip() else None
albums.append(
TopAlbum(
release_group_mbid=raw_mbid,
title=a.name,
artist_name=a.artist_name,
listen_count=a.playcount,
in_library=(
raw_mbid in library_album_mbids
if raw_mbid
else False
),
requested=(
raw_mbid in requested_album_mbids
if raw_mbid
else False
),
)
)
return TopAlbumsResponse(albums=albums, source="lastfm")
except Exception as e:
logger.warning(
"Last.fm top albums API error for %s: %s", artist_mbid[:8], e
)
raise