"args":[
"--log-level", "debug"
],
- "env": {"PYTHONDEVMODE": "1"}
+ // "env": {"PYTHONDEVMODE": "1"}
}
]
}
# this will return the id's of players synced to this player.
group_childs: list[str] = field(default_factory=list)
- # active_queue: return player_id of the active queue for this player
+ # active_source: return player_id of the active queue for this player
# if the player is grouped and a group is active, this will be set to the group's player_id
# otherwise it will be set to the own player_id
- active_queue: str = ""
+ active_source: str = ""
# can_sync_with: return tuple of player_ids that can be synced to/with this player
# usually this is just a list of all player_ids within the playerprovider
streamdetails: StreamDetails | None = None
media_item: Track | Radio | None = None
image: MediaItemImage | None = None
+ index: int = 0
def __post_init__(self):
"""Set default values."""
import pathlib
from typing import Final
-__version__: Final[str] = "2.0.0b24"
+__version__: Final[str] = "2.0.0b25"
SCHEMA_VERSION: Final[int] = 22
try:
player = self.mass.players.get(config.player_id)
player.enabled = config.enabled
- self.mass.players.update(config.player_id)
+ self.mass.players.update(config.player_id, force_update=True)
except PlayerUnavailableError:
pass
)
return db_item
- async def update(self, item_id: int, update: Album, overwrite: bool = False) -> Album:
+ async def update(self, item_id: str | int, update: Album, overwrite: bool = False) -> Album:
"""Update existing record in the database."""
- return await self._update_db_item(item_id=item_id, item=update, overwrite=overwrite)
+ db_id = int(item_id) # ensure integer
+ return await self._update_db_item(item_id=db_id, item=update, overwrite=overwrite)
- async def delete(self, item_id: int, recursive: bool = False) -> None:
+ async def delete(self, item_id: str | int, recursive: bool = False) -> None:
"""Delete record from the database."""
+ db_id = int(item_id) # ensure integer
# check album tracks
db_rows = await self.mass.music.database.get_rows_from_query(
- f"SELECT item_id FROM {DB_TABLE_TRACKS} WHERE albums LIKE '%\"{item_id}\"%'",
+ f"SELECT item_id FROM {DB_TABLE_TRACKS} WHERE albums LIKE '%\"{db_id}\"%'",
limit=5000,
)
assert not (db_rows and not recursive), "Tracks attached to album"
return await self.get_db_item(item_id)
async def _update_db_item(
- self, item_id: int, item: Album | ItemMapping, overwrite: bool = False
+ self, item_id: str | int, item: Album | ItemMapping, overwrite: bool = False
) -> Album:
"""Update Album record in the database."""
- cur_item = await self.get_db_item(item_id)
+ db_id = int(item_id) # ensure integer
+ cur_item = await self.get_db_item(db_id)
metadata = cur_item.metadata.update(getattr(item, "metadata", None), overwrite)
provider_mappings = self._get_provider_mappings(cur_item, item, overwrite)
album_artists = await self._get_artist_mappings(cur_item, item, overwrite)
async with self._db_add_lock:
await self.mass.music.database.update(
self.db_table,
- {"item_id": item_id},
+ {"item_id": db_id},
{
"name": item.name if overwrite else cur_item.name,
"sort_name": item.sort_name if overwrite else cur_item.sort_name,
},
)
# update/set provider_mappings table
- await self._set_provider_mappings(item_id, provider_mappings)
- self.logger.debug("updated %s in database: %s", item.name, item_id)
- return await self.get_db_item(item_id)
+ await self._set_provider_mappings(db_id, provider_mappings)
+ self.logger.debug("updated %s in database: %s", item.name, db_id)
+ return await self.get_db_item(db_id)
async def _get_provider_album_tracks(
self, item_id: str, provider_instance_id_or_domain: str
async def _get_db_album_tracks(
self,
- item_id: str,
+ item_id: str | int,
) -> list[Track]:
"""Return in-database album tracks for the given database album."""
- db_album = await self.get_db_item(item_id)
+ db_id = int(item_id) # ensure integer
+ db_album = await self.get_db_item(db_id)
# simply grab all tracks in the db that are linked to this album
# TODO: adjust to json query instead of text search?
- query = f'SELECT * FROM {DB_TABLE_TRACKS} WHERE albums LIKE \'%"item_id":"{item_id}","provider":"database"%\'' # noqa: E501
+ query = f'SELECT * FROM {DB_TABLE_TRACKS} WHERE albums LIKE \'%"item_id":"{db_id}","provider":"database"%\'' # noqa: E501
result = []
for track in await self.mass.music.tracks.get_db_items_by_query(query):
if album_mapping := next(
)
return db_item
- async def update(self, item_id: int, update: Artist, overwrite: bool = False) -> Artist:
+ async def update(self, item_id: str | int, update: Artist, overwrite: bool = False) -> Artist:
"""Update existing record in the database."""
return await self._update_db_item(item_id=item_id, item=update, overwrite=overwrite)
final_items[key].in_library = True
return list(final_items.values())
- async def delete(self, item_id: int, recursive: bool = False) -> None:
+ async def delete(self, item_id: str | int, recursive: bool = False) -> None:
"""Delete record from the database."""
+ db_id = int(item_id) # ensure integer
# check artist albums
db_rows = await self.mass.music.database.get_rows_from_query(
- f"SELECT item_id FROM {DB_TABLE_ALBUMS} WHERE artists LIKE '%\"{item_id}\"%'",
+ f"SELECT item_id FROM {DB_TABLE_ALBUMS} WHERE artists LIKE '%\"{db_id}\"%'",
limit=5000,
)
assert not (db_rows and not recursive), "Albums attached to artist"
# check artist tracks
db_rows = await self.mass.music.database.get_rows_from_query(
- f"SELECT item_id FROM {DB_TABLE_TRACKS} WHERE artists LIKE '%\"{item_id}\"%'",
+ f"SELECT item_id FROM {DB_TABLE_TRACKS} WHERE artists LIKE '%\"{db_id}\"%'",
limit=5000,
)
assert not (db_rows and not recursive), "Tracks attached to artist"
await self.mass.music.albums.delete(db_row["item_id"], recursive)
# delete the artist itself from db
- await super().delete(item_id)
+ await super().delete(db_id)
async def match_artist(self, db_artist: Artist):
"""Try to find matching artists on all providers for the provided (database) item_id.
return await self.get_db_item(item_id)
async def _update_db_item(
- self, item_id: int, item: Artist | ItemMapping, overwrite: bool = False
+ self, item_id: str | int, item: Artist | ItemMapping, overwrite: bool = False
) -> Artist:
"""Update Artist record in the database."""
- cur_item = await self.get_db_item(item_id)
+ db_id = int(item_id) # ensure integer
+ cur_item = await self.get_db_item(db_id)
metadata = cur_item.metadata.update(getattr(item, "metadata", None), overwrite)
provider_mappings = self._get_provider_mappings(cur_item, item, overwrite)
async with self._db_add_lock:
await self.mass.music.database.update(
self.db_table,
- {"item_id": item_id},
+ {"item_id": db_id},
{
"name": item.name if overwrite else cur_item.name,
"sort_name": item.sort_name if overwrite else cur_item.sort_name,
},
)
# update/set provider_mappings table
- await self._set_provider_mappings(item_id, provider_mappings)
- self.logger.debug("updated %s in database: %s", item.name, item_id)
- return await self.get_db_item(item_id)
+ await self._set_provider_mappings(db_id, provider_mappings)
+ self.logger.debug("updated %s in database: %s", item.name, db_id)
+ return await self.get_db_item(db_id)
async def _get_provider_dynamic_tracks(
self,
raise NotImplementedError
@abstractmethod
- async def update(self, item_id: int, update: ItemCls, overwrite: bool = False) -> ItemCls:
+ async def update(self, item_id: str | int, update: ItemCls, overwrite: bool = False) -> ItemCls:
"""Update existing record in the database."""
- async def delete(self, item_id: int, recursive: bool = False) -> None: # noqa: ARG002
+ async def delete(self, item_id: str | int, recursive: bool = False) -> None: # noqa: ARG002
"""Delete record from the database."""
- db_item = await self.get_db_item(item_id)
- assert db_item, f"Item does not exist: {item_id}"
+ db_id = int(item_id) # ensure integer
+ db_item = await self.get_db_item(db_id)
+ assert db_item, f"Item does not exist: {db_id}"
# delete item
await self.mass.music.database.delete(
self.db_table,
- {"item_id": int(item_id)},
+ {"item_id": db_id},
)
# update provider_mappings table
await self.mass.music.database.delete(
DB_TABLE_PROVIDER_MAPPINGS,
- {"media_type": self.media_type.value, "item_id": int(item_id)},
+ {"media_type": self.media_type.value, "item_id": db_id},
)
# NOTE: this does not delete any references to this item in other records,
# this is handled/overridden in the mediatype specific controllers
self.mass.signal_event(EventType.MEDIA_ITEM_DELETED, db_item.uri, db_item)
- self.logger.debug("deleted item with id %s from database", item_id)
+ self.logger.debug("deleted item with id %s from database", db_id)
async def db_items(
self,
async def get_db_item(self, item_id: int | str) -> ItemCls:
"""Get record by id."""
- match = {"item_id": int(item_id)}
+ db_id = int(item_id) # ensure integer
+ match = {"item_id": db_id}
if db_row := await self.mass.music.database.get_row(self.db_table, match):
return self.item_cls.from_db_row(db_row)
- raise MediaNotFoundError(f"Album not found in database: {item_id}")
+ raise MediaNotFoundError(f"Album not found in database: {db_id}")
async def get_db_item_by_prov_id(
self,
break
offset += limit
- async def set_db_library(self, item_id: int, in_library: bool) -> None:
+ async def set_db_library(self, item_id: str | int, in_library: bool) -> None:
"""Set the in-library bool on a database item."""
- match = {"item_id": item_id}
+ db_id = int(item_id) # ensure integer
+ match = {"item_id": db_id}
await self.mass.music.database.update(self.db_table, match, {"in_library": in_library})
- db_item = await self.get_db_item(item_id)
+ db_item = await self.get_db_item(db_id)
self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, db_item.uri, db_item)
async def get_provider_item(
"found on provider {provider_instance_id_or_domain}"
)
- async def remove_prov_mapping(self, item_id: int, provider_instance_id: str) -> None:
+ async def remove_prov_mapping(self, item_id: str | int, provider_instance_id: str) -> None:
"""Remove provider id(s) from item."""
+ db_id = int(item_id) # ensure integer
try:
- db_item = await self.get_db_item(item_id)
+ db_item = await self.get_db_item(db_id)
except MediaNotFoundError:
# edge case: already deleted / race condition
return
DB_TABLE_PROVIDER_MAPPINGS,
{
"media_type": self.media_type.value,
- "item_id": int(item_id),
+ "item_id": db_id,
"provider_instance": provider_instance_id,
},
)
db_item.provider_mappings = {
x for x in db_item.provider_mappings if x.provider_instance != provider_instance_id
}
- match = {"item_id": item_id}
+ match = {"item_id": db_id}
if db_item.provider_mappings:
await self.mass.music.database.update(
self.db_table,
match,
{"provider_mappings": serialize_to_json(db_item.provider_mappings)},
)
- self.logger.debug("removed provider %s from item id %s", provider_instance_id, item_id)
+ self.logger.debug("removed provider %s from item id %s", provider_instance_id, db_id)
self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, db_item.uri, db_item)
else:
# delete item if it has no more providers
with suppress(AssertionError):
- await self.delete(item_id)
+ await self.delete(db_id)
async def dynamic_tracks(
self,
"""Get dynamic list of tracks for given item, fallback/default implementation."""
async def _set_provider_mappings(
- self, item_id: int, provider_mappings: list[ProviderMapping]
+ self, item_id: str | int, provider_mappings: list[ProviderMapping]
) -> None:
"""Update the provider_items table for the media item."""
+ db_id = int(item_id) # ensure integer
# clear all records first
await self.mass.music.database.delete(
DB_TABLE_PROVIDER_MAPPINGS,
- {"media_type": self.media_type.value, "item_id": int(item_id)},
+ {"media_type": self.media_type.value, "item_id": db_id},
)
# add entries
for provider_mapping in provider_mappings:
DB_TABLE_PROVIDER_MAPPINGS,
{
"media_type": self.media_type.value,
- "item_id": item_id,
+ "item_id": db_id,
"provider_domain": provider_mapping.provider_domain,
"provider_instance": provider_mapping.provider_instance,
"provider_item_id": provider_mapping.item_id,
return await provider.create_playlist(name)
- async def add_playlist_tracks(self, db_playlist_id: str, uris: list[str]) -> None:
+ async def add_playlist_tracks(self, db_playlist_id: str | int, uris: list[str]) -> None:
"""Add multiple tracks to playlist. Creates background tasks to process the action."""
- playlist = await self.get_db_item(db_playlist_id)
+ db_id = int(db_playlist_id) # ensure integer
+ playlist = await self.get_db_item(db_id)
if not playlist:
- raise MediaNotFoundError(f"Playlist with id {db_playlist_id} not found")
+ raise MediaNotFoundError(f"Playlist with id {db_id} not found")
if not playlist.is_editable:
raise InvalidDataError(f"Playlist {playlist.name} is not editable")
for uri in uris:
- self.mass.create_task(self.add_playlist_track(db_playlist_id, uri))
+ self.mass.create_task(self.add_playlist_track(db_id, uri))
- async def add_playlist_track(self, db_playlist_id: str, track_uri: str) -> None:
+ async def add_playlist_track(self, db_playlist_id: str | int, track_uri: str) -> None:
"""Add track to playlist - make sure we dont add duplicates."""
+ db_id = int(db_playlist_id) # ensure integer
# we can only edit playlists that are in the database (marked as editable)
- playlist = await self.get_db_item(db_playlist_id)
+ playlist = await self.get_db_item(db_id)
if not playlist:
- raise MediaNotFoundError(f"Playlist with id {db_playlist_id} not found")
+ raise MediaNotFoundError(f"Playlist with id {db_id} not found")
if not playlist.is_editable:
raise InvalidDataError(f"Playlist {playlist.name} is not editable")
# make sure we have recent full track details
provider = self.mass.get_provider(playlist_prov.provider_instance)
await provider.add_playlist_tracks(playlist_prov.item_id, [track_id_to_add])
# invalidate cache by updating the checksum
- await self.get(db_playlist_id, "database", force_refresh=True)
+ await self.get(db_id, "database", force_refresh=True)
async def remove_playlist_tracks(
- self, db_playlist_id: str, positions_to_remove: tuple[int, ...]
+ self, db_playlist_id: str | int, positions_to_remove: tuple[int, ...]
) -> None:
"""Remove multiple tracks from playlist."""
- playlist = await self.get_db_item(db_playlist_id)
+ db_id = int(db_playlist_id) # ensure integer
+ playlist = await self.get_db_item(db_id)
if not playlist:
- raise MediaNotFoundError(f"Playlist with id {db_playlist_id} not found")
+ raise MediaNotFoundError(f"Playlist with id {db_id} not found")
if not playlist.is_editable:
raise InvalidDataError(f"Playlist {playlist.name} is not editable")
for prov_mapping in playlist.provider_mappings:
continue
await provider.remove_playlist_tracks(prov_mapping.item_id, positions_to_remove)
# invalidate cache by updating the checksum
- await self.get(db_playlist_id, "database", force_refresh=True)
+ await self.get(db_id, "database", force_refresh=True)
async def _add_db_item(self, item: Playlist) -> Playlist:
"""Add a new record to the database."""
return await self.get_db_item(item_id)
async def _update_db_item(
- self, item_id: int, item: Playlist, overwrite: bool = False
+ self, item_id: str | int, item: Playlist, overwrite: bool = False
) -> Playlist:
"""Update Playlist record in the database."""
- cur_item = await self.get_db_item(item_id)
+ db_id = int(item_id) # ensure integer
+ cur_item = await self.get_db_item(db_id)
metadata = cur_item.metadata.update(getattr(item, "metadata", None), overwrite)
provider_mappings = self._get_provider_mappings(cur_item, item, overwrite)
async with self._db_add_lock:
await self.mass.music.database.update(
self.db_table,
- {"item_id": item_id},
+ {"item_id": db_id},
{
# always prefer name/owner from updated item here
"name": item.name or cur_item.name,
},
)
# update/set provider_mappings table
- await self._set_provider_mappings(item_id, provider_mappings)
- self.logger.debug("updated %s in database: %s", item.name, item_id)
- return await self.get_db_item(item_id)
+ await self._set_provider_mappings(db_id, provider_mappings)
+ self.logger.debug("updated %s in database: %s", item.name, db_id)
+ return await self.get_db_item(db_id)
async def _get_provider_playlist_tracks(
self,
)
return db_item
- async def update(self, item_id: int, update: Radio, overwrite: bool = False) -> Radio:
+ async def update(self, item_id: str | int, update: Radio, overwrite: bool = False) -> Radio:
"""Update existing record in the database."""
return await self._update_db_item(item_id=item_id, item=update, overwrite=overwrite)
# return created object
return await self.get_db_item(item_id)
- async def _update_db_item(self, item_id: int, item: Radio, overwrite: bool = False) -> Radio:
+ async def _update_db_item(
+ self, item_id: str | int, item: Radio, overwrite: bool = False
+ ) -> Radio:
"""Update Radio record in the database."""
- cur_item = await self.get_db_item(item_id)
+ db_id = int(item_id) # ensure integer
+ cur_item = await self.get_db_item(db_id)
metadata = cur_item.metadata.update(getattr(item, "metadata", None), overwrite)
provider_mappings = self._get_provider_mappings(cur_item, item, overwrite)
- match = {"item_id": item_id}
+ match = {"item_id": db_id}
async with self._db_add_lock:
await self.mass.music.database.update(
self.db_table,
},
)
# update/set provider_mappings table
- await self._set_provider_mappings(item_id, provider_mappings)
- self.logger.debug("updated %s in database: %s", item.name, item_id)
- return await self.get_db_item(item_id)
+ await self._set_provider_mappings(db_id, provider_mappings)
+ self.logger.debug("updated %s in database: %s", item.name, db_id)
+ return await self.get_db_item(db_id)
async def _get_provider_dynamic_tracks(
self,
)
return db_item
- async def update(self, item_id: int, update: Track, overwrite: bool = False) -> Track:
+ async def update(self, item_id: str | int, update: Track, overwrite: bool = False) -> Track:
"""Update existing record in the database."""
return await self._update_db_item(item_id=item_id, item=update, overwrite=overwrite)
return await self.get_db_item(item_id)
async def _update_db_item(
- self, item_id: int, item: Track | ItemMapping, overwrite: bool = False
+ self, item_id: str | int, item: Track | ItemMapping, overwrite: bool = False
) -> Track:
"""Update Track record in the database, merging data."""
- cur_item = await self.get_db_item(item_id)
+ db_id = int(item_id) # ensure integer
+ cur_item = await self.get_db_item(db_id)
metadata = cur_item.metadata.update(getattr(item, "metadata", None), overwrite)
provider_mappings = self._get_provider_mappings(cur_item, item, overwrite)
if getattr(item, "isrc", None):
async with self._db_add_lock:
await self.mass.music.database.update(
self.db_table,
- {"item_id": item_id},
+ {"item_id": db_id},
{
"name": item.name or cur_item.name,
"sort_name": item.sort_name or cur_item.sort_name,
},
)
# update/set provider_mappings table
- await self._set_provider_mappings(item_id, provider_mappings)
- self.logger.debug("updated %s in database: %s", item.name, item_id)
- return await self.get_db_item(item_id)
+ await self._set_provider_mappings(db_id, provider_mappings)
+ self.logger.debug("updated %s in database: %s", item.name, db_id)
+ return await self.get_db_item(db_id)
async def _get_track_albums(
self,
import logging
import random
import time
+from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING
from music_assistant.common.helpers.util import get_changed_keys
return self._queues.get(queue_id)
@api_command("players/queue/items")
- def items(self, queue_id: str) -> list[QueueItem]:
+ async def items(self, queue_id: str) -> AsyncGenerator[QueueItem, None]:
"""Return all QueueItems for given PlayerQueue."""
- return self._queue_items.get(queue_id, [])
+ # because the QueueItems can potentially be a very large list, this is a async generator
+ for index, queue_item in enumerate(self._queue_items.get(queue_id, [])):
+ queue_item.index = index
+ yield queue_item
@api_command("players/queue/get_active_queue")
def get_active_queue(self, player_id: str) -> PlayerQueue:
"""Return the current active/synced queue for a player."""
player = self.mass.players.get(player_id)
- return self.get(player.active_queue)
+ if queue := self.get(player.active_source):
+ return queue
+ return self.get(player_id)
# Queue commands
self.signal_update(queue_id)
@api_command("players/queue/play_media")
- async def play_media( # noqa: PLR0915
+ async def play_media(
self,
queue_id: str,
media: MediaItemType | list[MediaItemType] | str | list[str],
- queue_opt: Which enqueue mode to use.
- radio_mode: Enable radio mode for the given item(s).
"""
- # ruff: noqa: PLR0915
+ # ruff: noqa: PLR0915,PLR0912
queue = self._queues[queue_id]
if queue.announcement_in_progress:
LOGGER.warning("Ignore queue command: An announcement is in progress")
insert_at_index=insert_at_index,
shuffle=queue.shuffle_enabled,
)
+ # handle edgecase, queue is empty and items are only added (not played)
+ # mark first item as new index
+ if queue.current_index is None:
+ queue.current_index = 0
+ queue.current_item = self.get_item(queue_id, 0)
+ self.signal_update(queue_id)
@api_command("players/queue/move_item")
def move_item(self, queue_id: str, queue_item_id: str, pos_shift: int = 1) -> None:
if queue.state not in (PlayerState.IDLE, PlayerState.OFF):
self.mass.create_task(self.stop(queue_id))
queue.current_index = None
+ queue.current_item = None
+ queue.elapsed_time = 0
queue.index_in_buffer = None
self.update_items(queue_id, [])
def on_player_update(self, player: Player, changed_keys: set[str]) -> None:
"""Call when a PlayerQueue needs to be updated (e.g. when player updates)."""
if player.player_id not in self._queues:
- self.mass.create_task(self.on_player_register(player))
+ # race condition
return
queue_id = player.player_id
player = self.players.get(queue_id)
queue = self._queues[queue_id]
- # copy most properties from the player
+ # basic properties
queue.display_name = player.display_name
queue.available = player.available
queue.items = len(self._queue_items[queue_id])
- queue.state = player.state
- queue.elapsed_time = int(player.corrected_elapsed_time)
- queue.elapsed_time_last_updated = time.time()
-
# determine if this queue is currently active for this player
- queue.active = player.active_queue == queue.queue_id
+ queue.active = player.active_source == queue.queue_id
if queue.active:
+ queue.state = player.state
# update current item from player report
player_item_index = self.index_by_id(queue_id, player.current_item_id)
if player_item_index is None:
+ # try grabbing the item id from the url
player_item_index = self._get_player_item_index(queue_id, player.current_url)
- if queue.flow_mode and player_item_index is not None:
- # flow mode active, calculate current item
- (
- queue.current_index,
- queue.elapsed_time,
- ) = self.__get_queue_stream_index(queue, player, player_item_index)
- else:
- queue.current_index = player_item_index
-
- queue.current_item = self.get_item(queue_id, queue.current_index)
- queue.next_item = self.get_next_item(queue_id)
-
- # correct elapsed time when seeking
- if (
- queue.current_item
- and queue.current_item.streamdetails
- and queue.current_item.streamdetails.seconds_skipped
- and not queue.flow_mode
- ):
- queue.elapsed_time += queue.current_item.streamdetails.seconds_skipped
-
+ if player_item_index is not None:
+ if queue.flow_mode:
+ # flow mode active, calculate current item
+ current_index, item_time = self.__get_queue_stream_index(
+ queue, player, player_item_index
+ )
+ else:
+ # queue is active and player has one of our tracks loaded, update state
+ current_index = player_item_index
+ item_time = int(player.corrected_elapsed_time)
+ # only update these attributes if the queue is active
+ # and has an item loaded so we are able to resume it
+ queue.current_index = current_index
+ queue.elapsed_time = item_time
+ queue.elapsed_time_last_updated = time.time()
+ queue.current_item = self.get_item(queue_id, queue.current_index)
+ queue.next_item = self.get_next_item(queue_id)
+ # correct elapsed time when seeking
+ if (
+ queue.current_item
+ and queue.current_item.streamdetails
+ and queue.current_item.streamdetails.seconds_skipped
+ and not queue.flow_mode
+ ):
+ queue.elapsed_time += queue.current_item.streamdetails.seconds_skipped
+ else:
+ queue.state = PlayerState.IDLE
# basic throttle: do not send state changed events if queue did not actually change
prev_state = self._prev_states.get(queue_id, {})
- new_state = self._queues[queue_id].to_dict()
+ new_state = queue.to_dict()
+ new_state.pop("elapsed_time_last_updated", None)
changed_keys = get_changed_keys(prev_state, new_state)
- self._prev_states[queue_id] = new_state
+ # return early if nothing changed
if len(changed_keys) == 0:
return
-
- if "elapsed_time" in changed_keys:
+ # do not send full updates if only time was updated
+ if changed_keys == {"elapsed_time"}:
self.mass.signal_event(
EventType.QUEUE_TIME_UPDATED,
object_id=queue_id,
data=queue.elapsed_time,
)
- # do not send full updates if only time was updated
- if changed_keys in (
- {"elapsed_time_last_updated"},
- {
- "elapsed_time",
- "elapsed_time_last_updated",
- },
- ):
- # ignore
+ self._prev_states[queue_id] = new_state
return
-
- # only signal queue updated event if other properties than elapsed_time updated
+ # handle player was playing and is now stopped
+ # if player finished playing a track for 90%, mark current item as finished
+ if (
+ prev_state.get("state") == "playing"
+ and queue.state == PlayerState.IDLE
+ and (
+ queue.current_item
+ and queue.current_item.duration
+ and queue.elapsed_time > (queue.current_item.duration * 0.8)
+ )
+ ):
+ queue.current_index += 1
+ queue.current_item = None
+ queue.next_item = None
+ # signal update and store state
self.signal_update(queue_id)
+ self._prev_states[queue_id] = new_state
# watch dynamic radio items refill if needed
if "current_index" in changed_keys:
fill_index = len(self._queue_items[queue_id]) - 5
self._queue_items.pop(player_id, None)
async def player_ready_for_next_track(
- self, queue_or_player_id: str, current_item_id: str | None = None
+ self, queue_or_player_id: str, current_item_id: str
) -> tuple[QueueItem, bool]:
"""Call when a player is ready to load the next track into the buffer.
just like with the play_media call.
"""
queue = self.get_active_queue(queue_or_player_id)
- if current_item_id is None:
- cur_index = queue.current_index
- else:
- cur_index = self.index_by_id(queue.queue_id, current_item_id)
+ cur_index = self.index_by_id(queue.queue_id, current_item_id)
cur_item = self.get_item(queue.queue_id, cur_index)
next_index = self.get_next_index(queue.queue_id, cur_index)
next_item = self.get_item(queue.queue_id, next_index)
UnsupportedFeaturedException,
)
from music_assistant.common.models.player import Player
-from music_assistant.constants import CONF_PLAYERS, ROOT_LOGGER_NAME
+from music_assistant.constants import CONF_HIDE_GROUP_CHILDS, CONF_PLAYERS, ROOT_LOGGER_NAME
from music_assistant.server.helpers.api import api_command
from music_assistant.server.models.player_provider import PlayerProvider
self.mass.signal_event(EventType.PLAYER_REMOVED, player_id)
@api_command("players/update")
- def update(self, player_id: str, skip_forward: bool = False) -> None:
+ def update(
+ self, player_id: str, skip_forward: bool = False, force_update: bool = False
+ ) -> None:
"""Update player state."""
if player_id not in self._players:
return
player = self._players[player_id]
- # calculate active_queue
- player.active_queue = self._get_active_queue(player)
+ # calculate active_source
+ player.active_source = self._get_active_source(player)
# calculate group volume
player.group_volume = self._get_group_volume_level(player)
# prefer any overridden name from config
player.state = PlayerState.IDLE
elif not player.powered:
player.state = PlayerState.OFF
+ # handle automatic hiding of group child's feature
+ for group_player in self._get_player_groups(player_id):
+ try:
+ hide_group_childs = self.mass.config.get_player_config_value(
+ group_player.player_id, CONF_HIDE_GROUP_CHILDS
+ ).value
+ except KeyError:
+ continue
+ if hide_group_childs == "always":
+ player.hidden_by.add(group_player.player_id)
+ elif group_player.powered:
+ if hide_group_childs == "active":
+ player.hidden_by.add(group_player.player_id)
+ elif group_player.player_id in player.hidden_by:
+ player.hidden_by.remove(group_player.player_id)
# basic throttle: do not send state changed events if player did not actually change
prev_state = self._prev_states.get(player_id, {})
new_state = self._players[player_id].to_dict()
)
self._prev_states[player_id] = new_state
- if not player.enabled and "enabled" not in changed_keys:
+ if not player.enabled and not force_update:
# ignore updates for disabled players
return
# always signal update to the playerqueue
self.queues.on_player_update(player, changed_keys)
- if len(changed_keys) == 0:
+ if len(changed_keys) == 0 and not force_update:
return
self.mass.signal_event(EventType.PLAYER_UPDATED, object_id=player_id, data=player)
for child_player_id in player.group_childs:
if child_player_id == player_id:
continue
- self.update(child_player_id, skip_forward=True)
+ self.update(child_player_id, skip_forward=True, force_update=force_update)
# update group player(s) when child updates
for group_player in self._get_player_groups(player_id):
- self.update(group_player.player_id, skip_forward=True)
+ self.update(group_player.player_id, skip_forward=True, force_update=force_update)
def get_player_provider(self, player_id: str) -> PlayerProvider:
"""Return PlayerProvider for given player."""
"""Return all (player_ids of) any groupplayers the given player belongs to."""
return tuple(x for x in self if player_id in x.group_childs)
- def _get_active_queue(self, player: Player) -> str:
- """Return the active_queue id for given player."""
+ def _get_active_source(self, player: Player) -> str:
+ """Return the active_source id for given player."""
# if player is synced, return master/group leader
if player.synced_to and player.synced_to in self._players:
- return self._get_active_queue(self.get(player.synced_to))
+ return self._get_active_source(self.get(player.synced_to))
# iterate player groups to find out if one is playing
if group_players := self._get_player_groups(player.player_id):
# prefer the first playing (or paused) group parent
for group_player in group_players:
if group_player.state in (PlayerState.PLAYING, PlayerState.PAUSED):
- return group_player.player_id
+ return group_player.active_source
# fallback to the first powered group player
for group_player in group_players:
if group_player.powered:
- return group_player.player_id
+ return group_player.active_source
# defaults to the player's own player id
- return player.player_id
+ if player.current_url:
+ if self.mass.webserver.base_url in player.current_url:
+ return player.player_id
+ elif ":" in player.current_url:
+ # extract source from uri/url
+ return player.current_url.split(":")[0]
+ return player.current_item_id or player.current_url
+ elif not player.powered:
+ # reset active source when player powers off
+ return player.player_id
+ return player.active_source
def _get_group_volume_level(self, player: Player) -> int:
"""Calculate a group volume from the grouped members."""
# if the player is playing, update elapsed time every tick
# to ensure the queue has accurate details
player_playing = (
- player.active_queue == player.player_id and player.state == PlayerState.PLAYING
+ player.active_source == player.player_id and player.state == PlayerState.PLAYING
)
if player_playing:
self.mass.loop.call_soon(self.update, player_id)
self.start()
self.seen_players.add(player_id)
try:
- sub_queue = asyncio.Queue(3)
+ sub_queue = asyncio.Queue(1)
# some checks
assert player_id not in self.subscribers, "No duplicate subscriptions allowed"
break
yield chunk
finally:
- # some delay here to detect misbehaving (reconnecting) players
- await asyncio.sleep(2)
empty_queue(sub_queue)
self.subscribers.pop(player_id)
+ # some delay here to detect misbehaving (reconnecting) players
await asyncio.sleep(2)
# check if this was the last subscriber and we should cancel
if len(self.subscribers) == 0 and self._audio_task and not self.finished:
self._audio_task.cancel()
- async def _put_data(self, data: Any, timeout: float = 1200) -> None:
+ async def _put_data(self, data: Any, timeout: float = 120) -> None:
"""Put chunk of data to all subscribers."""
async with asyncio.timeout(timeout):
+ while len(self.subscribers) == 0:
+ # this may happen with misbehaving clients that do
+ # multiple GET requests for the same audio stream.
+ # they receive the first chunk, disconnect and then
+ # directly reconnect again.
+ if not self._audio_task or self.finished:
+ return
+ await asyncio.sleep(0.1)
async with asyncio.TaskGroup() as tg:
for sub_id in self.subscribers:
sub_queue = self.subscribers[sub_id]
if request.method == "HEAD":
return resp
- # handler workaround for players that do 2 multiple GET requests
+ # handle workaround for players that do 2 multiple GET requests
# for the same audio stream (because of the missing duration/length)
if player_id in self.workaround_players and player_id not in stream_job.seen_players:
stream_job.seen_players.add(player_id)
async with AsyncProcess(ffmpeg_args, True) as ffmpeg_proc:
# feed stdin with pcm audio chunks from origin
async def read_audio():
- async for chunk in stream_job.subscribe(player_id):
- try:
- await ffmpeg_proc.write(chunk)
- except BrokenPipeError:
- break
- ffmpeg_proc.write_eof()
+ try:
+ async for chunk in stream_job.subscribe(player_id):
+ try:
+ await ffmpeg_proc.write(chunk)
+ except BrokenPipeError:
+ break
+ finally:
+ ffmpeg_proc.write_eof()
ffmpeg_proc.attach_task(read_audio())
if enable_icy
else ffmpeg_proc.iter_chunked(128000)
)
-
- bytes_streamed = 0
-
async for chunk in iterator:
try:
await resp.write(chunk)
except (BrokenPipeError, ConnectionResetError):
# race condition
break
- bytes_streamed += len(chunk)
-
- # do not allow the player to prebuffer more than 60 seconds
- seconds_streamed = int(bytes_streamed / stream_job.pcm_sample_size)
- if (
- seconds_streamed > 120
- and (seconds_streamed - player.corrected_elapsed_time) > 30
- ):
- await asyncio.sleep(1)
if not enable_icy:
continue
# if icy metadata is enabled, send the icy metadata after the chunk
+ current_item = self.mass.players.queues.get_item(
+ queue.queue_id, queue.index_in_buffer
+ )
if (
- queue
- and queue.current_item
- and queue.current_item.streamdetails
- and queue.current_item.streamdetails.stream_title
+ current_item
+ and current_item.streamdetails
+ and current_item.streamdetails.stream_title
):
- title = queue.current_item.streamdetails.stream_title
- elif queue.current_item and queue.current_item.name:
- title = queue.current_item.name
+ title = current_item.streamdetails.stream_title
+ elif queue and current_item and current_item.name:
+ title = current_item.name
else:
title = "Music Assistant"
metadata = f"StreamTitle='{title}';".encode()
# ruff: noqa: PLR0915
queue_id = stream_job.queue_item.queue_id
queue = self.mass.players.queues.get(queue_id)
+ queue_player = self.mass.players.get(queue_id)
queue_track = None
last_fadeout_part = b""
):
chunk_num += 1
+ # slow down if the player buffers too aggressively
+ seconds_streamed = int(bytes_written / stream_job.pcm_sample_size)
+ if (
+ seconds_streamed > 10
+ and queue_player.corrected_elapsed_time > 10
+ and (seconds_streamed - queue_player.corrected_elapsed_time) > 10
+ ):
+ await asyncio.sleep(1)
+
#### HANDLE FIRST PART OF TRACK
# buffer full for crossfade
controller = self.mass.music.get_controller(media_type)
cur_db_ids = set()
async for prov_item in self._get_library_gen(media_type):
- db_item: MediaItemType = await controller.get_db_item_by_prov_id(
- prov_item.item_id,
- prov_item.provider,
- )
- if not db_item: # noqa: SIM114
+ db_item: MediaItemType
+ if not (
+ db_item := await controller.get_db_item_by_prov_id(
+ prov_item.item_id,
+ prov_item.provider,
+ )
+ ):
# create full db item
db_item = await controller.add(prov_item, skip_metadata_lookup=True)
-
elif (
db_item.metadata.checksum and prov_item.metadata.checksum
) and db_item.metadata.checksum != prov_item.metadata.checksum:
- # item checksum changed
- db_item = await controller.add(prov_item, skip_metadata_lookup=True)
+ # existing dbitem checksum changed
+ db_item = await controller.update(db_item.item_id, prov_item)
cur_db_ids.add(db_item.item_id)
if not db_item.in_library:
await controller.set_db_library(db_item.item_id, True)
from pychromecast.controllers.receiver import CastStatus
from pychromecast.socket_client import ConnectionStatus
- from music_assistant.common.models.config_entries import ProviderConfig
+ from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.server import MusicAssistant
from music_assistant.server.models import ProviderInstanceType
)
return entries
+ def on_player_config_changed(
+ self, config: PlayerConfig, changed_keys: set[str] # noqa: ARG002
+ ) -> None:
+ """Call (by config manager) when the configuration of a player changes."""
+ if "enabled" in changed_keys and config.player_id not in self.castplayers:
+ self.mass.create_task(self.mass.config.reload_provider, self.instance_id)
+
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player."""
castplayer = self.castplayers[player_id]
self.castplayers[player_id] = castplayer
castplayer.status_listener = CastStatusListener(self, castplayer, self.mz_mgr)
- if cast_info.is_audio_group:
+ if cast_info.is_audio_group and not cast_info.is_multichannel_group:
mz_controller = MultizoneController(cast_info.uuid)
castplayer.cc.register_handler(mz_controller)
castplayer.mz_controller = mz_controller
status.volume_level,
)
castplayer.player.name = castplayer.cast_info.friendly_name
+ castplayer.player.volume_level = int(status.volume_level * 100)
+ castplayer.player.volume_muted = status.volume_muted
if castplayer.active_group:
# use mute as power when group is active
castplayer.player.powered = not status.volume_muted
castplayer.cc.app_id is not None
and castplayer.cc.app_id != pychromecast.IDLE_APP_ID
)
- castplayer.player.volume_level = int(status.volume_level * 100)
- castplayer.player.volume_muted = status.volume_muted
-
# handle stereo pairs
if castplayer.cast_info.is_multichannel_group:
castplayer.player.type = PlayerType.STEREO_PAIR
castplayer.player.group_childs = []
# handle cast groups
- elif castplayer.cast_info.is_audio_group:
+ if castplayer.cast_info.is_audio_group and not castplayer.cast_info.is_multichannel_group:
castplayer.player.type = PlayerType.GROUP
castplayer.player.group_childs = [
str(UUID(x)) for x in castplayer.mz_controller.members
PlayerFeature.POWER,
PlayerFeature.VOLUME_SET,
)
+
# send update to player manager
self.mass.loop.call_soon_threadsafe(self.mass.players.update, castplayer.player_id)
from pychromecast.const import CAST_TYPE_GROUP
from zeroconf import ServiceInfo
-from music_assistant.constants import CONF_HIDE_GROUP_CHILDS
-
if TYPE_CHECKING:
from pychromecast.controllers.media import MediaStatus
from pychromecast.controllers.multizone import MultizoneManager
self.prov.logger.debug(
"%s is added to multizone: %s", self.castplayer.player.display_name, group_uuid
)
- if group_player := self.prov.castplayers.get(group_uuid):
- hide_group_childs = self.prov.mass.config.get_player_config_value(
- group_player.player_id, CONF_HIDE_GROUP_CHILDS
- ).value
- if hide_group_childs == "always":
- self.castplayer.player.hidden_by.add(group_uuid)
+ self.new_cast_status(self.castplayer.cc.status)
def removed_from_multizone(self, group_uuid):
"""Handle the cast removed from a group."""
self.prov.logger.debug(
"%s is removed from multizone: %s", self.castplayer.player.display_name, group_uuid
)
+ self.new_cast_status(self.castplayer.cc.status)
def multizone_new_cast_status(self, group_uuid, cast_status): # noqa: ARG002
"""Handle reception of a new CastStatus for a group."""
if group_player := self.prov.castplayers.get(group_uuid):
- hide_group_childs = self.prov.mass.config.get_player_config_value(
- group_uuid, CONF_HIDE_GROUP_CHILDS
- ).value
- if hide_group_childs == "always":
- self.castplayer.player.hidden_by.add(group_uuid)
if group_player.cc.media_controller.is_active:
self.castplayer.active_group = group_uuid
- if hide_group_childs == "active":
- self.castplayer.player.hidden_by.add(group_uuid)
+ self.castplayer.player.active_source = group_uuid
elif group_uuid == self.castplayer.active_group:
self.castplayer.active_group = None
- if hide_group_childs != "always" and group_uuid in self.castplayer.player.hidden_by:
- self.castplayer.player.hidden_by.remove(group_uuid)
+ self.castplayer.player.active_source = self.castplayer.player.player_id
self.prov.logger.debug(
"%s got new cast status for group: %s", self.castplayer.player.display_name, group_uuid
)
+ self.new_cast_status(self.castplayer.cc.status)
def multizone_new_media_status(self, group_uuid, media_status): # noqa: ARG002
"""Handle reception of a new MediaStatus for a group."""
async def _parse_track(self, track_obj: dict) -> Track:
"""Parse a YT Track response to a Track model object."""
- if not track_obj["videoId"]:
+ if not track_obj.get("videoId"):
raise InvalidDataError("Track is missing videoId")
track = Track(item_id=track_obj["videoId"], provider=self.domain, name=track_obj["title"])
if "artists" in track_obj:
"python-slugify==8.0.1",
"mashumaro==3.5.0",
"memory-tempfile==2.2.3",
- "music-assistant-frontend==20230402.0",
+ "music-assistant-frontend==20230404.0",
"pillow==9.5.0",
"unidecode==1.3.6",
"xmltodict==0.13.0",
git+https://github.com/pytube/pytube.git@refs/pull/1501/head
mashumaro==3.5.0
memory-tempfile==2.2.3
-music-assistant-frontend==20230402.0
+music-assistant-frontend==20230404.0
orjson==3.8.9
pillow==9.5.0
plexapi==4.13.2