diff --git a/Makefile b/Makefile index cfb099b..ef04035 100644 --- a/Makefile +++ b/Makefile @@ -103,8 +103,19 @@ backend-test-local-files-fallback: $(BACKEND_VENV_STAMP) ## Run local files stal backend-test-jellyfin-proxy: $(BACKEND_VENV_STAMP) ## Run Jellyfin stream proxy tests cd "$(BACKEND_DIR)" && .venv/bin/python -m pytest tests/routes/test_stream_routes.py -v +backend-test-sync-watchdog: $(BACKEND_VENV_STAMP) ## Run adaptive watchdog timeout tests + cd "$(BACKEND_DIR)" && .venv/bin/python -m pytest tests/test_sync_watchdog.py -v + +backend-test-sync-resume: $(BACKEND_VENV_STAMP) ## Run sync resume-on-failure tests + cd "$(BACKEND_DIR)" && .venv/bin/python -m pytest tests/test_sync_resume.py -v + +backend-test-audiodb-parallel: $(BACKEND_VENV_STAMP) ## Run AudioDB parallel prewarm tests + cd "$(BACKEND_DIR)" && .venv/bin/python -m pytest tests/test_audiodb_parallel.py -v + test-audiodb-all: backend-test-audiodb backend-test-audiodb-prewarm backend-test-audiodb-settings backend-test-coverart-audiodb backend-test-audiodb-phase8 backend-test-audiodb-phase9 frontend-test-audiodb-images ## Run every AudioDB test target +test-sync-all: backend-test-sync-watchdog backend-test-sync-resume backend-test-audiodb-parallel ## Run all sync robustness tests + frontend-install: ## Install frontend npm dependencies cd "$(FRONTEND_DIR)" && $(NPM) install diff --git a/backend/api/v1/routes/library.py b/backend/api/v1/routes/library.py index d6f7e86..9fc72a0 100644 --- a/backend/api/v1/routes/library.py +++ b/backend/api/v1/routes/library.py @@ -1,6 +1,6 @@ import asyncio import logging -from fastapi import APIRouter, Depends, HTTPException +from fastapi import APIRouter, Depends, HTTPException, Query from api.v1.schemas.library import ( LibraryResponse, LibraryArtistsResponse, @@ -90,10 +90,11 @@ async def get_recently_added( @router.post("/sync", response_model=SyncLibraryResponse) async def sync_library( + force_full: bool = Query(default=False, description="Clear resume checkpoint and start a full sync from scratch"), library_service: LibraryService = Depends(get_library_service) ): try: - return await library_service.sync_library(is_manual=True) + return await library_service.sync_library(is_manual=True, force_full=force_full) except ExternalServiceError as e: if "cooldown" in str(e).lower(): raise HTTPException(status_code=429, detail="Sync is on cooldown, please wait") diff --git a/backend/api/v1/schemas/advanced_settings.py b/backend/api/v1/schemas/advanced_settings.py index f6e8552..45a4b6c 100644 --- a/backend/api/v1/schemas/advanced_settings.py +++ b/backend/api/v1/schemas/advanced_settings.py @@ -56,13 +56,14 @@ class AdvancedSettings(AppStruct): http_timeout: int = 10 http_connect_timeout: int = 5 http_max_connections: int = 200 - batch_artist_images: int = 5 - batch_albums: int = 3 + batch_artist_images: int = 10 + batch_albums: int = 8 delay_artist: float = 0.5 - delay_albums: float = 1.0 + delay_albums: float = 0.3 artist_discovery_warm_interval: int = 14400 artist_discovery_warm_delay: float = 0.5 artist_discovery_precache_delay: float = 0.3 + artist_discovery_precache_concurrency: int = 3 memory_cache_max_entries: int = 10000 memory_cache_cleanup_interval: int = 300 cover_memory_cache_max_entries: int = 128 @@ -100,6 +101,10 @@ class AdvancedSettings(AppStruct): cache_ttl_audiodb_not_found: int = 86400 cache_ttl_audiodb_library: int = 1209600 cache_ttl_recently_viewed_bytes: int = 172800 + sync_stall_timeout_minutes: int = 10 + sync_max_timeout_hours: int = 8 + audiodb_prewarm_concurrency: int = 4 + audiodb_prewarm_delay: float = 0.3 genre_section_ttl: int = 21600 request_history_retention_days: int = 180 ignored_releases_retention_days: int = 365 @@ -149,6 +154,11 @@ class AdvancedSettings(AppStruct): "recent_covers_max_size_mb": (100, 10000), "persistent_metadata_ttl_hours": (1, 168), "musicbrainz_concurrent_searches": (2, 10), + "artist_discovery_precache_concurrency": (1, 8), + "sync_stall_timeout_minutes": (2, 30), + "sync_max_timeout_hours": (1, 48), + "audiodb_prewarm_concurrency": (1, 8), + "audiodb_prewarm_delay": (0.0, 5.0), "discover_queue_size": (1, 20), "discover_queue_ttl": (3600, 604800), "discover_queue_polling_interval": (1000, 30000), @@ -219,10 +229,10 @@ class AdvancedSettingsFrontend(AppStruct): http_timeout: int = 10 http_connect_timeout: int = 5 http_max_connections: int = 200 - batch_artist_images: int = 5 - batch_albums: int = 3 + batch_artist_images: int = 10 + batch_albums: int = 8 delay_artist: float = 0.5 - delay_albums: float = 1.0 + delay_albums: float = 0.3 artist_discovery_warm_interval: int = 240 artist_discovery_warm_delay: float = 0.5 artist_discovery_precache_delay: float = 0.3 @@ -268,6 +278,11 @@ class AdvancedSettingsFrontend(AppStruct): ignored_releases_retention_days: int = 365 orphan_cover_demote_interval_hours: int = 24 store_prune_interval_hours: int = 6 + sync_stall_timeout_minutes: int = 10 + sync_max_timeout_hours: int = 8 + audiodb_prewarm_concurrency: int = 4 + audiodb_prewarm_delay: float = 0.3 + artist_discovery_precache_concurrency: int = 3 def __post_init__(self) -> None: int_coerce_fields = [ @@ -371,6 +386,11 @@ class AdvancedSettingsFrontend(AppStruct): "ignored_releases_retention_days": (30, 3650), "orphan_cover_demote_interval_hours": (1, 168), "store_prune_interval_hours": (1, 168), + "sync_stall_timeout_minutes": (2, 30), + "sync_max_timeout_hours": (1, 48), + "audiodb_prewarm_concurrency": (1, 8), + "audiodb_prewarm_delay": (0.0, 5.0), + "artist_discovery_precache_concurrency": (1, 8), } for field_name, (minimum, maximum) in ranges.items(): _validate_range(getattr(self, field_name), field_name, minimum, maximum) @@ -450,6 +470,11 @@ class AdvancedSettingsFrontend(AppStruct): ignored_releases_retention_days=settings.ignored_releases_retention_days, orphan_cover_demote_interval_hours=settings.orphan_cover_demote_interval_hours, store_prune_interval_hours=settings.store_prune_interval_hours, + sync_stall_timeout_minutes=settings.sync_stall_timeout_minutes, + sync_max_timeout_hours=settings.sync_max_timeout_hours, + audiodb_prewarm_concurrency=settings.audiodb_prewarm_concurrency, + audiodb_prewarm_delay=settings.audiodb_prewarm_delay, + artist_discovery_precache_concurrency=settings.artist_discovery_precache_concurrency, ) def to_backend(self) -> AdvancedSettings: @@ -526,4 +551,9 @@ class AdvancedSettingsFrontend(AppStruct): ignored_releases_retention_days=self.ignored_releases_retention_days, orphan_cover_demote_interval_hours=self.orphan_cover_demote_interval_hours, store_prune_interval_hours=self.store_prune_interval_hours, + sync_stall_timeout_minutes=self.sync_stall_timeout_minutes, + sync_max_timeout_hours=self.sync_max_timeout_hours, + audiodb_prewarm_concurrency=self.audiodb_prewarm_concurrency, + audiodb_prewarm_delay=self.audiodb_prewarm_delay, + artist_discovery_precache_concurrency=self.artist_discovery_precache_concurrency, ) diff --git a/backend/infrastructure/persistence/library_db.py b/backend/infrastructure/persistence/library_db.py index 8b1beab..693d67d 100644 --- a/backend/infrastructure/persistence/library_db.py +++ b/backend/infrastructure/persistence/library_db.py @@ -1,4 +1,4 @@ -"""Domain 1 — Library state persistence (artists, albums, metadata).""" +"""Domain 1: Library state persistence (artists, albums, metadata).""" import json import logging @@ -27,7 +27,6 @@ def _escape_like(term: str) -> str: _CROSS_DOMAIN_CLEAR_TABLES = ( "artist_genres", "artist_genre_lookup", - "processed_items", ) _FULL_CLEAR_EXTRA_TABLES = ( diff --git a/backend/infrastructure/persistence/sync_state_store.py b/backend/infrastructure/persistence/sync_state_store.py index 54c65ae..9d364f1 100644 --- a/backend/infrastructure/persistence/sync_state_store.py +++ b/backend/infrastructure/persistence/sync_state_store.py @@ -1,4 +1,4 @@ -"""Domain 5 — Sync lifecycle persistence.""" +"""Domain 5: Sync lifecycle persistence.""" import logging import sqlite3 @@ -87,13 +87,15 @@ class SyncStateStore(PersistenceBase): return await self._read(operation) async def mark_items_processed_batch(self, item_type: str, mbids: list[str]) -> None: - normalized = [mbid for mbid in mbids if isinstance(mbid, str) and mbid] + normalized = [(item_type, _normalize(mbid), mbid) for mbid in mbids if isinstance(mbid, str) and mbid] def operation(conn: sqlite3.Connection) -> None: - for mbid in normalized: - conn.execute( - "INSERT OR REPLACE INTO processed_items (item_type, mbid_lower, mbid) VALUES (?, ?, ?)", - (item_type, _normalize(mbid), mbid), - ) + conn.executemany( + "INSERT OR REPLACE INTO processed_items (item_type, mbid_lower, mbid) VALUES (?, ?, ?)", + normalized, + ) await self._write(operation) + + async def clear_processed_items(self) -> None: + await self._write(lambda conn: conn.execute("DELETE FROM processed_items")) diff --git a/backend/services/artist_discovery_service.py b/backend/services/artist_discovery_service.py index b05d3c4..8af71fa 100644 --- a/backend/services/artist_discovery_service.py +++ b/backend/services/artist_discovery_service.py @@ -422,58 +422,88 @@ class ArtistDiscoveryService: cached_count = 0 source_fetches = 0 - for i, mbid in enumerate(artist_mbids): + advanced = self._preferences_service.get_advanced_settings() if self._preferences_service else None + discovery_concurrency = getattr(advanced, 'artist_discovery_precache_concurrency', 3) if advanced else 3 + sem = asyncio.Semaphore(discovery_concurrency) + counter_lock = asyncio.Lock() + progress_counter = 0 + + async def process_artist(idx: int, mbid: str) -> bool: + nonlocal cached_count, source_fetches, progress_counter try: - for source in sources: - similar_key = self._build_cache_key( - "similar", mbid, DEFAULT_SIMILAR_COUNT, source - ) - songs_key = self._build_cache_key( - "top_songs", mbid, DEFAULT_TOP_SONGS_COUNT, source - ) - albums_key = self._build_cache_key( - "top_albums", mbid, DEFAULT_TOP_ALBUMS_COUNT, source - ) + async with sem: + for source in sources: + similar_key = self._build_cache_key( + "similar", mbid, DEFAULT_SIMILAR_COUNT, source + ) + songs_key = self._build_cache_key( + "top_songs", mbid, DEFAULT_TOP_SONGS_COUNT, source + ) + albums_key = self._build_cache_key( + "top_albums", mbid, DEFAULT_TOP_ALBUMS_COUNT, source + ) - has_all = ( - await self._cache.get(similar_key) is not None - and await self._cache.get(songs_key) is not None - and await self._cache.get(albums_key) is not None - ) - if has_all: - continue + has_all = ( + await self._cache.get(similar_key) is not None + and await self._cache.get(songs_key) is not None + and await self._cache.get(albums_key) is not None + ) + if has_all: + continue - results = await asyncio.gather( - self.get_similar_artists( - mbid, count=DEFAULT_SIMILAR_COUNT, source=source - ), - self.get_top_songs( - mbid, count=DEFAULT_TOP_SONGS_COUNT, source=source - ), - self.get_top_albums( - mbid, count=DEFAULT_TOP_ALBUMS_COUNT, source=source - ), - return_exceptions=True, - ) - errors = [r for r in results if isinstance(r, Exception)] - if errors: - logger.debug("Discovery precache errors for %s: %s", mbid[:8], errors) - source_fetches += 1 + results = await asyncio.gather( + self.get_similar_artists( + mbid, count=DEFAULT_SIMILAR_COUNT, source=source + ), + self.get_top_songs( + mbid, count=DEFAULT_TOP_SONGS_COUNT, source=source + ), + self.get_top_albums( + mbid, count=DEFAULT_TOP_ALBUMS_COUNT, source=source + ), + return_exceptions=True, + ) + errors = [r for r in results if isinstance(r, Exception)] + if errors: + logger.debug("Discovery precache errors for %s: %s", mbid[:8], errors) + async with counter_lock: + source_fetches += 1 - cached_count += 1 + if delay > 0: + await asyncio.sleep(delay) + + async with counter_lock: + cached_count += 1 + progress_counter += 1 + local_progress = progress_counter - except Exception as e: # noqa: BLE001 - logger.warning("Failed to precache discovery for %s: %s", mbid[:8], e) - finally: if status_service: artist_name = (mbid_to_name or {}).get(mbid, mbid[:8]) - await status_service.update_progress(i + 1, current_item=artist_name) + await status_service.update_progress(local_progress, current_item=artist_name) - if (i + 1) % 10 == 0: - logger.info("Discovery precache progress: %d/%d artists", i + 1, len(artist_mbids)) + if local_progress % 10 == 0: + logger.info("Discovery precache progress: %d/%d artists", local_progress, len(artist_mbids)) - if delay > 0 and i < len(artist_mbids) - 1: - await asyncio.sleep(delay) + return True + except Exception as e: # noqa: BLE001 + logger.warning("Failed to precache discovery for %s: %s", mbid[:8], e) + async with counter_lock: + progress_counter += 1 + local_progress = progress_counter + if status_service: + artist_name = (mbid_to_name or {}).get(mbid, mbid[:8]) + await status_service.update_progress(local_progress, current_item=artist_name) + return False + + chunk = max(discovery_concurrency * 4, 20) + for i in range(0, len(artist_mbids), chunk): + if status_service and status_service.is_cancelled(): + logger.info("Discovery precache cancelled by user") + break + batch = artist_mbids[i:i + chunk] + batch_tasks = [asyncio.create_task(process_artist(i + j, mbid)) for j, mbid in enumerate(batch)] + if batch_tasks: + await asyncio.gather(*batch_tasks, return_exceptions=True) logger.info( "Discovery precache complete: %d/%d artists refreshed (%d source fetches)", diff --git a/backend/services/cache_status_service.py b/backend/services/cache_status_service.py index e43d539..64cde3a 100644 --- a/backend/services/cache_status_service.py +++ b/backend/services/cache_status_service.py @@ -69,6 +69,7 @@ class CacheStatusService: self._last_persist_time: float = 0.0 self._last_broadcast_time: float = 0.0 self._persist_item_counter: int = 0 + self._last_progress_at: float = time.time() def set_sync_state_store(self, sync_state_store: 'SyncStateStore'): self._sync_state_store = sync_state_store @@ -120,6 +121,7 @@ class CacheStatusService: self._last_persist_time = 0.0 self._last_broadcast_time = 0.0 self._persist_item_counter = 0 + self._last_progress_at = time.time() started_at = time.time() self._progress = CacheSyncProgress( is_syncing=True, @@ -167,6 +169,7 @@ class CacheStatusService: self._progress.processed_artists = processed_artists if processed_albums is not None: self._progress.processed_albums = processed_albums + self._last_progress_at = time.time() now = time.time() is_final = processed >= self._progress.total_items @@ -180,8 +183,9 @@ class CacheStatusService: self._progress.total_items = total_items self._progress.processed_items = 0 self._progress.current_item = None + self._last_progress_at = time.time() - if self._sync_state_store: + if self._sync_state_store and self._progress.is_syncing: try: await self._sync_state_store.save_sync_state( status='running', @@ -208,6 +212,9 @@ class CacheStatusService: logger.info(f"Phase skipped (already cached): {phase}") await asyncio.sleep(0.5) + def get_last_progress_at(self) -> float: + return self._last_progress_at + _PERSIST_INTERVAL_SECONDS = 5.0 _PERSIST_ITEM_INTERVAL = 10 @@ -244,7 +251,10 @@ class CacheStatusService: async def complete_sync(self, error_message: Optional[str] = None): async with self._state_lock: - status = 'failed' if error_message else 'completed' + if not self._progress.is_syncing: + return + is_success = error_message is None + status = 'completed' if is_success else 'failed' logger.info(f"Cache sync {status}: {self._progress.phase}") if self._sync_state_store: @@ -259,7 +269,9 @@ class CacheStatusService: error_message=error_message, started_at=self._progress.started_at ) - await self._sync_state_store.clear_sync_state() + if is_success: + await self._sync_state_store.clear_sync_state() + await self._sync_state_store.clear_processed_items() except Exception as e: # noqa: BLE001 logger.warning(f"Failed to persist completion: {e}") diff --git a/backend/services/library_service.py b/backend/services/library_service.py index 1e80518..609bd2f 100644 --- a/backend/services/library_service.py +++ b/backend/services/library_service.py @@ -71,6 +71,7 @@ class LibraryService: self._local_files_service = local_files_service self._jellyfin_library_service = jellyfin_library_service self._navidrome_library_service = navidrome_library_service + self._sync_state_store = sync_state_store self._can_precache = sync_state_store is not None and genre_index is not None self._precache_service: LibraryPrecacheService | None = None if self._can_precache: @@ -287,11 +288,11 @@ class LibraryService: logger.error(f"Failed to fetch recently added: {e}") raise ExternalServiceError(f"Failed to fetch recently added: {e}") - async def sync_library(self, is_manual: bool = False) -> SyncLibraryResponse: + async def sync_library(self, is_manual: bool = False, force_full: bool = False) -> SyncLibraryResponse: from services.cache_status_service import CacheStatusService if not self._lidarr_repo.is_configured(): - raise ExternalServiceError("Lidarr is not configured — set a Lidarr API key in Settings to sync your library.") + raise ExternalServiceError("Lidarr is not configured. Set a Lidarr API key in Settings to sync your library.") try: status_service = CacheStatusService() @@ -368,13 +369,31 @@ class LibraryService: self._last_manual_sync = now if self._precache_service is None: - logger.warning("Precache skipped — sync_state_store/genre_index not provided") + logger.warning("Precache skipped: sync_state_store/genre_index not provided") self._update_last_sync_timestamp() result = SyncLibraryResponse(status='success', artists=len(artists), albums=len(albums)) self._sync_future.set_result(result) return result - task = asyncio.create_task(self._precache_service.precache_library_resources(artists, albums)) + resume = False + if not force_full and self._sync_state_store: + try: + last_state = await self._sync_state_store.get_sync_state() + if last_state and last_state.get('status') == 'failed': + resume = True + logger.info("Previous sync failed, resuming from checkpoint") + except Exception as e: # noqa: BLE001 + logger.warning("Failed to check sync state for resume: %s", e) + + if force_full and self._sync_state_store: + try: + await self._sync_state_store.clear_processed_items() + await self._sync_state_store.clear_sync_state() + logger.info("Force full sync: cleared previous progress") + except Exception as e: # noqa: BLE001 + logger.warning("Failed to clear sync state for force_full: %s", e) + + task = asyncio.create_task(self._precache_service.precache_library_resources(artists, albums, resume=resume)) def on_task_done(t: asyncio.Task): try: @@ -572,8 +591,6 @@ class LibraryService: except Exception: # noqa: BLE001 logger.warning("Failed to clean up cover images after removal", exc_info=True) - # Track resolution — extracted from routes/library.py - async def _resolve_album_tracks( self, album_mbid: str, diff --git a/backend/services/precache/album_phase.py b/backend/services/precache/album_phase.py index 7488266..d0fad25 100644 --- a/backend/services/precache/album_phase.py +++ b/backend/services/precache/album_phase.py @@ -69,7 +69,7 @@ class AlbumPhase: advanced_settings = self._preferences_service.get_advanced_settings() batch_size = advanced_settings.batch_albums min_batch = max(1, advanced_settings.batch_albums - 2) - max_batch = min(20, advanced_settings.batch_albums + 7) + max_batch = min(20, advanced_settings.batch_albums + 12) metadata_fetched = 0 covers_fetched = 0 consecutive_slow_batches = 0 diff --git a/backend/services/precache/audiodb_phase.py b/backend/services/precache/audiodb_phase.py index 8da0157..d26cd61 100644 --- a/backend/services/precache/audiodb_phase.py +++ b/backend/services/precache/audiodb_phase.py @@ -19,7 +19,6 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -_AUDIODB_PREWARM_INTER_ITEM_DELAY = 2.0 _AUDIODB_PREWARM_LOG_INTERVAL = 100 @@ -34,6 +33,8 @@ class AudioDBPhase: self._preferences_service = preferences_service self._audiodb_image_service = audiodb_image_service + _CACHE_CHECK_CHUNK = 200 + async def check_cache_needs( self, artists: list[dict], @@ -56,14 +57,21 @@ class AudioDBPhase: cached = await svc.get_cached_album_images(mbid) return None if cached is not None else album - artist_results = await asyncio.gather( - *(check_artist(a) for a in artists), return_exceptions=True - ) - album_results = await asyncio.gather( - *(check_album(a) for a in albums), return_exceptions=True - ) - needed_artists = [r for r in artist_results if r is not None and not isinstance(r, Exception)] - needed_albums = [r for r in album_results if r is not None and not isinstance(r, Exception)] + needed_artists: list[dict] = [] + chunk = self._CACHE_CHECK_CHUNK + for i in range(0, len(artists), chunk): + results = await asyncio.gather( + *(check_artist(a) for a in artists[i:i + chunk]), return_exceptions=True + ) + needed_artists.extend(r for r in results if r is not None and not isinstance(r, Exception)) + + needed_albums: list[Any] = [] + for i in range(0, len(albums), chunk): + results = await asyncio.gather( + *(check_album(a) for a in albums[i:i + chunk]), return_exceptions=True + ) + needed_albums.extend(r for r in results if r is not None and not isinstance(r, Exception)) + return needed_artists, needed_albums async def download_bytes(self, url: str, entity_type: str, mbid: str) -> bool: @@ -182,6 +190,9 @@ class AudioDBPhase: await status_service.skip_phase('audiodb_prewarm') return + concurrency = settings.audiodb_prewarm_concurrency + inter_item_delay = settings.audiodb_prewarm_delay + needed_artists, needed_albums = await self.check_cache_needs(artists, albums) total = len(needed_artists) + len(needed_albums) if total == 0: @@ -190,10 +201,10 @@ class AudioDBPhase: return original_total = len(artists) + len(albums) - hit_rate = ((original_total - total) / original_total * 100) if original_total > 0 else 100 + initial_hit_rate = ((original_total - total) / original_total * 100) if original_total > 0 else 100 logger.info( - "Phase 5 (AudioDB): Pre-warming %d items (%d artists, %d albums) — %.0f%% already cached", - total, len(needed_artists), len(needed_albums), hit_rate, + "Phase 5 (AudioDB): Pre-warming %d items (%d artists, %d albums), %.0f%% already cached, concurrency=%d delay=%.1fs", + total, len(needed_artists), len(needed_albums), initial_hit_rate, concurrency, inter_item_delay, ) await status_service.update_phase('audiodb_prewarm', total) @@ -204,71 +215,102 @@ class AudioDBPhase: bytes_ok = 0 bytes_fail = 0 svc = self._audiodb_image_service + sem = asyncio.Semaphore(concurrency) + counter_lock = asyncio.Lock() - for artist in needed_artists: + async def process_artist(artist: dict) -> None: + nonlocal processed, bytes_ok, bytes_fail if status_service.is_cancelled(): - logger.info("AudioDB pre-warming cancelled during artist phase") - break + return if not self._preferences_service.get_advanced_settings().audiodb_enabled: - logger.info("AudioDB disabled during pre-warming, stopping") - break + return mbid = artist.get('mbid') name = artist.get('name', 'Unknown') - processed += 1 - try: - result = await svc.fetch_and_cache_artist_images(mbid, name, is_monitored=True) - if result and not result.is_negative and result.thumb_url: - if await self.download_bytes(result.thumb_url, "artist", mbid): + + async with sem: + if inter_item_delay > 0: + await asyncio.sleep(inter_item_delay) + try: + result = await svc.fetch_and_cache_artist_images(mbid, name, is_monitored=True) + except Exception as e: # noqa: BLE001 + result = None + logger.warning("audiodb.prewarm action=artist_error mbid=%s error=%s", mbid[:8] if mbid else '?', e) + + if result and not result.is_negative and result.thumb_url: + ok = await self.download_bytes(result.thumb_url, "artist", mbid) + async with counter_lock: + if ok: bytes_ok += 1 else: bytes_fail += 1 - except Exception as e: # noqa: BLE001 - logger.warning("audiodb.prewarm action=artist_error mbid=%s error=%s", mbid[:8] if mbid else '?', e) - await status_service.update_progress(processed, f"AudioDB: {name}") + async with counter_lock: + processed += 1 + local_processed = processed + snap_ok, snap_fail = bytes_ok, bytes_fail + await status_service.update_progress(local_processed, f"AudioDB: {name}") - if processed % _AUDIODB_PREWARM_LOG_INTERVAL == 0: + if local_processed % _AUDIODB_PREWARM_LOG_INTERVAL == 0: logger.info( - "audiodb.prewarm processed=%d total=%d hit_rate=%.0f%% bytes_ok=%d bytes_fail=%d remaining=%d", - processed, total, hit_rate, bytes_ok, bytes_fail, total - processed, + "audiodb.prewarm processed=%d total=%d initial_hit=%.0f%% bytes_ok=%d bytes_fail=%d remaining=%d", + local_processed, total, initial_hit_rate, snap_ok, snap_fail, total - local_processed, ) - await asyncio.sleep(_AUDIODB_PREWARM_INTER_ITEM_DELAY) - - for album in needed_albums: + async def process_album(album: Any) -> None: + nonlocal processed, bytes_ok, bytes_fail if status_service.is_cancelled(): - logger.info("AudioDB pre-warming cancelled during album phase") - break + return if not self._preferences_service.get_advanced_settings().audiodb_enabled: - logger.info("AudioDB disabled during pre-warming, stopping") - break + return mbid = getattr(album, 'musicbrainz_id', None) if hasattr(album, 'musicbrainz_id') else album.get('mbid') if isinstance(album, dict) else None artist_name = getattr(album, 'artist_name', None) if hasattr(album, 'artist_name') else album.get('artist_name') if isinstance(album, dict) else None album_name = getattr(album, 'title', None) if hasattr(album, 'title') else album.get('title') if isinstance(album, dict) else None - processed += 1 - try: - result = await svc.fetch_and_cache_album_images( - mbid, artist_name=artist_name, album_name=album_name, is_monitored=True, - ) - if result and not result.is_negative and result.album_thumb_url: - if await self.download_bytes(result.album_thumb_url, "album", mbid): + + async with sem: + if inter_item_delay > 0: + await asyncio.sleep(inter_item_delay) + try: + result = await svc.fetch_and_cache_album_images( + mbid, artist_name=artist_name, album_name=album_name, is_monitored=True, + ) + except Exception as e: # noqa: BLE001 + result = None + logger.warning("audiodb.prewarm action=album_error mbid=%s error=%s", mbid[:8] if mbid else '?', e) + + if result and not result.is_negative and result.album_thumb_url: + ok = await self.download_bytes(result.album_thumb_url, "album", mbid) + async with counter_lock: + if ok: bytes_ok += 1 else: bytes_fail += 1 - except Exception as e: # noqa: BLE001 - logger.warning("audiodb.prewarm action=album_error mbid=%s error=%s", mbid[:8] if mbid else '?', e) - await status_service.update_progress(processed, f"AudioDB: {album_name or 'Unknown'}") + async with counter_lock: + processed += 1 + local_processed = processed + snap_ok, snap_fail = bytes_ok, bytes_fail + await status_service.update_progress(local_processed, f"AudioDB: {album_name or 'Unknown'}") - if processed % _AUDIODB_PREWARM_LOG_INTERVAL == 0: + if local_processed % _AUDIODB_PREWARM_LOG_INTERVAL == 0: logger.info( - "audiodb.prewarm processed=%d total=%d hit_rate=%.0f%% bytes_ok=%d bytes_fail=%d remaining=%d", - processed, total, hit_rate, bytes_ok, bytes_fail, total - processed, + "audiodb.prewarm processed=%d total=%d initial_hit=%.0f%% bytes_ok=%d bytes_fail=%d remaining=%d", + local_processed, total, initial_hit_rate, snap_ok, snap_fail, total - local_processed, ) - await asyncio.sleep(_AUDIODB_PREWARM_INTER_ITEM_DELAY) + chunk = max(concurrency * 4, 20) + for i in range(0, len(needed_artists), chunk): + if status_service.is_cancelled(): + break + batch = needed_artists[i:i + chunk] + await asyncio.gather(*(process_artist(a) for a in batch), return_exceptions=True) + + for i in range(0, len(needed_albums), chunk): + if status_service.is_cancelled(): + break + batch = needed_albums[i:i + chunk] + await asyncio.gather(*(process_album(a) for a in batch), return_exceptions=True) logger.info( "audiodb.prewarm action=complete processed=%d total=%d bytes_ok=%d bytes_fail=%d", diff --git a/backend/services/precache/orchestrator.py b/backend/services/precache/orchestrator.py index 9c789ff..da0a731 100644 --- a/backend/services/precache/orchestrator.py +++ b/backend/services/precache/orchestrator.py @@ -1,9 +1,10 @@ -"""Pre-cache orchestrator — delegates to phase sub-services.""" +"""Pre-cache orchestrator: delegates to phase sub-services.""" from __future__ import annotations import logging import asyncio +import time from typing import Any, TYPE_CHECKING from repositories.protocols import LidarrRepositoryProtocol, CoverArtRepositoryProtocol @@ -63,32 +64,81 @@ class LibraryPrecacheService: async def precache_library_resources(self, artists: list[dict], albums: list[Any], resume: bool = False) -> None: status_service = CacheStatusService(self._sync_state_store) task = None + + advanced_settings = self._preferences_service.get_advanced_settings() + stall_timeout_s = advanced_settings.sync_stall_timeout_minutes * 60 + max_timeout_s = advanced_settings.sync_max_timeout_hours * 3600 + try: task = asyncio.create_task(self._do_precache(artists, albums, status_service, resume)) from core.task_registry import TaskRegistry TaskRegistry.get_instance().register("precache-library", task) - await asyncio.wait_for(task, timeout=1800.0) - except asyncio.TimeoutError: - logger.error("Library pre-cache operation timed out after 30 minutes") - if task and not task.done(): - task.cancel() + + watchdog = asyncio.create_task( + self._watchdog(task, status_service, stall_timeout_s, max_timeout_s) + ) + + done, _ = await asyncio.wait( + {task, watchdog}, return_when=asyncio.FIRST_COMPLETED + ) + + # Always prioritise the main task result; if it completed + # successfully we don't care about a simultaneous watchdog error. + if task in done: + watchdog.cancel() try: - await task + await watchdog except asyncio.CancelledError: - logger.info("Pre-cache task successfully cancelled after timeout") - except Exception as e: # noqa: BLE001 - logger.error(f"Error during task cancellation: {e}") - await status_service.complete_sync("Sync timed out after 30 minutes") - raise ExternalServiceError("Library sync timed out - too many items or slow network") + pass + if task.exception(): + raise task.exception() + elif watchdog in done: + exc = watchdog.exception() if watchdog.done() and not watchdog.cancelled() else None + if exc: + if not task.done(): + task.cancel() + try: + await task + except (asyncio.CancelledError, Exception): + pass + await status_service.complete_sync(str(exc)) + raise ExternalServiceError(str(exc)) + except asyncio.CancelledError: logger.warning("Pre-cache was cancelled") await status_service.complete_sync() raise + except ExternalServiceError: + raise except Exception as e: logger.error(f"Pre-cache failed: {e}") await status_service.complete_sync(str(e)) raise + async def _watchdog( + self, + task: asyncio.Task, + status_service: CacheStatusService, + stall_timeout_s: float, + max_timeout_s: float, + ) -> None: + start = time.time() + while not task.done(): + await asyncio.sleep(30) + elapsed = time.time() - start + if elapsed > max_timeout_s: + msg = f"Sync exceeded maximum timeout ({max_timeout_s / 3600:.1f}h)" + logger.error(msg) + raise ExternalServiceError(msg) + stall_duration = time.time() - status_service.get_last_progress_at() + if stall_duration > stall_timeout_s: + msg = ( + f"Sync stalled: no progress for {stall_duration / 60:.0f} minutes " + f"during {status_service.get_progress().phase or 'unknown'} phase" + ) + logger.error(msg) + raise ExternalServiceError(msg) + async def _do_precache(self, artists: list[dict], albums: list[Any], status_service: CacheStatusService, resume: bool = False) -> None: from core.dependencies import get_album_service try: @@ -209,12 +259,23 @@ class LibraryPrecacheService: await status_service.skip_phase('albums') if not status_service.is_cancelled(): - try: - await self._audiodb_phase.precache_audiodb_data(artists, albums, status_service) - except Exception as e: # noqa: BLE001 - logger.error(f"AudioDB pre-warming failed (non-fatal): {e}") + await status_service.complete_sync() + logger.info("Library resource pre-caching complete (core phases done)") - logger.info("Library resource pre-caching complete") + try: + audiodb_timeout = self._preferences_service.get_advanced_settings().sync_max_timeout_hours * 3600 + logger.info("Starting AudioDB image prewarm as background enhancement...") + await asyncio.wait_for( + self._audiodb_phase.precache_audiodb_data(artists, albums, status_service), + timeout=audiodb_timeout, + ) + logger.info("AudioDB image prewarm complete") + except asyncio.TimeoutError: + logger.warning("AudioDB pre-warming timed out (non-fatal)") + except Exception as e: # noqa: BLE001 + logger.warning(f"AudioDB pre-warming failed (non-fatal): {e}") + else: + logger.info("Library resource pre-caching complete (cancelled)") except Exception as e: logger.error(f"Error during pre-cache: {e}") raise diff --git a/backend/tests/services/test_audiodb_prewarm.py b/backend/tests/services/test_audiodb_prewarm.py index 9d7a0b7..97f6ce8 100644 --- a/backend/tests/services/test_audiodb_prewarm.py +++ b/backend/tests/services/test_audiodb_prewarm.py @@ -14,6 +14,8 @@ def _make_settings(audiodb_enabled: bool = True, name_search_fallback: bool = Fa s = MagicMock() s.audiodb_enabled = audiodb_enabled s.audiodb_name_search_fallback = name_search_fallback + s.audiodb_prewarm_concurrency = 4 + s.audiodb_prewarm_delay = 0.0 return s diff --git a/backend/tests/test_advanced_settings_roundtrip.py b/backend/tests/test_advanced_settings_roundtrip.py index 5d16edb..3a30b32 100644 --- a/backend/tests/test_advanced_settings_roundtrip.py +++ b/backend/tests/test_advanced_settings_roundtrip.py @@ -185,3 +185,34 @@ class TestAudiodbEnabledRoundTrip: assert frontend.audiodb_enabled is True restored = frontend.to_backend() assert restored.audiodb_enabled is True + + +class TestSyncSettingsRoundTrip: + """Round-trip tests for the sync robustness settings.""" + + @pytest.mark.parametrize("field,backend_val,frontend_val", [ + ("sync_stall_timeout_minutes", 15, 15), + ("sync_max_timeout_hours", 6, 6), + ("audiodb_prewarm_concurrency", 6, 6), + ("audiodb_prewarm_delay", 1.5, 1.5), + ("artist_discovery_precache_concurrency", 5, 5), + ]) + def test_roundtrip_preserves_value(self, field: str, backend_val, frontend_val) -> None: + backend = AdvancedSettings(**{field: backend_val}) + frontend = AdvancedSettingsFrontend.from_backend(backend) + assert getattr(frontend, field) == frontend_val + restored = frontend.to_backend() + assert getattr(restored, field) == backend_val + + @pytest.mark.parametrize("field,default_val", [ + ("sync_stall_timeout_minutes", 10), + ("sync_max_timeout_hours", 8), + ("audiodb_prewarm_concurrency", 4), + ("audiodb_prewarm_delay", 0.3), + ("artist_discovery_precache_concurrency", 3), + ]) + def test_defaults_match(self, field: str, default_val) -> None: + backend = AdvancedSettings() + frontend = AdvancedSettingsFrontend() + assert getattr(backend, field) == default_val + assert getattr(frontend, field) == default_val diff --git a/backend/tests/test_audiodb_parallel.py b/backend/tests/test_audiodb_parallel.py new file mode 100644 index 0000000..cc575c1 --- /dev/null +++ b/backend/tests/test_audiodb_parallel.py @@ -0,0 +1,153 @@ +"""Tests for AudioDB parallel prewarm with semaphore gating.""" + +import asyncio +import tempfile +from pathlib import Path + +import pytest +from unittest.mock import AsyncMock, MagicMock + +from services.precache.audiodb_phase import AudioDBPhase + + +def _make_settings(concurrency=4, delay=0.0, enabled=True): + s = MagicMock() + s.audiodb_enabled = enabled + s.audiodb_name_search_fallback = False + s.audiodb_prewarm_concurrency = concurrency + s.audiodb_prewarm_delay = delay + return s + + +def _make_prefs(settings=None): + if settings is None: + settings = _make_settings() + prefs = MagicMock() + prefs.get_advanced_settings.return_value = settings + return prefs + + +def _make_status_service(): + status = MagicMock() + status.update_phase = AsyncMock() + status.update_progress = AsyncMock() + status.persist_progress = AsyncMock() + status.skip_phase = AsyncMock() + status.is_cancelled.return_value = False + return status + + +def _make_cover_repo(tmpdir): + repo = AsyncMock() + repo.cache_dir = Path(tmpdir) + return repo + + +class TestAudioDBParallel: + @pytest.mark.asyncio + async def test_concurrent_processing(self): + """Multiple artists should be processed concurrently up to the semaphore limit.""" + concurrency = 2 + prefs = _make_prefs(_make_settings(concurrency=concurrency, delay=0.0)) + + audiodb_svc = AsyncMock() + audiodb_svc.get_cached_artist_images = AsyncMock(return_value=None) + audiodb_svc.get_cached_album_images = AsyncMock(return_value=None) + audiodb_svc.fetch_and_cache_artist_images = AsyncMock(return_value=None) + + with tempfile.TemporaryDirectory() as tmpdir: + phase = AudioDBPhase( + cover_repo=_make_cover_repo(tmpdir), + preferences_service=prefs, + audiodb_image_service=audiodb_svc, + ) + + artists = [{"mbid": f"mbid-{i:04d}", "name": f"Artist {i}"} for i in range(6)] + status = _make_status_service() + + await phase.precache_audiodb_data(artists, [], status) + + assert audiodb_svc.fetch_and_cache_artist_images.call_count == 6 + assert status.update_progress.call_count == 6 + assert True + + @pytest.mark.asyncio + async def test_concurrency_respects_setting(self): + """The concurrency semaphore should limit parallel execution.""" + max_concurrent = 0 + current_concurrent = 0 + lock = asyncio.Lock() + + prefs = _make_prefs(_make_settings(concurrency=2, delay=0.0)) + audiodb_svc = AsyncMock() + audiodb_svc.get_cached_artist_images = AsyncMock(return_value=None) + audiodb_svc.get_cached_album_images = AsyncMock(return_value=None) + + async def track_concurrency(*args, **kwargs): + nonlocal max_concurrent, current_concurrent + async with lock: + current_concurrent += 1 + if current_concurrent > max_concurrent: + max_concurrent = current_concurrent + await asyncio.sleep(0.05) + async with lock: + current_concurrent -= 1 + return None + + audiodb_svc.fetch_and_cache_artist_images = AsyncMock(side_effect=track_concurrency) + + with tempfile.TemporaryDirectory() as tmpdir: + phase = AudioDBPhase( + cover_repo=_make_cover_repo(tmpdir), + preferences_service=prefs, + audiodb_image_service=audiodb_svc, + ) + + artists = [{"mbid": f"mbid-{i:04d}", "name": f"Artist {i}"} for i in range(10)] + status = _make_status_service() + + await phase.precache_audiodb_data(artists, [], status) + + assert max_concurrent <= 2 + assert max_concurrent >= 1 + assert True + + @pytest.mark.asyncio + async def test_disabled_audiodb_skips(self): + """When audiodb_enabled is False, phase should be skipped.""" + prefs = _make_prefs(_make_settings(enabled=False)) + + phase = AudioDBPhase( + cover_repo=AsyncMock(), + preferences_service=prefs, + audiodb_image_service=AsyncMock(), + ) + + status = _make_status_service() + await phase.precache_audiodb_data([], [], status) + + status.skip_phase.assert_called_once_with('audiodb_prewarm') + assert True + + @pytest.mark.asyncio + async def test_all_cached_skips(self): + """When all items are cached, phase should be skipped.""" + prefs = _make_prefs(_make_settings()) + + audiodb_svc = AsyncMock() + audiodb_svc.get_cached_artist_images = AsyncMock(return_value={"some": "data"}) + audiodb_svc.get_cached_album_images = AsyncMock(return_value={"some": "data"}) + + phase = AudioDBPhase( + cover_repo=AsyncMock(), + preferences_service=prefs, + audiodb_image_service=audiodb_svc, + ) + + artists = [{"mbid": "mbid-0001", "name": "Artist 1"}] + status = _make_status_service() + + await phase.precache_audiodb_data(artists, [], status) + + status.skip_phase.assert_called_once_with('audiodb_prewarm') + assert True diff --git a/backend/tests/test_phase9_observability.py b/backend/tests/test_phase9_observability.py index 1df9e14..61bc1d6 100644 --- a/backend/tests/test_phase9_observability.py +++ b/backend/tests/test_phase9_observability.py @@ -46,6 +46,8 @@ def _make_settings(enabled=True, name_search_fallback=False): s.cache_ttl_audiodb_found = 604800 s.cache_ttl_audiodb_not_found = 86400 s.cache_ttl_audiodb_library = 1209600 + s.audiodb_prewarm_concurrency = 4 + s.audiodb_prewarm_delay = 0.0 return s @@ -276,6 +278,8 @@ class TestPrewarmLogContract: settings = MagicMock() settings.audiodb_enabled = True settings.audiodb_name_search_fallback = False + settings.audiodb_prewarm_concurrency = 4 + settings.audiodb_prewarm_delay = 0.0 prefs = MagicMock() prefs.get_advanced_settings.return_value = settings return LibraryPrecacheService( diff --git a/backend/tests/test_sync_resume.py b/backend/tests/test_sync_resume.py new file mode 100644 index 0000000..7a4bff5 --- /dev/null +++ b/backend/tests/test_sync_resume.py @@ -0,0 +1,114 @@ +"""Tests for sync resume-on-failure behaviour.""" + +import asyncio +import sqlite3 +import tempfile +import os +import threading + +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + +from infrastructure.persistence.sync_state_store import SyncStateStore +from services.cache_status_service import CacheStatusService + + +@pytest.fixture(autouse=True) +def _reset_singleton(): + """Reset CacheStatusService singleton between tests.""" + CacheStatusService._instance = None + yield + CacheStatusService._instance = None + + +class TestResumeOnFailure: + @pytest.mark.asyncio + async def test_complete_sync_preserves_state_on_failure(self): + """On failure, sync state should be saved but NOT cleared.""" + store = AsyncMock() + store.save_sync_state = AsyncMock() + store.clear_sync_state = AsyncMock() + store.clear_processed_items = AsyncMock() + + svc = CacheStatusService(store) + await svc.start_sync('artists', 100) + await svc.update_progress(50, "some artist") + await svc.complete_sync("Sync stalled: no progress") + + store.save_sync_state.assert_called() + last_call = store.save_sync_state.call_args + assert last_call.kwargs.get('status') == 'failed' + + store.clear_sync_state.assert_not_called() + store.clear_processed_items.assert_not_called() + assert True + + @pytest.mark.asyncio + async def test_complete_sync_clears_state_on_success(self): + """On success, both sync_state and processed_items should be cleared.""" + store = AsyncMock() + store.save_sync_state = AsyncMock() + store.clear_sync_state = AsyncMock() + store.clear_processed_items = AsyncMock() + + svc = CacheStatusService(store) + await svc.start_sync('artists', 10) + await svc.update_progress(10, "done") + await svc.complete_sync(None) + + store.clear_sync_state.assert_called_once() + store.clear_processed_items.assert_called_once() + assert True + + @pytest.mark.asyncio + async def test_last_progress_at_updates_on_progress(self): + """get_last_progress_at should reflect the latest update_progress call.""" + store = AsyncMock() + store.save_sync_state = AsyncMock() + + svc = CacheStatusService(store) + await svc.start_sync('artists', 10) + + t1 = svc.get_last_progress_at() + await asyncio.sleep(0.05) + await svc.update_progress(5, "artist5") + t2 = svc.get_last_progress_at() + + assert t2 > t1 + assert True + + @pytest.mark.asyncio + async def test_last_progress_at_updates_on_phase_change(self): + """get_last_progress_at should refresh when phase changes.""" + store = AsyncMock() + store.save_sync_state = AsyncMock() + + svc = CacheStatusService(store) + await svc.start_sync('artists', 10) + + t1 = svc.get_last_progress_at() + await asyncio.sleep(0.05) + await svc.update_phase('albums', 50) + t2 = svc.get_last_progress_at() + + assert t2 > t1 + assert True + + +class TestSyncStateStoreClear: + @pytest.mark.asyncio + async def test_clear_processed_items_deletes_all(self): + """clear_processed_items should execute a DELETE on processed_items.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = os.path.join(tmpdir, "test.db") + write_lock = threading.Lock() + store = SyncStateStore(db_path, write_lock) + + await store.mark_items_processed_batch("artist", ["mbid1", "mbid2"]) + items = await store.get_processed_items("artist") + assert len(items) == 2 + + await store.clear_processed_items() + items = await store.get_processed_items("artist") + assert len(items) == 0 + assert True diff --git a/backend/tests/test_sync_watchdog.py b/backend/tests/test_sync_watchdog.py new file mode 100644 index 0000000..edd7fb4 --- /dev/null +++ b/backend/tests/test_sync_watchdog.py @@ -0,0 +1,126 @@ +"""Tests for the adaptive watchdog timeout in the orchestrator.""" + +import asyncio +import time + +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + +from services.precache.orchestrator import LibraryPrecacheService +from services.cache_status_service import CacheStatusService +from core.exceptions import ExternalServiceError + + +@pytest.fixture(autouse=True) +def _reset_singleton(): + """Reset CacheStatusService singleton between tests.""" + CacheStatusService._instance = None + yield + CacheStatusService._instance = None + + +def _make_settings(): + s = MagicMock() + s.sync_stall_timeout_minutes = 0.05 # 3 seconds for test speed + s.sync_max_timeout_hours = 0.01 # 36 seconds + s.audiodb_enabled = False + s.audiodb_prewarm_concurrency = 4 + s.audiodb_prewarm_delay = 0.0 + s.batch_artist_images = 10 + s.batch_albums = 8 + s.delay_albums = 0.0 + s.delay_artists = 0.0 + s.artist_discovery_precache_delay = 0.0 + return s + + +def _make_prefs(settings=None): + if settings is None: + settings = _make_settings() + prefs = MagicMock() + prefs.get_advanced_settings.return_value = settings + return prefs + + +def _make_service(prefs=None): + if prefs is None: + prefs = _make_prefs() + return LibraryPrecacheService( + lidarr_repo=AsyncMock(), + cover_repo=AsyncMock(), + preferences_service=prefs, + sync_state_store=AsyncMock(), + genre_index=AsyncMock(), + library_db=AsyncMock(), + ) + + +class TestAdaptiveWatchdog: + @pytest.mark.asyncio + async def test_stall_detection_cancels_sync(self): + """Sync that stops making progress should be cancelled by the watchdog.""" + settings = _make_settings() + settings.sync_stall_timeout_minutes = 0.02 # 1.2 seconds + svc = _make_service(_make_prefs(settings)) + + async def stalling_precache(artists, albums, status_service, resume=False): + await status_service.start_sync('artists', 1) + await asyncio.sleep(30) # Stall forever + + with patch.object(svc, '_do_precache', side_effect=stalling_precache): + with pytest.raises(ExternalServiceError, match="stalled"): + await svc.precache_library_resources([], []) + + assert True + + @pytest.mark.asyncio + async def test_progressing_sync_completes(self): + """A sync that makes steady progress should complete without watchdog interference.""" + settings = _make_settings() + settings.sync_stall_timeout_minutes = 0.1 # 6 seconds + svc = _make_service(_make_prefs(settings)) + + async def fast_precache(artists, albums, status_service, resume=False): + await status_service.start_sync('artists', 2) + await status_service.update_progress(1, "artist1") + await asyncio.sleep(0.1) + await status_service.update_progress(2, "artist2") + + with patch.object(svc, '_do_precache', side_effect=fast_precache): + await svc.precache_library_resources([], []) + + assert True + + @pytest.mark.asyncio + async def test_max_timeout_cancels_even_with_progress(self): + """Max timeout should cancel even if progress is being made.""" + settings = _make_settings() + settings.sync_stall_timeout_minutes = 10 # Very generous stall timeout + settings.sync_max_timeout_hours = 0.0003 # ~1 second + svc = _make_service(_make_prefs(settings)) + + async def slow_but_progressing(artists, albums, status_service, resume=False): + await status_service.start_sync('artists', 100) + for i in range(100): + await status_service.update_progress(i + 1, f"artist{i}") + await asyncio.sleep(0.5) + + with patch.object(svc, '_do_precache', side_effect=slow_but_progressing): + with pytest.raises(ExternalServiceError, match="maximum timeout"): + await svc.precache_library_resources([], []) + + assert True + + @pytest.mark.asyncio + async def test_error_in_precache_propagates(self): + """Errors in the precache task should propagate through the watchdog.""" + svc = _make_service() + + async def failing_precache(artists, albums, status_service, resume=False): + raise ValueError("something broke") + + with patch.object(svc, '_do_precache', side_effect=failing_precache): + with pytest.raises(ValueError, match="something broke"): + await svc.precache_library_resources([], []) + + assert True diff --git a/frontend/src/lib/components/settings/SettingsNetworkBatch.svelte b/frontend/src/lib/components/settings/SettingsNetworkBatch.svelte index 0624d7f..cc8b924 100644 --- a/frontend/src/lib/components/settings/SettingsNetworkBatch.svelte +++ b/frontend/src/lib/components/settings/SettingsNetworkBatch.svelte @@ -101,3 +101,46 @@ max={5} /> +
+

Library Sync

+
+ + + + + +
diff --git a/frontend/src/lib/components/settings/advanced-settings-types.ts b/frontend/src/lib/components/settings/advanced-settings-types.ts index 1455c43..c5ebec7 100644 --- a/frontend/src/lib/components/settings/advanced-settings-types.ts +++ b/frontend/src/lib/components/settings/advanced-settings-types.ts @@ -59,4 +59,9 @@ export interface AdvancedSettingsForm { cache_ttl_audiodb_not_found: number; cache_ttl_audiodb_library: number; cache_ttl_recently_viewed_bytes: number; + sync_stall_timeout_minutes: number; + sync_max_timeout_hours: number; + audiodb_prewarm_concurrency: number; + audiodb_prewarm_delay: number; + artist_discovery_precache_concurrency: number; }