Files
Harvey 0f25ebc26d Plex Integration + Music Source Integration Improvements (#37)
* plex integration

* The big one - Full Music Source page rework + Playlist importing + Full Plex Integration + Discovery Options + More Like This/Surprise Me/Instant Mix + More...

* Music source track page - Play all / shuffle fixes

* lint

* format

* fix type checks

* format
2026-04-13 23:39:01 +01:00

351 lines
15 KiB
Python

"""Domain 4: MBID resolution and external-service index persistence."""
import logging
import sqlite3
import time
from typing import Any
from infrastructure.persistence._database import PersistenceBase, _normalize
logger = logging.getLogger(__name__)
class MBIDStore(PersistenceBase):
"""Owns tables: ``mbid_resolution_map``, ``ignored_releases``,
``jellyfin_mbid_index``, ``navidrome_album_mbid_index``,
``navidrome_artist_mbid_index``.
"""
def _ensure_tables(self) -> None:
conn = self._connect()
try:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS mbid_resolution_map (
source_mbid_lower TEXT PRIMARY KEY,
source_mbid TEXT NOT NULL,
release_group_mbid TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS ignored_releases (
release_group_mbid_lower TEXT PRIMARY KEY,
release_group_mbid TEXT NOT NULL,
artist_mbid TEXT NOT NULL,
release_name TEXT NOT NULL,
artist_name TEXT NOT NULL,
ignored_at REAL NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS jellyfin_mbid_index (
mbid_lower TEXT PRIMARY KEY,
mbid TEXT NOT NULL,
item_id TEXT NOT NULL,
saved_at REAL NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS navidrome_album_mbid_index (
cache_key TEXT PRIMARY KEY,
mbid TEXT,
saved_at REAL NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS navidrome_artist_mbid_index (
cache_key TEXT PRIMARY KEY,
mbid TEXT,
saved_at REAL NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS plex_album_mbid_index (
cache_key TEXT PRIMARY KEY,
mbid TEXT,
saved_at REAL NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS plex_artist_mbid_index (
cache_key TEXT PRIMARY KEY,
mbid TEXT,
saved_at REAL NOT NULL
)
"""
)
conn.commit()
finally:
conn.close()
async def save_mbid_resolution_map(self, mapping: dict[str, str | None]) -> None:
normalized = {
source_mbid: value
for source_mbid, value in mapping.items()
if isinstance(source_mbid, str) and source_mbid
}
def operation(conn: sqlite3.Connection) -> None:
for source_mbid, resolved_mbid in normalized.items():
conn.execute(
"""
INSERT INTO mbid_resolution_map (source_mbid_lower, source_mbid, release_group_mbid)
VALUES (?, ?, ?)
ON CONFLICT(source_mbid_lower) DO UPDATE SET
source_mbid = excluded.source_mbid,
release_group_mbid = excluded.release_group_mbid
""",
(_normalize(source_mbid), source_mbid, resolved_mbid),
)
await self._write(operation)
async def get_mbid_resolution_map(self, source_mbids: list[str]) -> dict[str, str | None]:
normalized_mbids = [_normalize(mbid) for mbid in source_mbids if isinstance(mbid, str) and mbid]
if not normalized_mbids:
return {}
def operation(conn: sqlite3.Connection) -> dict[str, str | None]:
placeholders = ",".join("?" for _ in normalized_mbids)
rows = conn.execute(
f"SELECT source_mbid_lower, release_group_mbid FROM mbid_resolution_map WHERE source_mbid_lower IN ({placeholders})",
tuple(normalized_mbids),
).fetchall()
return {str(row["source_mbid_lower"]): row["release_group_mbid"] for row in rows}
return await self._read(operation)
async def add_ignored_release(
self,
release_group_mbid: str,
artist_mbid: str,
release_name: str,
artist_name: str,
) -> None:
ignored_at = time.time()
def operation(conn: sqlite3.Connection) -> None:
conn.execute(
"""
INSERT INTO ignored_releases (
release_group_mbid_lower, release_group_mbid, artist_mbid,
release_name, artist_name, ignored_at
) VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(release_group_mbid_lower) DO UPDATE SET
release_group_mbid = excluded.release_group_mbid,
artist_mbid = excluded.artist_mbid,
release_name = excluded.release_name,
artist_name = excluded.artist_name,
ignored_at = excluded.ignored_at
""",
(
_normalize(release_group_mbid),
release_group_mbid,
artist_mbid,
release_name,
artist_name,
ignored_at,
),
)
await self._write(operation)
async def get_ignored_release_mbids(self) -> set[str]:
def operation(conn: sqlite3.Connection) -> set[str]:
rows = conn.execute("SELECT release_group_mbid_lower FROM ignored_releases").fetchall()
return {str(row["release_group_mbid_lower"]) for row in rows if row["release_group_mbid_lower"]}
return await self._read(operation)
async def get_ignored_releases(self) -> list[dict[str, Any]]:
def operation(conn: sqlite3.Connection) -> list[dict[str, Any]]:
rows = conn.execute(
"SELECT release_group_mbid, artist_mbid, release_name, artist_name, ignored_at FROM ignored_releases ORDER BY ignored_at DESC"
).fetchall()
return [
{
"release_group_mbid": row["release_group_mbid"],
"artist_mbid": row["artist_mbid"],
"release_name": row["release_name"],
"artist_name": row["artist_name"],
"ignored_at": row["ignored_at"],
}
for row in rows
]
return await self._read(operation)
async def save_jellyfin_mbid_index(self, index: dict[str, str]) -> None:
saved_at = time.time()
def operation(conn: sqlite3.Connection) -> None:
conn.execute("DELETE FROM jellyfin_mbid_index")
for mbid, item_id in index.items():
if not isinstance(mbid, str) or not mbid or not isinstance(item_id, str) or not item_id:
continue
conn.execute(
"INSERT INTO jellyfin_mbid_index (mbid_lower, mbid, item_id, saved_at) VALUES (?, ?, ?, ?)",
(_normalize(mbid), mbid, item_id, saved_at),
)
await self._write(operation)
async def load_jellyfin_mbid_index(self, max_age_seconds: int = 3600) -> dict[str, str]:
def operation(conn: sqlite3.Connection) -> dict[str, str]:
row = conn.execute("SELECT MAX(saved_at) AS saved_at FROM jellyfin_mbid_index").fetchone()
if row is None or row["saved_at"] is None:
return {}
if time.time() - float(row["saved_at"]) > max(max_age_seconds, 1):
return {}
rows = conn.execute("SELECT mbid, item_id FROM jellyfin_mbid_index").fetchall()
return {str(r["mbid"]): str(r["item_id"]) for r in rows if r["mbid"] and r["item_id"]}
return await self._read(operation)
async def clear_jellyfin_mbid_index(self) -> None:
await self._write(lambda conn: conn.execute("DELETE FROM jellyfin_mbid_index"))
async def save_navidrome_album_mbid_index(self, index: dict[str, str | None]) -> None:
saved_at = time.time()
def operation(conn: sqlite3.Connection) -> None:
conn.execute("DELETE FROM navidrome_album_mbid_index")
for cache_key, mbid in index.items():
if not isinstance(cache_key, str) or not cache_key:
continue
conn.execute(
"INSERT OR REPLACE INTO navidrome_album_mbid_index (cache_key, mbid, saved_at) VALUES (?, ?, ?)",
(cache_key, mbid, saved_at),
)
await self._write(operation)
async def load_navidrome_album_mbid_index(self, max_age_seconds: int = 86400) -> dict[str, str | None]:
def operation(conn: sqlite3.Connection) -> dict[str, str | None]:
row = conn.execute("SELECT MAX(saved_at) AS saved_at FROM navidrome_album_mbid_index").fetchone()
if row is None or row["saved_at"] is None:
return {}
if time.time() - float(row["saved_at"]) > max(max_age_seconds, 1):
return {}
rows = conn.execute("SELECT cache_key, mbid FROM navidrome_album_mbid_index").fetchall()
return {str(r["cache_key"]): (str(r["mbid"]) if r["mbid"] else None) for r in rows if r["cache_key"]}
return await self._read(operation)
async def save_navidrome_artist_mbid_index(self, index: dict[str, str | None]) -> None:
saved_at = time.time()
def operation(conn: sqlite3.Connection) -> None:
conn.execute("DELETE FROM navidrome_artist_mbid_index")
for cache_key, mbid in index.items():
if not isinstance(cache_key, str) or not cache_key:
continue
conn.execute(
"INSERT OR REPLACE INTO navidrome_artist_mbid_index (cache_key, mbid, saved_at) VALUES (?, ?, ?)",
(cache_key, mbid, saved_at),
)
await self._write(operation)
async def load_navidrome_artist_mbid_index(self, max_age_seconds: int = 86400) -> dict[str, str | None]:
def operation(conn: sqlite3.Connection) -> dict[str, str | None]:
row = conn.execute("SELECT MAX(saved_at) AS saved_at FROM navidrome_artist_mbid_index").fetchone()
if row is None or row["saved_at"] is None:
return {}
if time.time() - float(row["saved_at"]) > max(max_age_seconds, 1):
return {}
rows = conn.execute("SELECT cache_key, mbid FROM navidrome_artist_mbid_index").fetchall()
return {str(r["cache_key"]): (str(r["mbid"]) if r["mbid"] else None) for r in rows if r["cache_key"]}
return await self._read(operation)
async def clear_navidrome_mbid_indexes(self) -> None:
def operation(conn: sqlite3.Connection) -> None:
conn.execute("DELETE FROM navidrome_album_mbid_index")
conn.execute("DELETE FROM navidrome_artist_mbid_index")
await self._write(operation)
async def save_plex_album_mbid_index(self, index: dict[str, str | None]) -> None:
saved_at = time.time()
def operation(conn: sqlite3.Connection) -> None:
conn.execute("DELETE FROM plex_album_mbid_index")
for cache_key, mbid in index.items():
if cache_key:
conn.execute(
"INSERT OR REPLACE INTO plex_album_mbid_index (cache_key, mbid, saved_at) VALUES (?, ?, ?)",
(cache_key, mbid, saved_at),
)
await self._write(operation)
async def load_plex_album_mbid_index(self, max_age_seconds: int = 86400) -> dict[str, str | None]:
def operation(conn: sqlite3.Connection) -> dict[str, str | None]:
row = conn.execute("SELECT MAX(saved_at) AS saved_at FROM plex_album_mbid_index").fetchone()
if row is None or row["saved_at"] is None:
return {}
if time.time() - float(row["saved_at"]) > max(max_age_seconds, 1):
return {}
rows = conn.execute("SELECT cache_key, mbid FROM plex_album_mbid_index").fetchall()
return {str(r["cache_key"]): (str(r["mbid"]) if r["mbid"] else None) for r in rows if r["cache_key"]}
return await self._read(operation)
async def save_plex_artist_mbid_index(self, index: dict[str, str | None]) -> None:
saved_at = time.time()
def operation(conn: sqlite3.Connection) -> None:
conn.execute("DELETE FROM plex_artist_mbid_index")
for cache_key, mbid in index.items():
if cache_key:
conn.execute(
"INSERT OR REPLACE INTO plex_artist_mbid_index (cache_key, mbid, saved_at) VALUES (?, ?, ?)",
(cache_key, mbid, saved_at),
)
await self._write(operation)
async def load_plex_artist_mbid_index(self, max_age_seconds: int = 86400) -> dict[str, str | None]:
def operation(conn: sqlite3.Connection) -> dict[str, str | None]:
row = conn.execute("SELECT MAX(saved_at) AS saved_at FROM plex_artist_mbid_index").fetchone()
if row is None or row["saved_at"] is None:
return {}
if time.time() - float(row["saved_at"]) > max(max_age_seconds, 1):
return {}
rows = conn.execute("SELECT cache_key, mbid FROM plex_artist_mbid_index").fetchall()
return {str(r["cache_key"]): (str(r["mbid"]) if r["mbid"] else None) for r in rows if r["cache_key"]}
return await self._read(operation)
async def clear_plex_mbid_indexes(self) -> None:
def operation(conn: sqlite3.Connection) -> None:
conn.execute("DELETE FROM plex_album_mbid_index")
conn.execute("DELETE FROM plex_artist_mbid_index")
await self._write(operation)
async def prune_old_ignored_releases(self, days: int) -> int:
"""Delete ignored releases older than `days` days."""
import time as _time
cutoff = _time.time() - days * 86400
def operation(conn: sqlite3.Connection) -> int:
cursor = conn.execute(
"DELETE FROM ignored_releases WHERE ignored_at < ?",
(cutoff,),
)
return cursor.rowcount
return await self._write(operation)