Update http_streamer.py
authormarcelveldt <marcelvanderveldt@MacBook-Silvia.local>
Thu, 17 Oct 2019 16:01:26 +0000 (18:01 +0200)
committermarcelveldt <marcelvanderveldt@MacBook-Silvia.local>
Thu, 17 Oct 2019 16:01:26 +0000 (18:01 +0200)
music_assistant/http_streamer.py

index 4db3aae2ca5078efdf2131fe3bff8b11682df8b4..9f5b6fd461a573b6ff1dddfc4283dddece26ab51 100755 (executable)
@@ -19,7 +19,6 @@ from .utils import LOGGER, try_parse_int, get_ip, run_async_background_task, run
 from .models.media_types import TrackQuality, MediaType
 from .models.playerstate import PlayerState
 
-
 class HTTPStreamer():
     ''' Built-in streamer using sox and webserver '''
     
@@ -27,11 +26,27 @@ class HTTPStreamer():
         self.mass = mass
         self.local_ip = get_ip()
         self.analyze_jobs = {}
+        self.stream_clients = []
 
     async def setup(self):
         ''' async initialize of module '''
-        # TODO: cleanup of cache files etc.
         pass
+        # self.mass.event_loop.create_task(
+        #         asyncio.start_server(self.sockets_streamer, '0.0.0.0', 8093))
+    
+    async def webplayer(self, http_request):
+        ''' 
+            start stream for a player
+        '''
+        from .models import Player
+        player_id = http_request.match_info.get('player_id')
+        player = Player(self.mass, player_id, 'web')
+        player.name = player_id
+        await self.mass.players.add_player(player)
+        # wait for queue
+        while not player.queue.items:
+            await asyncio.sleep(0.2)
+        return await self.stream(http_request)    
     
     async def stream(self, http_request):
         ''' 
@@ -40,44 +55,46 @@ class HTTPStreamer():
         # make sure we have a valid player
         player_id = http_request.match_info.get('player_id','')
         player = await self.mass.players.get_player(player_id)
-        if not player:
-            LOGGER.error("Received stream request for non-existing player %s" %(player_id))
-            return
-        queue_item_id = http_request.match_info.get('queue_item_id')
-        queue_item = await player.queue.by_item_id(queue_item_id)
+        assert(player)
         # prepare headers as audio/flac content
-        resp = web.StreamResponse(status=200, reason='OK', headers={'Content-Type': 'audio/flac'})
+        resp = web.StreamResponse(status=200, reason='OK', 
+                headers={'Content-Type': 'audio/flac'})
         await resp.prepare(http_request)
         # send content only on GET request
-        if http_request.method.upper() != 'HEAD':
-            # stream audio
-            buf_queue = asyncio.Queue()
-            cancelled = threading.Event()
-            if queue_item:
-                # single stream requested, run stream in executor
-                bg_task = run_async_background_task(
-                    None, 
-                    self.__stream_single, player, queue_item, buf_queue, cancelled)
-            else:
-                # no item is given, start queue stream, run stream in executor
-                bg_task = run_async_background_task(
-                    None, 
-                    self.__stream_queue, player, buf_queue, cancelled)
-            try:
-                while True:
-                    chunk = await buf_queue.get()
-                    if not chunk:
-                        buf_queue.task_done()
-                        break
-                    await resp.write(chunk)
+        if http_request.method.upper() != 'GET':
+            return resp
+        # stream audio
+        buf_queue = asyncio.Queue()
+        cancelled = threading.Event()
+        if player.queue.use_queue_stream:
+            # use queue stream
+            bg_task = run_async_background_task(
+                None, 
+                self.__stream_queue, player, buf_queue, cancelled)
+        else:
+            # single track stream
+            queue_item_id = http_request.match_info.get('queue_item_id')
+            queue_item = await player.queue.by_item_id(queue_item_id)
+            assert(queue_item)
+            bg_task = run_async_background_task(
+                None, 
+                self.__stream_single, player, queue_item, buf_queue, cancelled)
+        # let the streaming begin!
+        try:
+            while True:
+                chunk = await buf_queue.get()
+                if not chunk:
                     buf_queue.task_done()
-            except (asyncio.CancelledError, asyncio.TimeoutError):
-                LOGGER.debug("stream interrupted")
-                cancelled.set()
-                # wait for bg_task
-                await asyncio.gather(bg_task)
-                del buf_queue
-                raise asyncio.CancelledError()
+                    break
+                await resp.write(chunk)
+                buf_queue.task_done()
+        except (asyncio.CancelledError, asyncio.TimeoutError):
+            LOGGER.debug("stream interrupted")
+            cancelled.set()
+            # wait for bg_task to finish
+            await asyncio.gather(bg_task)
+            del buf_queue
+            raise asyncio.CancelledError()
         return resp
     
     async def __stream_single(self, player, queue_item, buffer, cancelled):
@@ -337,7 +354,8 @@ class HTTPStreamer():
         asyncio.run_coroutine_threadsafe(
                 self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails), self.mass.event_loop)
         # send task to background to analyse the audio
-        asyncio.ensure_future(self.__analyze_audio(queue_item), loop=self.mass.event_loop)
+        self.mass.event_loop.call_soon_threadsafe(
+                asyncio.ensure_future, self.__analyze_audio(queue_item))
 
     async def __get_player_sox_options(self, player, queue_item):
         ''' get player specific sox effect options '''
@@ -421,3 +439,56 @@ class HTTPStreamer():
         fadeinfile.close()
         fadeoutfile.close()
         return crossfade_part
+
+    async def start_stream(self, clients_needed):
+        # wait for clients
+        print("wait for clients...")
+        track = asyncio.run_coroutine_threadsafe(
+                self.mass.music.track('2741', provider='database'), 
+                self.mass.event_loop).result()
+        player_id = '1523403a-4cc4-f151-29d1-758822807128'
+        player = self.mass.players._players[player_id]
+        cancelled = threading.Event()
+        # wait for clients
+        while len(self.stream_clients) < clients_needed:
+            await asyncio.sleep(0.1)
+        # start streaming
+        while self.stream_clients:
+            audio_stream = self.__get_audio_stream(player, track, cancelled)
+            async for is_last_chunk, audio_chunk in audio_stream:
+                for client in self.stream_clients:
+                    try:
+                        client.write(audio_chunk)
+                        await client.drain()
+                    except ConnectionResetError:
+                        print('client disconnected')
+                        client.close()
+                        self.stream_clients.remove(client)
+            await asyncio.sleep(1)
+        print("all clients disconnected")
+        return        
+
+    async def add_client(self, client_writer, client_msg):
+        print("new client connected!")
+        for line in client_msg.decode().split('\r\n'):
+            print(line)
+        msg = 'HTTP/1.0 200 OK\r\n'
+        msg += "Content-Type: audio/flac\r\n"
+        msg += "Transfer-Encoding: chunked\r\n\r\n"
+        client_writer.write(msg.encode())
+        await client_writer.drain()
+        self.stream_clients.append(client_writer)
+        if len(self.stream_clients) == 1:
+            bg_task = run_async_background_task(
+                    None, 
+                    self.start_stream, 2)
+
+    async def sockets_streamer(self, reader, writer):
+        while True:
+            request = await reader.read(1024)
+            if request:
+                await self.add_client(writer, request)
+            else:
+                print("client lost")
+                break
+