from music_assistant_models.media_items import AudioFormat
from music_assistant_models.player import DeviceInfo, Player, PlayerMedia
from snapcast.control import create_server
-from snapcast.control.client import Snapclient
from zeroconf import NonUniqueNameException
from zeroconf.asyncio import AsyncServiceInfo
create_sample_rates_config_entry,
)
from music_assistant.helpers.audio import FFMpeg, get_ffmpeg_stream, get_player_filter_params
+from music_assistant.helpers.compare import create_safe_string
from music_assistant.helpers.process import AsyncProcess, check_output
from music_assistant.helpers.util import get_ip_pton
from music_assistant.models.player_provider import PlayerProvider
if TYPE_CHECKING:
from music_assistant_models.config_entries import ProviderConfig
from music_assistant_models.provider import ProviderManifest
+ from snapcast.control.client import Snapclient
from snapcast.control.group import Snapgroup
from snapcast.control.server import Snapserver
from snapcast.control.stream import Snapstream
player = self.mass.players.get(player_id, raise_unavailable=False)
if not player:
snap_client = cast(
- Snapclient, self._snapserver.client(self._get_snapclient_id(player_id))
+ "Snapclient", self._snapserver.client(self._get_snapclient_id(player_id))
)
player = Player(
player_id=player_id,
async def _get_or_create_stream(self, player_id: str, queue_id: str) -> Snapstream:
"""Create new stream on snapcast server (or return existing one)."""
mass_queue = self.mass.player_queues.get(queue_id)
- stream_name = f"{MASS_STREAM_POSTFIX} - {mass_queue.display_name}"
+ safe_name = create_safe_string(mass_queue.display_name, replace_spaces=True)
+ stream_name = f"{MASS_STREAM_POSTFIX} - {safe_name}"
# cancel any existing clear stream task
self.mass.cancel_timer(f"snapcast_clear_stream_{player_id}")