d24e26fb32
* In library rework + Monitored/Unmonitored statuses * address comments + format
289 lines
10 KiB
Python
289 lines
10 KiB
Python
import asyncio
|
|
import logging
|
|
from typing import Any
|
|
|
|
from api.v1.schemas.discover import DiscoverQueueItemLight
|
|
from infrastructure.persistence import LibraryDB, MBIDStore
|
|
from repositories.protocols import (
|
|
LidarrRepositoryProtocol,
|
|
ListenBrainzRepositoryProtocol,
|
|
MusicBrainzRepositoryProtocol,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MbidResolutionService:
|
|
def __init__(
|
|
self,
|
|
musicbrainz_repo: MusicBrainzRepositoryProtocol,
|
|
lidarr_repo: LidarrRepositoryProtocol,
|
|
listenbrainz_repo: ListenBrainzRepositoryProtocol,
|
|
library_db: LibraryDB | None = None,
|
|
mbid_store: MBIDStore | None = None,
|
|
) -> None:
|
|
self._mb_repo = musicbrainz_repo
|
|
self._lidarr_repo = lidarr_repo
|
|
self._lb_repo = listenbrainz_repo
|
|
self._library_db = library_db
|
|
self._mbid_store = mbid_store
|
|
|
|
@staticmethod
|
|
def normalize_mbid(mbid: str | None) -> str | None:
|
|
if not mbid:
|
|
return None
|
|
normalized = mbid.strip().lower()
|
|
return normalized or None
|
|
|
|
async def resolve_lastfm_release_group_mbids(
|
|
self,
|
|
album_mbids: list[str],
|
|
*,
|
|
max_lookups: int = 10,
|
|
allow_passthrough: bool = True,
|
|
resolver_cache: dict[str, str | None] | None = None,
|
|
) -> dict[str, str]:
|
|
normalized: list[str] = []
|
|
seen: set[str] = set()
|
|
for mbid in album_mbids:
|
|
mbid_normalized = self.normalize_mbid(mbid)
|
|
if not mbid_normalized or mbid_normalized in seen:
|
|
continue
|
|
normalized.append(mbid_normalized)
|
|
seen.add(mbid_normalized)
|
|
|
|
if not normalized:
|
|
return {}
|
|
|
|
cache = resolver_cache if resolver_cache is not None else {}
|
|
resolved: dict[str, str] = {}
|
|
pending: list[str] = []
|
|
|
|
for mbid in normalized:
|
|
if mbid in cache:
|
|
cached_value = cache[mbid]
|
|
if cached_value:
|
|
resolved[mbid] = cached_value
|
|
elif allow_passthrough:
|
|
resolved[mbid] = mbid
|
|
continue
|
|
pending.append(mbid)
|
|
|
|
if pending and self._mbid_store:
|
|
try:
|
|
persisted = await self._mbid_store.get_mbid_resolution_map(pending)
|
|
still_pending: list[str] = []
|
|
for mbid in pending:
|
|
if mbid in persisted:
|
|
rg_mbid = persisted[mbid]
|
|
cache[mbid] = rg_mbid
|
|
if rg_mbid:
|
|
resolved[mbid] = rg_mbid
|
|
elif allow_passthrough:
|
|
resolved[mbid] = mbid
|
|
else:
|
|
still_pending.append(mbid)
|
|
pending = still_pending
|
|
except Exception: # noqa: BLE001
|
|
logger.warning("Failed to load MBID resolution from persistent cache")
|
|
|
|
if not pending:
|
|
return resolved
|
|
|
|
new_resolutions: dict[str, str | None] = {}
|
|
|
|
lookup_mbids = pending[:max_lookups]
|
|
skipped_mbids = pending[max_lookups:]
|
|
for mbid in skipped_mbids:
|
|
if allow_passthrough:
|
|
resolved[mbid] = mbid
|
|
cache[mbid] = mbid
|
|
else:
|
|
cache[mbid] = None
|
|
|
|
release_results = await asyncio.gather(
|
|
*[self._mb_repo.get_release_group_id_from_release(mbid) for mbid in lookup_mbids],
|
|
return_exceptions=True,
|
|
)
|
|
unresolved: list[str] = []
|
|
|
|
for mbid, result in zip(lookup_mbids, release_results):
|
|
if isinstance(result, Exception):
|
|
unresolved.append(mbid)
|
|
continue
|
|
rg_mbid = self.normalize_mbid(result)
|
|
if rg_mbid:
|
|
resolved[mbid] = rg_mbid
|
|
cache[mbid] = rg_mbid
|
|
new_resolutions[mbid] = rg_mbid
|
|
else:
|
|
unresolved.append(mbid)
|
|
|
|
if not unresolved:
|
|
if new_resolutions and self._mbid_store:
|
|
try:
|
|
await self._mbid_store.save_mbid_resolution_map(new_resolutions)
|
|
except Exception: # noqa: BLE001
|
|
logger.warning("Failed to persist MBID resolutions")
|
|
return resolved
|
|
|
|
rg_checks = await asyncio.gather(
|
|
*[
|
|
self._mb_repo.get_release_group_by_id(mbid, includes=["artist-credits"])
|
|
for mbid in unresolved
|
|
],
|
|
return_exceptions=True,
|
|
)
|
|
|
|
for mbid, result in zip(unresolved, rg_checks):
|
|
if isinstance(result, Exception):
|
|
if allow_passthrough:
|
|
resolved[mbid] = mbid
|
|
cache[mbid] = mbid
|
|
else:
|
|
cache[mbid] = None
|
|
continue
|
|
if isinstance(result, dict) and result.get("id"):
|
|
resolved[mbid] = mbid
|
|
cache[mbid] = mbid
|
|
new_resolutions[mbid] = mbid
|
|
elif allow_passthrough:
|
|
resolved[mbid] = mbid
|
|
cache[mbid] = mbid
|
|
else:
|
|
cache[mbid] = None
|
|
new_resolutions[mbid] = None
|
|
|
|
if new_resolutions and self._mbid_store:
|
|
try:
|
|
await self._mbid_store.save_mbid_resolution_map(new_resolutions)
|
|
except Exception: # noqa: BLE001
|
|
logger.warning("Failed to persist MBID resolutions")
|
|
|
|
return resolved
|
|
|
|
async def lastfm_albums_to_queue_items(
|
|
self,
|
|
artist_albums_pairs: list[tuple[Any, list]],
|
|
*,
|
|
exclude: set[str] | None = None,
|
|
target: int,
|
|
reason: str,
|
|
is_wildcard: bool = False,
|
|
resolver_cache: dict[str, str | None] | None = None,
|
|
use_album_artist_name: bool = True,
|
|
) -> list[DiscoverQueueItemLight]:
|
|
all_album_mbids: list[str] = []
|
|
for _, albums in artist_albums_pairs:
|
|
all_album_mbids.extend(a.mbid for a in albums if a.mbid)
|
|
rg_mbid_map = await self.resolve_lastfm_release_group_mbids(
|
|
all_album_mbids, resolver_cache=resolver_cache,
|
|
)
|
|
items: list[DiscoverQueueItemLight] = []
|
|
seen_rg_mbids: set[str] = {mbid.lower() for mbid in (exclude or set())}
|
|
for artist, albums in artist_albums_pairs:
|
|
if len(items) >= target:
|
|
break
|
|
artist_mbid = self.normalize_mbid(artist.mbid)
|
|
for album in albums:
|
|
if len(items) >= target:
|
|
break
|
|
raw_album_mbid = self.normalize_mbid(album.mbid)
|
|
if not raw_album_mbid:
|
|
continue
|
|
rg_mbid = rg_mbid_map.get(raw_album_mbid)
|
|
if not rg_mbid:
|
|
continue
|
|
rg_mbid_lower = rg_mbid.lower()
|
|
if rg_mbid_lower in seen_rg_mbids:
|
|
continue
|
|
artist_name = (album.artist_name or artist.name) if use_album_artist_name else artist.name
|
|
items.append(DiscoverQueueItemLight(
|
|
release_group_mbid=rg_mbid,
|
|
album_name=album.name,
|
|
artist_name=artist_name,
|
|
artist_mbid=artist_mbid or "",
|
|
cover_url=f"/api/v1/covers/release-group/{rg_mbid}?size=500",
|
|
recommendation_reason=reason,
|
|
is_wildcard=is_wildcard,
|
|
in_library=False,
|
|
))
|
|
seen_rg_mbids.add(rg_mbid_lower)
|
|
return items
|
|
|
|
async def resolve_release_mbids(
|
|
self,
|
|
release_ids: list[str],
|
|
) -> dict[str, str]:
|
|
return await self.resolve_lastfm_release_group_mbids(
|
|
release_ids, allow_passthrough=False,
|
|
)
|
|
|
|
async def get_library_artist_mbids(self, lidarr_configured: bool) -> set[str]:
|
|
if not lidarr_configured:
|
|
return set()
|
|
try:
|
|
artists = await self._lidarr_repo.get_artists_from_library(include_unmonitored=True)
|
|
return {a.get("mbid", "").lower() for a in artists if a.get("mbid")}
|
|
except Exception: # noqa: BLE001
|
|
logger.warning("Failed to fetch library artists from Lidarr")
|
|
return set()
|
|
|
|
async def get_library_album_mbids(self, lidarr_configured: bool) -> set[str]:
|
|
if not lidarr_configured:
|
|
if self._library_db:
|
|
try:
|
|
return await self._library_db.get_all_album_mbids()
|
|
except Exception: # noqa: BLE001
|
|
logger.warning("Failed to fetch album MBIDs from library cache")
|
|
return set()
|
|
try:
|
|
return await self._lidarr_repo.get_library_mbids(include_release_ids=False)
|
|
except Exception: # noqa: BLE001
|
|
logger.warning("Failed to fetch library album MBIDs from Lidarr")
|
|
return set()
|
|
|
|
async def get_user_listened_release_group_mbids(
|
|
self,
|
|
lb_enabled: bool,
|
|
username: str | None,
|
|
resolved_source: str,
|
|
) -> set[str]:
|
|
if resolved_source != "listenbrainz" or not lb_enabled or not username:
|
|
return set()
|
|
try:
|
|
listened = await self._lb_repo.get_user_top_release_groups(
|
|
username=username,
|
|
range_="all_time",
|
|
count=100,
|
|
)
|
|
except Exception: # noqa: BLE001
|
|
logger.warning("Failed to fetch user listened release groups from ListenBrainz")
|
|
return set()
|
|
return {
|
|
rg.release_group_mbid.lower()
|
|
for rg in listened
|
|
if getattr(rg, "release_group_mbid", None)
|
|
}
|
|
|
|
def make_queue_item(
|
|
self,
|
|
*,
|
|
release_group_mbid: str,
|
|
album_name: str,
|
|
artist_name: str,
|
|
artist_mbid: str,
|
|
reason: str,
|
|
is_wildcard: bool = False,
|
|
) -> DiscoverQueueItemLight:
|
|
return DiscoverQueueItemLight(
|
|
release_group_mbid=release_group_mbid,
|
|
album_name=album_name,
|
|
artist_name=artist_name,
|
|
artist_mbid=artist_mbid,
|
|
cover_url=f"/api/v1/covers/release-group/{release_group_mbid}?size=500",
|
|
recommendation_reason=reason,
|
|
is_wildcard=is_wildcard,
|
|
in_library=False,
|
|
)
|