From: marcelveldt Date: Thu, 17 Oct 2019 16:01:26 +0000 (+0200) Subject: Update http_streamer.py X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=1b2cc9de52fe1557460c1b8b1989858881a8fdf9;p=music-assistant-server.git Update http_streamer.py --- diff --git a/music_assistant/http_streamer.py b/music_assistant/http_streamer.py index 4db3aae2..9f5b6fd4 100755 --- a/music_assistant/http_streamer.py +++ b/music_assistant/http_streamer.py @@ -19,7 +19,6 @@ from .utils import LOGGER, try_parse_int, get_ip, run_async_background_task, run from .models.media_types import TrackQuality, MediaType from .models.playerstate import PlayerState - class HTTPStreamer(): ''' Built-in streamer using sox and webserver ''' @@ -27,11 +26,27 @@ class HTTPStreamer(): self.mass = mass self.local_ip = get_ip() self.analyze_jobs = {} + self.stream_clients = [] async def setup(self): ''' async initialize of module ''' - # TODO: cleanup of cache files etc. pass + # self.mass.event_loop.create_task( + # asyncio.start_server(self.sockets_streamer, '0.0.0.0', 8093)) + + async def webplayer(self, http_request): + ''' + start stream for a player + ''' + from .models import Player + player_id = http_request.match_info.get('player_id') + player = Player(self.mass, player_id, 'web') + player.name = player_id + await self.mass.players.add_player(player) + # wait for queue + while not player.queue.items: + await asyncio.sleep(0.2) + return await self.stream(http_request) async def stream(self, http_request): ''' @@ -40,44 +55,46 @@ class HTTPStreamer(): # make sure we have a valid player player_id = http_request.match_info.get('player_id','') player = await self.mass.players.get_player(player_id) - if not player: - LOGGER.error("Received stream request for non-existing player %s" %(player_id)) - return - queue_item_id = http_request.match_info.get('queue_item_id') - queue_item = await player.queue.by_item_id(queue_item_id) + assert(player) # prepare headers as audio/flac content - resp = web.StreamResponse(status=200, reason='OK', headers={'Content-Type': 'audio/flac'}) + resp = web.StreamResponse(status=200, reason='OK', + headers={'Content-Type': 'audio/flac'}) await resp.prepare(http_request) # send content only on GET request - if http_request.method.upper() != 'HEAD': - # stream audio - buf_queue = asyncio.Queue() - cancelled = threading.Event() - if queue_item: - # single stream requested, run stream in executor - bg_task = run_async_background_task( - None, - self.__stream_single, player, queue_item, buf_queue, cancelled) - else: - # no item is given, start queue stream, run stream in executor - bg_task = run_async_background_task( - None, - self.__stream_queue, player, buf_queue, cancelled) - try: - while True: - chunk = await buf_queue.get() - if not chunk: - buf_queue.task_done() - break - await resp.write(chunk) + if http_request.method.upper() != 'GET': + return resp + # stream audio + buf_queue = asyncio.Queue() + cancelled = threading.Event() + if player.queue.use_queue_stream: + # use queue stream + bg_task = run_async_background_task( + None, + self.__stream_queue, player, buf_queue, cancelled) + else: + # single track stream + queue_item_id = http_request.match_info.get('queue_item_id') + queue_item = await player.queue.by_item_id(queue_item_id) + assert(queue_item) + bg_task = run_async_background_task( + None, + self.__stream_single, player, queue_item, buf_queue, cancelled) + # let the streaming begin! + try: + while True: + chunk = await buf_queue.get() + if not chunk: buf_queue.task_done() - except (asyncio.CancelledError, asyncio.TimeoutError): - LOGGER.debug("stream interrupted") - cancelled.set() - # wait for bg_task - await asyncio.gather(bg_task) - del buf_queue - raise asyncio.CancelledError() + break + await resp.write(chunk) + buf_queue.task_done() + except (asyncio.CancelledError, asyncio.TimeoutError): + LOGGER.debug("stream interrupted") + cancelled.set() + # wait for bg_task to finish + await asyncio.gather(bg_task) + del buf_queue + raise asyncio.CancelledError() return resp async def __stream_single(self, player, queue_item, buffer, cancelled): @@ -337,7 +354,8 @@ class HTTPStreamer(): asyncio.run_coroutine_threadsafe( self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails), self.mass.event_loop) # send task to background to analyse the audio - asyncio.ensure_future(self.__analyze_audio(queue_item), loop=self.mass.event_loop) + self.mass.event_loop.call_soon_threadsafe( + asyncio.ensure_future, self.__analyze_audio(queue_item)) async def __get_player_sox_options(self, player, queue_item): ''' get player specific sox effect options ''' @@ -421,3 +439,56 @@ class HTTPStreamer(): fadeinfile.close() fadeoutfile.close() return crossfade_part + + async def start_stream(self, clients_needed): + # wait for clients + print("wait for clients...") + track = asyncio.run_coroutine_threadsafe( + self.mass.music.track('2741', provider='database'), + self.mass.event_loop).result() + player_id = '1523403a-4cc4-f151-29d1-758822807128' + player = self.mass.players._players[player_id] + cancelled = threading.Event() + # wait for clients + while len(self.stream_clients) < clients_needed: + await asyncio.sleep(0.1) + # start streaming + while self.stream_clients: + audio_stream = self.__get_audio_stream(player, track, cancelled) + async for is_last_chunk, audio_chunk in audio_stream: + for client in self.stream_clients: + try: + client.write(audio_chunk) + await client.drain() + except ConnectionResetError: + print('client disconnected') + client.close() + self.stream_clients.remove(client) + await asyncio.sleep(1) + print("all clients disconnected") + return + + async def add_client(self, client_writer, client_msg): + print("new client connected!") + for line in client_msg.decode().split('\r\n'): + print(line) + msg = 'HTTP/1.0 200 OK\r\n' + msg += "Content-Type: audio/flac\r\n" + msg += "Transfer-Encoding: chunked\r\n\r\n" + client_writer.write(msg.encode()) + await client_writer.drain() + self.stream_clients.append(client_writer) + if len(self.stream_clients) == 1: + bg_task = run_async_background_task( + None, + self.start_stream, 2) + + async def sockets_streamer(self, reader, writer): + while True: + request = await reader.read(1024) + if request: + await self.add_client(writer, request) + else: + print("client lost") + break +