from __future__ import annotations
from dataclasses import dataclass
-from typing import Any
from uuid import uuid4
from mashumaro import DataClassDictMixin
if not self.name:
self.name = self.uri
- @classmethod
- def __pre_deserialize__(cls, d: dict[Any, Any]) -> dict[Any, Any]:
- """Run actions before deserialization."""
- d.pop("streamdetails", None)
- return d
-
@property
def uri(self) -> str:
"""Return uri for this QueueItem (for logging purposes)."""
PlayerConfig,
ProviderConfig,
)
-from music_assistant.common.models.enums import EventType, ProviderType
+from music_assistant.common.models.enums import EventType, PlayerState, ProviderType
from music_assistant.common.models.errors import InvalidDataError, PlayerUnavailableError
from music_assistant.constants import (
CONF_CORE,
data=config,
)
# signal update to the player manager
+ player = self.mass.players.get(config.player_id)
with suppress(PlayerUnavailableError, AttributeError, KeyError):
- player = self.mass.players.get(config.player_id)
if config.enabled:
player_prov = self.mass.players.get_player_provider(player_id)
await player_prov.poll_player(player_id)
with suppress(PlayerUnavailableError):
if provider := self.mass.get_provider(config.provider):
provider.on_player_config_changed(config, changed_keys)
+ # if the player was playing, restart playback
+ if player and player.state == PlayerState.PLAYING:
+ self.mass.create_task(self.mass.player_queues.resume(player.active_source))
# return full player config (just in case)
return await self.get_player_config(player_id)
if not existing:
raise KeyError(f"Player {player_id} does not exist")
self.remove(conf_key)
+ if (player := self.mass.players.get(player_id)) and player.available:
+ player.enabled = False
+ self.mass.players.update(player_id, force_update=True)
if provider := self.mass.get_provider(existing["provider"]):
assert isinstance(provider, PlayerProvider)
provider.on_player_config_removed(player_id)
if child_player.state == PlayerState.PLAYING:
await self.cmd_stop(player_id)
# all checks passed, forward command to the player provider
- child_player.hidden_by.add(target_player)
player_provider = self.get_player_provider(player_id)
await player_provider.cmd_sync(player_id, target_player)
+ child_player.hidden_by.add(target_player)
+ # optimistically update the player to update the UI as fast as possible
+ player_provider.poll_player(player_id)
@api_command("players/cmd/unsync")
@log_player_command
if not player.synced_to:
LOGGER.info(
"Ignoring command to unsync player %s "
- "because it is currently not part of a (sync)group."
+ "because it is currently not synced to another player.",
+ player.display_name,
)
return
player.hidden_by.remove(player.synced_to)
player_provider = self.get_player_provider(player_id)
await player_provider.cmd_unsync(player_id)
+ # optimistically update the player to update the UI as fast as possible
+ player_provider.poll_player(player_id)
def _check_redirect(self, player_id: str) -> str:
"""Check if playback related command should be redirected."""
input_file = streamdetails.direct or "-"
proc_args = [
"ffmpeg",
+ "-protocol_whitelist",
+ "file,http,https,tcp,tls,crypto,pipe,fd",
"-t",
"300", # limit to 5 minutes to prevent OOM
"-i",
"warning" if LOGGER.isEnabledFor(logging.DEBUG) else "quiet",
"-ignore_unknown",
"-protocol_whitelist",
- "file,http,https,tcp,tls,crypto,pipe,fd", # support nested protocols (e.g. within playlist)
+ "file,http,https,tcp,tls,crypto,pipe,data,fd",
]
# collect input args
input_args = [
id: str
number: str
title: str
- length: int
+ length: int | None = None
@dataclass
class MusicBrainzMedia(DataClassDictMixin):
"""Model for a (basic) Media object from MusicBrainz."""
- position: int
format: str
track: list[MusicBrainzTrack]
- track_count: int
- track_offset: int
+ position: int = 0
+ track_count: int = 0
+ track_offset: int = 0
@dataclass
status: str
artist_credit: list[MusicBrainzArtistCredit]
release_group: MusicBrainzReleaseGroup
- track_count: int
+ track_count: int = 0
# optional fields
media: list[MusicBrainzMedia] = field(default_factory=list)
id: str
title: str
- length: int | None
- first_release_date: str | None
- artist_credit: list[MusicBrainzArtistCredit]
+ artist_credit: list[MusicBrainzArtistCredit] = field(default_factory=list)
# optional fields
+ length: int | None = None
+ first_release_date: str | None = None
isrcs: list[str] | None = None
tags: list[MusicBrainzTag] | None = None
disambiguation: str | None = None # version (e.g. live, karaoke etc.)
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
from music_assistant.common.models.enums import LinkType, ProviderFeature
+from music_assistant.common.models.errors import InvalidDataError
from music_assistant.common.models.media_items import (
AudioFormat,
BrowseFolder,
StreamDetails,
)
from music_assistant.server.helpers.audio import get_radio_stream
+from music_assistant.server.helpers.playlists import fetch_playlist
from music_assistant.server.models.music_provider import MusicProvider
SUPPORTED_FEATURES = (ProviderFeature.SEARCH, ProviderFeature.BROWSE)
stream = await self.radios.station(uuid=item_id)
url_resolved = stream.url_resolved
await self.radios.station_click(uuid=item_id)
+ direct = None
+ if ".m3u" in url_resolved or ".pls" in url_resolved:
+ # url is playlist, try to figure out how to handle it
+ # if it is an mpeg-dash stream, let ffmpeg handle that
+ try:
+ playlist = await fetch_playlist(self.mass, url_resolved)
+ if len(playlist) > 1 or ".m3u" in playlist[0] or ".pls" in playlist[0]:
+ direct = playlist[0]
+ elif playlist:
+ url_resolved = playlist[0]
+ except (InvalidDataError, IndexError):
+ # empty playlist ?!
+ direct = url_resolved
return StreamDetails(
provider=self.domain,
item_id=item_id,
),
media_type=MediaType.RADIO,
data=url_resolved,
+ direct=direct,
expires=time() + 24 * 3600,
)
# check if the radio stream is not a playlist
url = stream["url"]
direct = None
- if stream.get("playlist_type"): # noqa: SIM102
- if playlist := await fetch_playlist(self.mass, url):
+ direct = None
+ if ".m3u" in url or ".pls" in url or stream.get("playlist_type"):
+ # url is playlist, try to figure out how to handle it
+ # if it is an mpeg-dash stream, let ffmpeg handle that
+ try:
+ playlist = await fetch_playlist(self.mass, url)
if len(playlist) > 1 or ".m3u" in playlist[0] or ".pls" in playlist[0]:
- # this is most likely an mpeg-dash stream, let ffmpeg handle that
direct = playlist[0]
- url = playlist[0]
+ elif playlist:
+ url_resolved = playlist[0]
+ except (InvalidDataError, IndexError):
+ # empty playlist ?!
+ direct = url_resolved
return StreamDetails(
provider=self.domain,
item_id=item_id,