logger.info("shutdown requested!")
loop.run_until_complete(mass.async_stop())
- # TODO: uvloop is temporary disabled due to a bug with subprocesses
- # https://github.com/MagicStack/uvloop/issues/317
run(
mass.async_start(),
- use_uvloop=False,
+ use_uvloop=True,
shutdown_callback=on_shutdown,
executor_workers=64,
)
"""All constants for Music Assistant."""
-__version__ = "0.0.51"
+__version__ = "0.0.52"
REQUIRED_PYTHON_VER = "3.7"
# configuration keys/attributes
--- /dev/null
+"""
+Implementation of a (truly) non blocking subprocess.
+
+The subprocess implementation in asyncio can (still) sometimes cause deadlocks,
+even when properly handling reading/writes from different tasks.
+Besides that, when using multiple asyncio subprocesses, together with uvloop
+things go very wrong: https://github.com/MagicStack/uvloop/issues/317
+
+As we rely a lot on moving chunks around through subprocesses (mainly sox),
+this custom implementation can be seen as a temporary solution until the main issue
+in uvloop is resolved.
+"""
+
+import asyncio
+import logging
+import subprocess
+import threading
+import time
+from typing import AsyncGenerator, List, Optional
+
+LOGGER = logging.getLogger("AsyncProcess")
+
+
+class AsyncProcess(object):
+ """Implementation of a (truly) non blocking subprocess."""
+
+ def __init__(
+ self,
+ process_args: List,
+ chunksize=512000,
+ enable_write: bool = False,
+ enable_shell=False,
+ ):
+ """Initialize."""
+ self._process_args = process_args
+ self._chunksize = chunksize
+ self._enable_write = enable_write
+ self._enable_shell = enable_shell
+ self._exit = False
+ self._proc = None
+ self._id = int(time.time()) # some identifier for logging
+
+ async def __aenter__(self) -> "AsyncProcess":
+ """Enter context manager, start running the process in executor."""
+ LOGGER.debug("[%s] Entered context manager", self._id)
+ self._proc = subprocess.Popen(
+ self._process_args,
+ **{
+ "shell": self._enable_shell,
+ "stdout": subprocess.PIPE,
+ "stdin": subprocess.PIPE if self._enable_write else None,
+ },
+ )
+ return self
+
+ async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
+ """Exit context manager."""
+ if exc_type:
+ LOGGER.debug(
+ "[%s] Context manager exit with exception %s (%s)",
+ self._id,
+ exc_type,
+ str(exc_value),
+ )
+ else:
+ LOGGER.debug("[%s] Context manager exit", self._id)
+ # signal that we must exit
+ self._exit = True
+
+ def close_proc():
+ if self._proc and self._proc.poll() is None:
+ # there is no real clean way to do this with all the blocking pipes
+ self._proc.kill()
+
+ await asyncio.get_running_loop().run_in_executor(None, close_proc)
+ LOGGER.debug("[%s] Cleanup finished", self._id)
+ return True
+
+ async def iterate_chunks(self) -> AsyncGenerator[bytes, None]:
+ """Yield chunks from the output Queue. Generator."""
+ LOGGER.debug("[%s] start reading from generator", self._id)
+ while not self._exit:
+ chunk = await self.read()
+ yield chunk
+ if len(chunk) < self._chunksize:
+ break
+ LOGGER.debug("[%s] finished reading from generator", self._id)
+
+ async def read(self) -> bytes:
+ """Read single chunk from the output Queue."""
+
+ def try_read():
+ try:
+ data = self._proc.stdout.read(self._chunksize)
+ return data
+ except BrokenPipeError:
+ return b""
+ except Exception as exc: # pylint: disable=broad-except
+ LOGGER.exception(exc)
+ return b""
+
+ return await asyncio.get_running_loop().run_in_executor(None, try_read)
+
+ async def write(self, data: bytes) -> None:
+ """Write data to process."""
+
+ def try_write(_data):
+ try:
+ self._proc.stdin.write(_data)
+ except BrokenPipeError:
+ pass
+ except Exception as exc: # pylint: disable=broad-except
+ LOGGER.exception(exc)
+
+ await asyncio.get_running_loop().run_in_executor(None, try_write, data)
+
+ async def write_eof(self) -> None:
+ """Write eof to process."""
+
+ def try_write():
+ try:
+ self._proc.stdin.close()
+ except BrokenPipeError:
+ pass
+ except Exception as exc: # pylint: disable=broad-except
+ LOGGER.exception(exc)
+
+ await asyncio.get_running_loop().run_in_executor(None, try_write)
+
+ async def communicate(self, input_data: Optional[bytes] = None) -> bytes:
+ """Write bytes to process and read back results."""
+ if not self._enable_write and input_data:
+ raise RuntimeError("Write is disabled")
+ if input_data:
+ await self.write(input_data)
+ output = b""
+ async for chunk in self.iterate_chunks():
+ output += chunk
+ return output
+
+
+# first attempt with queues, too complicated
+# left here as reference
+class AsyncProcessWithQueues(object):
+ """Implementation of a (truly) non blocking subprocess."""
+
+ def __init__(
+ self,
+ process_args: List,
+ chunksize=512000,
+ enable_write: bool = False,
+ enable_shell=False,
+ ):
+ """Initialize."""
+ self._process_args = process_args
+ self._chunksize = chunksize
+ self._enable_write = enable_write
+ self._enable_shell = enable_shell
+ # we have large chunks, limit the queue size a bit.
+ import janus
+
+ self.__queue_in = janus.Queue(8)
+ self.__queue_out = janus.Queue(4)
+ self.__proc_task = None
+ self._exit = threading.Event()
+ self._id = int(time.time()) # some identifier for logging
+
+ async def __aenter__(self) -> "AsyncProcess":
+ """Enter context manager, start running the process in executor."""
+ LOGGER.debug("[%s] Entered context manager", self._id)
+ self.__proc_task = asyncio.get_running_loop().run_in_executor(None, self._run)
+ return self
+
+ async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
+ """Exit context manager."""
+ if exc_type:
+ LOGGER.debug(
+ "[%s] Context manager exit with exception %s (%s)",
+ self._id,
+ exc_type,
+ str(exc_value),
+ )
+ else:
+ LOGGER.debug("[%s] Context manager exit", self._id)
+ # signal that we must exit
+ self._exit.set()
+ # if self._proc and self._proc.poll() is None:
+ # asyncio.get_running_loop().run_in_executor(None, self._proc.communicate)
+ print("1")
+ self.__queue_out.close()
+ self.__queue_in.close()
+ print("2")
+ await self.__queue_out.wait_closed()
+ await self.__queue_in.wait_closed()
+ print("3")
+ # await executor job
+ self.__proc_task.cancel()
+ # await self.__proc_task
+ print("4")
+
+ LOGGER.debug("[%s] Cleanup finished", self._id)
+ return True
+
+ async def iterate_chunks(self) -> AsyncGenerator[bytes, None]:
+ """Yield chunks from the output Queue. Generator."""
+ LOGGER.debug("[%s] start reading from generator", self._id)
+ while not self._exit.is_set():
+ chunk = await self.__queue_out.async_q.get()
+ self.__queue_out.async_q.task_done()
+ if not chunk:
+ break
+ yield chunk
+ LOGGER.debug("[%s] finished reading from generator", self._id)
+
+ async def read(self) -> bytes:
+ """Read single chunk from the output Queue."""
+ chunk = await self.__queue_out.async_q.get()
+ self.__queue_out.async_q.task_done()
+ return chunk
+
+ async def write(self, data: Optional[bytes] = None) -> None:
+ """Write data to process."""
+ if not self._exit.is_set():
+ await self.__queue_in.async_q.put(data)
+
+ async def write_eof(self) -> None:
+ """Write eof to process stdin."""
+ await self.write(b"")
+
+ async def communicate(self, input_data: Optional[bytes] = None) -> bytes:
+ """Write bytes to process and read back results."""
+ if not self._enable_write and input_data:
+ raise RuntimeError("Write is disabled")
+ if input_data:
+ await self.write(input_data)
+ await self.write_eof()
+ output = b""
+ async for chunk in self.iterate_chunks():
+ output += chunk
+ return output
+
+ def _run(self):
+ """Run actual process in executor thread."""
+ LOGGER.info(
+ "[%s] Starting process with args: %s", self._id, str(self._process_args)
+ )
+ proc = subprocess.Popen(
+ self._process_args,
+ **{
+ "shell": self._enable_shell,
+ "stdout": subprocess.PIPE,
+ "stdin": subprocess.PIPE if self._enable_write else None,
+ },
+ )
+
+ # start fill buffer task in (yet another) background thread
+ def fill_buffer():
+ LOGGER.debug("[%s] start fill buffer", self._id)
+ try:
+ while not self._exit.is_set() and not self.__queue_in.closed:
+ chunk = self.__queue_in.sync_q.get()
+ if not chunk:
+ break
+ proc.stdin.write(chunk)
+ except Exception as exc: # pylint: disable=broad-except
+ LOGGER.debug("[%s], fill buffer aborted (%s)", self._id, str(exc))
+ else:
+ LOGGER.debug("[%s] fill buffer finished", self._id)
+
+ if self._enable_write:
+ fill_buffer_thread = threading.Thread(
+ target=fill_buffer, name=f"AsyncProcess_{self._id}"
+ )
+ fill_buffer_thread.start()
+
+ # consume bytes from stdout
+ try:
+ while not self._exit.is_set() and not self.__queue_out.closed:
+ chunk = proc.stdout.read(self._chunksize)
+ self.__queue_out.sync_q.put(chunk)
+ if len(chunk) < self._chunksize:
+ LOGGER.debug("[%s] last chunk received on stdout", self._id)
+ break
+ if self._enable_write:
+ fill_buffer_thread.join()
+ # write empty chunk to out queue to indicate end of stream just in case
+ self.__queue_out.sync_q.put(b"")
+ finally:
+ LOGGER.info("[%s] wait for process exit", self._id)
+ # pickup remaining bytes if process is stull running
+ if proc.poll() is None:
+ proc.communicate()
Either by sending tracks one by one or send one continuous stream
of music with crossfade/gapless support (queue stream).
+
+All audio is processed by the SoX executable, using various subprocess streams.
"""
import asyncio
import gc
async_decrypt_string,
encrypt_bytes,
)
+from music_assistant.helpers.process import AsyncProcess
from music_assistant.helpers.typing import MusicAssistantType
from music_assistant.helpers.util import (
async_yield_chunks,
output_format: SoxOutputFormat = SoxOutputFormat.FLAC,
resample: Optional[int] = None,
gain_db_adjust: Optional[float] = None,
- chunk_size: int = 5000000,
+ chunk_size: int = 1024000,
) -> AsyncGenerator[Tuple[bool, bytes], None]:
"""Get the sox manipulated audio data for the given streamdetails."""
# collect all args for sox
args += ["vol", str(gain_db_adjust), "dB"]
if resample:
args += ["rate", "-v", str(resample)]
- if not chunk_size:
- chunk_size = int(
- streamdetails.sample_rate * (streamdetails.bit_depth / 8) * 2 * 10
- )
+
LOGGER.debug(
"[async_get_sox_stream] [%s/%s] started using args: %s",
streamdetails.provider,
streamdetails.item_id,
" ".join(args),
)
- # init the process with stdin/out pipes
- sox_proc = await asyncio.create_subprocess_exec(
- *args,
- stdout=asyncio.subprocess.PIPE,
- stdin=asyncio.subprocess.PIPE,
- limit=chunk_size * 5,
- )
+ async with AsyncProcess(args, chunk_size, enable_write=True) as sox_proc:
- async def fill_buffer():
- """Forward audio chunks to sox stdin."""
- LOGGER.debug(
- "[async_get_sox_stream] [%s/%s] fill_buffer started",
- streamdetails.provider,
- streamdetails.item_id,
- )
- # feed audio data into sox stdin for processing
- async for chunk in self.async_get_media_stream(streamdetails):
- if self.mass.exit:
- break
- sox_proc.stdin.write(chunk)
- await sox_proc.stdin.drain()
- # send eof when last chunk received
- sox_proc.stdin.write_eof()
- await sox_proc.stdin.drain()
- LOGGER.debug(
- "[async_get_sox_stream] [%s/%s] fill_buffer finished",
- streamdetails.provider,
- streamdetails.item_id,
- )
+ async def fill_buffer():
+ """Forward audio chunks to sox stdin."""
+ LOGGER.debug(
+ "[async_get_sox_stream] [%s/%s] fill_buffer started",
+ streamdetails.provider,
+ streamdetails.item_id,
+ )
+ # feed audio data into sox stdin for processing
+ async for chunk in self.async_get_media_stream(streamdetails):
+ await sox_proc.write(chunk)
+ await sox_proc.write_eof()
+ LOGGER.debug(
+ "[async_get_sox_stream] [%s/%s] fill_buffer finished",
+ streamdetails.provider,
+ streamdetails.item_id,
+ )
- fill_buffer_task = self.mass.loop.create_task(fill_buffer())
- try:
+ fill_buffer_task = self.mass.loop.create_task(fill_buffer())
# yield chunks from stdout
# we keep 1 chunk behind to detect end of stream properly
prev_chunk = b""
- while True:
- # read exactly chunksize of data
- try:
- chunk = await sox_proc.stdout.readexactly(chunk_size)
- except asyncio.IncompleteReadError as exc:
- chunk = exc.partial
+ async for chunk in sox_proc.iterate_chunks():
if len(chunk) < chunk_size:
# last chunk
yield (True, prev_chunk + chunk)
await asyncio.wait([fill_buffer_task])
- except (GeneratorExit, Exception): # pylint: disable=broad-except
- LOGGER.warning(
- "[async_get_sox_stream] [%s/%s] aborted",
- streamdetails.provider,
- streamdetails.item_id,
- )
- if fill_buffer_task and not fill_buffer_task.cancelled():
- fill_buffer_task.cancel()
- await sox_proc.communicate()
- if sox_proc and sox_proc.returncode is None:
- sox_proc.terminate()
- await sox_proc.wait()
- else:
LOGGER.debug(
"[async_get_sox_stream] [%s/%s] finished",
streamdetails.provider,
chunk_size = 571392 # 74,7% of pcm
args = ["sox", "-t", "s32", "-c", "2", "-r", "96000", "-", "-t", "flac", "-"]
- sox_proc = await asyncio.create_subprocess_exec(
- *args,
- stdout=asyncio.subprocess.PIPE,
- stdin=asyncio.subprocess.PIPE,
- limit=chunk_size,
- )
- LOGGER.debug(
- "[async_queue_stream_flac] [%s] started using args: %s",
- player_id,
- " ".join(args),
- )
+ async with AsyncProcess(args, chunk_size, enable_write=True) as sox_proc:
- # feed stdin with pcm samples
- async def fill_buffer():
- """Feed audio data into sox stdin for processing."""
LOGGER.debug(
- "[async_queue_stream_flac] [%s] fill buffer started", player_id
- )
- async for chunk in self.async_queue_stream_pcm(player_id, 96000, 32):
- if self.mass.exit:
- return
- sox_proc.stdin.write(chunk)
- await sox_proc.stdin.drain()
- # write eof when no more data
- sox_proc.stdin.write_eof()
- await sox_proc.stdin.drain()
- LOGGER.debug(
- "[async_queue_stream_flac] [%s] fill buffer finished", player_id
+ "[async_queue_stream_flac] [%s] started using args: %s",
+ player_id,
+ " ".join(args),
)
- fill_buffer_task = self.mass.loop.create_task(fill_buffer())
- try:
- # yield flac chunks from stdout
- while True:
- try:
- chunk = await sox_proc.stdout.readexactly(chunk_size)
- yield chunk
- except asyncio.IncompleteReadError as exc:
- chunk = exc.partial
- yield chunk
- break
- except (GeneratorExit, Exception): # pylint: disable=broad-except
- LOGGER.debug("[async_queue_stream_flac] [%s] aborted", player_id)
- if fill_buffer_task and not fill_buffer_task.cancelled():
- fill_buffer_task.cancel()
- await sox_proc.communicate()
- if sox_proc and sox_proc.returncode is None:
- sox_proc.terminate()
- await sox_proc.wait()
- else:
+ # feed stdin with pcm samples
+ async def fill_buffer():
+ """Feed audio data into sox stdin for processing."""
+ LOGGER.debug(
+ "[async_queue_stream_flac] [%s] fill buffer started", player_id
+ )
+ async for chunk in self.async_queue_stream_pcm(player_id, 96000, 32):
+ if self.mass.exit:
+ return
+ await sox_proc.write(chunk)
+ # write eof when no more data
+ await sox_proc.write_eof()
+ LOGGER.debug(
+ "[async_queue_stream_flac] [%s] fill buffer finished", player_id
+ )
+
+ fill_buffer_task = self.mass.loop.create_task(fill_buffer())
+ # start yielding audio chunks
+ async for chunk in sox_proc.iterate_chunks():
+ yield chunk
+ await asyncio.wait([fill_buffer_task])
LOGGER.debug(
"[async_queue_stream_flac] [%s] finished",
player_id,
stream_path = await async_decrypt_string(streamdetails.path)
stream_type = StreamType(streamdetails.type)
audio_data = b""
+ chunk_size = 512000
# Handle (optional) caching of audio data
cache_id = f"{streamdetails.item_id}{streamdetails.provider}"[::-1]
)
if stream_type == StreamType.CACHE:
- async for chunk in async_yield_chunks(audio_data, 512000):
+ async for chunk in async_yield_chunks(audio_data, chunk_size):
yield chunk
elif stream_type == StreamType.URL:
async with self.mass.http_session.get(stream_path) as response:
- async for chunk in response.content.iter_any():
+ while True:
+ chunk = await response.content.read(chunk_size)
+ if not chunk:
+ break
yield chunk
if len(audio_data) < 100000000:
audio_data += chunk
elif stream_type == StreamType.FILE:
async with AIOFile(stream_path) as afp:
- async for chunk in Reader(afp):
+ async for chunk in Reader(afp, chunk_size=chunk_size):
yield chunk
if len(audio_data) < 100000000:
audio_data += chunk
elif stream_type == StreamType.EXECUTABLE:
args = shlex.split(stream_path)
- chunk_size = 512000
- process = await asyncio.create_subprocess_exec(
- *args, stdout=asyncio.subprocess.PIPE, limit=chunk_size
- )
- try:
- while True:
- # read exactly chunksize of data
- try:
- chunk = await process.stdout.readexactly(chunk_size)
- except asyncio.IncompleteReadError as exc:
- chunk = exc.partial
+ async with AsyncProcess(args, chunk_size, False) as process:
+ async for chunk in process.iterate_chunks():
yield chunk
if len(audio_data) < 100000000:
audio_data += chunk
- if len(chunk) < chunk_size:
- # last chunk
- break
- except (GeneratorExit, Exception) as exc: # pylint: disable=broad-except
- LOGGER.warning(
- "[async_get_media_stream] [%s/%s] Aborted: %s",
- streamdetails.provider,
- streamdetails.item_id,
- str(exc),
- )
- # read remaining bytes
- process.terminate()
- await process.communicate()
- if process and process.returncode is None:
- process.terminate()
- await process.wait()
- raise GeneratorExit from exc
# signal end of stream event
self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails)
fadeinfile = create_tempfile()
args = ["sox", "--ignore-length", "-t"] + pcm_args
args += ["-", "-t"] + pcm_args + [fadeinfile.name, "fade", "t", str(fade_length)]
- process = await asyncio.create_subprocess_exec(
- *args, stdin=asyncio.subprocess.PIPE, limit=10000000
- )
- await process.communicate(fade_in_part)
+ async with AsyncProcess(args, enable_write=True) as sox_proc:
+ await sox_proc.communicate(fade_in_part)
# create fade-out part
fadeoutfile = create_tempfile()
args = ["sox", "--ignore-length", "-t"] + pcm_args + ["-", "-t"] + pcm_args
args += [fadeoutfile.name, "reverse", "fade", "t", str(fade_length), "reverse"]
- process = await asyncio.create_subprocess_exec(
- *args,
- stdout=asyncio.subprocess.PIPE,
- stdin=asyncio.subprocess.PIPE,
- limit=10000000,
- )
- await process.communicate(fade_out_part)
+ async with AsyncProcess(args, enable_write=True) as sox_proc:
+ await sox_proc.communicate(fade_out_part)
# create crossfade using sox and some temp files
# TODO: figure out how to make this less complex and without the tempfiles
args = ["sox", "-m", "-v", "1.0", "-t"] + pcm_args + [fadeoutfile.name, "-v", "1.0"]
args += ["-t"] + pcm_args + [fadeinfile.name, "-t"] + pcm_args + ["-"]
- process = await asyncio.create_subprocess_exec(
- *args,
- stdout=asyncio.subprocess.PIPE,
- stdin=asyncio.subprocess.PIPE,
- limit=10000000,
- )
- crossfade_part, _ = await process.communicate()
+ async with AsyncProcess(args, enable_write=False) as sox_proc:
+ crossfade_part = await sox_proc.communicate()
fadeinfile.close()
fadeoutfile.close()
del fadeinfile
args += ["silence", "1", "0.1", "1%"]
if reverse:
args.append("reverse")
- process = await asyncio.create_subprocess_exec(
- *args,
- stdin=asyncio.subprocess.PIPE,
- stdout=asyncio.subprocess.PIPE,
- limit=10000000,
- )
- stripped_data, _ = await process.communicate(audio_data)
+ async with AsyncProcess(args, enable_write=True) as sox_proc:
+ stripped_data = await sox_proc.communicate(audio_data)
return stripped_data
def set_cast_info(self, cast_info: ChromecastInfo) -> None:
"""Set the cast information and set up the chromecast object."""
self._cast_info = cast_info
- if self._chromecast is not None:
+ if self._chromecast and not self._chromecast.socket_client.is_connected:
+ self.disconnect()
+ elif self._chromecast is not None:
return
LOGGER.debug(
"[%s] Connecting to cast device by service %s",
self._available = new_available
self.update_state()
if self._cast_info.is_audio_group and new_available:
- self.mass.add_job(self._chromecast.mz_controller.update_members)
+ self.try_chromecast_command(
+ self._chromecast.mz_controller.update_members
+ )
async def async_on_update(self) -> None:
"""Call when player is periodically polled by the player manager (should_poll=True)."""
"group_player"
):
# the group player wants very accurate elapsed_time state so we request it very often
- self.mass.add_job(self._chromecast.media_controller.update_status)
+ self.try_chromecast_command(self._chromecast.media_controller.update_status)
self.update_state()
# ========== Service Calls ==========
async def async_cmd_stop(self) -> None:
"""Send stop command to player."""
if self._chromecast and self._chromecast.media_controller:
- self.mass.add_job(self._chromecast.media_controller.stop)
+ self.try_chromecast_command(self._chromecast.media_controller.stop)
async def async_cmd_play(self) -> None:
"""Send play command to player."""
if self._chromecast.media_controller:
- self.mass.add_job(self._chromecast.media_controller.play)
+ self.try_chromecast_command(self._chromecast.media_controller.play)
async def async_cmd_pause(self) -> None:
"""Send pause command to player."""
if self._chromecast.media_controller:
- self.mass.add_job(self._chromecast.media_controller.pause)
+ self.try_chromecast_command(self._chromecast.media_controller.pause)
async def async_cmd_next(self) -> None:
"""Send next track command to player."""
if self._chromecast.media_controller:
- self.mass.add_job(self._chromecast.media_controller.queue_next)
+ self.try_chromecast_command(self._chromecast.media_controller.queue_next)
async def async_cmd_previous(self) -> None:
"""Send previous track command to player."""
if self._chromecast.media_controller:
- self.mass.add_job(self._chromecast.media_controller.queue_prev)
+ self.try_chromecast_command(self._chromecast.media_controller.queue_prev)
async def async_cmd_power_on(self) -> None:
"""Send power ON command to player."""
- self.mass.add_job(self._chromecast.set_volume_muted, False)
+ self.try_chromecast_command(self._chromecast.set_volume_muted, False)
async def async_cmd_power_off(self) -> None:
"""Send power OFF command to player."""
or self.media_status.player_is_paused
or self.media_status.player_is_idle
):
- self.mass.add_job(self._chromecast.media_controller.stop)
+ self.try_chromecast_command(self._chromecast.media_controller.stop)
# chromecast has no real poweroff so we send mute instead
- self.mass.add_job(self._chromecast.set_volume_muted, True)
+ self.try_chromecast_command(self._chromecast.set_volume_muted, True)
async def async_cmd_volume_set(self, volume_level: int) -> None:
"""Send new volume level command to player."""
- self.mass.add_job(self._chromecast.set_volume, volume_level / 100)
+ self.try_chromecast_command(self._chromecast.set_volume, volume_level / 100)
async def async_cmd_volume_mute(self, is_muted: bool = False) -> None:
"""Send mute command to player."""
- self.mass.add_job(self._chromecast.set_volume_muted, is_muted)
+ self.try_chromecast_command(self._chromecast.set_volume_muted, is_muted)
async def async_cmd_play_uri(self, uri: str) -> None:
"""Play single uri on player."""
queue_item.name = "Music Assistant"
queue_item.uri = uri
return await self.async_cmd_queue_load([queue_item, queue_item])
- self.mass.add_job(self._chromecast.play_media, uri, "audio/flac")
+ self.try_chromecast_command(self._chromecast.play_media, uri, "audio/flac")
async def async_cmd_queue_load(self, queue_items: List[QueueItem]) -> None:
"""Load (overwrite) queue with new items."""
"startIndex": 0, # Item index to play after this request or keep same item if undefined
"items": cc_queue_items, # only load 50 tracks at once or the socket will crash
}
- self.mass.add_job(self.__send_player_queue, queuedata)
+ self.try_chromecast_command(self.__send_player_queue, queuedata)
if len(queue_items) > 50:
await self.async_cmd_queue_append(queue_items[51:])
"insertBefore": None,
"items": chunk,
}
- self.mass.add_job(self.__send_player_queue, queuedata)
+ self.try_chromecast_command(self.__send_player_queue, queuedata)
def __create_queue_items(self, tracks) -> None:
"""Create list of CC queue items from tracks."""
)
else:
send_queue()
+
+ def try_chromecast_command(self, func, *args, **kwargs):
+ """Try to execute Chromecast command."""
+
+ def handle_command(func, *args, **kwarg):
+ if not self._chromecast.socket_client.is_connected:
+ return
+ try:
+ return func(*args, **kwargs)
+ except Exception as exc: # pylint: disable=broad-except
+ LOGGER.error(
+ "Error while executing command on player %s: %s",
+ self.name,
+ str(exc),
+ )
+
+ self.mass.add_job(handle_command, func, *args, **kwargs)
async def subscribe_stream_client(self, child_player_id):
"""Handle streaming to all players of a group. Highly experimental."""
- # each connected client gets its own sox process to convert incoming pcm samples
- # to flac (which is streamed to the player).
- args = [
- "sox",
- "-t",
- "s32",
- "-c",
- "2",
- "-r",
- "96000",
- "-",
- "-t",
- "flac",
- "-C",
- "0",
- "-",
- ]
- sox_proc = await asyncio.create_subprocess_exec(
- *args,
- stdout=asyncio.subprocess.PIPE,
- stdin=asyncio.subprocess.PIPE,
- )
- chunk_size = 2880000 # roughly 5 seconds of flac @ 96000/32
+ # each connected client gets its own Queue to which audio chunks (flac) are sent
try:
# report this client as connected
- self.connected_clients[child_player_id] = sox_proc.stdin
+ queue = asyncio.Queue()
+ self.connected_clients[child_player_id] = queue
LOGGER.debug(
"[%s] child player connected: %s",
self.player_id,
)
# yield flac chunks from stdout to the http streamresponse
while True:
- try:
- chunk = await sox_proc.stdout.readexactly(chunk_size)
- yield chunk
- except asyncio.IncompleteReadError as exc:
- chunk = exc.partial
- yield chunk
+ chunk = await queue.get()
+ yield chunk
+ queue.task_done()
+ if not chunk:
break
except (GeneratorExit, Exception): # pylint: disable=broad-except
LOGGER.warning(
"[%s] child player aborted stream: %s", self.player_id, child_player_id
)
self.connected_clients.pop(child_player_id, None)
- await sox_proc.communicate()
- if sox_proc and sox_proc.returncode is None:
- sox_proc.terminate()
- await sox_proc.wait()
else:
self.connected_clients.pop(child_player_id, None)
LOGGER.debug(
)
self.sync_task = asyncio.create_task(self.__synchronize_players())
- async for audio_chunk in self.mass.streams.async_queue_stream_pcm(
- self.player_id, sample_rate=96000, bit_depth=32
+ async for audio_chunk in self.mass.streams.async_queue_stream_flac(
+ self.player_id
):
# make sure we still have clients connected
# send the audio chunk to all connected players
tasks = []
- for _writer in self.connected_clients.values():
- tasks.append(self.mass.add_job(_writer.write, audio_chunk))
- tasks.append(self.mass.add_job(_writer.drain()))
+ for _queue in self.connected_clients.values():
+ tasks.append(self.mass.add_job(_queue.put(audio_chunk)))
# wait for clients to consume the data
await asyncio.wait(tasks)
"""Process incoming stat STMu message: Buffer underrun: Normal end of playback."""
# pylint: disable=unused-argument
LOGGER.debug("STMu received - end of playback.")
- self.state = STATE_STOPPED
+ self._state = STATE_STOPPED
self.signal_event(SqueezeEvent.STATE_UPDATED)
def _process_resp(self, data):
resp = StreamResponse(
status=200, reason="OK", headers={"Content-Type": f"audio/{content_type}"}
)
- resp.enable_chunked_encoding()
await resp.prepare(request)
# stream track
resp = StreamResponse(
status=200, reason="OK", headers={"Content-Type": "audio/flac"}
)
- resp.enable_chunked_encoding()
await resp.prepare(request)
async for audio_chunk in request.app["mass"].streams.async_stream_queue_item(
resp = StreamResponse(
status=200, reason="OK", headers={"Content-Type": "audio/flac"}
)
- resp.enable_chunked_encoding()
await resp.prepare(request)
# stream queue