import gc
import io
import logging
-import os
import shlex
from enum import Enum
from typing import AsyncGenerator, List, Optional, Tuple
EVENT_STREAM_ENDED,
EVENT_STREAM_STARTED,
)
-from music_assistant.helpers.encryption import (
- async_decrypt_bytes,
- async_decrypt_string,
- encrypt_bytes,
-)
+from music_assistant.helpers.encryption import async_decrypt_string
from music_assistant.helpers.process import AsyncProcess
from music_assistant.helpers.typing import MusicAssistantType
-from music_assistant.helpers.util import (
- async_yield_chunks,
- create_tempfile,
- get_ip,
- try_parse_int,
-)
+from music_assistant.helpers.util import create_tempfile, get_ip, try_parse_int
from music_assistant.models.streamdetails import ContentType, StreamDetails, StreamType
LOGGER = logging.getLogger("stream_manager")
await asyncio.wait([fill_buffer_task])
- except (GeneratorExit, Exception) as exc: # pylint: disable=broad-except
+ except (
+ GeneratorExit,
+ asyncio.CancelledError,
+ Exception,
+ ) as exc: # pylint: disable=broad-except
cancelled = True
fill_buffer_task.cancel()
LOGGER.debug(
async for chunk in sox_proc.iterate_chunks():
yield chunk
await asyncio.wait([fill_buffer_task])
- except (GeneratorExit, Exception) as exc: # pylint: disable=broad-except
+ except (
+ GeneratorExit,
+ asyncio.CancelledError,
+ Exception,
+ ) as exc: # pylint: disable=broad-except
cancelled = True
fill_buffer_task.cancel()
LOGGER.debug(
audio_data = b""
chunk_size = 512000
- # Handle (optional) caching of audio data
- cache_id = f"{streamdetails.item_id}{streamdetails.provider}"[::-1]
- cache_file = os.path.join(self.mass.config.data_path, ".audio_cache", cache_id)
- if os.path.isfile(cache_file):
- async with AIOFile(cache_file, "rb") as afp:
- audio_data = await afp.read()
- audio_data = await async_decrypt_bytes(audio_data)
- if audio_data:
- stream_type = StreamType.CACHE
-
# support for AAC/MPEG created with ffmpeg in between
if streamdetails.content_type in [ContentType.AAC, ContentType.MPEG]:
stream_type = StreamType.EXECUTABLE
# signal start of stream event
self.mass.signal_event(EVENT_STREAM_STARTED, streamdetails)
- if stream_type == StreamType.CACHE:
- async for chunk in async_yield_chunks(audio_data, chunk_size):
- yield chunk
- elif stream_type == StreamType.URL:
+ if stream_type == StreamType.URL:
async with self.mass.http_session.get(stream_path) as response:
while True:
chunk = await response.content.read(chunk_size)
self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails)
# send analyze job to background worker
- if not stream_type == StreamType.CACHE:
- self.mass.add_job(self.__analyze_audio, streamdetails, audio_data)
+ self.mass.add_job(self.__analyze_audio, streamdetails, audio_data)
def __get_player_sox_options(
self, player_id: str, streamdetails: StreamDetails
return # prevent multiple analyze jobs for same track
self.analyze_jobs[item_key] = True
- # Save cache file to disk
- cache_id = f"{streamdetails.item_id}{streamdetails.provider}"[::-1]
- cache_dir = os.path.join(self.mass.config.data_path, ".audio_cache")
- if not os.path.isdir(cache_dir):
- os.mkdir(cache_dir)
- cache_file = os.path.join(cache_dir, cache_id)
- if not os.path.isfile(cache_file) and len(audio_data) < 100000000:
- with open(cache_file, "wb") as _file:
- _file.write(encrypt_bytes(audio_data))
-
# get track loudness
track_loudness = self.mass.add_job(
self.mass.database.async_get_track_loudness(