Files
musicseerr/backend/services/cache_status_service.py
T
Harvey df779c9e6d Mus 19 library sync completing with issues (#29)
* fix: Sync issues - AudioDB warmig +  automatic sync skips

* progress ui/ux + discovery and album fixes

* artist fixes

* several request level fixes and improvements

* handle request fails + artist refresh + resilience fixes

* fix format

* fix stop sync fail + last.fn mbid issues + failures/validation reworks
2026-04-08 00:29:36 +01:00

411 lines
16 KiB
Python

import asyncio
import logging
import threading
import time
from typing import Optional, TYPE_CHECKING
import msgspec
if TYPE_CHECKING:
from infrastructure.persistence import SyncStateStore
logger = logging.getLogger(__name__)
class CacheSyncProgress(msgspec.Struct):
is_syncing: bool
phase: Optional[str]
total_items: int
processed_items: int
current_item: Optional[str]
started_at: Optional[float]
error_message: Optional[str] = None
total_artists: int = 0
processed_artists: int = 0
total_albums: int = 0
processed_albums: int = 0
@property
def progress_percent(self) -> int:
if self.total_items == 0:
return 0
return min(100, int((self.processed_items / self.total_items) * 100))
class CacheStatusService:
_instance: Optional['CacheStatusService'] = None
_creation_lock = threading.Lock()
def __new__(cls, sync_state_store: Optional['SyncStateStore'] = None):
with cls._creation_lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialize(sync_state_store)
elif sync_state_store is not None and cls._instance._sync_state_store is None:
cls._instance._sync_state_store = sync_state_store
return cls._instance
def _initialize(self, sync_state_store: Optional['SyncStateStore'] = None):
self._sync_state_store = sync_state_store
self._sync_generation: int = 0
self._progress = CacheSyncProgress(
is_syncing=False,
phase=None,
total_items=0,
processed_items=0,
current_item=None,
started_at=None,
error_message=None,
total_artists=0,
processed_artists=0,
total_albums=0,
processed_albums=0
)
self._cancel_event = asyncio.Event()
self._current_task: Optional[asyncio.Task] = None
self._state_lock = asyncio.Lock()
self._sse_subscribers: set[asyncio.Queue] = set()
self._sse_lock = threading.Lock()
self._last_persist_time: float = 0.0
self._last_broadcast_time: float = 0.0
self._persist_item_counter: int = 0
self._last_progress_at: float = time.time()
def set_sync_state_store(self, sync_state_store: 'SyncStateStore'):
self._sync_state_store = sync_state_store
def subscribe_sse(self) -> asyncio.Queue:
queue: asyncio.Queue = asyncio.Queue(maxsize=100)
with self._sse_lock:
self._sse_subscribers.add(queue)
return queue
def unsubscribe_sse(self, queue: asyncio.Queue) -> None:
with self._sse_lock:
self._sse_subscribers.discard(queue)
async def broadcast_progress(self) -> None:
progress = self.get_progress()
data = msgspec.json.encode({
'is_syncing': progress.is_syncing,
'phase': progress.phase,
'total_items': progress.total_items,
'processed_items': progress.processed_items,
'progress_percent': progress.progress_percent,
'current_item': progress.current_item,
'started_at': progress.started_at,
'error_message': progress.error_message,
'total_artists': progress.total_artists,
'processed_artists': progress.processed_artists,
'total_albums': progress.total_albums,
'processed_albums': progress.processed_albums
}).decode("utf-8")
with self._sse_lock:
dead_queues = []
for queue in self._sse_subscribers:
try:
queue.put_nowait(data)
except asyncio.QueueFull:
try:
while not queue.empty():
queue.get_nowait()
queue.put_nowait(data)
except Exception: # noqa: BLE001
dead_queues.append(queue)
for q in dead_queues:
self._sse_subscribers.discard(q)
async def start_sync(self, phase: str, total_items: int, total_artists: int = 0, total_albums: int = 0) -> int:
async with self._state_lock:
self._sync_generation += 1
generation = self._sync_generation
self._cancel_event.clear()
self._last_persist_time = 0.0
self._last_broadcast_time = 0.0
self._persist_item_counter = 0
self._last_progress_at = time.time()
started_at = time.time()
self._progress = CacheSyncProgress(
is_syncing=True,
phase=phase,
total_items=total_items,
processed_items=0,
current_item=None,
started_at=started_at,
error_message=None,
total_artists=total_artists,
processed_artists=0,
total_albums=total_albums,
processed_albums=0
)
logger.info(f"Cache sync started: {phase} ({total_items} items)")
if self._sync_state_store:
try:
await self._sync_state_store.save_sync_state(
status='running',
phase=phase,
total_artists=total_artists,
total_albums=total_albums,
started_at=started_at
)
except Exception as e: # noqa: BLE001
logger.warning(f"Failed to persist sync state: {e}")
await self.broadcast_progress()
return generation
_BROADCAST_THROTTLE_SECONDS = 0.3
async def update_progress(
self,
processed: int,
current_item: Optional[str] = None,
processed_artists: Optional[int] = None,
processed_albums: Optional[int] = None,
generation: int = 0,
):
async with self._state_lock:
if generation and generation != self._sync_generation:
return
if processed >= self._progress.processed_items:
self._progress.processed_items = processed
self._progress.current_item = current_item
if processed_artists is not None:
self._progress.processed_artists = processed_artists
if processed_albums is not None:
self._progress.processed_albums = processed_albums
self._last_progress_at = time.time()
now = time.time()
is_final = processed >= self._progress.total_items
if is_final or (now - self._last_broadcast_time) >= self._BROADCAST_THROTTLE_SECONDS:
self._last_broadcast_time = now
await self.broadcast_progress()
async def update_phase(self, phase: str, total_items: int, generation: int = 0):
async with self._state_lock:
if generation and generation != self._sync_generation:
return
self._progress.phase = phase
self._progress.total_items = total_items
self._progress.processed_items = 0
self._progress.current_item = None
self._last_progress_at = time.time()
if self._sync_state_store and self._progress.is_syncing:
try:
await self._sync_state_store.save_sync_state(
status='running',
phase=phase,
total_artists=self._progress.total_artists,
processed_artists=self._progress.processed_artists,
total_albums=self._progress.total_albums if phase == 'albums' else total_items,
processed_albums=self._progress.processed_albums,
started_at=self._progress.started_at
)
except Exception as e: # noqa: BLE001
logger.warning(f"Failed to persist phase update: {e}")
await self.broadcast_progress()
async def skip_phase(self, phase: str, generation: int = 0):
"""Broadcast a phase with 0 items so the frontend sees it as skipped."""
async with self._state_lock:
if generation and generation != self._sync_generation:
return
self._progress.phase = phase
self._progress.total_items = 0
self._progress.processed_items = 0
self._progress.current_item = None
await self.broadcast_progress()
logger.info(f"Phase skipped (already cached): {phase}")
await asyncio.sleep(0.5)
def get_last_progress_at(self) -> float:
return self._last_progress_at
_PERSIST_INTERVAL_SECONDS = 5.0
_PERSIST_ITEM_INTERVAL = 10
async def persist_progress(self, force: bool = False, generation: int = 0):
if not self._progress.is_syncing:
return
if self.is_cancelled():
return
if generation and generation != self._sync_generation:
return
self._persist_item_counter += 1
now = time.time()
elapsed = now - self._last_persist_time
if not force and elapsed < self._PERSIST_INTERVAL_SECONDS and self._persist_item_counter < self._PERSIST_ITEM_INTERVAL:
return
self._persist_item_counter = 0
self._last_persist_time = now
if self._sync_state_store:
try:
await self._sync_state_store.save_sync_state(
status='running',
phase=self._progress.phase,
total_artists=self._progress.total_artists,
processed_artists=self._progress.processed_artists,
total_albums=self._progress.total_albums,
processed_albums=self._progress.processed_albums,
current_item=self._progress.current_item,
started_at=self._progress.started_at
)
except Exception as e: # noqa: BLE001
logger.warning(f"Failed to persist progress: {e}")
async def complete_sync(self, error_message: Optional[str] = None, generation: int = 0):
async with self._state_lock:
if not self._progress.is_syncing:
return
if generation and generation != self._sync_generation:
return
is_success = error_message is None
status = 'completed' if is_success else 'failed'
logger.info(f"Cache sync {status}: {self._progress.phase}")
if self._sync_state_store:
try:
await self._sync_state_store.save_sync_state(
status=status,
phase=self._progress.phase,
total_artists=self._progress.total_artists,
processed_artists=self._progress.processed_artists,
total_albums=self._progress.total_albums,
processed_albums=self._progress.processed_albums,
error_message=error_message,
started_at=self._progress.started_at
)
if is_success:
await self._sync_state_store.clear_sync_state()
await self._sync_state_store.clear_processed_items()
except Exception as e: # noqa: BLE001
logger.warning(f"Failed to persist completion: {e}")
self._progress = CacheSyncProgress(
is_syncing=False,
phase=None,
total_items=0,
processed_items=0,
current_item=None,
started_at=None,
error_message=error_message,
total_artists=0,
processed_artists=0,
total_albums=0,
processed_albums=0
)
await self.broadcast_progress()
def get_progress(self) -> CacheSyncProgress:
return self._progress
def is_syncing(self) -> bool:
return self._progress.is_syncing
async def cancel_current_sync(self):
async with self._state_lock:
logger.warning(f"Cancelling sync: phase={self._progress.phase}, progress={self._progress.processed_items}/{self._progress.total_items}")
self._cancel_event.set()
if self._sync_state_store and self._progress.is_syncing:
try:
await self._sync_state_store.save_sync_state(
status='cancelled',
phase=self._progress.phase,
total_artists=self._progress.total_artists,
processed_artists=self._progress.processed_artists,
total_albums=self._progress.total_albums,
processed_albums=self._progress.processed_albums,
started_at=self._progress.started_at
)
except Exception as e: # noqa: BLE001
logger.warning(f"Failed to persist cancellation: {e}")
self._progress = CacheSyncProgress(
is_syncing=False,
phase=None,
total_items=0,
processed_items=0,
current_item=None,
started_at=None,
error_message=None,
total_artists=0,
processed_artists=0,
total_albums=0,
processed_albums=0
)
await self.broadcast_progress()
def is_cancelled(self) -> bool:
return self._cancel_event.is_set()
def set_current_task(self, task: Optional[asyncio.Task]):
self._current_task = task
async def wait_for_completion(self):
task = self._current_task
if task and not task.done():
try:
await asyncio.wait_for(task, timeout=30.0)
except asyncio.TimeoutError:
logger.warning("Sync task did not complete within 30s timeout, forcing cancellation")
if not task.done():
task.cancel()
except Exception as e: # noqa: BLE001
logger.error(f"Error waiting for sync completion: {e}")
def can_start_sync(self) -> bool:
return not self._progress.is_syncing
async def restore_from_persistence(self) -> Optional[dict]:
if not self._sync_state_store:
return None
try:
state = await self._sync_state_store.get_sync_state()
if state and state.get('status') == 'running':
logger.info(f"Found interrupted sync: phase={state.get('phase')}, "
f"artists={state.get('processed_artists')}/{state.get('total_artists')}, "
f"albums={state.get('processed_albums')}/{state.get('total_albums')}")
phase = state.get('phase')
if phase == 'albums':
total_items = state.get('total_albums')
processed_items = state.get('processed_albums')
elif phase == 'audiodb_prewarm':
total_items = 0
processed_items = 0
else:
total_items = state.get('total_artists')
processed_items = state.get('processed_artists')
self._progress = CacheSyncProgress(
is_syncing=True,
phase=phase,
total_items=total_items,
processed_items=processed_items,
current_item=state.get('current_item'),
started_at=state.get('started_at'),
error_message=None,
total_artists=state.get('total_artists', 0),
processed_artists=state.get('processed_artists', 0),
total_albums=state.get('total_albums', 0),
processed_albums=state.get('processed_albums', 0)
)
return state
return None
except Exception as e: # noqa: BLE001
logger.error(f"Failed to restore from persistence: {e}")
return None