"""
return self.metadata and self.metadata.chapters and len(self.metadata.chapters) > 1
+ @property
+ def image(self) -> MediaItemImage | None:
+ """Return (first) image from metadata (prefer album)."""
+ if isinstance(self.album, Album) and self.album.image:
+ return self.album.image
+ return super().image
+
@dataclass(kw_only=True)
class AlbumTrack(Track):
) -> Artist:
"""Add artist to library and return the database item."""
if isinstance(item, ItemMapping):
- metadata_lookup = True
+ metadata_lookup = False
# grab musicbrainz id and additional metadata
if metadata_lookup:
await self.mass.metadata.get_artist_metadata(item)
provider_instance_id_or_domain: str,
) -> list[Track]:
"""Return top tracks for an artist on given provider."""
+ items = []
assert provider_instance_id_or_domain != "library"
artist = await self.get(item_id, provider_instance_id_or_domain, add_to_library=False)
cache_checksum = artist.metadata.checksum
items = await prov.get_artist_toptracks(item_id)
else:
# fallback implementation using the db
- items = []
if db_artist := await self.mass.music.artists.get_library_item_by_prov_id(
item_id,
provider_instance_id_or_domain,
query += (
f" AND tracks.provider_mappings LIKE '%\"{provider_instance_id_or_domain}\"%'"
)
- if paged_list := await self.mass.music.tracks.library_items(extra_query=query):
- items = paged_list.items
+ paged_list = await self.mass.music.tracks.library_items(extra_query=query)
+ return paged_list.items
# store (serializable items) in cache
self.mass.create_task(
self.mass.cache.set(cache_key, [x.to_dict() for x in items], checksum=cache_checksum)
provider_instance_id_or_domain: str,
) -> list[Album]:
"""Return albums for an artist on given provider."""
+ items = []
assert provider_instance_id_or_domain != "library"
artist = await self.get_provider_item(item_id, provider_instance_id_or_domain)
cache_checksum = artist.metadata.checksum
f" AND albums.provider_mappings LIKE '%\"{provider_instance_id_or_domain}\"%'"
)
paged_list = await self.mass.music.albums.library_items(extra_query=query)
- items = paged_list.items
- else:
- # edge case
- items = []
+ return paged_list.items
# store (serializable items) in cache
self.mass.create_task(
self.mass.cache.set(cache_key, [x.to_dict() for x in items], checksum=cache_checksum)
else:
items = searchresult.radio
# store (serializable items) in cache
- if not prov.domain.startswith("filesystem"): # do not cache filesystem results
+ if prov.is_streaming_provider: # do not cache filesystem results
self.mass.create_task(
self.mass.cache.set(cache_key, [x.to_dict() for x in items], expiration=86400 * 7)
)
"version": db_row_dict["album_version"],
}
db_row_dict["album"] = ItemMapping.from_dict(db_row_dict["album"])
- if not db_row_dict["metadata"]["images"]:
- # copy album image in case the track has no image
- album_metadata = json_loads(db_row_dict["album_metadata"])
- db_row_dict["metadata"]["images"] = album_metadata["images"]
return db_row_dict
# grab additional metadata
if metadata_lookup:
await self.mass.metadata.get_track_metadata(item)
- # fallback track image from album (only if albumtype = single)
+ # allow track image from album (only if albumtype = single)
if (
not item.image
and isinstance(item.album, Album)
cache_key = f"{prov.instance_id}.search.{search_query}.{limit}.{media_types_str}"
cache_key += "".join(x for x in media_types)
- if cache := await self.mass.cache.get(cache_key):
+ if prov.is_streaming_provider and (cache := await self.mass.cache.get(cache_key)):
return SearchResults.from_dict(cache)
# no items in cache - get listing from provider
result = await prov.search(
limit,
)
# store (serializable items) in cache
- self.mass.create_task(
- self.mass.cache.set(cache_key, result.to_dict(), expiration=86400 * 7)
- )
+ if prov.is_streaming_provider:
+ self.mass.create_task(
+ self.mass.cache.set(cache_key, result.to_dict(), expiration=86400 * 7)
+ )
return result
@api_command("music/browse")
cur_index = queue.index_in_buffer or 0
else:
cur_index = queue.current_index or 0
- shuffle = queue.shuffle_enabled and len(queue_items) >= 5
+ shuffle = queue.shuffle_enabled and len(queue_items) > 1
# handle replace: clear all items and replace with the new items
if option == QueueOption.REPLACE:
- self.load(queue_id, queue_items=queue_items, shuffle=shuffle)
+ self.load(
+ queue_id,
+ queue_items=queue_items,
+ keep_remaining=False,
+ keep_played=False,
+ shuffle=shuffle,
+ )
await self.play_index(queue_id, 0)
+ return
# handle next: add item(s) in the index next to the playing/loaded/buffered index
- elif option == QueueOption.NEXT:
+ if option == QueueOption.NEXT:
self.load(
queue_id,
queue_items=queue_items,
insert_at_index=cur_index + 1,
shuffle=shuffle,
)
- elif option == QueueOption.REPLACE_NEXT:
+ return
+ if option == QueueOption.REPLACE_NEXT:
self.load(
queue_id,
queue_items=queue_items,
keep_remaining=False,
shuffle=shuffle,
)
+ return
# handle play: replace current loaded/playing index with new item(s)
- elif option == QueueOption.PLAY:
- if cur_index <= len(self._queue_items[queue_id]) - 1:
- cur_index = 0
+ if option == QueueOption.PLAY:
self.load(
queue_id,
queue_items=queue_items,
- insert_at_index=cur_index,
+ insert_at_index=cur_index + 1,
shuffle=shuffle,
)
- await self.play_index(queue_id, cur_index)
+ next_index = min(cur_index + 1, len(self._queue_items[queue_id]) - 1)
+ await self.play_index(queue_id, next_index)
+ return
# handle add: add/append item(s) to the remaining queue items
- elif option == QueueOption.ADD:
+ if option == QueueOption.ADD:
if queue.shuffle_enabled:
# shuffle the new items with remaining queue items
insert_at_index = cur_index + 1
queue_items: list[QueueItem],
insert_at_index: int = 0,
keep_remaining: bool = True,
+ keep_played: bool = True,
shuffle: bool = False,
) -> None:
"""Load new items at index.
- keep_remaining: keep the remaining items after the insert
- shuffle: (re)shuffle the items after insert index
"""
- # keep previous/played items, append the new ones
- prev_items = self._queue_items[queue_id][:insert_at_index]
+ prev_items = self._queue_items[queue_id][:insert_at_index] if keep_played else []
next_items = queue_items
- # if keep_remaining, append the old previous items
+ # if keep_remaining, append the old 'next' items
if keep_remaining:
next_items += self._queue_items[queue_id][insert_at_index:]
try:
if not library_item and not prov_item.available:
# skip unavailable tracks
- self.logger.debg(
+ self.logger.debug(
"Skipping sync of item %s because it is unavailable", prov_item.uri
)
continue
advanced=True,
),
ConfigEntry.from_dict(
- {**CONF_ENTRY_OUTPUT_CODEC.to_dict(), "default_value": "pcm", "hidden": True}
+ {**CONF_ENTRY_OUTPUT_CODEC.to_dict(), "default_value": "flac", "hidden": True}
),
)
# set codecs and sample rate to airplay default
common_elem = xml_root.find("common")
- common_elem.find("codecs").text = "pcm"
+ common_elem.find("codecs").text = "flc,pcm"
common_elem.find("sample_rate").text = "44100"
- common_elem.find("resample").text = "0"
+ common_elem.find("resample").text = "1"
common_elem.find("player_volume").text = "20"
# default values for players
self.mass.loop.call_soon_threadsafe(self.mass.players.update, castplayer.player_id)
# enqueue next item if needed
- if castplayer.player.state == PlayerState.PLAYING and (
- not castplayer.next_url or castplayer.next_url == castplayer.player.current_url
+ if (
+ castplayer.player.state == PlayerState.PLAYING
+ and castplayer.player.active_source == castplayer.player.player_id
+ and castplayer.next_url in (None, castplayer.player.current_url)
):
asyncio.run_coroutine_threadsafe(self._enqueue_next_track(castplayer), self.mass.loop)
+ # handle end of MA queue - set current item to None
+ if (
+ castplayer.player.state == PlayerState.IDLE
+ and castplayer.player.current_url
+ and (queue := self.mass.player_queues.get(castplayer.player_id))
+ and queue.next_item is None
+ ):
+ castplayer.player.current_url = None
def on_new_connection_status(self, castplayer: CastPlayer, status: ConnectionStatus) -> None:
"""Handle updated ConnectionStatus."""
"artist": queue_item.media_item.artists[0].name
if queue_item.media_item.artists
else "",
- "title": queue_item.name,
+ "title": queue_item.media_item.name,
"images": [{"url": image_url}] if image_url else None,
}
else:
# enqueue next item if needed
if (
dlna_player.player.state == PlayerState.PLAYING
- and dlna_player.player.player_id in current_url
- and (not dlna_player.next_url or dlna_player.next_url == current_url)
+ and dlna_player.player.active_source == dlna_player.player.player_id
+ and dlna_player.next_url in (None, dlna_player.player.current_url)
# prevent race conditions at start/stop by doing this check
and (time.time() - dlna_player.last_command) > 4
):
await makedirs(self.base_path)
try:
- await self.unmount()
+ # do unmount first to cleanup any unexpected state
+ await self.unmount(ignore_error=True)
await self.mount()
except Exception as err:
raise LoginFailed(f"Connection failed for the given details: {err}") from err
if proc.returncode != 0:
raise LoginFailed(f"SMB mount failed with error: {stderr.decode()}")
- async def unmount(self) -> None:
+ async def unmount(self, ignore_error: bool = False) -> None:
"""Unmount the remote share."""
proc = await asyncio.create_subprocess_shell(
f"umount {self.base_path}",
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await proc.communicate()
- if proc.returncode != 0:
+ if proc.returncode != 0 and not ignore_error:
self.logger.warning("SMB unmount failed with error: %s", stderr.decode())
album.artists.append(
self._get_item_mapping(
- media_type=MediaType.ARTIST,
- url=plex_album.parentKey,
- provider=plex_album.parentTitle,
+ MediaType.ARTIST,
+ plex_album.parentKey,
+ plex_album.parentTitle,
)
)
return album
for player_id in list(self.sonosplayers):
player = self.sonosplayers.pop(player_id)
player.player.available = False
- player.soco.end_direct_control_session()
+ if player.soco.is_coordinator:
+ player.soco.end_direct_control_session()
self.sonosplayers = None
def on_player_config_changed(
self.mass.players.update(sonos_player.player_id)
# enqueue next item if needed
- if sonos_player.player.state == PlayerState.PLAYING and (
- sonos_player.next_url is None
- or sonos_player.next_url == sonos_player.player.current_url
+ if (
+ sonos_player.player.state == PlayerState.PLAYING
+ and sonos_player.player.active_source == sonos_player.player.player_id
+ and sonos_player.next_url in (None, sonos_player.player.current_url)
):
self.mass.create_task(self._enqueue_next_track(sonos_player))
if track:
tracks.append(track)
except InvalidDataError:
- track = await self.get_track(track["videoId"])
- if track:
+ if track := await self.get_track(track["videoId"]):
tracks.append(track)
return tracks
return []
headers=self._headers,
signature_timestamp=self._signature_timestamp,
)
+ if not track_obj:
+ raise MediaNotFoundError(f"Item {item_id} not found")
stream_format = await self._parse_stream_format(track_obj)
url = await self._parse_stream_url(stream_format=stream_format, item_id=item_id)
if not await self._is_valid_deciphered_url(url=url):
async def get_track(
prov_track_id: str, headers: dict[str, str], signature_timestamp: str
-) -> dict[str, str]:
+) -> dict[str, str] | None:
"""Async wrapper around the ytmusicapi get_playlist function."""
def _get_song():
ytm = ytmusicapi.YTMusic(auth=json.dumps(headers))
track_obj = ytm.get_song(videoId=prov_track_id, signatureTimestamp=signature_timestamp)
track = {}
+ if "videoDetails" not in track_obj:
+ # video that no longer exists
+ return None
track["videoId"] = track_obj["videoDetails"]["videoId"]
track["title"] = track_obj["videoDetails"]["title"]
track["artists"] = [