From: Marcel van der Veldt Date: Wed, 27 Jul 2022 13:38:19 +0000 (+0200) Subject: Various small fixes and enhancements (#440) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=3ffbebde5e9a8ae3153197eaac14be96f09a5709;p=music-assistant-server.git Various small fixes and enhancements (#440) * fix edge case where playlist track has invalid albumdetails * make the audio buffer more dynamic * fix loop thread safety (fixes issues with cast speakers) * simplify some code * simplify player poll task * extend timeout for spotify token retrieval * Fix count of PagedItems queries * Fix relative paths in playlists * Handle some edge cases in cleanup procedure * Fix metadata checksum overwrite * Fix type of mediaitem image * additional guards for library edit usage * Fix local artwork retrieval in artist/album folders * Handle (and log) slow Spotify connection --- diff --git a/music_assistant/controllers/metadata/__init__.py b/music_assistant/controllers/metadata/__init__.py index 51647778..eeb497e8 100755 --- a/music_assistant/controllers/metadata/__init__.py +++ b/music_assistant/controllers/metadata/__init__.py @@ -105,12 +105,12 @@ class MetaDataController: # retrieve genres from tracks # TODO: retrieve style/mood ? playlist.metadata.genres = set() - images = set() + image_urls = set() for track in await self.mass.music.playlists.tracks( playlist.item_id, playlist.provider ): if not playlist.image and track.image: - images.add(track.image) + image_urls.add(track.image.url) if track.media_type != MediaType.TRACK: # filter out radio items continue @@ -119,9 +119,9 @@ class MetaDataController: elif track.album and track.album.metadata.genres: playlist.metadata.genres.update(track.album.metadata.genres) # create collage thumb/fanart from playlist tracks - if images: + if image_urls: fake_path = f"playlist_collage.{playlist.provider.value}.{playlist.item_id}" - collage = await create_collage(self.mass, list(images)) + collage = await create_collage(self.mass, list(image_urls)) match = {"path": fake_path, "size": 0} await self.mass.database.insert( TABLE_THUMBS, {**match, "data": collage}, allow_replace=True diff --git a/music_assistant/controllers/music/artists.py b/music_assistant/controllers/music/artists.py index b973ac90..a33f40b9 100644 --- a/music_assistant/controllers/music/artists.py +++ b/music_assistant/controllers/music/artists.py @@ -211,6 +211,9 @@ class ArtistsController(MediaControllerBase[Artist]): query = f"SELECT * FROM albums WHERE artists LIKE '%\"{db_artist.item_id}\"%'" query += f" AND provider_ids LIKE '%\"{prov_id}\"%'" items = await self.mass.music.albums.get_db_items_by_query(query) + else: + # edge case + items = [] # store (serializable items) in cache self.mass.create_task( self.mass.cache.set( diff --git a/music_assistant/controllers/music/tracks.py b/music_assistant/controllers/music/tracks.py index ee5a46ca..2d78363e 100644 --- a/music_assistant/controllers/music/tracks.py +++ b/music_assistant/controllers/music/tracks.py @@ -17,6 +17,7 @@ from music_assistant.models.enums import ( MusicProviderFeature, ProviderType, ) +from music_assistant.models.errors import MediaNotFoundError from music_assistant.models.event import MassEvent from music_assistant.models.media_controller import MediaControllerBase from music_assistant.models.media_items import ( @@ -40,9 +41,13 @@ class TracksController(MediaControllerBase[Track]): track = await super().get(*args, **kwargs) # append full album details to full track item if track.album: - track.album = await self.mass.music.albums.get( - track.album.item_id, track.album.provider - ) + try: + track.album = await self.mass.music.albums.get( + track.album.item_id, track.album.provider + ) + except MediaNotFoundError: + # edge case where playlist track has invalid albumdetails + self.logger.warning("Unable to fetch album details %s", track.album.uri) # append full artist details to full track item full_artists = [] for artist in track.artists: @@ -105,9 +110,6 @@ class TracksController(MediaControllerBase[Track]): """ if db_track.provider != ProviderType.DATABASE: return # Matching only supported for database items - if isinstance(db_track.album, ItemMapping): - # matching only works if we have a full track object - db_track = await self.get_db_item(db_track.item_id) for provider in self.mass.music.providers: if MusicProviderFeature.SEARCH not in provider.supported_features: continue diff --git a/music_assistant/controllers/players.py b/music_assistant/controllers/players.py index b7e6f344..3efbe502 100755 --- a/music_assistant/controllers/players.py +++ b/music_assistant/controllers/players.py @@ -98,15 +98,14 @@ class PlayerController: for player in self.players: if not player.available: continue - if cur_tick == interval or ( - player.active_queue.active - and player.state - in ( - PlayerState.PLAYING, - PlayerState.PAUSED, - ) + if cur_tick == interval: + self.mass.loop.call_soon(player.update_state) + elif ( + player.active_queue.queue_id == player.player_id + and player.active_queue.active + and player.state == PlayerState.PLAYING ): - player.update_state() + self.mass.loop.call_soon(player.active_queue.on_player_update) if cur_tick == interval: cur_tick = 0 else: diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index ec8edf7b..b660610c 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -134,7 +134,7 @@ class StreamsController: @staticmethod async def serve_silence(request: web.Request): """Serve some nice silence.""" - duration = int(request.query.get("duration", 3600)) + duration = int(request.query.get("duration", 60)) fmt = ContentType.try_parse(request.match_info["fmt"]) resp = web.StreamResponse( @@ -206,7 +206,10 @@ class StreamsController: item_in_buf = queue_stream.queue.get_item(queue_stream.index_in_buffer) if item_in_buf and item_in_buf.name: title = item_in_buf.name - image = item_in_buf.image or "" + if item_in_buf.image and not item_in_buf.image.is_file: + image = item_in_buf.media_item.image.url + else: + image = "" else: title = "Music Assistant" image = "" @@ -274,13 +277,12 @@ class StreamsController: pcm_bit_depth=pcm_bit_depth, pcm_channels=pcm_channels, allow_resample=allow_resample, - autostart=True, ) # cleanup stale previous queue tasks - self.mass.create_task(self.cleanup_stale) + asyncio.create_task(self.cleanup_stale()) return stream - def cleanup_stale(self) -> None: + async def cleanup_stale(self) -> None: """Cleanup stale/done stream tasks.""" stale = set() for stream_id, stream in self.queue_streams.items(): @@ -306,7 +308,6 @@ class QueueStream: pcm_channels: int = 2, pcm_floating_point: bool = False, allow_resample: bool = False, - autostart: bool = False, ): """Init QueueStreamJob instance.""" self.queue = queue @@ -326,7 +327,7 @@ class QueueStream: self.logger = self.queue.logger.getChild("stream") self.expected_clients = 1 self.connected_clients: Dict[str, CoroutineType[bytes]] = {} - self.seconds_streamed = 0 + self.total_seconds_streamed = 0 self.streaming_started = 0 self.done = asyncio.Event() self.all_clients_connected = asyncio.Event() @@ -341,11 +342,12 @@ class QueueStream: self.output_chunksize = get_chunksize( output_format, pcm_sample_rate, pcm_bit_depth ) - if autostart: - self.mass.create_task(self.start()) - - async def start(self) -> None: - """Start running queue stream.""" + self.sample_size_per_second = get_chunksize( + ContentType.from_bit_depth(pcm_bit_depth, pcm_floating_point), + pcm_sample_rate, + pcm_bit_depth, + pcm_channels, + ) self._runner_task = self.mass.create_task(self._queue_stream_runner()) async def stop(self) -> None: @@ -423,7 +425,7 @@ class QueueStream: "0", "-", ] - # get the raw pcm bytes from the queue stream and on the fly encode to wanted format + # get the raw pcm bytes from the queue stream and on-the-fly encode to wanted format # send the compressed/encoded stream to the client(s). async with AsyncProcess(ffmpeg_args, True) as ffmpeg_proc: @@ -431,6 +433,9 @@ class QueueStream: """Task that sends the raw pcm audio to the ffmpeg process.""" async for audio_chunk in self._get_queue_stream(): await ffmpeg_proc.write(audio_chunk) + self.total_seconds_streamed += ( + len(audio_chunk) / self.sample_size_per_second + ) # write eof when last packet is received ffmpeg_proc.write_eof() @@ -469,13 +474,6 @@ class QueueStream: ): self.connected_clients.pop(client_id, None) - # complete queue streamed - if self.signal_next is not None and not self.queue.announcement_in_progress: - # the queue stream was aborted (e.g. because of sample rate mismatch) - # tell the queue to load the next track (restart stream) as soon - # as the player finished playing and returns to idle - self.queue.signal_next = self.signal_next - # all queue data has been streamed. Either because the queue is exhausted # or we need to restart the stream due to decoder/sample rate mismatch # set event that this stream task is finished @@ -565,7 +563,7 @@ class QueueStream: if ( use_crossfade and self.queue.settings.crossfade_mode != CrossFadeMode.ALWAYS - and prev_track is not None + and prev_track and prev_track.media_type == MediaType.TRACK and queue_track.media_type == MediaType.TRACK ): @@ -580,23 +578,6 @@ class QueueStream: use_crossfade = False prev_track = queue_track - # calculate sample_size based on PCM params for 1 second of audio - input_format = ContentType.from_bit_depth( - self.pcm_bit_depth, self.pcm_floating_point - ) - sample_size_per_second = get_chunksize( - input_format, - self.pcm_sample_rate, - self.pcm_bit_depth, - self.pcm_channels, - ) - crossfade_duration = self.queue.settings.crossfade_duration - crossfade_size = sample_size_per_second * crossfade_duration - # buffer_duration has some overhead to account for padded silence - buffer_duration = (crossfade_duration or 1) * 2 - # predict total size to expect for this track from duration - stream_duration = (queue_track.duration or 0) - seek_position - self.logger.info( "Start Streaming queue track: %s (%s) for queue %s - crossfade: %s", queue_track.uri, @@ -604,13 +585,27 @@ class QueueStream: self.queue.player.name, use_crossfade, ) + + # set some basic vars + if last_fadeout_part: + crossfade_duration = ( + len(last_fadeout_part) / self.sample_size_per_second + ) + else: + crossfade_duration = self.queue.settings.crossfade_duration + crossfade_size = self.sample_size_per_second * crossfade_duration queue_track.streamdetails.seconds_skipped = seek_position + # predict total size to expect for this track from duration + stream_duration = (queue_track.duration or 0) - seek_position # send signal that we've loaded a new track into the buffer self.index_in_buffer = queue_index self.queue.signal_update() + # precache the streamdetails for the next track + self.mass.create_task(self._precache_next_streamdetails()) + buffer = b"" bytes_written = 0 - seconds_streamed = 0 + chunk_num = 0 # handle incoming audio chunks async for chunk in get_media_stream( self.mass, @@ -619,16 +614,23 @@ class QueueStream: sample_rate=self.pcm_sample_rate, channels=self.pcm_channels, seek_position=seek_position, - chunk_size=sample_size_per_second, + chunk_size=self.sample_size_per_second, ): - seconds_streamed += 1 - self.seconds_streamed += 1 - seconds_in_buffer = len(buffer) / sample_size_per_second - # try to make a rough assumption of how many seconds the player has in buffer - player_in_buffer = self.seconds_streamed - ( - time() - self.streaming_started + chunk_num += 1 + seconds_in_buffer = len(buffer) / self.sample_size_per_second + # try to make a rough assumption of how many seconds is buffered ahead by the player(s) + buffered_ahead = ( + self.total_seconds_streamed - self.queue.player.elapsed_time or 0 ) + # use dynamic buffer size to account for slow connections (or throttling providers, like YT) + # buffer_duration has some overhead to account for padded silence + if use_crossfade and buffered_ahead > (crossfade_duration * 4): + buffer_duration = crossfade_duration + 6 + elif use_crossfade and buffered_ahead > (crossfade_duration * 2): + buffer_duration = crossfade_duration + 4 + else: + buffer_duration = 2 #### HANDLE FIRST PART OF TRACK @@ -638,22 +640,6 @@ class QueueStream: queue_track.streamdetails.seconds_streamed = 0 break - # bypass any processing for radiostreams and announcements - if ( - streamdetails.media_type == MediaType.ANNOUNCEMENT - or not stream_duration - or stream_duration < buffer_duration - or player_in_buffer < buffer_duration - ): - # handle edge case where we have a previous chunk in buffer - # and the next track is too short - if last_fadeout_part: - yield last_fadeout_part - last_fadeout_part = b"" - yield chunk - bytes_written += len(chunk) - continue - # buffer full for crossfade if last_fadeout_part and (seconds_in_buffer >= buffer_duration): # strip silence of start @@ -666,8 +652,7 @@ class QueueStream: crossfade_part = await crossfade_pcm_parts( fadein_part, last_fadeout_part, - crossfade_duration, - pcm_fmt, + self.pcm_bit_depth, self.pcm_sample_rate, ) # send crossfade_part @@ -689,7 +674,7 @@ class QueueStream: continue # last part of track: fill buffer - if buffer or (seconds_streamed >= (stream_duration - buffer_duration)): + if buffer or (chunk_num >= (stream_duration - buffer_duration)): buffer += chunk continue @@ -700,10 +685,11 @@ class QueueStream: #### HANDLE END OF TRACK self.logger.debug( - "end of track reached - seconds_streamed: %s - seconds_in_buffer: %s - stream_duration: %s", - seconds_streamed, + "end of track reached - chunk_num: %s - stream_buffer: %s - stream_duration: %s - player_buffer: %s", + chunk_num, seconds_in_buffer, stream_duration, + buffered_ahead, ) if buffer: @@ -711,16 +697,20 @@ class QueueStream: last_part = await strip_silence( buffer, pcm_fmt, self.pcm_sample_rate, reverse=True ) - # if crossfade is enabled, save fadeout part to pickup for next track - if use_crossfade and len(last_part) > crossfade_size: - # yield remaining bytes from strip action, - # we only need the crossfade_size part - last_fadeout_part = last_part[-crossfade_size:] - remaining_bytes = last_part[:-crossfade_size] - yield remaining_bytes - bytes_written += len(remaining_bytes) - elif use_crossfade: - last_fadeout_part = last_part + if use_crossfade: + # if crossfade is enabled, save fadeout part to pickup for next track + if len(last_part) < crossfade_size <= len(buffer): + # the chunk length is too short after stripping silence, only use first part + last_fadeout_part = buffer[:crossfade_size] + elif use_crossfade and len(last_part) > crossfade_size: + # yield remaining bytes from strip action, + # we only need the crossfade_size part + last_fadeout_part = last_part[-crossfade_size:] + remaining_bytes = last_part[:-crossfade_size] + yield remaining_bytes + bytes_written += len(remaining_bytes) + elif use_crossfade: + last_fadeout_part = last_part else: # no crossfade enabled, just yield the stripped audio data yield last_part @@ -729,7 +719,7 @@ class QueueStream: # end of the track reached - store accurate duration buffer = b"" queue_track.streamdetails.seconds_streamed = ( - bytes_written / sample_size_per_second + bytes_written / self.sample_size_per_second ) self.logger.debug( "Finished Streaming queue track: %s (%s) on queue %s", @@ -743,6 +733,16 @@ class QueueStream: # END OF QUEUE STREAM self.logger.debug("Queue stream for Queue %s finished.", self.queue.player.name) + async def _precache_next_streamdetails(self) -> None: + """Prefetch the streamdetails for the next track.""" + next_index = self.queue.get_next_index(self.index_in_buffer) + if next_index <= self.index_in_buffer: + return + queue_track = self.queue.get_item(next_index) + if not queue_track: + return + await get_stream_details(self.mass, queue_track, self.queue.queue_id) + async def _check_stop(self) -> bool: """Schedule stop of queue stream.""" # Stop this queue stream when no clients (re)connected within 5 seconds @@ -750,5 +750,5 @@ class QueueStream: if len(self.connected_clients) > 0: return False await asyncio.sleep(0.5) - self.mass.create_task(self.stop()) + asyncio.create_task(self.stop()) return True diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index 582b9fb2..58eb98ef 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -34,12 +34,15 @@ LOGGER = logging.getLogger(__name__) async def crossfade_pcm_parts( fade_in_part: bytes, fade_out_part: bytes, - fade_length: int, - fmt: ContentType, + bit_depth: int, sample_rate: int, channels: int = 2, ) -> bytes: """Crossfade two chunks of pcm/raw audio using ffmpeg.""" + sample_size = int(sample_rate * (bit_depth / 8) * channels) + fmt = ContentType.from_bit_depth(bit_depth) + # calculate the fade_length from the smallest chunk + fade_length = min(len(fade_in_part), len(fade_out_part)) / sample_size fadeoutfile = create_tempfile() async with aiofiles.open(fadeoutfile.name, "wb") as outfile: await outfile.write(fade_out_part) @@ -83,15 +86,15 @@ async def crossfade_pcm_parts( crossfade_data, _ = await proc.communicate(fade_in_part) if crossfade_data: LOGGER.debug( - "crossfaded 2 pcm chunks. fade_in_part: %s - fade_out_part: %s - result: %s", + "crossfaded 2 pcm chunks. fade_in_part: %s - fade_out_part: %s - fade_length: %s seconds", len(fade_in_part), len(fade_out_part), - len(crossfade_data), + fade_length, ) return crossfade_data # no crossfade_data, return original data instead LOGGER.debug( - "crossfade of pcm chunks failed: not enough data. fade_in_part: %s - fade_out_part: %s", + "crossfade of pcm chunks failed: not enough data? fade_in_part: %s - fade_out_part: %s", len(fade_in_part), len(fade_out_part), ) diff --git a/music_assistant/mass.py b/music_assistant/mass.py index 6463585b..cf355b01 100644 --- a/music_assistant/mass.py +++ b/music_assistant/mass.py @@ -2,9 +2,7 @@ from __future__ import annotations import asyncio -import functools import logging -import threading from collections import deque from functools import partial from time import time @@ -71,7 +69,7 @@ class MusicAssistant: async def setup(self) -> None: """Async setup of music assistant.""" # initialize loop - self.loop = asyncio.get_event_loop() + self.loop = asyncio.get_running_loop() # create shared aiohttp ClientSession if not self.http_session: self.http_session = aiohttp.ClientSession( @@ -109,7 +107,10 @@ class MusicAssistant: continue if not (id_filter is None or event.object_id in id_filter): continue - self.create_task(cb_func, event) + if asyncio.iscoroutinefunction(cb_func): + asyncio.run_coroutine_threadsafe(cb_func(event), self.loop) + else: + self.loop.call_soon_threadsafe(cb_func, event) def subscribe( self, @@ -156,7 +157,7 @@ class MusicAssistant: def create_task( self, - target: Callable[..., Any], + target: Coroutine, *args: Any, **kwargs: Any, ) -> Union[asyncio.Task, asyncio.Future]: @@ -168,31 +169,10 @@ class MusicAssistant: if self.closed: return - # Check for partials to properly determine if coroutine function - check_target = target - while isinstance(check_target, functools.partial): - check_target = check_target.func - - async def executor_wrapper(_target: Callable, *_args, **_kwargs): - return await self.loop.run_in_executor(None, _target, *_args, **_kwargs) - - # called from other thread - if threading.current_thread() is not threading.main_thread(): - if asyncio.iscoroutine(check_target): - task = asyncio.run_coroutine_threadsafe(target, self.loop) - elif asyncio.iscoroutinefunction(check_target): - task = asyncio.run_coroutine_threadsafe(target(*args), self.loop) - else: - task = asyncio.run_coroutine_threadsafe( - executor_wrapper(target, *args, **kwargs), self.loop - ) + if asyncio.iscoroutinefunction(target): + task = self.loop.create_task(target(*args, **kwargs)) else: - if asyncio.iscoroutine(check_target): - task = self.loop.create_task(target) - elif asyncio.iscoroutinefunction(check_target): - task = self.loop.create_task(target(*args)) - else: - task = self.loop.create_task(executor_wrapper(target, *args, **kwargs)) + task = self.loop.create_task(target) def task_done_callback(*args, **kwargs): self._tracked_tasks.remove(task) diff --git a/music_assistant/models/media_controller.py b/music_assistant/models/media_controller.py index c36f9066..5234b7ac 100644 --- a/music_assistant/models/media_controller.py +++ b/music_assistant/models/media_controller.py @@ -93,7 +93,7 @@ class MediaControllerBase(Generic[ItemCls], metaclass=ABCMeta): sql_query, params, limit=limit, offset=offset ) count = len(items) - if count < limit: + if 0 < count < limit: total = offset + count else: total = await self.mass.database.get_count_from_query(sql_query, params) @@ -250,6 +250,8 @@ class MediaControllerBase(Generic[ItemCls], metaclass=ABCMeta): # mark as favorite/library item on provider(s) for prov_id in prov_item.provider_ids: if prov := self.mass.music.get_provider(prov_id.prov_id): + if not prov.library_edit_supported(self.media_type): + continue await prov.library_add(prov_id.item_id, self.media_type) # mark as library item in internal db if db item if prov_item.provider == ProviderType.DATABASE: @@ -276,6 +278,8 @@ class MediaControllerBase(Generic[ItemCls], metaclass=ABCMeta): # unmark as favorite/library item on provider(s) for prov_id in prov_item.provider_ids: if prov := self.mass.music.get_provider(prov_id.prov_id): + if not prov.library_edit_supported(self.media_type): + continue await prov.library_remove(prov_id.item_id, self.media_type) # unmark as library item in internal db if db item if prov_item.provider == ProviderType.DATABASE: @@ -401,20 +405,23 @@ class MediaControllerBase(Generic[ItemCls], metaclass=ABCMeta): async def remove_prov_mapping(self, item_id: int, prov_id: str) -> None: """Remove provider id(s) from item.""" - if db_item := await self.get_db_item(item_id): - db_item.provider_ids = { - x for x in db_item.provider_ids if x.prov_id != prov_id - } - if not db_item.provider_ids: - # item has no more provider_ids left, it is completely deleted - try: - await self.delete_db_item(db_item.item_id) - except AssertionError: - self.logger.debug( - "Could not delete %s: it has items attached", db_item.item_id - ) - return - await self.update_db_item(db_item.item_id, db_item, overwrite=True) + try: + db_item = await self.get_db_item(item_id) + except MediaNotFoundError: + # edge case: already deleted / race condition + return + + db_item.provider_ids = {x for x in db_item.provider_ids if x.prov_id != prov_id} + if not db_item.provider_ids: + # item has no more provider_ids left, it is completely deleted + try: + await self.delete_db_item(db_item.item_id) + except AssertionError: + self.logger.debug( + "Could not delete %s: it has items attached", db_item.item_id + ) + return + await self.update_db_item(db_item.item_id, db_item, overwrite=True) self.logger.debug("removed provider %s from item id %s", prov_id, item_id) diff --git a/music_assistant/models/media_items.py b/music_assistant/models/media_items.py index b5c753b6..d6804a0d 100755 --- a/music_assistant/models/media_items.py +++ b/music_assistant/models/media_items.py @@ -111,6 +111,9 @@ class MediaItemMetadata(DataClassDictMixin): setattr(self, fld.name, new_val) elif cur_val is None or allow_overwrite: setattr(self, fld.name, new_val) + elif new_val and fld.name in ("checksum", "popularity", "last_refresh"): + # some fields are always allowed to be overwritten (such as checksum and last_refresh) + setattr(self, fld.name, new_val) return self @@ -181,12 +184,12 @@ class MediaItem(DataClassDictMixin): return any(x.available for x in self.provider_ids) @property - def image(self) -> str | None: + def image(self) -> MediaItemImage | None: """Return (first/random) image/thumb from metadata (if any).""" if self.metadata is None or self.metadata.images is None: return None return next( - (x.url for x in self.metadata.images if x.type == ImageType.THUMB), None + (x for x in self.metadata.images if x.type == ImageType.THUMB), None ) def add_provider_id(self, prov_id: MediaItemProviderId) -> None: @@ -302,7 +305,7 @@ class Track(MediaItem): return hash((self.provider, self.item_id)) @property - def image(self) -> str | None: + def image(self) -> MediaItemImage | None: """Return (first/random) image/thumb from metadata (if any).""" if image := super().image: return image diff --git a/music_assistant/models/music_provider.py b/music_assistant/models/music_provider.py index 8d4a61b6..1cece3d9 100644 --- a/music_assistant/models/music_provider.py +++ b/music_assistant/models/music_provider.py @@ -465,6 +465,21 @@ class MusicProvider: if media_type == MediaType.RADIO: return MusicProviderFeature.LIBRARY_RADIOS in self.supported_features + def library_edit_supported(self, media_type: MediaType) -> bool: + """Return if Library add/remove is supported for given MediaType on this provider.""" + if media_type == MediaType.ARTIST: + return MusicProviderFeature.LIBRARY_ARTISTS_EDIT in self.supported_features + if media_type == MediaType.ALBUM: + return MusicProviderFeature.LIBRARY_ALBUMS_EDIT in self.supported_features + if media_type == MediaType.TRACK: + return MusicProviderFeature.LIBRARY_TRACKS_EDIT in self.supported_features + if media_type == MediaType.PLAYLIST: + return ( + MusicProviderFeature.LIBRARY_PLAYLISTS_EDIT in self.supported_features + ) + if media_type == MediaType.RADIO: + return MusicProviderFeature.LIBRARY_RADIOS_EDIT in self.supported_features + def _get_library_gen(self, media_type: MediaType) -> AsyncGenerator[MediaItemType]: """Return library generator for given media_type.""" if media_type == MediaType.ARTIST: diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index 35840286..e625e864 100755 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -325,13 +325,13 @@ class Player(ABC): if child_player_id == self.player_id: continue if player := self.mass.players.get_player(child_player_id): - self.mass.create_task( + self.mass.loop.call_soon_threadsafe( player.on_parent_update, self.player_id, changed_keys ) # update group player(s) when child updates for group_player in self.get_group_parents(): - self.mass.create_task( + self.mass.loop.call_soon_threadsafe( group_player.on_child_update, self.player_id, changed_keys ) diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index 8fea1673..8449119a 100644 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -9,6 +9,7 @@ from asyncio import TimerHandle from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union +from music_assistant.helpers.util import try_parse_int from music_assistant.models.enums import ( ContentType, EventType, @@ -292,7 +293,6 @@ class PlayerQueue: def create_announcement(_url: str): return QueueItem( - uri=_url, name="announcement", duration=30, streamdetails=StreamDetails( @@ -304,7 +304,6 @@ class PlayerQueue: gain_correct=4, direct=_url, ), - media_type=MediaType.ANNOUNCEMENT, ) try: @@ -361,7 +360,6 @@ class PlayerQueue: await self.player.play_url(stream.url) # wait for the player to finish playing - await asyncio.sleep(5) await self._wait_for_state(PlayerState.PLAYING, silence_item.item_id) except Exception as err: # pylint: disable=broad-except @@ -786,7 +784,7 @@ class PlayerQueue: # save items self.mass.create_task( self.mass.cache.set( - f"queue.{self.queue_id}.items", + f"queue.items.{self.queue_id}", [x.to_dict() for x in self._items], ) ) @@ -871,7 +869,7 @@ class PlayerQueue: async def _restore_items(self) -> None: """Try to load the saved state from cache.""" - if queue_cache := await self.mass.cache.get(f"queue.{self.queue_id}.items"): + if queue_cache := await self.mass.cache.get(f"queue.items.{self.queue_id}"): try: self._items = [QueueItem.from_dict(x) for x in queue_cache] except (KeyError, AttributeError, TypeError) as err: @@ -884,10 +882,10 @@ class PlayerQueue: # restore state too db_key = f"queue.{self.queue_id}.current_index" if db_value := await self.mass.database.get_setting(db_key): - self._current_index = int(db_value) + self._current_index = try_parse_int(db_value) db_key = f"queue.{self.queue_id}.current_item_elapsed_time" if db_value := await self.mass.database.get_setting(db_key): - self._current_item_elapsed_time = int(db_value) + self._current_item_elapsed_time = try_parse_int(db_value) await self.settings.restore() diff --git a/music_assistant/models/queue_item.py b/music_assistant/models/queue_item.py index 64cce2f8..3c25ae58 100644 --- a/music_assistant/models/queue_item.py +++ b/music_assistant/models/queue_item.py @@ -8,28 +8,33 @@ from uuid import uuid4 from mashumaro import DataClassDictMixin from music_assistant.models.enums import MediaType -from music_assistant.models.media_items import Radio, StreamDetails, Track +from music_assistant.models.media_items import ( + ItemMapping, + MediaItemImage, + Radio, + StreamDetails, + Track, +) @dataclass class QueueItem(DataClassDictMixin): """Representation of a queue item.""" - uri: str name: str = "" duration: Optional[int] = None item_id: str = "" sort_index: int = 0 streamdetails: Optional[StreamDetails] = None - media_type: MediaType = MediaType.UNKNOWN - image: Optional[str] = None - available: bool = True media_item: Union[Track, Radio, None] = None + image: Optional[MediaItemImage] = None def __post_init__(self): """Set default values.""" if not self.item_id: self.item_id = str(uuid4()) + if self.streamdetails and self.streamdetails.stream_title: + self.name = self.streamdetails.stream_title if not self.name: self.name = self.uri @@ -39,11 +44,19 @@ class QueueItem(DataClassDictMixin): d.pop("streamdetails", None) return d - def __post_serialize__(self, d: Dict[Any, Any]) -> Dict[Any, Any]: - """Run actions before serialization.""" - if self.media_type == MediaType.RADIO: - d.pop("duration") - return d + @property + def uri(self) -> str: + """Return uri for this QueueItem (for logging purposes).""" + if self.media_item: + return self.media_item.uri + return self.item_id + + @property + def media_type(self) -> MediaType: + """Return MediaType for this QueueItem (for convenience purposes).""" + if self.media_item: + return self.media_item.media_type + return MediaType.UNKNOWN @classmethod def from_media_item(cls, media_item: Track | Radio): @@ -51,14 +64,16 @@ class QueueItem(DataClassDictMixin): if isinstance(media_item, Track): artists = "/".join((x.name for x in media_item.artists)) name = f"{artists} - {media_item.name}" + # save a lot of data/bandwidth by simplifying nested objects + media_item.artists = [ItemMapping.from_item(x) for x in media_item.artists] + if media_item.album: + media_item.album = ItemMapping.from_item(media_item.album) + media_item.albums = [] else: name = media_item.name return cls( - uri=media_item.uri, name=name, duration=media_item.duration, - media_type=media_item.media_type, media_item=media_item, image=media_item.image, - available=media_item.available, ) diff --git a/music_assistant/music_providers/filesystem.py b/music_assistant/music_providers/filesystem.py index 72580d92..41a71524 100644 --- a/music_assistant/music_providers/filesystem.py +++ b/music_assistant/music_providers/filesystem.py @@ -6,7 +6,6 @@ import logging import os import urllib.parse from contextlib import asynccontextmanager -from pathlib import Path from time import time from typing import AsyncGenerator, List, Optional, Set, Tuple @@ -17,6 +16,7 @@ from aiofiles.threadpool.binary import AsyncFileIO from music_assistant.constants import VARIOUS_ARTISTS, VARIOUS_ARTISTS_ID from music_assistant.helpers.compare import compare_strings +from music_assistant.helpers.playlists import parse_m3u, parse_pls from music_assistant.helpers.tags import parse_tags, split_items from music_assistant.helpers.util import create_safe_string, parse_title_and_version from music_assistant.models.enums import MusicProviderFeature, ProviderType @@ -41,8 +41,9 @@ from music_assistant.models.media_items import ( from music_assistant.models.music_provider import MusicProvider TRACK_EXTENSIONS = ("mp3", "m4a", "mp4", "flac", "wav", "ogg", "aiff", "wma", "dsf") -PLAYLIST_EXTENSIONS = ("m3u",) +PLAYLIST_EXTENSIONS = ("m3u", "pls") SUPPORTED_EXTENSIONS = TRACK_EXTENSIONS + PLAYLIST_EXTENSIONS +IMAGE_EXTENSIONS = ("jpg", "jpeg", "JPG", "JPEG", "png", "PNG", "gif", "GIF") SCHEMA_VERSION = 17 LOGGER = logging.getLogger(__name__) @@ -192,6 +193,7 @@ class FileSystemProvider(MusicProvider): subitems.append(db_item) continue if ext in PLAYLIST_EXTENSIONS: + item_id = self._get_item_id(full_path) if db_item := await self.mass.music.playlists.get_db_item_by_prov_id( item_id, provider_id=self.id ): @@ -367,19 +369,26 @@ class FileSystemProvider(MusicProvider): playlist_path = await self._get_filepath(MediaType.PLAYLIST, prov_playlist_id) if not await self.exists(playlist_path): raise MediaNotFoundError(f"Playlist path does not exist: {playlist_path}") - playlist_base_path = Path(playlist_path).parent + parentdir = os.path.dirname(playlist_path) + _, ext = playlist_path.rsplit(".", 1) try: async with self.open_file(playlist_path, "r") as _file: - for line_no, line in enumerate(await _file.readlines()): - line = urllib.parse.unquote(line.strip()) - if line and not line.startswith("#"): - # TODO: add support for .pls playlist files - if media_item := await self._parse_playlist_line( - line, playlist_base_path - ): - # use the linenumber as position for easier deletions - media_item.position = line_no - result.append(media_item) + playlist_data = await _file.read() + + if ext in ("m3u", "m3u8"): + playlist_lines = await parse_m3u(playlist_data) + else: + playlist_lines = await parse_pls(playlist_data) + + for line_no, playlist_line in enumerate(playlist_lines): + + if media_item := await self._parse_playlist_line( + playlist_line, parentdir + ): + # use the linenumber as position for easier deletions + media_item.position = line_no + result.append(media_item) + except Exception as err: # pylint: disable=broad-except self.logger.warning( "Error while parsing playlist %s", playlist_path, exc_info=err @@ -391,10 +400,13 @@ class FileSystemProvider(MusicProvider): ) -> Track | Radio | None: """Try to parse a track from a playlist line.""" try: - # try to treat uri as filename first - if await self.exists(line): - file_path = await self.resolve(line) - return await self._parse_track(file_path) + # try to treat uri as (relative) filename + if "://" not in line: + for filename in (line, os.path.join(playlist_path, line)): + if not await self.exists(filename): + continue + file_path = await self.resolve(filename) + return await self._parse_track(file_path) # fallback to generic uri parsing return await self.mass.music.get_item_by_uri(line) except MusicAssistantError as err: @@ -544,9 +556,7 @@ class FileSystemProvider(MusicProvider): # cover image - prefer album image, fallback to embedded if track.album and track.album.image: - track.metadata.images = [ - MediaItemImage(ImageType.THUMB, track.album.image, True) - ] + track.metadata.images = [track.album.image] elif tags.has_cover_image: # we do not actually embed the image in the metadata because that would consume too # much space and bandwidth. Instead we set the filename as value so the image can @@ -679,24 +689,7 @@ class FileSystemProvider(MusicProvider): if genre := info.get("genre"): artist.metadata.genres = set(split_items(genre)) # find local images - images = [] - for _path in await self.mass.loop.run_in_executor( - None, os.scandir, artist_path - ): - if "." not in _path.path or _path.is_dir(): - continue - filename, ext = _path.path.rsplit(os.sep, 1)[-1].split(".", 1) - if ext not in ("jpg", "png"): - continue - try: - images.append(MediaItemImage(ImageType(filename), _path.path, True)) - except ValueError: - if "folder" in filename: - images.append(MediaItemImage(ImageType.THUMB, _path.path, True)) - elif "Artist" in filename: - images.append(MediaItemImage(ImageType.THUMB, _path.path, True)) - if images: - artist.metadata.images = images + artist.metadata.images = await self._get_local_images(artist_path) or None return artist @@ -753,22 +746,7 @@ class FileSystemProvider(MusicProvider): album.name, album.version = parse_title_and_version(album.name) # find local images - images = [] - async for _path in scantree(album_path): - if "." not in _path.path or _path.is_dir(): - continue - filename, ext = _path.path.rsplit(os.sep, 1)[-1].split(".", 1) - if ext not in ("jpg", "png"): - continue - try: - images.append(MediaItemImage(ImageType(filename), _path.path, True)) - except ValueError: - if "folder" in filename: - images.append(MediaItemImage(ImageType.THUMB, _path.path, True)) - elif "AlbumArt" in filename: - images.append(MediaItemImage(ImageType.THUMB, _path.path, True)) - if images: - album.metadata.images = images + album.metadata.images = await self._get_local_images(album_path) or None return album @@ -779,10 +757,11 @@ class FileSystemProvider(MusicProvider): if not await self.exists(playlist_path): raise MediaNotFoundError(f"Playlist path does not exist: {playlist_path}") - name = playlist_path.split(os.sep)[-1].replace(".m3u", "") + playlist_path_base, ext = playlist_path.rsplit(".", 1) + name = playlist_path_base.split(os.sep)[-1] playlist = Playlist(playlist_item_id, provider=self.type, name=name) - playlist.is_editable = True + playlist.is_editable = ext != "pls" # can only edit m3u playlists playlist.add_provider_id( MediaItemProviderId( @@ -849,3 +828,24 @@ class FileSystemProvider(MusicProvider): def _get_item_id(self, file_path: str) -> str: """Create item id from filename.""" return create_safe_string(file_path.replace(self.config.path, "")) + + async def _get_local_images(self, folder: str) -> List[MediaItemImage]: + """Return local images found in a given folderpath.""" + images = [] + async for _path in scantree(folder): + if "." not in _path.path or _path.is_dir(): + continue + for ext in IMAGE_EXTENSIONS: + if not _path.path.endswith(f".{ext}"): + continue + filename = _path.path.rsplit(os.sep, 1)[-1].replace(f".{ext}", "") + try: + images.append(MediaItemImage(ImageType(filename), _path.path, True)) + except ValueError: + if "folder" in filename: + images.append(MediaItemImage(ImageType.THUMB, _path.path, True)) + elif "AlbumArt" in filename: + images.append(MediaItemImage(ImageType.THUMB, _path.path, True)) + elif "Artist" in filename: + images.append(MediaItemImage(ImageType.THUMB, _path.path, True)) + return images diff --git a/music_assistant/music_providers/spotify.py b/music_assistant/music_providers/spotify.py index 8642f55b..010f68fe 100644 --- a/music_assistant/music_providers/spotify.py +++ b/music_assistant/music_providers/spotify.py @@ -75,21 +75,10 @@ class SpotifyProvider(MusicProvider): """Handle async initialization of the provider.""" if not self.config.enabled: return False - if not self.config.username or not self.config.password: - raise LoginFailed("Invalid login credentials") # try to get a token, raise if that fails self._cache_dir = os.path.join(CACHE_DIR, self.id) - token = await self.get_token() - if not token: - try: - # a spotify free/basic account can be recoognized when - # the username consists of numbers only - check that here - int(self.config.username) - # an integer can be parsed of the username, this is a free account - raise LoginFailed("Only Spotify Premium accounts are supported") - except ValueError: - # pylint: disable=raise-missing-from - raise LoginFailed(f"Login failed for user {self.config.username}") + # try login which will raise if it fails + await self.login() return True async def search( @@ -308,7 +297,7 @@ class SpotifyProvider(MusicProvider): if not track: raise MediaNotFoundError(f"track {item_id} not found") # make sure that the token is still valid by just requesting it - await self.get_token() + await self.login() return StreamDetails( item_id=track.item_id, provider=self.type, @@ -321,7 +310,7 @@ class SpotifyProvider(MusicProvider): ) -> AsyncGenerator[bytes, None]: """Return the audio stream for the provider item.""" # make sure that the token is still valid by just requesting it - await self.get_token() + await self.login() librespot = await self.get_librespot_binary() args = [ librespot, @@ -495,8 +484,8 @@ class SpotifyProvider(MusicProvider): playlist.metadata.checksum = str(playlist_obj["snapshot_id"]) return playlist - async def get_token(self): - """Get auth token on spotify.""" + async def login(self) -> dict: + """Log-in Spotify and return tokeninfo.""" # return existing token if we have one in memory if ( self._auth_token @@ -504,37 +493,49 @@ class SpotifyProvider(MusicProvider): and (self._auth_token["expiresAt"] > int(time.time()) + 20) ): return self._auth_token - tokeninfo = {} + tokeninfo, userinfo = None, self._sp_user if not self.config.username or not self.config.password: - return tokeninfo + raise LoginFailed("Invalid login credentials") # retrieve token with librespot retries = 0 - while retries < 4: + while retries < 20: try: - tokeninfo = await asyncio.wait_for(self._get_token(), 5) - if tokeninfo: + retries += 1 + if not tokeninfo: + tokeninfo = await asyncio.wait_for(self._get_token(), 5) + if tokeninfo and not userinfo: + userinfo = await asyncio.wait_for( + self._get_data("me", tokeninfo=tokeninfo), 5 + ) + if tokeninfo and userinfo: + # we have all info we need! break if retries > 2: # switch to ap workaround after 2 retries self._ap_workaround = True - retries += 1 + except asyncio.exceptions.TimeoutError: await asyncio.sleep(2) - except TimeoutError: - pass - if tokeninfo: + if tokeninfo and userinfo: self._auth_token = tokeninfo - self._sp_user = await self._get_data("me") - self.mass.metadata.preferred_language = self._sp_user["country"] - self.logger.info( - "Succesfully logged in to Spotify as %s", self._sp_user["id"] - ) + self._sp_user = userinfo + self.mass.metadata.preferred_language = userinfo["country"] + self.logger.info("Succesfully logged in to Spotify as %s", userinfo["id"]) self._auth_token = tokeninfo - else: - self.logger.error("Login failed for user %s", self.config.username) - return tokeninfo + return tokeninfo + if tokeninfo and not userinfo: + raise LoginFailed( + "Unable to retrieve userdetails from Spotify API - probably just a temporary error" + ) + if self.config.username.isnumeric(): + # a spotify free/basic account can be recognized when + # the username consists of numbers only - check that here + # an integer can be parsed of the username, this is a free account + raise LoginFailed("Only Spotify Premium accounts are supported") + raise LoginFailed(f"Login failed for user {self.config.username}") async def _get_token(self): """Get spotify auth token with librespot bin.""" + time_start = time.time() # authorize with username and password (NOTE: this can also be Spotify Connect) args = [ await self.get_librespot_binary(), @@ -585,14 +586,20 @@ class SpotifyProvider(MusicProvider): *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT ) stdout, _ = await librespot.communicate() + duration = round(time.time() - time_start, 2) try: result = json.loads(stdout) except JSONDecodeError: self.logger.warning( - "Error while retrieving Spotify token, details: %s", + "Error while retrieving Spotify token after %s seconds, details: %s", + duration, stdout.decode(), ) return None + self.logger.debug( + "Retrieved Spotify token using librespot in %s seconds", + duration, + ) # transform token info to spotipy compatible format if result and "accessToken" in result: tokeninfo = result @@ -619,38 +626,44 @@ class SpotifyProvider(MusicProvider): break return all_items - async def _get_data(self, endpoint, **kwargs): + async def _get_data(self, endpoint, tokeninfo: Optional[dict] = None, **kwargs): """Get data from api.""" url = f"https://api.spotify.com/v1/{endpoint}" kwargs["market"] = "from_token" kwargs["country"] = "from_token" - token = await self.get_token() - if not token: - return None - headers = {"Authorization": f'Bearer {token["accessToken"]}'} + if tokeninfo is None: + tokeninfo = await self.login() + headers = {"Authorization": f'Bearer {tokeninfo["accessToken"]}'} async with self._throttler: - async with self.mass.http_session.get( - url, headers=headers, params=kwargs, verify_ssl=False - ) as response: - try: + time_start = time.time() + try: + async with self.mass.http_session.get( + url, headers=headers, params=kwargs, verify_ssl=False, timeout=120 + ) as response: result = await response.json() if "error" in result or ( "status" in result and "error" in result["status"] ): self.logger.error("%s - %s", endpoint, result) return None - except ( - aiohttp.ContentTypeError, - JSONDecodeError, - ) as err: - self.logger.error("%s - %s", endpoint, str(err)) - return None - return result + except ( + aiohttp.ContentTypeError, + JSONDecodeError, + ) as err: + self.logger.error("%s - %s", endpoint, str(err)) + return None + finally: + self.logger.debug( + "Processing GET/%s took %s seconds", + endpoint, + round(time.time() - time_start, 2), + ) + return result async def _delete_data(self, endpoint, data=None, **kwargs): """Delete data from api.""" url = f"https://api.spotify.com/v1/{endpoint}" - token = await self.get_token() + token = await self.login() if not token: return None headers = {"Authorization": f'Bearer {token["accessToken"]}'} @@ -662,7 +675,7 @@ class SpotifyProvider(MusicProvider): async def _put_data(self, endpoint, data=None, **kwargs): """Put data on api.""" url = f"https://api.spotify.com/v1/{endpoint}" - token = await self.get_token() + token = await self.login() if not token: return None headers = {"Authorization": f'Bearer {token["accessToken"]}'} @@ -674,7 +687,7 @@ class SpotifyProvider(MusicProvider): async def _post_data(self, endpoint, data=None, **kwargs): """Post data on api.""" url = f"https://api.spotify.com/v1/{endpoint}" - token = await self.get_token() + token = await self.login() if not token: return None headers = {"Authorization": f'Bearer {token["accessToken"]}'}