Various fixes and code cleanup to the streaming logic (#1216)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 10 Apr 2024 19:45:12 +0000 (21:45 +0200)
committerGitHub <noreply@github.com>
Wed, 10 Apr 2024 19:45:12 +0000 (21:45 +0200)
23 files changed:
music_assistant/common/helpers/util.py
music_assistant/common/models/streamdetails.py
music_assistant/constants.py
music_assistant/server/controllers/cache.py
music_assistant/server/controllers/media/artists.py
music_assistant/server/controllers/media/playlists.py
music_assistant/server/controllers/media/tracks.py
music_assistant/server/controllers/metadata.py
music_assistant/server/controllers/music.py
music_assistant/server/controllers/player_queues.py
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py
music_assistant/server/helpers/database.py
music_assistant/server/helpers/process.py
music_assistant/server/models/music_provider.py
music_assistant/server/models/player_provider.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/chromecast/__init__.py
music_assistant/server/providers/filesystem_local/base.py
music_assistant/server/providers/filesystem_smb/__init__.py
music_assistant/server/providers/slimproto/__init__.py
music_assistant/server/providers/snapcast/__init__.py
music_assistant/server/providers/spotify/__init__.py

index 9b7bd05597e70542169819df24cba5fe1ad48a4d..989588a73be48aa291e0fe1592db7d021e26142d 100644 (file)
@@ -46,6 +46,19 @@ def try_parse_bool(possible_bool: Any) -> str:
     return possible_bool in ["true", "True", "1", "on", "ON", 1]
 
 
+def try_parse_duration(duration_str: str) -> float:
+    """Try to parse a duration in seconds from a duration (HH:MM:SS) string."""
+    milliseconds = float("0." + duration_str.split(".")[-1]) if "." in duration_str else 0.0
+    duration_parts = duration_str.split(".")[0].split(",")[0].split(":")
+    if len(duration_parts) == 3:
+        seconds = sum(x * int(t) for x, t in zip([3600, 60, 1], duration_parts, strict=False))
+    elif len(duration_parts) == 2:
+        seconds = sum(x * int(t) for x, t in zip([60, 1], duration_parts, strict=False))
+    else:
+        seconds = int(duration_parts[0])
+    return seconds + milliseconds
+
+
 def create_sort_name(input_str: str) -> str:
     """Create sort name/title from string."""
     input_str = input_str.lower().strip()
index 230c5acfece042e2c8ce3ef81854b1e560c643ec..c56b6f88d1676734b1b39ef9606c7e108ebfd2cc 100644 (file)
@@ -19,6 +19,7 @@ class LoudnessMeasurement(DataClassDictMixin):
     true_peak: float
     lra: float
     threshold: float
+    target_offset: float | None = None
 
 
 @dataclass(kw_only=True)
@@ -50,8 +51,6 @@ class StreamDetails(DataClassDictMixin):
     # can_seek: bool to indicate that the providers 'get_audio_stream' supports seeking of the item
     can_seek: bool = True
 
-    # stream_type:
-
     # the fields below will be set/controlled by the streamcontroller
     seek_position: int = 0
     fade_in: bool = False
index a2eaa1f0849f1934a4be33f86a0c17c4e0876d67..861d558e93c8f66be438e3532ffbee194ae86764 100644 (file)
@@ -5,7 +5,7 @@ from typing import Final
 
 API_SCHEMA_VERSION: Final[int] = 24
 MIN_SCHEMA_VERSION: Final[int] = 24
-DB_SCHEMA_VERSION: Final[int] = 28
+DB_SCHEMA_VERSION: Final[int] = 29
 
 MASS_LOGGER_NAME: Final[str] = "music_assistant"
 
index 23fca100122187d95298d690e95e3e4e1dd926a8..6eae031060bc7ef39d4dd86296f24828e9195d66 100644 (file)
@@ -164,6 +164,7 @@ class CacheController(CoreController):
             if db_row["expires"] < cur_timestamp:
                 await self.delete(db_row["key"])
                 cleaned_records += 1
+            await asyncio.sleep(0)  # yield to eventloop
         if cleaned_records > 50:
             self.logger.debug("Compacting database...")
             await self.database.vacuum()
index f9c33287c2f7397a3e97eeecf25df489bfe7a120..6d39428a0375fca528fdc4ab1d523bdec96f563c 100644 (file)
@@ -87,6 +87,7 @@ class ArtistsController(MediaControllerBase[Artist]):
                     # overhead of grabbing the musicbrainz id upfront
                     library_item = await self.update_item_in_library(db_item.item_id, item)
                     break
+                await asyncio.sleep(0)  # yield to eventloop
         if not library_item:
             # actually add (or update) the item in the library db
             # use the lock to prevent a race condition of the same item being added twice
index 91eb00adce4c0fbca413014b361621c14dcead28..0e5a131a030f37ee822479c0fd086d8ec05a4ec9 100644 (file)
@@ -18,12 +18,7 @@ from music_assistant.common.models.errors import (
     ProviderUnavailableError,
     UnsupportedFeaturedException,
 )
-from music_assistant.common.models.media_items import (
-    ItemMapping,
-    Playlist,
-    PlaylistTrack,
-    Track,
-)
+from music_assistant.common.models.media_items import ItemMapping, Playlist, PlaylistTrack, Track
 from music_assistant.constants import DB_TABLE_PLAYLISTS
 from music_assistant.server.helpers.compare import compare_strings
 
@@ -93,7 +88,7 @@ class PlaylistController(MediaControllerBase[Playlist]):
                 library_item = await self._add_library_item(item)
         # preload playlist tracks listing (do not load them in the db)
         async for _ in self.tracks(item.item_id, item.provider):
-            pass
+            await asyncio.sleep(0)  # yield to eventloop
         # metadata lookup we need to do after adding it to the db
         if metadata_lookup:
             await self.mass.metadata.get_playlist_metadata(library_item)
@@ -228,6 +223,7 @@ class PlaylistController(MediaControllerBase[Playlist]):
                     if i.provider_instance == playlist_prov.provider_instance
                 }
             )
+            await asyncio.sleep(0)  # yield to eventloop
         # check for duplicates
         for track_prov in track.provider_mappings:
             if (
index b3068511c782449a2ddfa32ab538114f8e82b769..70ef210ebe20d500923b71774bc8657d15988937 100644 (file)
@@ -161,6 +161,7 @@ class TracksController(MediaControllerBase[Track]):
                     # existing item found: update it
                     library_item = await self.update_item_in_library(db_item.item_id, item)
                     break
+                await asyncio.sleep(0)  # yield to eventloop
         if not library_item:
             # actually add a new item in the library db
             # use the lock to prevent a race condition of the same item being added twice
index d52c5fd354d9ca89537783f45a238fac299fd4bd..23ff8b054ee22e7b2a766ddd586792cd1b50b05e 100644 (file)
@@ -198,6 +198,7 @@ class MetaDataController(CoreController):
                     if genre not in playlist_genres:
                         playlist_genres[genre] = 0
                     playlist_genres[genre] += 1
+                await asyncio.sleep(0)  # yield to eventloop
 
             playlist_genres_filtered = {
                 genre for genre, count in playlist_genres.items() if count > 5
index db34f2f672fb206b81644d38443bd219b2f2a562..d5d2ac8f37463ab56b93bdfd4962350faa0c81af 100644 (file)
@@ -478,6 +478,7 @@ class MusicController(CoreController):
                     "true_peak": loudness.true_peak,
                     "lra": loudness.lra,
                     "threshold": loudness.threshold,
+                    "target_offset": loudness.target_offset,
                 },
                 allow_replace=True,
             )
@@ -659,7 +660,7 @@ class MusicController(CoreController):
             await asyncio.to_thread(shutil.copyfile, db_path, db_path_backup)
 
             # handle db migration from previous schema to this one
-            if prev_version == 27:
+            if prev_version in (27, 28):
                 self.logger.info(
                     "Performing database migration from %s to %s",
                     prev_version,
@@ -727,6 +728,7 @@ class MusicController(CoreController):
                     true_peak REAL,
                     lra REAL,
                     threshold REAL,
+                    target_offset REAL,
                     UNIQUE(item_id, provider));"""
         )
         await self.database.execute(
index b662c5600a76579fee7d8234e0a8ed762d885399..7970ee8c5f05edd32113eeca487058238e64291d 100644 (file)
@@ -2,6 +2,7 @@
 
 from __future__ import annotations
 
+import asyncio
 import random
 import time
 from collections.abc import AsyncGenerator
@@ -325,6 +326,7 @@ class PlayerQueuesController(CoreController):
             elif media_item.media_type == MediaType.PLAYLIST:
                 async for playlist_track in ctrl.tracks(media_item.item_id, media_item.provider):
                     tracks.append(playlist_track)
+                    await asyncio.sleep(0)  # yield to eventloop
                 await self.mass.music.mark_item_played(
                     media_item.media_type, media_item.item_id, media_item.provider
                 )
index 1a1a3a18e31fed3578dd270a24d596a437134fbb..b8a1d6e9c4a3a59a0f0352f91a667f6c5edfb5dd 100644 (file)
@@ -19,7 +19,12 @@ from typing import TYPE_CHECKING
 from aiofiles.os import wrap
 from aiohttp import web
 
-from music_assistant.common.helpers.util import get_ip, select_free_port, try_parse_bool
+from music_assistant.common.helpers.util import (
+    get_ip,
+    select_free_port,
+    try_parse_bool,
+    try_parse_duration,
+)
 from music_assistant.common.models.config_entries import (
     ConfigEntry,
     ConfigValueOption,
@@ -254,7 +259,7 @@ class StreamsController(CoreController):
             output_format_str=request.match_info["fmt"],
             queue_player=queue_player,
             default_sample_rate=queue_item.streamdetails.audio_format.sample_rate,
-            default_bit_depth=queue_item.streamdetails.audio_format.bit_depth,
+            default_bit_depth=24,  # always prefer 24 bits to prevent dithering
         )
         # prepare request, add some DLNA/UPNP compatible headers
         headers = {
@@ -279,21 +284,10 @@ class StreamsController(CoreController):
             queue.display_name,
         )
         queue.index_in_buffer = self.mass.player_queues.index_by_id(queue_id, queue_item_id)
-        pcm_format = AudioFormat(
-            content_type=ContentType.from_bit_depth(
-                queue_item.streamdetails.audio_format.bit_depth
-            ),
-            sample_rate=queue_item.streamdetails.audio_format.sample_rate,
-            bit_depth=queue_item.streamdetails.audio_format.bit_depth,
-        )
-        async for chunk in get_ffmpeg_stream(
-            audio_input=self._get_media_stream(
-                streamdetails=queue_item.streamdetails,
-                pcm_format=pcm_format,
-            ),
-            input_format=pcm_format,
+        async for chunk in self.get_media_stream(
+            streamdetails=queue_item.streamdetails,
             output_format=output_format,
-            filter_params=get_player_filter_params(self.mass, queue_player.player_id),
+            extra_filter_params=get_player_filter_params(self.mass, queue_player.player_id),
         ):
             try:
                 await resp.write(chunk)
@@ -320,7 +314,7 @@ class StreamsController(CoreController):
             output_format_str=request.match_info["fmt"],
             queue_player=queue_player,
             default_sample_rate=FLOW_DEFAULT_SAMPLE_RATE,
-            default_bit_depth=FLOW_DEFAULT_BIT_DEPTH,
+            default_bit_depth=24,  # always prefer 24 bits to prevent dithering
         )
         # play it safe: only allow icy metadata for mp3 and aac
         enable_icy = request.headers.get(
@@ -545,21 +539,19 @@ class StreamsController(CoreController):
             bytes_written = 0
             buffer = b""
             # handle incoming audio chunks
-            async for chunk in self._get_media_stream(
+            async for chunk in self.get_media_stream(
                 queue_track.streamdetails,
-                pcm_format=pcm_format,
-                # strip silence from begin/end if track is being crossfaded
-                strip_silence_begin=use_crossfade and total_bytes_sent > 0,
-                strip_silence_end=use_crossfade,
+                output_format=pcm_format,
             ):
                 # buffer size needs to be big enough to include the crossfade part
                 # allow it to be a bit smaller when playback just starts
-                if not use_crossfade:
-                    req_buffer_size = pcm_sample_size
-                elif (total_bytes_sent + bytes_written) < crossfade_size:
-                    req_buffer_size = int(crossfade_size / 2)
+                if not use_crossfade or (total_bytes_sent + bytes_written == 0):
+                    req_buffer_size = pcm_sample_size * 2
+                elif (total_bytes_sent + bytes_written) < (crossfade_size * 2):
+                    req_buffer_size = pcm_sample_size * 5
                 else:
-                    req_buffer_size = crossfade_size
+                    # additional 5 seconds to strip silence from last part
+                    req_buffer_size = crossfade_size + pcm_sample_size * 5
 
                 # ALWAYS APPEND CHUNK TO BUFFER
                 buffer += chunk
@@ -570,6 +562,14 @@ class StreamsController(CoreController):
 
                 ####  HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
                 if last_fadeout_part:
+                    # strip silence from last part
+                    buffer = await strip_silence(
+                        self.mass,
+                        buffer,
+                        sample_rate=pcm_format.sample_rate,
+                        bit_depth=pcm_format.bit_depth,
+                        reverse=False,
+                    )
                     # perform crossfade
                     fadein_part = buffer[:crossfade_size]
                     remaining_bytes = buffer[crossfade_size:]
@@ -595,6 +595,7 @@ class StreamsController(CoreController):
                 #### OTHER: enough data in buffer, feed to output
                 while len(buffer) > req_buffer_size:
                     yield buffer[:pcm_sample_size]
+                    await asyncio.sleep(0)  # yield to eventloop
                     bytes_written += pcm_sample_size
                     buffer = buffer[pcm_sample_size:]
 
@@ -605,6 +606,14 @@ class StreamsController(CoreController):
                 bytes_written += len(last_fadeout_part)
                 last_fadeout_part = b""
             if use_crossfade:
+                # strip silence from last part
+                buffer = await strip_silence(
+                    self.mass,
+                    buffer,
+                    sample_rate=pcm_format.sample_rate,
+                    bit_depth=pcm_format.bit_depth,
+                    reverse=True,
+                )
                 # if crossfade is enabled, save fadeout part to pickup for next track
                 last_fadeout_part = buffer[-crossfade_size:]
                 remaining_bytes = buffer[:-crossfade_size]
@@ -660,7 +669,7 @@ class StreamsController(CoreController):
                 "-i",
                 ANNOUNCE_ALERT_FILE,
                 "-filter_complex",
-                "[1:a][0:a]concat=n=2:v=0:a=1,loudnorm=I=-10:LRA=11:TP=-2",
+                "[1:a][0:a]concat=n=2:v=0:a=1,loudnorm=I=-10:LRA=11:TP=-1.5",
             ]
             filter_params = []
         async for chunk in get_ffmpeg_stream(
@@ -672,52 +681,34 @@ class StreamsController(CoreController):
         ):
             yield chunk
 
-    async def _get_media_stream(
+    async def get_media_stream(
         self,
         streamdetails: StreamDetails,
-        pcm_format: AudioFormat,
-        strip_silence_begin: bool = False,
-        strip_silence_end: bool = False,
+        output_format: AudioFormat,
+        extra_filter_params: list[str] | None = None,
     ) -> AsyncGenerator[tuple[bool, bytes], None]:
-        """
-        Get the (raw PCM) audio stream for the given streamdetails.
-
-        Other than stripping silence at end and beginning and optional
-        volume normalization this is the pure, unaltered audio data as PCM chunks.
-        """
+        """Get the audio stream for the given streamdetails."""
         logger = self.logger.getChild("media_stream")
         is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
         if is_radio:
             streamdetails.seek_position = 0
-            strip_silence_begin = False
-            strip_silence_end = False
-        if streamdetails.seek_position:
-            strip_silence_begin = False
-        if not streamdetails.duration or streamdetails.duration < 30:
-            strip_silence_end = False
-        # pcm_sample_size = chunk size = 1 second of pcm audio
-        pcm_sample_size = pcm_format.pcm_sample_size
-        buffer_size = (
-            pcm_sample_size * 5
-            if (strip_silence_begin or strip_silence_end)
-            # always require a small amount of buffer to prevent livestreams stuttering
-            else pcm_sample_size * 2
-        )
         # collect all arguments for ffmpeg
-        filter_params = []
+        filter_params = extra_filter_params or []
         if streamdetails.target_loudness is not None:
             # add loudnorm filters
-            filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=11:TP=-2"
+            filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-1.5:LRA=11"
             if streamdetails.loudness:
                 filter_rule += f":measured_I={streamdetails.loudness.integrated}"
                 filter_rule += f":measured_LRA={streamdetails.loudness.lra}"
                 filter_rule += f":measured_tp={streamdetails.loudness.true_peak}"
                 filter_rule += f":measured_thresh={streamdetails.loudness.threshold}"
+                if streamdetails.loudness.target_offset is not None:
+                    filter_rule += f":offset={streamdetails.loudness.target_offset}"
+                filter_rule += ":linear=true"
             filter_rule += ":print_format=json"
             filter_params.append(filter_rule)
         if streamdetails.fade_in:
             filter_params.append("afade=type=in:start_time=0:duration=3")
-
         if streamdetails.stream_type == StreamType.CUSTOM:
             audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
                 streamdetails,
@@ -732,13 +723,14 @@ class StreamsController(CoreController):
         extra_input_args = []
         if streamdetails.seek_position and streamdetails.stream_type != StreamType.CUSTOM:
             extra_input_args += ["-ss", str(int(streamdetails.seek_position))]
-        logger.debug("start media stream for: %s", streamdetails.uri)
-        state_data = {"finished": asyncio.Event(), "bytes_sent": 0}
 
+        logger.debug("start media stream for: %s", streamdetails.uri)
+        bytes_sent = 0
+        finished = False
         async with FFMpeg(
             audio_input=audio_source,
             input_format=streamdetails.audio_format,
-            output_format=pcm_format,
+            output_format=output_format,
             filter_params=filter_params,
             extra_input_args=[
                 *extra_input_args,
@@ -747,61 +739,53 @@ class StreamsController(CoreController):
                 "-filter_threads",
                 "1",
             ],
-            name="ffmpeg_media_stream",
-            stderr_enabled=True,
+            collect_log_history=True,
+            logger=logger,
         ) as ffmpeg_proc:
-
-            async def log_reader():
-                # To prevent stderr locking up, we must keep reading it
-                stderr_data = ""
-                async for line in ffmpeg_proc.iter_stderr():
-                    if "error" in line or "warning" in line:
-                        logger.warning(line)
-                    elif "critical" in line:
-                        logger.critical(line)
-                    elif (
-                        streamdetails.audio_format.content_type == ContentType.UNKNOWN
-                        and line.startswith("Stream #0:0: Audio: ")
-                    ):
-                        # if streamdetails contenttype is unknown, try parse it from the ffmpeg log
-                        streamdetails.audio_format.content_type = ContentType.try_parse(
-                            line.split("Stream #0:0: Audio: ")[1].split(" ")[0]
-                        )
-                    elif stderr_data or "loudnorm" in line:
-                        stderr_data += line
-                    else:
-                        logger.debug(line)
-                    del line
-
-                # if we reach this point, the process is finished (completed or aborted)
-                if ffmpeg_proc.returncode == 0:
-                    await state_data["finished"].wait()
-                finished = state_data["finished"].is_set()
-                bytes_sent = state_data["bytes_sent"]
-                seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
-                streamdetails.seconds_streamed = seconds_streamed
-                state_str = "finished" if finished else "aborted"
+            try:
+                async for chunk in ffmpeg_proc.iter_any():
+                    bytes_sent += len(chunk)
+                    yield chunk
+                    del chunk
+                finished = True
+            finally:
+                await ffmpeg_proc.close()
                 logger.debug(
-                    "stream %s for: %s (%s seconds streamed, exitcode %s)",
-                    state_str,
-                    streamdetails.uri,
-                    seconds_streamed,
+                    "stream %s (with code %s) for %s",
+                    "finished" if finished else "aborted",
                     ffmpeg_proc.returncode,
+                    streamdetails.uri,
                 )
+                # try to determine how many seconds we've streamed
+                seconds_streamed = 0
+                if output_format.content_type.is_pcm():
+                    seconds_streamed = (
+                        bytes_sent / output_format.pcm_sample_size if bytes_sent else 0
+                    )
+                elif line := next((x for x in ffmpeg_proc.log_history if "time=" in x), None):
+                    duration_str = line.split("time=")[1].split(" ")[0]
+                    seconds_streamed = try_parse_duration(duration_str)
+
+                if seconds_streamed:
+                    streamdetails.seconds_streamed = seconds_streamed
                 # store accurate duration
-                if finished and not streamdetails.seek_position:
+                if finished and not streamdetails.seek_position and seconds_streamed:
                     streamdetails.duration = seconds_streamed
 
                 # parse loudnorm data if we have that collected
-                if stderr_data and (loudness_details := parse_loudnorm(stderr_data)):
+                if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)):
                     required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
                     if finished or (seconds_streamed >= required_seconds):
                         logger.debug(
-                            "Loudness measurement for %s: %s", streamdetails.uri, loudness_details
+                            "Loudness measurement for %s: %s",
+                            streamdetails.uri,
+                            loudness_details,
                         )
                         streamdetails.loudness = loudness_details
-                        await self.mass.music.set_track_loudness(
-                            streamdetails.item_id, streamdetails.provider, loudness_details
+                        self.mass.create_task(
+                            self.mass.music.set_track_loudness(
+                                streamdetails.item_id, streamdetails.provider, loudness_details
+                            )
                         )
 
                 # report playback
@@ -809,70 +793,15 @@ class StreamsController(CoreController):
                 if finished or seconds_streamed > 30:
                     self.mass.create_task(
                         self.mass.music.mark_item_played(
-                            streamdetails.media_type, streamdetails.item_id, streamdetails.provider
+                            streamdetails.media_type,
+                            streamdetails.item_id,
+                            streamdetails.provider,
                         )
                     )
                     if music_prov := self.mass.get_provider(streamdetails.provider):
                         self.mass.create_task(
                             music_prov.on_streamed(streamdetails, seconds_streamed)
                         )
-                # cleanup
-                del stderr_data
-
-            self.mass.create_task(log_reader())
-
-            # get pcm chunks from stdout
-            # we always stay buffer_size of bytes behind
-            # so we can strip silence at the beginning and end of a track
-            buffer = b""
-            chunk_num = 0
-            async for chunk in ffmpeg_proc.iter_chunked(pcm_sample_size):
-                chunk_num += 1
-                buffer += chunk
-                del chunk
-
-                if len(buffer) < buffer_size:
-                    # buffer is not full enough, move on
-                    continue
-
-                if strip_silence_begin and chunk_num == 2:
-                    # first 2 chunks received, strip silence of beginning
-                    stripped_audio = await strip_silence(
-                        self.mass,
-                        buffer,
-                        sample_rate=pcm_format.sample_rate,
-                        bit_depth=pcm_format.bit_depth,
-                    )
-                    yield stripped_audio
-                    state_data["bytes_sent"] += len(stripped_audio)
-                    buffer = b""
-                    del stripped_audio
-                    continue
-
-                #### OTHER: enough data in buffer, feed to output
-                while len(buffer) > buffer_size:
-                    yield buffer[:pcm_sample_size]
-                    state_data["bytes_sent"] += pcm_sample_size
-                    buffer = buffer[pcm_sample_size:]
-
-            # all chunks received, strip silence of last part if needed and yield remaining bytes
-            if strip_silence_end:
-                final_chunk = await strip_silence(
-                    self.mass,
-                    buffer,
-                    sample_rate=pcm_format.sample_rate,
-                    bit_depth=pcm_format.bit_depth,
-                    reverse=True,
-                )
-            else:
-                final_chunk = buffer
-
-            # yield final chunk to output (as one big chunk)
-            yield final_chunk
-            state_data["bytes_sent"] += len(final_chunk)
-            state_data["finished"].set()
-            del final_chunk
-            del buffer
 
     def _log_request(self, request: web.Request) -> None:
         """Log request."""
index cd8013fa2fce88e328f2998dca103ace9b4fe446..8e70a5f2b94e182f7d7309a473193f77d517929e 100644 (file)
@@ -8,7 +8,10 @@ import os
 import re
 import struct
 from collections import deque
+from collections.abc import AsyncGenerator
+from contextlib import suppress
 from io import BytesIO
+from signal import SIGINT
 from typing import TYPE_CHECKING
 
 import aiofiles
@@ -46,12 +49,10 @@ from music_assistant.server.helpers.playlists import (
 )
 from music_assistant.server.helpers.tags import parse_tags
 
-from .process import AsyncProcess, check_output
+from .process import AsyncProcess, check_output, communicate
 from .util import create_tempfile
 
 if TYPE_CHECKING:
-    from collections.abc import AsyncGenerator
-
     from music_assistant.common.models.player_queue import QueueItem
     from music_assistant.server import MusicAssistant
 
@@ -66,7 +67,7 @@ HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"}
 class FFMpeg(AsyncProcess):
     """FFMpeg wrapped as AsyncProcess."""
 
-    def __init__(  # noqa: PLR0913
+    def __init__(
         self,
         audio_input: AsyncGenerator[bytes, None] | str | int,
         input_format: AudioFormat,
@@ -74,10 +75,9 @@ class FFMpeg(AsyncProcess):
         filter_params: list[str] | None = None,
         extra_args: list[str] | None = None,
         extra_input_args: list[str] | None = None,
-        name: str = "ffmpeg",
-        stderr_enabled: bool = False,
         audio_output: str | int = "-",
-        loglevel: str | None = None,
+        collect_log_history: bool = False,
+        logger: logging.Logger | None = None,
     ) -> None:
         """Initialize AsyncProcess."""
         ffmpeg_args = get_ffmpeg_args(
@@ -88,17 +88,92 @@ class FFMpeg(AsyncProcess):
             input_path=audio_input if isinstance(audio_input, str) else "-",
             output_path=audio_output if isinstance(audio_output, str) else "-",
             extra_input_args=extra_input_args or [],
-            loglevel=loglevel or "info"
-            if stderr_enabled or LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL)
-            else "error",
+            loglevel="info",
         )
+        self.audio_input = audio_input
+        self.input_format = input_format
+        self.collect_log_history = collect_log_history
+        self.log_history: deque[str] = deque(maxlen=100)
+        self._stdin_task: asyncio.Task | None = None
+        self._logger_task: asyncio.Task | None = None
         super().__init__(
             ffmpeg_args,
-            stdin=True if isinstance(audio_input, str) else audio_input,
+            stdin=True if isinstance(audio_input, str | AsyncGenerator) else audio_input,
             stdout=True if isinstance(audio_output, str) else audio_output,
-            stderr=stderr_enabled,
-            name=name,
+            stderr=True,
         )
+        self.logger = logger or LOGGER.getChild("ffmpeg")
+
+    async def start(self) -> None:
+        """Perform Async init of process."""
+        await super().start()
+        self._logger_task = asyncio.create_task(self._log_reader_task())
+        if isinstance(self.audio_input, AsyncGenerator):
+            self._stdin_task = asyncio.create_task(self._feed_stdin())
+
+    async def close(self, send_signal: bool = True) -> None:
+        """Close/terminate the process and wait for exit."""
+        if self._stdin_task and not self._stdin_task.done():
+            self._stdin_task.cancel()
+            with suppress(asyncio.CancelledError):
+                await self._stdin_task
+            # make sure the stdin generator is also properly closed
+            # by propagating a cancellederror within
+            task = asyncio.create_task(self.audio_input.__anext__())
+            task.cancel()
+        if not self.collect_log_history:
+            await super().close(send_signal)
+            return
+        # override close logic to make sure we catch all logging
+        self._close_called = True
+        if send_signal and self.returncode is None:
+            self.proc.send_signal(SIGINT)
+        if self.proc.stdin and not self.proc.stdin.is_closing():
+            self.proc.stdin.close()
+            await asyncio.sleep(0)  # yield to loop
+        # abort existing readers on stdout first before we send communicate
+        if self.proc.stdout:
+            if self.proc.stdout._waiter is not None:
+                with suppress(asyncio.exceptions.InvalidStateError):
+                    self.proc.stdout._waiter.set_exception(asyncio.CancelledError())
+            # read reamaing bytes to unblock pipe
+            await self.read(-1)
+        # wait for log task to complete that reads the remaining data from stderr
+        with suppress(TimeoutError):
+            await asyncio.wait_for(self._logger_task, 5)
+        await super().close(False)
+
+    async def _log_reader_task(self) -> None:
+        """Read ffmpeg log from stderr."""
+        async for line in self.iter_stderr():
+            if self.collect_log_history:
+                self.log_history.append(line)
+            if "error" in line or "warning" in line:
+                self.logger.warning(line)
+            elif "critical" in line:
+                self.logger.critical(line)
+            else:
+                self.logger.log(VERBOSE_LOG_LEVEL, line)
+
+            # if streamdetails contenttype is unknown, try parse it from the ffmpeg log
+            if line.startswith("Stream #0:0: Audio: "):
+                if self.input_format.content_type == ContentType.UNKNOWN:
+                    content_type_raw = line.split("Stream #0:0: Audio: ")[1].split(" ")[0]
+                    content_type = ContentType.try_parse(content_type_raw)
+                    self.logger.info(
+                        "Detected (input) content type: %s (%s)", content_type, content_type_raw
+                    )
+                    self.input_format.content_type = content_type
+            del line
+
+    async def _feed_stdin(self) -> None:
+        """Feed stdin with audio chunks from an AsyncGenerator."""
+        if TYPE_CHECKING:
+            self.audio_input: AsyncGenerator[bytes, None]
+        async for chunk in self.audio_input:
+            await self.write(chunk)
+        # write EOF once we've reached the end of the input stream
+        await self.write_eof()
 
 
 async def crossfade_pcm_parts(
@@ -151,26 +226,24 @@ async def crossfade_pcm_parts(
         fmt,
         "-",
     ]
-    async with AsyncProcess(args, stdin=True, stdout=True) as proc:
-        crossfade_data, _ = await proc.communicate(fade_in_part)
-        if crossfade_data:
-            LOGGER.log(
-                5,
-                "crossfaded 2 pcm chunks. fade_in_part: %s - "
-                "fade_out_part: %s - fade_length: %s seconds",
-                len(fade_in_part),
-                len(fade_out_part),
-                fade_length,
-            )
-            return crossfade_data
-        # no crossfade_data, return original data instead
-        LOGGER.debug(
-            "crossfade of pcm chunks failed: not enough data? "
-            "fade_in_part: %s - fade_out_part: %s",
+    _returncode, crossfaded_audio, _stderr = await communicate(args, fade_in_part)
+    if crossfaded_audio:
+        LOGGER.log(
+            5,
+            "crossfaded 2 pcm chunks. fade_in_part: %s - "
+            "fade_out_part: %s - fade_length: %s seconds",
             len(fade_in_part),
             len(fade_out_part),
+            fade_length,
         )
-        return fade_out_part + fade_in_part
+        return crossfaded_audio
+    # no crossfade_data, return original data instead
+    LOGGER.debug(
+        "crossfade of pcm chunks failed: not enough data? " "fade_in_part: %s - fade_out_part: %s",
+        len(fade_in_part),
+        len(fade_out_part),
+    )
+    return fade_out_part + fade_in_part
 
 
 async def strip_silence(
@@ -208,8 +281,7 @@ async def strip_silence(
         ]
     # output args
     args += ["-f", fmt, "-"]
-    async with AsyncProcess(args, stdin=True, stdout=True) as proc:
-        stripped_data, _ = await proc.communicate(audio_data)
+    _returncode, stripped_data, _stderr = await communicate(args, audio_data)
 
     # return stripped audio
     bytes_stripped = len(audio_data) - len(stripped_data)
@@ -643,7 +715,7 @@ async def get_ffmpeg_stream(
     extra_args: list[str] | None = None,
     chunk_size: int | None = None,
     extra_input_args: list[str] | None = None,
-    name: str = "ffmpeg",
+    logger: logging.Logger | None = None,
 ) -> AsyncGenerator[bytes, None]:
     """
     Get the ffmpeg audio stream as async generator.
@@ -658,7 +730,7 @@ async def get_ffmpeg_stream(
         filter_params=filter_params,
         extra_args=extra_args,
         extra_input_args=extra_input_args,
-        name=name,
+        logger=logger,
     ) as ffmpeg_proc:
         # read final chunks from stdout
         iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any()
@@ -895,11 +967,28 @@ def get_ffmpeg_args(
         output_args = ["-f", "wav", output_path]
     else:
         # use explicit format identifier for all other
-        output_args = ["-f", output_format.content_type.value, output_path]
+        output_args = [
+            "-f",
+            output_format.content_type.value,
+            "-ar",
+            str(output_format.sample_rate),
+            output_path,
+        ]
 
-    # prefer libsoxr high quality resampler (if present) for sample rate conversions
-    if input_format.sample_rate != output_format.sample_rate and libsoxr_support:
-        filter_params.append("aresample=resampler=soxr")
+    # determine if we need to do resampling
+    if (
+        input_format.sample_rate != output_format.sample_rate
+        or input_format.bit_depth != output_format.bit_depth
+    ):
+        # prefer resampling with libsoxr due to its high quality
+        resample_filter = f'aresample=resampler={"soxr" if libsoxr_support else "swr"}'
+        if output_format.bit_depth < input_format.bit_depth:
+            # apply dithering when going down to 16 bits
+            resample_filter += ":osf=s16:dither_method=triangular_hp"
+        if not output_format.content_type.is_pcm():
+            # specify sample rate if output format is not pcm
+            resample_filter += f":osr={output_format.sample_rate}"
+        filter_params.append(resample_filter)
 
     if filter_params and "-filter_complex" not in extra_args:
         extra_args += ["-af", ",".join(filter_params)]
@@ -924,4 +1013,5 @@ def parse_loudnorm(raw_stderr: bytes | str) -> LoudnessMeasurement | None:
         true_peak=float(loudness_data["input_tp"]),
         lra=float(loudness_data["input_lra"]),
         threshold=float(loudness_data["input_thresh"]),
+        target_offset=float(loudness_data["target_offset"]),
     )
index a15148d41d8fa4118ab3dd383783b8698aa633a6..9da98c67ded3c9b5be304deadce8c4cf6ddf017b 100644 (file)
@@ -2,6 +2,7 @@
 
 from __future__ import annotations
 
+import asyncio
 from typing import TYPE_CHECKING, Any
 
 import aiosqlite
@@ -172,6 +173,7 @@ class DatabaseConnection:
                 yield item
             if len(next_items) < limit:
                 break
+            await asyncio.sleep(0)  # yield to eventloop
             offset += limit
 
     async def vacuum(self) -> None:
index ca6e7139b4532a553069e046dbc5b312dce59de6..83cb322681db5a487d283348f98a9c3456d00bcc 100644 (file)
@@ -17,7 +17,7 @@ from collections.abc import AsyncGenerator
 from contextlib import suppress
 from signal import SIGINT
 from types import TracebackType
-from typing import TYPE_CHECKING
+from typing import Self
 
 from music_assistant.constants import MASS_LOGGER_NAME
 
@@ -41,9 +41,9 @@ class AsyncProcess:
     def __init__(
         self,
         args: list[str],
-        stdin: bool | int | AsyncGenerator[bytes, None] | None = None,
+        stdin: bool | int | None = None,
         stdout: bool | int | None = None,
-        stderr: bool | int | None = None,
+        stderr: bool | int | None = False,
         name: str | None = None,
     ) -> None:
         """Initialize AsyncProcess."""
@@ -51,7 +51,6 @@ class AsyncProcess:
         if name is None:
             name = args[0].split(os.sep)[-1]
         self.name = name
-        self.attached_tasks: list[asyncio.Task] = []
         self.logger = LOGGER.getChild(name)
         self._args = args
         self._stdin = None if stdin is False else stdin
@@ -76,7 +75,7 @@ class AsyncProcess:
             self._returncode = ret_code
         return ret_code
 
-    async def __aenter__(self) -> AsyncProcess:
+    async def __aenter__(self) -> Self:
         """Enter context manager."""
         await self.start()
         return self
@@ -96,9 +95,7 @@ class AsyncProcess:
         """Perform Async init of process."""
         self.proc = await asyncio.create_subprocess_exec(
             *self._args,
-            stdin=asyncio.subprocess.PIPE
-            if (self._stdin is True or isinstance(self._stdin, AsyncGenerator))
-            else self._stdin,
+            stdin=asyncio.subprocess.PIPE if self._stdin is True else self._stdin,
             stdout=asyncio.subprocess.PIPE if self._stdout is True else self._stdout,
             stderr=asyncio.subprocess.PIPE if self._stderr is True else self._stderr,
             # because we're exchanging big amounts of (audio) data with pipes
@@ -107,12 +104,10 @@ class AsyncProcess:
             pipesize=1000000,
         )
         self.logger.debug("Process %s started with PID %s", self.name, self.proc.pid)
-        if isinstance(self._stdin, AsyncGenerator):
-            self.attached_tasks.append(asyncio.create_task(self._feed_stdin()))
 
     async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
         """Yield chunks of n size from the process stdout."""
-        while not self._close_called:
+        while True:
             chunk = await self.readexactly(n)
             if chunk == b"":
                 break
@@ -120,7 +115,7 @@ class AsyncProcess:
 
     async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
         """Yield chunks as they come in from process stdout."""
-        while not self._close_called:
+        while True:
             chunk = await self.read(n)
             if chunk == b"":
                 break
@@ -145,7 +140,8 @@ class AsyncProcess:
     async def write(self, data: bytes) -> None:
         """Write data to process stdin."""
         if self._close_called:
-            raise RuntimeError("write called while process already done")
+            self.logger.warning("write called while process already done")
+            return
         self.proc.stdin.write(data)
         with suppress(BrokenPipeError, ConnectionResetError):
             await self.proc.stdin.drain()
@@ -167,34 +163,55 @@ class AsyncProcess:
             # already exited, race condition
             pass
 
-    async def close(self, send_signal: bool = False) -> int:
+    async def read_stderr(self) -> bytes:
+        """Read line from stderr."""
+        try:
+            return await self.proc.stderr.readline()
+        except ValueError as err:
+            # we're waiting for a line (separator found), but the line was too big
+            # this may happen with ffmpeg during a long (radio) stream where progress
+            # gets outputted to the stderr but no newline
+            # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit
+            # NOTE: this consumes the line that was too big
+            if "chunk exceed the limit" in str(err):
+                return await self.proc.stderr.readline()
+            # raise for all other (value) errors
+            raise
+
+    async def iter_stderr(self) -> AsyncGenerator[str, None]:
+        """Iterate lines from the stderr stream as string."""
+        while True:
+            line = await self.read_stderr()
+            if line == b"":
+                break
+            line = line.decode().strip()
+            if not line:
+                continue
+            yield line
+
+    async def close(self, send_signal: bool = False) -> None:
         """Close/terminate the process and wait for exit."""
         self._close_called = True
-        # close any/all attached (writer) tasks
-        for task in self.attached_tasks:
-            if not task.done():
-                task.cancel()
         if send_signal and self.returncode is None:
             self.proc.send_signal(SIGINT)
-
+        if self.proc.stdin and not self.proc.stdin.is_closing():
+            self.proc.stdin.close()
+            await asyncio.sleep(0)  # yield to loop
         # abort existing readers on stderr/stdout first before we send communicate
         if self.proc.stdout and self.proc.stdout._waiter is not None:
-            self.proc.stdout._waiter.set_exception(asyncio.CancelledError())
-            self.proc.stdout._waiter = None
+            with suppress(asyncio.exceptions.InvalidStateError):
+                self.proc.stdout._waiter.set_exception(asyncio.CancelledError())
         if self.proc.stderr and self.proc.stderr._waiter is not None:
-            self.proc.stderr._waiter.set_exception(asyncio.CancelledError())
-            self.proc.stderr._waiter = None
+            with suppress(asyncio.exceptions.InvalidStateError):
+                self.proc.stderr._waiter.set_exception(asyncio.CancelledError())
 
         # make sure the process is really cleaned up.
         # especially with pipes this can cause deadlocks if not properly guarded
         # we need to ensure stdout and stderr are flushed and stdin closed
-        while True:
+        while self.returncode is None:
             try:
-                async with asyncio.timeout(5):
-                    # use communicate to flush all pipe buffers
-                    await self.proc.communicate()
-                    if self.returncode is not None:
-                        break
+                # use communicate to flush all pipe buffers
+                await asyncio.wait_for(self.proc.communicate(), 5)
             except TimeoutError:
                 self.logger.debug(
                     "Process %s with PID %s did not stop in time. Sending terminate...",
@@ -208,7 +225,6 @@ class AsyncProcess:
             self.proc.pid,
             self.returncode,
         )
-        return self.returncode
 
     async def wait(self) -> int:
         """Wait for the process and return the returncode."""
@@ -216,59 +232,9 @@ class AsyncProcess:
             self._returncode = await self.proc.wait()
         return self._returncode
 
-    async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]:
-        """Write bytes to process and read back results."""
-        stdout, stderr = await self.proc.communicate(input_data)
-        self._returncode = self.proc.returncode
-        return (stdout, stderr)
-
-    async def _feed_stdin(self) -> None:
-        """Feed stdin with chunks from an AsyncGenerator."""
-        if TYPE_CHECKING:
-            self._stdin: AsyncGenerator[bytes, None]
-        try:
-            async for chunk in self._stdin:
-                if self._close_called or self.proc.stdin.is_closing():
-                    return
-                await self.write(chunk)
-            await self.write_eof()
-        except Exception as err:
-            if not isinstance(err, asyncio.CancelledError):
-                self.logger.exception(err)
-            # make sure the stdin generator is also properly closed
-            # by propagating a cancellederror within
-            task = asyncio.create_task(self._stdin.__anext__())
-            task.cancel()
-
-    async def read_stderr(self) -> bytes:
-        """Read line from stderr."""
-        try:
-            return await self.proc.stderr.readline()
-        except ValueError as err:
-            # we're waiting for a line (separator found), but the line was too big
-            # this may happen with ffmpeg during a long (radio) stream where progress
-            # gets outputted to the stderr but no newline
-            # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit
-            # NOTE: this consumes the line that was too big
-            if "chunk exceed the limit" in str(err):
-                return await self.proc.stderr.readline()
-            # raise for all other (value) errors
-            raise
-
-    async def iter_stderr(self) -> AsyncGenerator[str, None]:
-        """Iterate lines from the stderr stream as string."""
-        while True:
-            line = await self.read_stderr()
-            if line == b"":
-                break
-            line = line.decode().strip()
-            if not line:
-                continue
-            yield line
-
 
 async def check_output(args: str | list[str]) -> tuple[int, bytes]:
-    """Run subprocess and return output."""
+    """Run subprocess and return returncode and output."""
     if isinstance(args, str):
         proc = await asyncio.create_subprocess_shell(
             args,
@@ -283,3 +249,26 @@ async def check_output(args: str | list[str]) -> tuple[int, bytes]:
         )
     stdout, _ = await proc.communicate()
     return (proc.returncode, stdout)
+
+
+async def communicate(
+    args: str | list[str],
+    input: bytes | None = None,  # noqa: A002
+) -> tuple[int, bytes, bytes]:
+    """Communicate with subprocess and return returncode, stdout and stderr output."""
+    if isinstance(args, str):
+        proc = await asyncio.create_subprocess_shell(
+            args,
+            stderr=asyncio.subprocess.PIPE,
+            stdout=asyncio.subprocess.PIPE,
+            stdin=asyncio.subprocess.PIPE if input is not None else None,
+        )
+    else:
+        proc = await asyncio.create_subprocess_exec(
+            *args,
+            stderr=asyncio.subprocess.PIPE,
+            stdout=asyncio.subprocess.PIPE,
+            stdin=asyncio.subprocess.PIPE if input is not None else None,
+        )
+    stdout, stderr = await proc.communicate(input)
+    return (proc.returncode, stdout, stderr)
index e1b981c84fb7734b5cd9b28156d45bb9a9b4c325..04e6a21e38e94d9d8971b5e9d4bbb08e237ea2b3 100644 (file)
@@ -2,6 +2,7 @@
 
 from __future__ import annotations
 
+import asyncio
 from typing import TYPE_CHECKING
 
 from music_assistant.common.models.enums import MediaType, ProviderFeature
@@ -411,6 +412,7 @@ class MusicProvider(Provider):
                             library_item.item_id, prov_item
                         )
                     cur_db_ids.add(library_item.item_id)
+                    await asyncio.sleep(0)  # yield to eventloop
                 except MusicAssistantError as err:
                     self.logger.warning(
                         "Skipping sync of item %s - error details: %s", prov_item.uri, str(err)
@@ -442,6 +444,7 @@ class MusicProvider(Provider):
                         else:
                             # otherwise: just unmark favorite
                             await controller.set_favorite(db_id, False)
+                await asyncio.sleep(0)  # yield to eventloop
             await self.mass.cache.set(cache_key, list(cur_db_ids))
 
     # DO NOT OVERRIDE BELOW
index c85d0e3527a8fcd9384fc3bed7ba36a0138d7cb1..1147d72965d0c4b09a78cf03595e4130b83fc355 100644 (file)
@@ -13,6 +13,7 @@ from music_assistant.common.models.config_entries import (
     CONF_ENTRY_ANNOUNCE_VOLUME_MIN,
     CONF_ENTRY_ANNOUNCE_VOLUME_STRATEGY,
     CONF_ENTRY_AUTO_PLAY,
+    CONF_ENTRY_FLOW_MODE,
     CONF_ENTRY_HIDE_PLAYER,
     CONF_ENTRY_PLAYER_ICON,
     CONF_ENTRY_PLAYER_ICON_GROUP,
@@ -48,6 +49,7 @@ class PlayerProvider(Provider):
         """Return all (provider/player specific) Config Entries for the given player (if any)."""
         entries = (
             CONF_ENTRY_PLAYER_ICON,
+            CONF_ENTRY_FLOW_MODE,
             CONF_ENTRY_VOLUME_NORMALIZATION,
             CONF_ENTRY_AUTO_PLAY,
             CONF_ENTRY_VOLUME_NORMALIZATION_TARGET,
index 0ce6177d2699ba8aeb45c9c077875a1a12f5b5d8..55d0d1fcf004cea0e3a4f582408c90c3ffb24f2f 100644 (file)
@@ -42,11 +42,7 @@ from music_assistant.common.models.media_items import AudioFormat
 from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
 from music_assistant.common.models.player_queue import PlayerQueue
 from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
-from music_assistant.server.helpers.audio import (
-    get_ffmpeg_args,
-    get_ffmpeg_stream,
-    get_player_filter_params,
-)
+from music_assistant.server.helpers.audio import FFMpeg, get_ffmpeg_stream, get_player_filter_params
 from music_assistant.server.helpers.process import AsyncProcess, check_output
 from music_assistant.server.models.player_provider import PlayerProvider
 
@@ -204,6 +200,11 @@ class AirplayStream:
         self._started = asyncio.Event()
         self._stopped = False
 
+    @property
+    def running(self) -> bool:
+        """Return boolean if this stream is running."""
+        return not self._stopped and self._started.is_set()
+
     async def start(self, start_ntp: int, wait_start: int = 1000) -> None:
         """Initialize CLIRaop process for a player."""
         extra_args = []
@@ -226,6 +227,23 @@ class AirplayStream:
         elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
             extra_args += ["-debug", "10"]
 
+        # create os pipes to pipe ffmpeg to cliraop
+        read, write = await asyncio.to_thread(os.pipe)
+
+        # ffmpeg handles the player specific stream + filters and pipes
+        # audio to the cliraop process
+        self._ffmpeg_proc = FFMpeg(
+            audio_input="-",
+            input_format=self.input_format,
+            output_format=AIRPLAY_PCM_FORMAT,
+            filter_params=get_player_filter_params(self.mass, player_id),
+            audio_output=write,
+            logger=self.airplay_player.logger.getChild("ffmpeg"),
+        )
+        await self._ffmpeg_proc.start()
+        await asyncio.to_thread(os.close, write)
+
+        # cliraop is the binary that handles the actual raop streaming to the player
         cliraop_args = [
             self.prov.cliraop_bin,
             "-ntpstart",
@@ -246,30 +264,9 @@ class AirplayStream:
             self.airplay_player.address,
             "-",
         ]
+        self._cliraop_proc = AsyncProcess(cliraop_args, stdin=read, stderr=True, name="cliraop")
         if platform.system() == "Darwin":
             os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
-
-        # ffmpeg handles the player specific stream + filters and pipes
-        # audio to the cliraop process
-        read, write = await asyncio.to_thread(os.pipe)
-        ffmpeg_args = get_ffmpeg_args(
-            input_format=self.input_format,
-            output_format=AIRPLAY_PCM_FORMAT,
-            filter_params=get_player_filter_params(self.mass, player_id),
-            loglevel="fatal",
-        )
-        self._ffmpeg_proc = AsyncProcess(
-            ffmpeg_args,
-            stdin=True,
-            stdout=write,
-            name="cliraop_ffmpeg",
-        )
-        await self._ffmpeg_proc.start()
-        await asyncio.to_thread(os.close, write)
-
-        self._cliraop_proc = AsyncProcess(
-            cliraop_args, stdin=read, stdout=False, stderr=True, name="cliraop"
-        )
         await self._cliraop_proc.start()
         await asyncio.to_thread(os.close, read)
         self._started.set()
@@ -388,7 +385,6 @@ class AirplayStream:
             logger.log(VERBOSE_LOG_LEVEL, line)
 
         # if we reach this point, the process exited
-        self.running = False
         if airplay_player.active_stream == self:
             mass_player.state = PlayerState.IDLE
             self.mass.players.update(airplay_player.player_id)
@@ -562,6 +558,9 @@ class AirplayProvider(PlayerProvider):
             for airplay_player in self._get_sync_clients(player_id):
                 if airplay_player.active_stream:
                     tg.create_task(airplay_player.active_stream.stop())
+                if mass_player := self.mass.players.get(airplay_player.player_id):
+                    mass_player.state = PlayerState.IDLE
+                    self.mass.players.update(mass_player.player_id)
 
     async def cmd_play(self, player_id: str) -> None:
         """Send PLAY (unpause) command to given player.
@@ -685,7 +684,7 @@ class AirplayProvider(PlayerProvider):
         # get current ntp and start cliraop
         _, stdout = await check_output(f"{self.cliraop_bin} -ntp")
         start_ntp = int(stdout.strip())
-        wait_start = 1000 + (500 * len(sync_clients))
+        wait_start = 1250 + (250 * len(sync_clients))
         async with asyncio.TaskGroup() as tg:
             for airplay_player in self._get_sync_clients(player_id):
                 tg.create_task(airplay_player.active_stream.start(start_ntp, wait_start))
@@ -739,7 +738,9 @@ class AirplayProvider(PlayerProvider):
             # so we debounce the resync a bit here with a timer
             self.mass.call_later(
                 1,
-                self.mass.player_queues.resume(active_queue.queue_id, fade_in=False),
+                self.mass.player_queues.resume,
+                active_queue.queue_id,
+                fade_in=False,
                 task_id=f"resume_{active_queue.queue_id}",
             )
         else:
@@ -774,14 +775,10 @@ class AirplayProvider(PlayerProvider):
 
         async def check_binary(cliraop_path: str) -> str | None:
             try:
-                cliraop = await asyncio.create_subprocess_exec(
-                    *[cliraop_path, "-check"],
-                    stdout=asyncio.subprocess.PIPE,
-                    stderr=asyncio.subprocess.STDOUT,
+                returncode, output = await check_output(
+                    [cliraop_path, "-check"],
                 )
-                stdout, _ = await cliraop.communicate()
-                stdout = stdout.strip().decode()
-                if cliraop.returncode == 0 and stdout == "cliraop check":
+                if returncode == 0 and output.strip().decode() == "cliraop check":
                     self.cliraop_bin = cliraop_path
                     return cliraop_path
             except OSError:
index fd5511e8da3c43c139ed991564aa6b3a2a3b52b6..564796d71455338950759d79f9b15621623f8734 100644 (file)
@@ -71,7 +71,7 @@ PLAYER_CONFIG_ENTRIES = (
     CONF_ENTRY_CROSSFADE_DURATION,
 )
 
-MASS_APP_ID = "46C1A819"  # use the cast receiver app from philippe44 for now until we get our own
+MASS_APP_ID = "C35B0678"
 
 
 # Monkey patch the Media controller here to store the queue items
@@ -491,11 +491,7 @@ class ChromecastProvider(PlayerProvider):
             castplayer.player.elapsed_time = status.current_time
 
         # active source
-        if (
-            status.content_id
-            and self.mass.streams.base_url in status.content_id
-            and castplayer.player_id in status.content_id
-        ):
+        if castplayer.cc.app_id == MASS_APP_ID:
             castplayer.player.active_source = castplayer.player_id
         else:
             castplayer.player.active_source = castplayer.cc.app_display_name
index ba57c40aaca2aa41c53e8b237510e080490f2cd1..baf575b67ae7db42aaf336091fc12887dfa44997 100644 (file)
@@ -295,6 +295,7 @@ class FileSystemProviderBase(MusicProvider):
                     provider=self.instance_id,
                     name=item.name,
                 )
+            await asyncio.sleep(0)  # yield to eventloop
 
     async def sync_library(self, media_types: tuple[MediaType, ...]) -> None:
         """Run library sync for this provider."""
@@ -308,6 +309,7 @@ class FileSystemProviderBase(MusicProvider):
                     if x.provider_instance == self.instance_id
                 )
                 prev_checksums[file_name] = db_item.metadata.checksum
+                await asyncio.sleep(0)  # yield to eventloop
 
         # process all deleted (or renamed) files first
         cur_filenames = set()
@@ -320,6 +322,7 @@ class FileSystemProviderBase(MusicProvider):
                 # unsupported file extension
                 continue
             cur_filenames.add(item.path)
+            await asyncio.sleep(0)  # yield to eventloop
         # work out deletions
         deleted_files = set(prev_checksums.keys()) - cur_filenames
         await self._process_deletions(deleted_files)
index 2ba4ea5d0090ad29705a36d9605ea1383a838514..9178fc0f5e5b6b2a89dfe12276996dcb8b5de15d 100644 (file)
@@ -2,7 +2,6 @@
 
 from __future__ import annotations
 
-import asyncio
 import platform
 from collections.abc import AsyncGenerator
 from typing import TYPE_CHECKING
@@ -14,6 +13,7 @@ from music_assistant.common.models.errors import LoginFailed
 from music_assistant.common.models.streamdetails import StreamDetails
 from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME
 from music_assistant.server.helpers.audio import get_file_stream
+from music_assistant.server.helpers.process import check_output
 from music_assistant.server.providers.filesystem_local import (
     CONF_ENTRY_MISSING_ALBUM_ARTIST,
     LocalFileSystemProvider,
@@ -204,21 +204,13 @@ class SMBFileSystemProvider(LocalFileSystemProvider):
         self.logger.info("Mounting //%s/%s%s to %s", server, share, subfolder, self.base_path)
         self.logger.debug("Using mount command: %s", mount_cmd.replace(password, "########"))
 
-        proc = await asyncio.create_subprocess_shell(
-            mount_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
-        )
-        _, stderr = await proc.communicate()
-        if proc.returncode != 0:
-            msg = f"SMB mount failed with error: {stderr.decode()}"
+        returncode, output = await check_output(mount_cmd)
+        if returncode != 0:
+            msg = f"SMB mount failed with error: {output.decode()}"
             raise LoginFailed(msg)
 
     async def unmount(self, ignore_error: bool = False) -> None:
         """Unmount the remote share."""
-        proc = await asyncio.create_subprocess_shell(
-            f"umount {self.base_path}",
-            stdout=asyncio.subprocess.PIPE,
-            stderr=asyncio.subprocess.PIPE,
-        )
-        _, stderr = await proc.communicate()
-        if proc.returncode != 0 and not ignore_error:
-            self.logger.warning("SMB unmount failed with error: %s", stderr.decode())
+        returncode, output = await check_output(f"umount {self.base_path}")
+        if returncode != 0 and not ignore_error:
+            self.logger.warning("SMB unmount failed with error: %s", output.decode())
index c4d3dacfc9d593f65d220750e7732196cb32cdf6..e15fc6999bf84034036262244ffe03c34a6f7b1f 100644 (file)
@@ -533,7 +533,9 @@ class SlimprotoProvider(PlayerProvider):
             # so we debounce the resync a bit here with a timer
             self.mass.call_later(
                 1,
-                self.mass.player_queues.resume(active_queue.queue_id, fade_in=False),
+                self.mass.player_queues.resume,
+                active_queue.queue_id,
+                fade_in=False,
                 task_id=f"resume_{active_queue.queue_id}",
             )
         else:
index a180dc9696e1b55d4ad57046acde9921cd331f05..330933f6514d9153b6bea8c057aa7ea3657e488d 100644 (file)
@@ -380,8 +380,8 @@ class SnapCastProvider(PlayerProvider):
                     input_format=input_format,
                     output_format=DEFAULT_SNAPCAST_FORMAT,
                     filter_params=get_player_filter_params(self.mass, player_id),
-                    name="snapcast_ffmpeg",
                     audio_output=f"tcp://{host}:{port}",
+                    logger=self.logger.getChild("ffmpeg"),
                 ) as ffmpeg_proc:
                     await ffmpeg_proc.wait()
                     # we need to wait a bit for the stream status to become idle
@@ -505,7 +505,7 @@ class SnapCastProvider(PlayerProvider):
             "--tcp.enabled=true",
             "--tcp.port=1705",
         ]
-        async with AsyncProcess(args, stdin=False, stdout=True, stderr=False) as snapserver_proc:
+        async with AsyncProcess(args, stdout=True, name="snapserver") as snapserver_proc:
             # keep reading from stdout until exit
             async for data in snapserver_proc.iter_any():
                 data = data.decode().strip()  # noqa: PLW2901
index 9bf688ae19fa651ea1b2f2d497176a60316922d7..85fff75898eb297f09c298a9d3f668754709b15d 100644 (file)
@@ -47,7 +47,7 @@ from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME
 from music_assistant.server.helpers.app_vars import app_var
 
 # pylint: enable=no-name-in-module
-from music_assistant.server.helpers.process import AsyncProcess
+from music_assistant.server.helpers.process import AsyncProcess, check_output
 from music_assistant.server.models.music_provider import MusicProvider
 
 if TYPE_CHECKING:
@@ -430,7 +430,7 @@ class SpotifyProvider(MusicProvider):
         if self._ap_workaround:
             args += ["--ap-port", "12345"]
         bytes_sent = 0
-        async with AsyncProcess(args, stdout=True) as librespot_proc:
+        async with AsyncProcess(args, stdout=True, name="librespot") as librespot_proc:
             async for chunk in librespot_proc.iter_any():
                 yield chunk
                 bytes_sent += len(chunk)
@@ -677,9 +677,8 @@ class SpotifyProvider(MusicProvider):
         ]
         if self._ap_workaround:
             args += ["--ap-port", "12345"]
-        async with AsyncProcess(args, stdout=True) as librespot:
-            stdout = await librespot.read(-1)
-        if stdout.decode().strip() != "authorized":
+        _returncode, output = await check_output(args)
+        if _returncode == 0 and output.decode().strip() != "authorized":
             raise LoginFailed(f"Login failed for username {self.config.get_value(CONF_USERNAME)}")
         # get token with (authorized) librespot
         scopes = [
@@ -713,16 +712,15 @@ class SpotifyProvider(MusicProvider):
         ]
         if self._ap_workaround:
             args += ["--ap-port", "12345"]
-        async with AsyncProcess(args, stdout=True) as librespot:
-            stdout = await librespot.read(-1)
+        _returncode, output = await check_output(args)
         duration = round(time.time() - time_start, 2)
         try:
-            result = json.loads(stdout)
+            result = json.loads(output)
         except JSONDecodeError:
             self.logger.warning(
                 "Error while retrieving Spotify token after %s seconds, details: %s",
                 duration,
-                stdout.decode(),
+                output.decode(),
             )
             return None
         self.logger.debug(
@@ -847,15 +845,8 @@ class SpotifyProvider(MusicProvider):
 
         async def check_librespot(librespot_path: str) -> str | None:
             try:
-                librespot = await asyncio.create_subprocess_exec(
-                    *[librespot_path, "--check"], stdout=asyncio.subprocess.PIPE
-                )
-                stdout, _ = await librespot.communicate()
-                if (
-                    librespot.returncode == 0
-                    and b"ok spotty" in stdout
-                    and b"using librespot" in stdout
-                ):
+                returncode, output = await check_output([librespot_path, "--check"])
+                if returncode == 0 and b"ok spotty" in output and b"using librespot" in output:
                     self._librespot_bin = librespot_path
                     return librespot_path
             except OSError: