Plex: added remote control feature (#2608)
authoranatosun <33899455+anatosun@users.noreply.github.com>
Tue, 25 Nov 2025 08:20:07 +0000 (09:20 +0100)
committerGitHub <noreply@github.com>
Tue, 25 Nov 2025 08:20:07 +0000 (09:20 +0100)
* Plex: moved scrobbling and playback reporting logic to plugin

* Plex connect: introducing new plugin

* Plex Connect: implemented copilot's suggestions

* Plex connect: corrected link to documentation

Co-authored-by: OzGav <gavnosp@hotmail.com>
---------

Co-authored-by: OzGav <gavnosp@hotmail.com>
music_assistant/providers/plex/__init__.py
music_assistant/providers/plex_connect/__init__.py [new file with mode: 0644]
music_assistant/providers/plex_connect/gdm.py [new file with mode: 0644]
music_assistant/providers/plex_connect/icon.svg [new file with mode: 0644]
music_assistant/providers/plex_connect/manifest.json [new file with mode: 0644]
music_assistant/providers/plex_connect/player_remote.py [new file with mode: 0644]

index aaa3317b75daff975330a8327f2cbca0285f444d..21bca99360c9e1b9599a4891ae7183121415e7c3 100644 (file)
@@ -37,7 +37,6 @@ from music_assistant_models.media_items import (
     ItemMapping,
     MediaItem,
     MediaItemImage,
-    MediaItemType,
     Playlist,
     ProviderMapping,
     RecommendationFolder,
@@ -1296,167 +1295,6 @@ class PlexProvider(MusicProvider):
 
         return stream_details
 
-    async def on_streamed(
-        self,
-        streamdetails: StreamDetails,
-    ) -> None:
-        """Handle callback when an item completed streaming."""
-
-        def mark_played() -> None:
-            """Mark the item as played in Plex."""
-            try:
-                item = streamdetails.data
-                if not item:
-                    self.logger.warning("No Plex item data in streamdetails, cannot scrobble")
-                    return
-
-                if not hasattr(item, "ratingKey"):
-                    self.logger.warning(
-                        "Streamdetails data is not a Plex item (missing ratingKey), cannot scrobble"
-                    )
-                    return
-
-                params = {
-                    "key": str(item.ratingKey),
-                    "identifier": "com.plexapp.plugins.library",
-                }
-                self.logger.debug(
-                    "Scrobbling track %s (ratingKey: %s) to Plex",
-                    streamdetails.uri,
-                    item.ratingKey,
-                )
-                self._plex_server.query("/:/scrobble", params=params)
-                self.logger.info("Successfully scrobbled track %s to Plex", streamdetails.uri)
-            except Exception as err:
-                self.logger.exception(
-                    "Failed to scrobble track %s to Plex: %s",
-                    streamdetails.uri,
-                    err,
-                )
-
-        await asyncio.to_thread(mark_played)
-
-    async def on_played(
-        self,
-        media_type: MediaType,
-        prov_item_id: str,
-        fully_played: bool,
-        position: int,
-        media_item: MediaItemType,
-        is_playing: bool = False,
-    ) -> None:
-        """
-        Handle callback when a media item has been played.
-
-        This is called periodically (every 30s) during playback and when playback stops.
-        We use this to send timeline/progress updates to Plex.
-        """
-        if media_type != MediaType.TRACK:
-            # Only handle tracks for now
-            return
-
-        def update_timeline() -> None:
-            """Update Plex timeline with current playback progress."""
-            try:
-                self.logger.debug(
-                    "on_played: prov_item_id=%s, pos=%s, fully_played=%s, is_playing=%s",
-                    prov_item_id,
-                    position,
-                    fully_played,
-                    is_playing,
-                )
-
-                # Extract ratingKey from the key path (e.g., "/library/metadata/12345" -> "12345")
-                # The prov_item_id is the Plex key path, we need the ratingKey for API calls
-                try:
-                    rating_key = prov_item_id.split("/")[-1]
-                    self.logger.debug(
-                        "Extracted ratingKey %s from path %s", rating_key, prov_item_id
-                    )
-                except Exception as e:
-                    self.logger.error("Failed to extract ratingKey from %s: %s", prov_item_id, e)
-                    return
-
-                # Fetch the track directly from server using ratingKey to avoid ambiguity
-                # Using server.fetchItem() instead of library.fetchItem() is more reliable
-                plex_track = self._plex_server.fetchItem(int(rating_key))
-                if not plex_track:
-                    self.logger.warning("Cannot find Plex item with ratingKey %s", rating_key)
-                    return
-
-                self.logger.debug(
-                    "Found Plex item: '%s' by '%s' (type: %s, ratingKey: %s)",
-                    plex_track.title if hasattr(plex_track, "title") else "unknown",
-                    plex_track.grandparentTitle
-                    if hasattr(plex_track, "grandparentTitle")
-                    else "unknown",
-                    plex_track.type if hasattr(plex_track, "type") else "unknown",
-                    plex_track.ratingKey if hasattr(plex_track, "ratingKey") else "unknown",
-                )
-
-                # Verify this is actually a track, not a collection or other item
-                if not hasattr(plex_track, "type") or plex_track.type != "track":
-                    self.logger.warning(
-                        "Item %s is not a track (type: %s), cannot update timeline",
-                        rating_key,
-                        plex_track.type if hasattr(plex_track, "type") else "unknown",
-                    )
-                    return
-
-                # Convert position to milliseconds (Plex expects ms)
-                position_ms = position * 1000
-
-                # Determine playback state
-                if fully_played:
-                    state = "stopped"
-                elif is_playing:
-                    state = "playing"
-                else:
-                    state = "paused"
-
-                # Send timeline update to Plex with current state
-                # Client identification is set globally on the session headers
-                params = {
-                    "ratingKey": str(plex_track.ratingKey),
-                    "key": prov_item_id,
-                    "state": state,
-                    "time": str(position_ms),
-                    "duration": str(plex_track.duration)
-                    if hasattr(plex_track, "duration")
-                    else "0",
-                }
-                self.logger.debug("Sending Plex timeline update (state=%s): %s", state, params)
-                self._plex_server.query("/:/timeline", params=params)
-
-                # If fully played, also scrobble
-                if fully_played:
-                    scrobble_params = {
-                        "key": str(plex_track.ratingKey),
-                        "identifier": "com.plexapp.plugins.library",
-                    }
-                    self.logger.debug("Scrobbling track to Plex: %s", scrobble_params)
-                    self._plex_server.query("/:/scrobble", params=scrobble_params)
-                    self.logger.info("Track %s marked as played in Plex", prov_item_id)
-
-                # If position is 0 and not playing, mark as unplayed
-                if position == 0 and not is_playing and not fully_played:
-                    unscrobble_params = {
-                        "key": str(plex_track.ratingKey),
-                        "identifier": "com.plexapp.plugins.library",
-                    }
-                    self.logger.debug("Unscrobbling track in Plex: %s", unscrobble_params)
-                    self._plex_server.query("/:/unscrobble", params=unscrobble_params)
-                    self.logger.info("Track %s marked as unplayed in Plex", prov_item_id)
-
-            except Exception as err:
-                self.logger.exception(
-                    "Failed to update Plex timeline for track %s: %s",
-                    prov_item_id,
-                    err,
-                )
-
-        await asyncio.to_thread(update_timeline)
-
     async def get_myplex_account_and_refresh_token(self, auth_token: str) -> MyPlexAccount:
         """Get a MyPlexAccount object and refresh the token if needed."""
         if auth_token == AUTH_TOKEN_UNAUTH:
diff --git a/music_assistant/providers/plex_connect/__init__.py b/music_assistant/providers/plex_connect/__init__.py
new file mode 100644 (file)
index 0000000..b123291
--- /dev/null
@@ -0,0 +1,317 @@
+"""
+Plex Connect plugin for Music Assistant.
+
+This plugin allows Music Assistant players to appear as controllable devices
+in the official Plex apps (Plexamp, web player, etc.). Each plugin instance
+links a single MA player to Plex, making it available for remote control.
+
+Multiple instances can be created to expose multiple MA players to Plex.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import socket
+from collections.abc import Callable
+from typing import TYPE_CHECKING, cast
+
+from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption
+from music_assistant_models.enums import ConfigEntryType, EventType, ProviderFeature
+
+from music_assistant.models.plugin import PluginProvider
+
+from .player_remote import PlayerRemoteInstance
+
+if TYPE_CHECKING:
+    from music_assistant_models.config_entries import ConfigValueType, ProviderConfig
+    from music_assistant_models.event import MassEvent
+    from music_assistant_models.provider import ProviderManifest
+
+    from music_assistant.mass import MusicAssistant
+    from music_assistant.models import ProviderInstanceType
+    from music_assistant.providers.plex import PlexProvider
+
+CONF_MASS_PLAYER_ID = "mass_player_id"
+CONF_PLEX_PROVIDER_ID = "plex_provider_id"
+CONF_PLAYER_NAME = "player_name"
+CONF_DEVICE_CLASS = "device_class"
+
+# No special features needed for this plugin
+SUPPORTED_FEATURES: set[ProviderFeature] = set()
+
+
+async def setup(
+    mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+) -> ProviderInstanceType:
+    """Initialize provider(instance) with given configuration."""
+    return PlexConnectProvider(mass, manifest, config)
+
+
+async def get_config_entries(
+    mass: MusicAssistant,
+    instance_id: str | None = None,  # noqa: ARG001
+    action: str | None = None,  # noqa: ARG001
+    values: dict[str, ConfigValueType] | None = None,
+) -> tuple[ConfigEntry, ...]:
+    """
+    Return Config entries to setup this provider.
+
+    :param mass: MusicAssistant instance.
+    :param instance_id: id of an existing provider instance (None if new instance setup).
+    :param action: [optional] action key called from config entries UI.
+    :param values: the (intermediate) raw values for config entries sent with the action.
+    """
+    # Get available Plex music providers
+    plex_providers = [
+        provider
+        for provider in mass.get_providers()
+        if provider.domain == "plex" and provider.type.value == "music"
+    ]
+
+    # Get player name default if player is selected
+    player_name_default = None
+    if values and values.get(CONF_MASS_PLAYER_ID):
+        player_id = str(values.get(CONF_MASS_PLAYER_ID))
+        if player := mass.players.get(player_id):
+            player_name_default = player.display_name
+
+    return (
+        ConfigEntry(
+            key=CONF_PLEX_PROVIDER_ID,
+            type=ConfigEntryType.STRING,
+            label="Plex Music Provider",
+            description="Select the Plex music provider to use for this connection.",
+            required=True,
+            options=[
+                ConfigValueOption(provider.name, provider.instance_id)
+                for provider in plex_providers
+            ],
+        ),
+        ConfigEntry(
+            key=CONF_MASS_PLAYER_ID,
+            type=ConfigEntryType.STRING,
+            label="Music Assistant Player",
+            description="Select the MA player to advertise as a Plex remote client.",
+            required=True,
+            options=[
+                ConfigValueOption(x.display_name, x.player_id)
+                for x in sorted(
+                    mass.players.all(False, False), key=lambda p: p.display_name.lower()
+                )
+            ],
+        ),
+        ConfigEntry(
+            key=CONF_PLAYER_NAME,
+            type=ConfigEntryType.STRING,
+            label="Player Name in Plex",
+            description=(
+                "Custom name for this player as it appears in Plex apps. "
+                "Leave empty to use the player's name."
+            ),
+            required=False,
+            default_value=player_name_default,
+        ),
+        ConfigEntry(
+            key=CONF_DEVICE_CLASS,
+            type=ConfigEntryType.STRING,
+            label="Device Class",
+            description="How this player appears in Plex apps.",
+            required=False,
+            default_value="speaker",
+            options=[
+                ConfigValueOption("Speaker", "speaker"),
+                ConfigValueOption("Phone", "phone"),
+                ConfigValueOption("Tablet", "tablet"),
+                ConfigValueOption("Set-Top Box", "stb"),
+                ConfigValueOption("TV", "tv"),
+                ConfigValueOption("PC", "pc"),
+                ConfigValueOption("Cloud", "cloud"),
+            ],
+        ),
+    )
+
+
+class PlexConnectProvider(PluginProvider):
+    """Plex Connect plugin provider implementation."""
+
+    def __init__(
+        self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+    ) -> None:
+        """Initialize the plugin provider.
+
+        :param mass: MusicAssistant instance.
+        :param manifest: Provider manifest.
+        :param config: Provider configuration.
+        """
+        super().__init__(mass, manifest, config, SUPPORTED_FEATURES)
+        self.mass_player_id = cast("str", self.config.get_value(CONF_MASS_PLAYER_ID))
+        self.plex_provider_id = cast("str", self.config.get_value(CONF_PLEX_PROVIDER_ID))
+        self.custom_player_name = cast("str | None", self.config.get_value(CONF_PLAYER_NAME))
+        self.device_class = cast("str", self.config.get_value(CONF_DEVICE_CLASS)) or "speaker"
+
+        self._plex_provider: PlexProvider | None = None
+        self._player_instance: PlayerRemoteInstance | None = None
+        self._allocated_port: int | None = None
+        self._stop_called: bool = False
+        self._on_unload_callbacks: list[Callable[..., None]] = []
+
+    async def handle_async_init(self) -> None:
+        """Handle async initialization of the provider."""
+        # Wait for the Plex provider to be available (with timeout)
+        max_retries = 30  # 15 seconds total
+        retry_delay = 0.5
+        for attempt in range(max_retries):
+            self._plex_provider = self.mass.get_provider(self.plex_provider_id)  # type: ignore[assignment]
+            if self._plex_provider:
+                break
+            if attempt == 0:
+                self.logger.info(
+                    f"Waiting for Plex provider {self.plex_provider_id} to become available..."
+                )
+            await asyncio.sleep(retry_delay)
+        else:
+            timeout_seconds = max_retries * retry_delay
+            self.logger.error(
+                f"Plex provider {self.plex_provider_id} not found after {timeout_seconds}s"
+            )
+            return
+
+        self.logger.debug(f"Plex provider {self.plex_provider_id} is ready")
+
+        # Subscribe to player events first
+        self._on_unload_callbacks.append(
+            self.mass.subscribe(
+                self._on_mass_player_event,
+                (EventType.PLAYER_ADDED, EventType.PLAYER_REMOVED),
+                id_filter=self.mass_player_id,
+            )
+        )
+
+        # Now try to setup the player instance
+        player = self.mass.players.get(self.mass_player_id)
+        if not player:
+            self.logger.info(
+                f"Player {self.mass_player_id} not found yet, waiting for PLAYER_ADDED event"
+            )
+        else:
+            # Setup the player instance immediately
+            await self._setup_player_instance()
+
+    async def unload(self, is_removed: bool = False) -> None:
+        """Handle close/cleanup of the provider.
+
+        :param is_removed: Whether the provider is being removed.
+        """
+        self._stop_called = True
+
+        # Stop player instance
+        if self._player_instance:
+            await self._player_instance.stop()
+            self._player_instance = None
+
+        # Unsubscribe from events
+        for callback in self._on_unload_callbacks:
+            callback()
+        self._on_unload_callbacks.clear()
+
+    def _is_port_available(self, port: int) -> bool:
+        """Check if a port is available by attempting to bind to it.
+
+        :param port: Port number to check.
+        :return: True if port is available, False otherwise.
+        """
+        try:
+            # Try to bind to the port on all interfaces
+            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+                sock.bind(("", port))
+                return True
+        except OSError:
+            return False
+
+    def _find_available_port(self) -> int:
+        """Find the first available port starting from 32500.
+
+        :return: First available port number.
+        """
+        port = 32500
+        max_attempts = 100  # Prevent infinite loop
+        attempts = 0
+
+        while attempts < max_attempts:
+            if self._is_port_available(port):
+                return port
+            port += 1
+            attempts += 1
+
+        # Fallback - should rarely happen
+        msg = f"Could not find available port in range 32500-{32500 + max_attempts}"
+        raise RuntimeError(msg)
+
+    async def _setup_player_instance(self) -> None:
+        """Set up the Plex remote control instance for the player."""
+        # Don't create duplicate instances
+        if self._player_instance:
+            self.logger.debug("Player instance already exists, skipping setup")
+            return
+
+        if not self._plex_provider:
+            self.logger.error("Cannot setup player instance: Plex provider not available")
+            return
+
+        player = self.mass.players.get(self.mass_player_id)
+        if not player:
+            self.logger.warning(f"Player {self.mass_player_id} not found")
+            return
+
+        # Allocate a port if we haven't already
+        if not self._allocated_port:
+            self._allocated_port = self._find_available_port()
+
+        # Use custom name if provided, otherwise use player's display name
+        player_name = self.custom_player_name or player.display_name
+
+        # Create remote control instance
+        self._player_instance = PlayerRemoteInstance(
+            plex_provider=self._plex_provider,
+            ma_player_id=self.mass_player_id,
+            player_name=player_name,
+            port=self._allocated_port,
+            device_class=self.device_class,
+            remote_control=True,
+        )
+
+        try:
+            await self._player_instance.start()
+            self.logger.info(
+                f"Plex Connect ready: '{player_name}' is now available in Plex apps "
+                f"on port {self._allocated_port}"
+            )
+        except Exception as e:
+            self.logger.exception(f"Failed to start Plex remote control: {e}")
+            self._player_instance = None
+
+    async def _teardown_player_instance(self) -> None:
+        """Tear down the Plex remote control instance."""
+        if self._player_instance:
+            await self._player_instance.stop()
+            self._player_instance = None
+
+    def _on_mass_player_event(self, event: MassEvent) -> None:
+        """Handle player added/removed events.
+
+        :param event: The event that occurred.
+        """
+        if event.object_id != self.mass_player_id:
+            return
+
+        if event.event == EventType.PLAYER_REMOVED:
+            # Player was removed - stop the instance
+            self.logger.info(f"Player {self.mass_player_id} removed, stopping Plex Connect")
+            self.mass.create_task(self._teardown_player_instance())
+
+        elif event.event == EventType.PLAYER_ADDED:
+            # Player was added - start the instance (if not already running)
+            if not self._player_instance:
+                self.logger.info(f"Player {self.mass_player_id} added, starting Plex Connect")
+                self.mass.create_task(self._setup_player_instance())
diff --git a/music_assistant/providers/plex_connect/gdm.py b/music_assistant/providers/plex_connect/gdm.py
new file mode 100644 (file)
index 0000000..f238d99
--- /dev/null
@@ -0,0 +1,221 @@
+"""GDM (Plex Good Day Mate) advertising for player discovery."""
+
+from __future__ import annotations
+
+import asyncio
+import contextlib
+import logging
+import socket
+
+LOGGER = logging.getLogger(__name__)
+
+# GDM broadcast and listen ports (matching test-client.py)
+GDM_BROADCAST_PORT = 32414  # Send HELLO broadcasts here
+GDM_LISTEN_PORT = 32412  # Listen for M-SEARCH queries here
+GDM_BROADCAST_ADDR = "255.255.255.255"  # Broadcast address
+
+
+class PlexGDMAdvertiser:
+    """Advertise Music Assistant as a Plex player via GDM."""
+
+    def __init__(
+        self,
+        instance_id: str,
+        port: int,
+        publish_ip: str,
+        name: str = "Music Assistant",
+        product: str = "Music Assistant",
+        version: str = "1.0.0",
+    ) -> None:
+        """Initialize GDM advertiser.
+
+        :param instance_id: Unique identifier for this instance.
+        :param port: Port number for the server.
+        :param publish_ip: IP address to advertise for this server.
+        :param name: Display name for the device.
+        :param product: Product name.
+        :param version: Version string.
+        """
+        self.instance_id = instance_id
+        self.port = port
+        self.name = name
+        self.product = product
+        self.version = version
+        self._running = False
+        self._broadcast_task: asyncio.Task[None] | None = None
+        self._listener_task: asyncio.Task[None] | None = None
+
+        # Pre-build GDM messages (they're static)
+        self._hello_message = self._build_hello_message()
+        self._response_message = self._build_response_message()
+
+        # Sockets for reuse
+        self._broadcast_socket: socket.socket | None = None
+        self._response_socket: socket.socket | None = None
+
+        # Cached publish IP
+        self._local_ip = publish_ip
+
+    def _build_hello_message(self) -> bytes:
+        """Build HELLO broadcast message (static, built once)."""
+        message_lines = [
+            "HELLO * HTTP/1.0",
+            f"Name: {self.name}",
+            f"Port: {self.port}",
+            f"Product: {self.product}",
+            f"Version: {self.version}",
+            "Protocol: plex",
+            "Protocol-Version: 1",
+            "Protocol-Capabilities: timeline,playback,navigation,playqueues",
+            "Device-Class: pc",
+            f"Resource-Identifier: {self.instance_id}",
+            "Content-Type: plex/media-player",
+            "Provides: client,player,pubsub-player",
+        ]
+        return "\r\n".join(message_lines).encode("utf-8")
+
+    def _build_response_message(self) -> bytes:
+        """Build M-SEARCH response message (static, built once)."""
+        message_lines = [
+            "HTTP/1.0 200 OK",
+            f"Name: {self.name}",
+            f"Port: {self.port}",
+            f"Product: {self.product}",
+            f"Version: {self.version}",
+            "Protocol: plex",
+            "Protocol-Version: 1",
+            "Protocol-Capabilities: timeline,playback,navigation,playqueues",
+            "Device-Class: pc",
+            f"Resource-Identifier: {self.instance_id}",
+            "Content-Type: plex/media-player",
+            "Provides: client,player,pubsub-player",
+        ]
+        return "\r\n".join(message_lines).encode("utf-8")
+
+    def start(self) -> None:
+        """Start GDM advertising and listening."""
+        if self._running:
+            return
+        self._running = True
+
+        # Create reusable broadcast socket
+        self._broadcast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        self._broadcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
+
+        # Create reusable response socket
+        self._response_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+
+        # Start broadcast task
+        self._broadcast_task = asyncio.create_task(self._advertise_loop())
+
+        # Start listener task
+        self._listener_task = asyncio.create_task(self._listen_loop())
+
+        LOGGER.info(f"Started GDM advertising and listening at {self._local_ip}:{self.port}")
+
+    async def stop(self) -> None:
+        """Stop GDM advertising and listening."""
+        self._running = False
+
+        if self._broadcast_task:
+            self._broadcast_task.cancel()
+            with contextlib.suppress(asyncio.CancelledError):
+                await self._broadcast_task
+
+        if self._listener_task:
+            self._listener_task.cancel()
+            with contextlib.suppress(asyncio.CancelledError):
+                await self._listener_task
+
+        # Close reusable sockets
+        if self._broadcast_socket:
+            self._broadcast_socket.close()
+            self._broadcast_socket = None
+
+        if self._response_socket:
+            self._response_socket.close()
+            self._response_socket = None
+
+        LOGGER.info("Stopped GDM advertising")
+
+    async def _advertise_loop(self) -> None:
+        """Continuously advertise via GDM every 30 seconds."""
+        # Send initial announcement immediately
+        await self._send_announcement()
+
+        while self._running:
+            try:
+                await asyncio.sleep(30)
+                await self._send_announcement()
+            except asyncio.CancelledError:
+                break
+            except Exception as e:
+                LOGGER.exception(f"Error sending GDM announcement: {e}")
+                await asyncio.sleep(30)
+
+    async def _listen_loop(self) -> None:
+        """Listen for GDM discovery requests and respond (matching test-client.py)."""
+
+        def listen() -> None:
+            try:
+                # Create UDP socket
+                sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+                # Bind to GDM listen port (like test-client.py)
+                sock.bind(("", GDM_LISTEN_PORT))
+
+                sock.settimeout(1.0)  # 1 second timeout for checking _running
+
+                while self._running:
+                    try:
+                        data, addr = sock.recvfrom(1024)
+                        message = data.decode("utf-8", errors="ignore")
+
+                        # Check if this is a discovery request (M-SEARCH) not our own HELLO
+                        if "M-SEARCH" in message:
+                            # Send response - addr contains the actual client's IP and port
+                            self._send_discovery_response(addr)
+
+                    except socket.timeout:  # noqa: UP041
+                        continue
+                    except Exception as e:
+                        if self._running:
+                            LOGGER.debug(f"Error receiving GDM request: {e}")
+
+                sock.close()
+
+            except Exception as e:
+                LOGGER.exception(f"Failed to start GDM listener: {e}")
+
+        await asyncio.to_thread(listen)
+
+    def _send_discovery_response(self, addr: tuple[str, int]) -> None:
+        """Send GDM response to a discovery request."""
+        if not self._response_socket:
+            LOGGER.warning("Response socket not available")
+            return
+
+        try:
+            self._response_socket.sendto(self._response_message, addr)
+
+        except Exception as e:
+            LOGGER.warning(f"Failed to send GDM response to {addr}: {e}")
+
+    async def _send_announcement(self) -> None:
+        """Send a GDM announcement broadcast (uses pre-built message)."""
+        await asyncio.get_event_loop().run_in_executor(None, self._send_udp)
+
+    def _send_udp(self) -> None:
+        """Send UDP broadcast message (uses cached socket and message)."""
+        if not self._broadcast_socket:
+            LOGGER.warning("Broadcast socket not available")
+            return
+
+        try:
+            self._broadcast_socket.sendto(
+                self._hello_message, (GDM_BROADCAST_ADDR, GDM_BROADCAST_PORT)
+            )
+
+        except Exception as e:
+            LOGGER.exception(f"Failed to send GDM announcement: {e}")
diff --git a/music_assistant/providers/plex_connect/icon.svg b/music_assistant/providers/plex_connect/icon.svg
new file mode 100644 (file)
index 0000000..7d30cd1
--- /dev/null
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="utf-8"?>
+<svg version="1.1" xmlns="http://www.w3.org/2000/svg"
+     xmlns:xlink="http://www.w3.org/1999/xlink" x="0px" y="0px"
+     viewBox="0 0 122.88 122.88" xml:space="preserve">
+  <title>Plex Ã— Music Assistant</title>
+
+  <style type="text/css"><![CDATA[
+    .bg{fill:#282A2D;}
+    .bar{fill:#18B6D2;}
+    .bar-alt{fill:#E5A00D;}
+  ]]></style>
+
+  <!-- Background -->
+  <path class="bg" d="M18.43,0h86.02c10.18,0,18.43,8.25,18.43,18.43v86.02c0,10.18-8.25,18.43-18.43,18.43H18.43
+    C8.25,122.88,0,114.63,0,104.45l0-86.02C0,8.25,8.25,0,18.43,0L18.43,0z"/>
+
+  <!-- Left vertical bars (MA style) -->
+  <rect x="20" y="30" width="3.5" height="62" class="bar"/>
+  <rect x="28" y="30" width="3.5" height="62" class="bar"/>
+  <rect x="36" y="30" width="3.5" height="62" class="bar"/>
+  <rect x="44" y="30" width="3.5" height="62" class="bar"/>
+
+  <!-- Center: Plex chevron as a diagonal bar (matching Plex angle) -->
+  <path class="bar-alt" d="M55,30 L61,30 L76.3,61.44 L61,92 L55,92 L70.3,61.44 Z"/>
+
+  <!-- Right diagonal slashes (MA style) -->
+  <path class="bar" d="M72,30 L86,92 L89.5,92 L75.5,30 Z"/>
+  <path class="bar" d="M80,30 L94,92 L97.5,92 L83.5,30 Z"/>
+  <path class="bar" d="M88,30 L102,92 L105.5,92 L91.5,30 Z"/>
+</svg>
diff --git a/music_assistant/providers/plex_connect/manifest.json b/music_assistant/providers/plex_connect/manifest.json
new file mode 100644 (file)
index 0000000..c25ed03
--- /dev/null
@@ -0,0 +1,13 @@
+{
+  "type": "plugin",
+  "stage": "alpha",
+  "domain": "plex_connect",
+  "name": "Plex Connect Plugin",
+  "description": "Makes a Music Assistant player appear as a device in the official Plex apps.",
+  "codeowners": ["@anatosun"],
+  "requirements": ["plexapi==4.17.1"],
+  "icon": "mdi:plex",
+  "documentation": "https://music-assistant.io/plugins/plex-connect/",
+  "depends_on": "plex",
+  "multi_instance": true
+}
diff --git a/music_assistant/providers/plex_connect/player_remote.py b/music_assistant/providers/plex_connect/player_remote.py
new file mode 100644 (file)
index 0000000..40152f9
--- /dev/null
@@ -0,0 +1,1753 @@
+"""Per-player Plex remote control instances."""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+import platform
+import re
+import time
+import uuid
+from collections.abc import Callable
+from typing import TYPE_CHECKING, Any
+from urllib.parse import urlparse
+
+from aiohttp import ClientTimeout, web
+from music_assistant_models.enums import EventType, QueueOption, RepeatMode
+from plexapi.playqueue import PlayQueue
+
+from .gdm import PlexGDMAdvertiser
+
+if TYPE_CHECKING:
+    from music_assistant_models.event import MassEvent
+
+    from music_assistant.providers.plex import PlexProvider
+
+
+LOGGER = logging.getLogger(__name__)
+
+
+class PlayerRemoteInstance:
+    """Single remote control instance for one MA player."""
+
+    def __init__(
+        self,
+        plex_provider: PlexProvider,
+        ma_player_id: str,
+        player_name: str,
+        port: int,
+        device_class: str = "speaker",
+        remote_control: bool = False,
+    ) -> None:
+        """Initialize player remote instance.
+
+        :param plex_provider: Plex provider instance.
+        :param ma_player_id: Music Assistant player ID.
+        :param player_name: Display name for the player.
+        :param port: Port for the remote control server.
+        :param device_class: Device class (speaker, phone, tablet, stb, tv, pc, cloud).
+        :param remote_control: Whether to enable remote control.
+        """
+        self.plex_provider = plex_provider
+        self.plex_server = plex_provider._plex_server
+        self.ma_player_id = ma_player_id
+        self.player_name = player_name
+        self.port = port
+        self.device_class = device_class
+        self.remote_control = remote_control
+
+        self.client_id = str(
+            uuid.uuid5(
+                uuid.NAMESPACE_DNS,
+                f"music-assistant-plex-{plex_provider.instance_id}-{ma_player_id}",
+            )
+        )
+
+        if self.remote_control:
+            # Remote control server
+            self.server: PlexRemoteControlServer | None = None
+            # GDM advertiser
+            self.gdm: PlexGDMAdvertiser | None = None
+
+    async def start(self) -> None:
+        """Start this player's remote control."""
+        if self.remote_control:
+            # Create player-specific PlexServer instance with unique client identification
+            LOGGER.info(
+                f"Created PlexServer for '{self.player_name}' with client ID: {self.client_id}"
+            )
+
+            self.server = PlexRemoteControlServer(
+                plex_provider=self.plex_provider,
+                port=self.port,
+                client_id=self.client_id,
+                ma_player_id=self.ma_player_id,
+                device_class=self.device_class,
+            )
+            LOGGER.info(
+                f"Remote control server for '{self.player_name}' bound to MA player: "
+                f"{self.ma_player_id}"
+            )
+
+            await self.server.start()
+
+            # Step 4: Start GDM broadcasting
+            self.gdm = PlexGDMAdvertiser(
+                instance_id=self.client_id,
+                port=self.port,
+                publish_ip=str(self.plex_provider.mass.streams.publish_ip),
+                name=self.player_name,
+                product="Music Assistant",
+                version=self.plex_provider.mass.version
+                if self.plex_provider.mass.version != "0.0.0"
+                else "1.0.0",
+            )
+            self.gdm.start()
+
+            LOGGER.info(f"Player '{self.player_name}' is now discoverable on port {self.port}")
+
+    async def stop(self) -> None:
+        """Stop this player's remote control."""
+        if self.remote_control:
+            if self.gdm:
+                await self.gdm.stop()
+
+            if self.server:
+                await self.server.stop()
+
+            LOGGER.info(f"Stopped remote control for player '{self.player_name}'")
+
+
+class PlexRemoteControlServer:
+    """HTTP server to receive Plex remote control commands."""
+
+    def __init__(
+        self,
+        plex_provider: PlexProvider,
+        port: int = 32500,
+        client_id: str | None = None,
+        ma_player_id: str | None = None,
+        device_class: str = "speaker",
+    ) -> None:
+        """Initialize remote control server.
+
+        :param plex_provider: Plex provider instance.
+        :param port: Port for the HTTP server.
+        :param client_id: Unique client identifier.
+        :param ma_player_id: Music Assistant player ID.
+        :param device_class: Device class (speaker, phone, tablet, stb, tv, pc, cloud).
+        """
+        self.provider = plex_provider
+        self.plex_server = plex_provider._plex_server
+        self.port = port
+        self.client_id = client_id or plex_provider.instance_id
+        self.device_class = device_class
+        self.app = web.Application()
+        self.subscriptions: dict[str, dict[str, object]] = {}
+        self.runner: web.AppRunner | None = None
+        self.http_site: web.TCPSite | None = None
+
+        # Play queue tracking (Plex-specific state that doesn't exist in MA)
+        self.play_queue_id: str | None = None
+        self.play_queue_version: int = 1
+        # Map queue index to item ID
+        self.play_queue_item_ids: dict[int, int] = {}
+
+        # Track MA queue state to detect when we need to sync to Plex
+        self._last_synced_ma_queue_length: int = 0
+        self._last_synced_ma_queue_keys: list[str] = []
+
+        # Specific MA player this server controls (set by PlayerRemoteInstance)
+        self._ma_player_id = ma_player_id
+
+        # Store unsubscribe callbacks
+        self._unsub_callbacks: list[Callable[..., None]] = []
+
+        # Flag to prevent circular updates when we modify the queue ourselves
+        self._updating_from_plex = False
+
+        self.player = self.provider.mass.players.get(self._ma_player_id)  # type: ignore[arg-type]
+
+        self.device_name = f"{self.player.display_name}" if self.player else "Music Assistant"
+
+        self.headers = {
+            "X-Plex-Device-Name": self.device_name,
+            "X-Plex-Session-Identifier": self.client_id,
+            "X-Plex-Client-Identifier": self.client_id,
+            "X-Plex-Product": "Music Assistant",
+            "X-Plex-Platform": "Music Assistant",
+            "X-Plex-Platform-Version": platform.release(),
+        }
+
+        self._setup_routes()
+
+    def _setup_routes(self) -> None:
+        """Set up all required endpoints."""
+        # Root endpoint
+        self.app.router.add_get("/", self.handle_root)
+
+        # Subscription management
+        self.app.router.add_get("/player/timeline/subscribe", self.handle_subscribe)
+        self.app.router.add_get("/player/timeline/unsubscribe", self.handle_unsubscribe)
+        self.app.router.add_get("/player/timeline/poll", self.handle_poll)
+
+        # Playback commands
+        self.app.router.add_get("/player/playback/playMedia", self.handle_play_media)
+        self.app.router.add_get("/player/playback/refreshPlayQueue", self.handle_refresh_play_queue)
+        self.app.router.add_get("/player/playback/createPlayQueue", self.handle_create_play_queue)
+        self.app.router.add_get("/player/playback/pause", self.handle_pause)
+        self.app.router.add_get("/player/playback/play", self.handle_play)
+        self.app.router.add_get("/player/playback/stop", self.handle_stop)
+        self.app.router.add_get("/player/playback/skipNext", self.handle_skip_next)
+        self.app.router.add_get("/player/playback/skipPrevious", self.handle_skip_previous)
+        self.app.router.add_get("/player/playback/stepForward", self.handle_step_forward)
+        self.app.router.add_get("/player/playback/stepBack", self.handle_step_back)
+        self.app.router.add_get("/player/playback/seekTo", self.handle_seek_to)
+        self.app.router.add_get("/player/playback/setParameters", self.handle_set_parameters)
+        self.app.router.add_get("/player/playback/skipTo", self.handle_skip_to)
+
+        # Resources endpoint
+        self.app.router.add_get("/resources", self.handle_resources)
+
+        # CORS OPTIONS handler (for all routes)
+        self.app.router.add_route("OPTIONS", "/{tail:.*}", self.handle_options)
+
+        # --- Catch-all fallback for debugging purposes ---
+        # self.app.router.add_route("*", "/{path_info:.*}", self.handle_unknown)
+
+    async def start(self) -> None:
+        """Start HTTP server and GDM advertising."""
+        self.runner = web.AppRunner(self.app)
+        await self.runner.setup()
+
+        # Start HTTP server
+        self.http_site = web.TCPSite(self.runner, "0.0.0.0", self.port)
+        await self.http_site.start()
+        LOGGER.info(f"Plex remote control server started on HTTP port {self.port}")
+
+        # Note: GDM advertising is handled by PlexProvider in __init__.py
+        # to avoid duplicate broadcasts
+
+        # Subscribe to player and queue events for state synchronization
+        if self._ma_player_id:
+            self._unsub_callbacks.append(
+                self.provider.mass.subscribe(
+                    self._handle_player_event,
+                    EventType.PLAYER_UPDATED,
+                    id_filter=self._ma_player_id,
+                )
+            )
+            self._unsub_callbacks.append(
+                self.provider.mass.subscribe(
+                    self._handle_queue_event,
+                    EventType.QUEUE_UPDATED,
+                    id_filter=self._ma_player_id,
+                )
+            )
+            self._unsub_callbacks.append(
+                self.provider.mass.subscribe(
+                    self._handle_queue_event,
+                    EventType.QUEUE_TIME_UPDATED,
+                    id_filter=self._ma_player_id,
+                )
+            )
+            self._unsub_callbacks.append(
+                self.provider.mass.subscribe(
+                    self._handle_queue_items_updated,
+                    EventType.QUEUE_ITEMS_UPDATED,
+                    id_filter=self._ma_player_id,
+                )
+            )
+
+    async def stop(self) -> None:
+        """Stop the HTTP server."""
+        # Unsubscribe from events
+        for unsub in self._unsub_callbacks:
+            unsub()
+        self._unsub_callbacks.clear()
+
+        # Stop HTTP server
+        if self.http_site:
+            await self.http_site.stop()
+        if self.runner:
+            await self.runner.cleanup()
+        LOGGER.info("Plex remote control server stopped")
+
+    async def handle_root(self, request: web.Request) -> web.Response:
+        """Handle root endpoint - return basic player info."""
+        # Get player name
+        player_name = "Music Assistant"
+        if self._ma_player_id:
+            player = self.provider.mass.players.get(self._ma_player_id)
+            if player:
+                player_name = player.display_name
+
+        xml = f"""<?xml version="1.0" encoding="UTF-8"?>
+<MediaContainer machineIdentifier="{self.client_id}" version="1.0">
+    <Player title="{player_name}" machineIdentifier="{self.client_id}"/>
+</MediaContainer>"""
+        return web.Response(
+            text=xml, content_type="text/xml", headers={"Access-Control-Allow-Origin": "*"}
+        )
+
+    async def handle_subscribe(self, request: web.Request) -> web.Response:
+        """Handle timeline subscription from controller."""
+        client_id = request.headers.get("X-Plex-Client-Identifier")
+        protocol = request.query.get("protocol", "http")
+        port = request.query.get("port")
+        command_id = int(request.query.get("commandID", 0))
+
+        if not client_id or not port:
+            return web.Response(status=400)
+
+        self.subscriptions[client_id] = {
+            "url": f"{protocol}://{request.remote}:{port}",
+            "command_id": command_id,
+            "last_update": time.time(),
+        }
+
+        LOGGER.info(f"Controller {client_id} subscribed for timeline updates")
+        await self._send_timeline(client_id)
+        return web.Response(status=200)
+
+    async def handle_unsubscribe(self, request: web.Request) -> web.Response:
+        """Handle unsubscribe request."""
+        client_id = request.headers.get("X-Plex-Client-Identifier")
+        if client_id in self.subscriptions:
+            del self.subscriptions[client_id]
+            LOGGER.info(f"Controller {client_id} unsubscribed")
+        return web.Response(status=200)
+
+    async def handle_poll(self, request: web.Request) -> web.Response:
+        """Handle timeline poll request."""
+        # Extract parameters
+        include_metadata = request.query.get("includeMetadata", "0") == "1"
+        command_id = request.query.get("commandID", "0")
+
+        # Update subscription timestamp if this client is subscribed
+        client_id = request.headers.get("X-Plex-Client-Identifier")
+        if client_id and client_id in self.subscriptions:
+            self.subscriptions[client_id]["last_update"] = time.time()
+
+        # Build timeline from current MA player state
+        timeline_xml = await self._build_timeline_xml(
+            include_metadata=include_metadata, command_id=command_id
+        )
+        return web.Response(
+            text=timeline_xml,
+            content_type="text/xml",
+            headers={
+                "X-Plex-Client-Identifier": self.client_id,
+                "Access-Control-Expose-Headers": "X-Plex-Client-Identifier",
+                "Access-Control-Allow-Origin": "*",
+            },
+        )
+
+    async def handle_play_media(self, request: web.Request) -> web.Response:
+        """
+        Handle playMedia command from Plex controller.
+
+        Plexamp sends various parameters:
+        - key: The item to play (track, album, playlist, etc.)
+        - containerKey: The container context (play queue)
+        - offset: Starting position in milliseconds
+        - shuffle: Whether to shuffle
+        - repeat: Repeat mode
+        """
+        # Set flag to prevent circular updates
+        self._updating_from_plex = True
+        try:
+            key = request.query.get("key")
+            container_key = request.query.get("containerKey")
+            offset = int(request.query.get("offset", 0))
+            shuffle = request.query.get("shuffle", "0") == "1"
+
+            if not key:
+                return web.Response(
+                    status=400, text="Missing required 'key' parameter for playMedia command"
+                )
+
+            LOGGER.info(
+                f"Received playMedia command - key: {key}, "
+                f"containerKey: {container_key}, offset: {offset}ms"
+            )
+
+            # Use the assigned player for this server instance
+            player_id = self._ma_player_id
+            if not player_id:
+                return web.Response(status=500, text="No player assigned to this server")
+
+            if container_key and "/playQueues/" in container_key:
+                # Extract play queue ID from container key
+                queue_id_match = re.search(r"/playQueues/(\d+)", container_key)
+                if queue_id_match:
+                    self.play_queue_id = queue_id_match.group(1)
+                    self.play_queue_version = 1
+                    LOGGER.info(f"Playing from queue: {container_key} starting at {key}")
+
+                    await self._play_from_plex_queue(player_id, container_key, key, shuffle, offset)
+                else:
+                    # Reset queue tracking if no valid queue ID found
+                    self.play_queue_id = None
+                    self.play_queue_item_ids = {}
+                    # Fall back to single track
+                    media = await self._resolve_plex_item(key)
+                    await self.provider.mass.player_queues.play_media(
+                        queue_id=player_id,
+                        media=media,  # type: ignore[arg-type]
+                        option=QueueOption.REPLACE,
+                    )
+            elif container_key:
+                # Playing from a regular container (album, playlist, artist) not a play queue
+                # Reset queue tracking
+                self.play_queue_id = None
+                self.play_queue_item_ids = {}
+
+                # The key is the specific track, containerKey is the collection
+                media_to_play = await self._resolve_plex_item(container_key)
+
+                # Queue the entire container
+                await self.provider.mass.player_queues.play_media(
+                    queue_id=player_id,
+                    media=media_to_play,  # type: ignore[arg-type]
+                    option=QueueOption.REPLACE,
+                )
+
+            else:
+                # Playing a single item, reset queue tracking
+                self.play_queue_id = None
+                self.play_queue_item_ids = {}
+
+                media = await self._resolve_plex_item(key)
+
+                # Replace the queue with this media
+                await self.provider.mass.player_queues.play_media(
+                    queue_id=player_id,
+                    media=media,  # type: ignore[arg-type]
+                    option=QueueOption.REPLACE,
+                )
+
+            # Set shuffle if requested
+            if shuffle:
+                self.provider.mass.player_queues.set_shuffle(player_id, shuffle)
+
+            # Seek to offset if specified
+            if offset > 0:
+                await self._seek_to_offset_after_playback(player_id, offset)
+
+            await self._broadcast_timeline()
+            return web.Response(status=200)
+
+        except Exception as e:
+            LOGGER.exception(f"Error handling playMedia: {e}")
+            return web.Response(status=500, text=str(e))
+        finally:
+            # Clear flag after processing
+            self._updating_from_plex = False
+
+    def _reorder_tracks_for_playback(
+        self, tracks: list[Any], start_index: int
+    ) -> tuple[list[Any], dict[int, int]]:
+        """Reorder tracks to start from a specific index and update item ID mappings.
+
+        :param tracks: List of tracks to reorder.
+        :param start_index: Index of the track to start from.
+        :return: Tuple of (reordered tracks, updated item ID mappings).
+        """
+        if start_index <= 0 or start_index >= len(tracks):
+            # No reordering needed
+            return tracks, self.play_queue_item_ids
+
+        # Reorder: [selected track, tracks after it, tracks before it]
+        reordered_tracks = (
+            tracks[start_index:]  # From selected to end
+            + tracks[:start_index]  # From start to selected
+        )
+
+        # Update play queue item ID mappings to reflect new order
+        new_item_ids = {}
+        for new_idx, old_idx in enumerate(
+            list(range(start_index, len(tracks))) + list(range(start_index))
+        ):
+            if old_idx in self.play_queue_item_ids:
+                new_item_ids[new_idx] = self.play_queue_item_ids[old_idx]
+
+        LOGGER.info(f"Started playback from offset {start_index} (reordered queue)")
+        return reordered_tracks, new_item_ids
+
+    async def _seek_to_offset_after_playback(self, player_id: str, offset: int) -> None:
+        """Seek to the specified offset after playback starts.
+
+        :param player_id: The player ID to seek on.
+        :param offset: The offset in milliseconds.
+        """
+        # Wait for the queue to have items loaded before seeking
+        for _ in range(10):  # Try up to 10 times (5 seconds total)
+            await asyncio.sleep(0.5)
+            queue = self.provider.mass.player_queues.get(player_id)
+            if queue and queue.current_item:
+                try:
+                    await self.provider.mass.players.cmd_seek(player_id, offset // 1000)
+                    # Wait briefly for player state to update
+                    await asyncio.sleep(0.1)
+                    break
+                except Exception as e:
+                    LOGGER.debug(f"Could not seek to offset {offset}ms: {e}")
+                    break
+        else:
+            LOGGER.warning("Queue not ready for seeking after timeout")
+
+    async def _play_from_plex_queue(  # noqa: PLR0915
+        self,
+        player_id: str,
+        container_key: str,
+        starting_key: str | None,
+        shuffle: bool,
+        offset: int,
+    ) -> None:
+        """Fetch play queue from Plex and load tracks."""
+        try:
+            LOGGER.info(f"Fetching play queue: {container_key}")
+
+            # Extract queue ID from container_key (e.g., "/playQueues/123" -> "123")
+            queue_id_match = re.search(r"/playQueues/(\d+)", container_key)
+            if not queue_id_match:
+                raise ValueError(f"Invalid container_key format: {container_key}")
+
+            queue_id = queue_id_match.group(1)
+
+            # Use plexapi to fetch the play queue
+            def fetch_queue() -> PlayQueue:
+                return PlayQueue.get(self.provider._plex_server, playQueueID=queue_id)
+
+            playqueue = await asyncio.to_thread(fetch_queue)
+
+            if playqueue and playqueue.items:
+                # Get selected item offset from PlayQueue - this tells us which track to start from
+                selected_offset = getattr(playqueue, "playQueueSelectedItemOffset", 0)
+                LOGGER.info(f"PlayQueue selected item offset: {selected_offset}")
+
+                # Track play queue item IDs
+                self.play_queue_item_ids = {}
+                tracks_to_queue: list[object] = []
+                start_index = None
+
+                for plex_idx, item in enumerate(playqueue.items):
+                    track_key = item.key if hasattr(item, "key") else None
+                    play_queue_item_id = (
+                        item.playQueueItemID if hasattr(item, "playQueueItemID") else None
+                    )
+
+                    if track_key:
+                        try:
+                            # Fetch track from MA
+                            track = await self.provider.get_track(track_key)
+                            ma_idx = len(tracks_to_queue)
+                            tracks_to_queue.append(track)
+
+                            # Store play queue item ID mapping
+                            if play_queue_item_id:
+                                self.play_queue_item_ids[ma_idx] = play_queue_item_id
+
+                            # Check if this is the track at the selected offset
+                            if plex_idx == selected_offset:
+                                start_index = ma_idx
+                                LOGGER.info(
+                                    f"Start track at offset {selected_offset}: {track.name}"
+                                )
+                        except Exception as e:
+                            LOGGER.debug(f"Could not fetch track {track_key}: {e}")
+                            continue
+
+                if tracks_to_queue:
+                    LOGGER.info(
+                        f"Loaded queue with {len(tracks_to_queue)} tracks, "
+                        f"starting at offset {selected_offset} (MA index {start_index})"
+                    )
+
+                    # Reorder tracks if not starting from the first track
+                    if start_index is not None and start_index > 0:
+                        tracks_to_queue, self.play_queue_item_ids = (
+                            self._reorder_tracks_for_playback(tracks_to_queue, start_index)
+                        )
+
+                    # Queue all tracks
+                    await self.provider.mass.player_queues.play_media(
+                        queue_id=player_id,
+                        media=tracks_to_queue,  # type: ignore[arg-type]
+                        option=QueueOption.REPLACE,
+                    )
+
+                    # Update tracked state to prevent sync loop
+                    # Store the keys in the order they're in MA queue (after reordering)
+                    synced_keys = []
+                    for track in tracks_to_queue:  # type: ignore[assignment]
+                        for mapping in track.provider_mappings:
+                            if mapping.provider_instance == self.provider.instance_id:
+                                synced_keys.append(mapping.item_id)
+                                break
+                    self._last_synced_ma_queue_length = len(synced_keys)
+                    self._last_synced_ma_queue_keys = synced_keys
+
+                    # Apply shuffle if requested
+                    if shuffle:
+                        self.provider.mass.player_queues.set_shuffle(player_id, shuffle)
+
+                    # Seek to offset if specified
+                    if offset > 0:
+                        await self._seek_to_offset_after_playback(player_id, offset)
+                else:
+                    LOGGER.error("No valid tracks in play queue")
+                    # Fall back to single track
+                    if starting_key:
+                        track = await self.provider.get_track(starting_key)
+                        await self.provider.mass.player_queues.play_media(
+                            queue_id=player_id,
+                            media=track,
+                            option=QueueOption.REPLACE,
+                        )
+            else:
+                LOGGER.error("Play queue is empty or could not be fetched")
+                # Fall back to single track
+                if starting_key:
+                    track = await self.provider.get_track(starting_key)
+                    await self.provider.mass.player_queues.play_media(
+                        queue_id=player_id,
+                        media=track,
+                        option=QueueOption.REPLACE,
+                    )
+
+        except Exception as e:
+            LOGGER.exception(f"Error playing from queue: {e}")
+            # Fall back to single track
+            if starting_key:
+                track = await self.provider.get_track(starting_key)
+                await self.provider.mass.player_queues.play_media(
+                    queue_id=player_id,
+                    media=track,
+                    option=QueueOption.REPLACE,
+                )
+
+    async def _replace_entire_queue(self, player_id: str, playqueue: PlayQueue) -> None:
+        """Replace the entire queue when nothing is currently playing.
+
+        :param player_id: The Music Assistant player ID.
+        :param playqueue: The Plex play queue to load.
+        """
+        all_tracks = []
+        self.play_queue_item_ids = {}
+
+        for i, item in enumerate(playqueue.items):
+            track_key = item.key if hasattr(item, "key") else None
+            play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None
+
+            if track_key:
+                try:
+                    track = await self.provider.get_track(track_key)
+                    all_tracks.append(track)
+
+                    if play_queue_item_id:
+                        self.play_queue_item_ids[len(all_tracks) - 1] = play_queue_item_id
+                except Exception as e:
+                    LOGGER.debug(f"Could not fetch track {track_key}: {e}")
+                    continue
+
+        if all_tracks:
+            await self.provider.mass.player_queues.play_media(
+                queue_id=player_id,
+                media=all_tracks,  # type: ignore[arg-type]
+                option=QueueOption.REPLACE,
+            )
+            LOGGER.info(f"Replaced queue with {len(all_tracks)} tracks")
+
+    async def _replace_remaining_queue(
+        self, player_id: str, playqueue: PlayQueue, current_index: int
+    ) -> None:
+        """Replace only items after the current track.
+
+        :param player_id: The Music Assistant player ID.
+        :param playqueue: The Plex play queue to load.
+        :param current_index: The current track index in the MA queue.
+        """
+        # Fetch tracks that come AFTER the current track in the Plex queue
+        remaining_tracks = []
+        new_item_mappings = {}
+
+        # Start from the track after current_index
+        for i in range(current_index + 1, len(playqueue.items)):
+            item = playqueue.items[i]
+            track_key = item.key if hasattr(item, "key") else None
+            play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None
+
+            if track_key:
+                try:
+                    track = await self.provider.get_track(track_key)
+                    remaining_tracks.append(track)
+
+                    # Map relative to the current position
+                    if play_queue_item_id:
+                        new_item_mappings[current_index + 1 + len(remaining_tracks) - 1] = (
+                            play_queue_item_id
+                        )
+                except Exception as e:
+                    LOGGER.debug(f"Could not fetch track {track_key}: {e}")
+                    continue
+
+        # Replace items after current track
+        if remaining_tracks:
+            await self.provider.mass.player_queues.play_media(
+                queue_id=player_id,
+                media=remaining_tracks,  # type: ignore[arg-type]
+                option=QueueOption.REPLACE_NEXT,  # Replace everything after current
+            )
+            # Update mappings for the new items
+            self.play_queue_item_ids.update(new_item_mappings)
+
+            LOGGER.info(
+                f"Replaced {len(remaining_tracks)} tracks after current track "
+                f"(index {current_index})"
+            )
+        else:
+            # No tracks after current - clear remaining queue
+            LOGGER.debug("No tracks after current track in Plex queue")
+
+        # Rebuild complete item ID mappings from Plex queue
+        # Keep mappings for tracks from index 0 to current_index unchanged
+        for i, item in enumerate(playqueue.items):
+            play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None
+            if play_queue_item_id:
+                self.play_queue_item_ids[i] = play_queue_item_id
+
+    async def handle_refresh_play_queue(self, request: web.Request) -> web.Response:
+        """
+        Handle refreshPlayQueue command from Plex controller.
+
+        This is called when the play queue is modified (items added, removed, reordered).
+        We need to sync the entire updated queue state to MA while preserving playback.
+        """
+        try:
+            play_queue_id = request.query.get("playQueueID")
+
+            if not play_queue_id:
+                return web.Response(status=400, text="Missing 'playQueueID' parameter")
+
+            # Log all query parameters to understand what Plex sends
+            LOGGER.info(
+                f"Received refreshPlayQueue command - playQueueID: {play_queue_id}, "
+                f"params: {dict(request.query)}"
+            )
+
+            # Verify this is our active play queue
+            if self.play_queue_id != play_queue_id:
+                LOGGER.warning(
+                    f"Refresh requested for queue {play_queue_id} but active queue is "
+                    f"{self.play_queue_id}"
+                )
+                return web.Response(
+                    status=409,
+                    text=(
+                        f"Requested playQueueID {play_queue_id} does not match "
+                        f"active queue {self.play_queue_id}"
+                    ),
+                )
+
+            # Update the play queue version (increments on each refresh)
+            self.play_queue_version += 1
+
+            # Use plexapi to fetch the updated play queue
+            def fetch_queue() -> PlayQueue:
+                return PlayQueue.get(self.provider._plex_server, playQueueID=play_queue_id)
+
+            playqueue = await asyncio.to_thread(fetch_queue)
+
+            if not playqueue or not playqueue.items:
+                LOGGER.error("Failed to refresh play queue - queue is empty or not found")
+                return web.Response(status=404, text="Play queue not found")
+
+            # Get current MA queue state
+            player_id = self._ma_player_id
+            if not player_id:
+                LOGGER.error("No player assigned to this server")
+                return web.Response(status=500, text="No player assigned")
+
+            # disable shuffle to avoid infinite loop
+            self.provider.mass.player_queues.set_shuffle(player_id, False)
+            ma_queue = self.provider.mass.player_queues.get(player_id)
+            if not ma_queue:
+                LOGGER.error(f"MA queue not found for player {player_id}")
+                return web.Response(status=500, text="MA queue not found")
+
+            # Get current playback state
+            current_index = ma_queue.current_index
+
+            # Get MA queue item count
+            ma_queue_items = self.provider.mass.player_queues.items(player_id)
+            ma_queue_count = len(ma_queue_items) if ma_queue_items else 0
+
+            LOGGER.debug(
+                f"Queue refresh: Current index={current_index}, "
+                f"MA has {ma_queue_count} items, Plex has {len(playqueue.items)} items"
+            )
+
+            # If nothing is playing, replace the entire queue
+            if current_index is None:
+                LOGGER.debug("No track currently playing, replacing entire queue")
+                await self._replace_entire_queue(player_id, playqueue)
+            else:
+                # Something is playing - update only the remaining queue items
+                LOGGER.debug(
+                    f"Track at index {current_index} is playing, "
+                    f"replacing only items after current track"
+                )
+                await self._replace_remaining_queue(player_id, playqueue, current_index)
+
+            LOGGER.info(
+                f"Refreshed play queue {play_queue_id} - now has {len(playqueue.items)} items"
+            )
+
+            # Update tracked state to prevent sync loop
+            # Get what's actually in MA queue after the refresh
+            queue_items_after = self.provider.mass.player_queues.items(player_id)
+            synced_keys = []
+            for item in queue_items_after:
+                if item.media_item:
+                    for mapping in item.media_item.provider_mappings:
+                        if mapping.provider_instance == self.provider.instance_id:
+                            synced_keys.append(mapping.item_id)
+                            break
+            self._last_synced_ma_queue_length = len(synced_keys)
+            self._last_synced_ma_queue_keys = synced_keys
+
+            return web.Response(status=200)
+
+        except Exception as e:
+            LOGGER.exception(f"Error handling refreshPlayQueue: {e}")
+            return web.Response(status=500, text=str(e))
+
+    async def handle_create_play_queue(self, request: web.Request) -> web.Response:
+        """
+        Handle createPlayQueue command from Plex controller.
+
+        Creates a new play queue from a URI (album, playlist, artist tracks, etc.)
+        and optionally applies shuffle.
+        """
+        try:
+            uri = request.query.get("uri")
+            shuffle = request.query.get("shuffle", "0") == "1"
+            continuous = request.query.get("continuous", "0") == "1"
+
+            if not uri:
+                return web.Response(status=400, text="Missing 'uri' parameter")
+
+            LOGGER.info(f"Received createPlayQueue command - uri: {uri}, shuffle: {shuffle}")
+
+            # Use the assigned player for this server instance
+            player_id = self._ma_player_id
+            if not player_id:
+                return web.Response(status=500, text="No player assigned to this server")
+
+            # Use plexapi to create play queue
+            def create_queue() -> PlayQueue:
+                # Fetch the item from URI first
+                item = self.provider._plex_server.fetchItem(uri)
+                # Create play queue from the item
+                return PlayQueue.create(
+                    self.provider._plex_server,
+                    item,
+                    shuffle=1 if shuffle else 0,
+                    continuous=1 if continuous else 0,
+                )
+
+            playqueue = await asyncio.to_thread(create_queue)
+
+            if playqueue and playqueue.items:
+                # Extract play queue ID from response
+                self.play_queue_id = str(playqueue.playQueueID)
+                self.play_queue_version = 1
+
+                LOGGER.info(
+                    f"Created play queue {self.play_queue_id} with {len(playqueue.items)} items"
+                )
+
+                # Load tracks from the created queue
+                self.play_queue_item_ids = {}
+                tracks_to_queue = []
+
+                for i, item in enumerate(playqueue.items):
+                    track_key = item.key if hasattr(item, "key") else None
+                    play_queue_item_id = (
+                        item.playQueueItemID if hasattr(item, "playQueueItemID") else None
+                    )
+
+                    if track_key:
+                        try:
+                            # Fetch track from MA
+                            track = await self.provider.get_track(track_key)
+                            tracks_to_queue.append(track)
+
+                            # Store play queue item ID mapping
+                            if play_queue_item_id:
+                                self.play_queue_item_ids[len(tracks_to_queue) - 1] = (
+                                    play_queue_item_id
+                                )
+                        except Exception as e:
+                            LOGGER.debug(f"Could not fetch track {track_key}: {e}")
+                            continue
+
+                if tracks_to_queue:
+                    # Queue all tracks
+                    await self.provider.mass.player_queues.play_media(
+                        queue_id=player_id,
+                        media=tracks_to_queue,  # type: ignore[arg-type]
+                        option=QueueOption.REPLACE,
+                    )
+
+                    # Apply shuffle if requested (Plex may have already shuffled server-side)
+                    if shuffle:
+                        self.provider.mass.player_queues.set_shuffle(player_id, shuffle)
+                else:
+                    LOGGER.error("No valid tracks in created play queue")
+                    return web.Response(status=500, text="Failed to load tracks from play queue")
+            else:
+                LOGGER.error("Failed to create play queue or queue is empty")
+                return web.Response(status=500, text="Failed to create play queue")
+
+            # Broadcast timeline update
+            await self._broadcast_timeline()
+            return web.Response(status=200)
+
+        except Exception as e:
+            LOGGER.exception(f"Error handling createPlayQueue: {e}")
+            return web.Response(status=500, text=str(e))
+
+    async def _resolve_plex_item(self, key: str) -> object:
+        """Resolve a Plex key to a Music Assistant media item."""
+        # Determine item type from the key format
+        if "/library/metadata/" in key:
+            # Could be track, album, or artist
+            # Try to fetch as track first
+            try:
+                return await self.provider.get_track(key)
+            except Exception as exc:
+                LOGGER.debug(f"Failed to resolve Plex item as track for key '{key}': {exc}")
+
+            # Try as album
+            try:
+                return await self.provider.get_album(key)
+            except Exception as exc:
+                LOGGER.debug(f"Failed to resolve Plex item as album for key '{key}': {exc}")
+
+            # Try as artist
+            try:
+                return await self.provider.get_artist(key)
+            except Exception:
+                raise ValueError(f"Could not resolve Plex item: {key}") from None
+
+        elif "/playlists/" in key:
+            return await self.provider.get_playlist(key)
+        else:
+            raise ValueError(f"Unknown Plex key format: {key}")
+
+    async def handle_pause(self, request: web.Request) -> web.Response:
+        """Handle pause command (test-client.py line 98-101)."""
+        self._updating_from_plex = True
+        try:
+            if self._ma_player_id:
+                await self.provider.mass.players.cmd_pause(self._ma_player_id)
+            await self._broadcast_timeline()
+            return web.Response(status=200)
+        finally:
+            self._updating_from_plex = False
+
+    async def handle_play(self, request: web.Request) -> web.Response:
+        """Handle play/resume command (test-client.py line 103-106)."""
+        self._updating_from_plex = True
+        try:
+            if self._ma_player_id:
+                await self.provider.mass.players.cmd_play(self._ma_player_id)
+            await self._broadcast_timeline()
+            return web.Response(status=200)
+        finally:
+            self._updating_from_plex = False
+
+    async def handle_stop(self, request: web.Request) -> web.Response:
+        """Handle stop command - stops playback and clears the queue."""
+        self._updating_from_plex = True
+        try:
+            if self._ma_player_id:
+                # Clear the queue (which also stops playback)
+                self.provider.mass.player_queues.clear(self._ma_player_id)
+
+                # Reset play queue tracking since the queue is now cleared
+                self.play_queue_id = None
+                self.play_queue_item_ids = {}
+
+            await self._broadcast_timeline()
+            return web.Response(status=200)
+        finally:
+            self._updating_from_plex = False
+
+    async def handle_skip_next(self, request: web.Request) -> web.Response:
+        """Handle skip next command."""
+        self._updating_from_plex = True
+        try:
+            if self._ma_player_id:
+                await self.provider.mass.player_queues.next(self._ma_player_id)
+            await self._broadcast_timeline()
+            return web.Response(status=200)
+        finally:
+            self._updating_from_plex = False
+
+    async def handle_skip_previous(self, request: web.Request) -> web.Response:
+        """Handle skip previous command."""
+        self._updating_from_plex = True
+        try:
+            if self._ma_player_id:
+                await self.provider.mass.player_queues.previous(self._ma_player_id)
+            await self._broadcast_timeline()
+            return web.Response(status=200)
+        finally:
+            self._updating_from_plex = False
+
+    async def handle_step_forward(self, request: web.Request) -> web.Response:
+        """Handle step forward command (small skip forward)."""
+        self._updating_from_plex = True
+        try:
+            if self._ma_player_id:
+                # Step forward 30 seconds
+                player = self.provider.mass.players.get(self._ma_player_id)
+                queue = self.provider.mass.player_queues.get(self._ma_player_id)
+                if player and player.corrected_elapsed_time is not None:
+                    max_duration = player.corrected_elapsed_time + 30
+                    if queue and queue.current_item and queue.current_item.media_item:
+                        max_duration = min(
+                            player.corrected_elapsed_time + 30,
+                            queue.current_item.media_item.duration
+                            or (player.corrected_elapsed_time + 30),
+                        )
+                    await self.provider.mass.players.cmd_seek(self._ma_player_id, int(max_duration))
+                    # Wait briefly for player state to update
+                    await asyncio.sleep(0.1)
+            await self._broadcast_timeline()
+            return web.Response(status=200)
+        finally:
+            self._updating_from_plex = False
+
+    async def handle_step_back(self, request: web.Request) -> web.Response:
+        """Handle step back command (small skip backward)."""
+        self._updating_from_plex = True
+        try:
+            if self._ma_player_id:
+                # Step back 10 seconds
+                player = self.provider.mass.players.get(self._ma_player_id)
+                if player and player.corrected_elapsed_time is not None:
+                    new_position = max(0, player.corrected_elapsed_time - 10)
+                    await self.provider.mass.players.cmd_seek(self._ma_player_id, int(new_position))
+                    # Wait briefly for player state to update
+                    await asyncio.sleep(0.1)
+            await self._broadcast_timeline()
+            return web.Response(status=200)
+        finally:
+            self._updating_from_plex = False
+
+    async def handle_skip_to(self, request: web.Request) -> web.Response:
+        """Handle skip to specific queue item."""
+        key = request.query.get("key")
+        if not self._ma_player_id or not key:
+            return web.Response(status=400, text="Missing player ID or key")
+
+        self._updating_from_plex = True
+        try:
+            ma_index = None
+
+            # Check if key is a play queue item ID (numeric) or a library path
+            if key.isdigit():
+                # Key is a play queue item ID
+                play_queue_item_id = int(key)
+
+                # Find the MA queue index for this play queue item ID
+                for idx, pq_item_id in self.play_queue_item_ids.items():
+                    if pq_item_id == play_queue_item_id:
+                        ma_index = idx
+                        break
+
+                if ma_index is None:
+                    LOGGER.warning(
+                        f"Could not find MA queue index for play queue item ID: "
+                        f"{play_queue_item_id}"
+                    )
+                    return web.Response(status=404, text="Queue item not found")
+
+                LOGGER.info(
+                    f"Skipping to queue index {ma_index} (play queue item ID: {play_queue_item_id})"
+                )
+            else:
+                # Key is a library path (e.g., "/library/metadata/856761")
+                # Find the track in the MA queue by matching the Plex key
+                queue_items = self.provider.mass.player_queues.items(self._ma_player_id)
+                if not queue_items:
+                    return web.Response(status=404, text="Queue is empty")
+
+                for idx, item in enumerate(queue_items):
+                    if not item.media_item:
+                        continue
+
+                    # Find Plex mapping for this track
+                    for mapping in item.media_item.provider_mappings:
+                        if (
+                            mapping.provider_instance == self.provider.instance_id
+                            and mapping.item_id == key
+                        ):
+                            ma_index = idx
+                            break
+
+                    if ma_index is not None:
+                        break
+
+                if ma_index is None:
+                    LOGGER.warning(f"Could not find track with key {key} in MA queue")
+                    return web.Response(status=404, text="Track not found in queue")
+
+                LOGGER.info(f"Skipping to queue index {ma_index} (track key: {key})")
+
+            # Skip to this index in the MA queue
+            await self.provider.mass.player_queues.play_index(self._ma_player_id, ma_index)
+
+            await self._broadcast_timeline()
+            return web.Response(status=200)
+
+        except Exception as e:
+            LOGGER.exception(f"Error handling skipTo: {e}")
+            return web.Response(status=500, text=str(e))
+        finally:
+            self._updating_from_plex = False
+
+    async def handle_seek_to(self, request: web.Request) -> web.Response:
+        """Handle seek command."""
+        self._updating_from_plex = True
+        try:
+            offset_ms = int(request.query.get("offset", 0))
+            if self._ma_player_id:
+                await self.provider.mass.players.cmd_seek(self._ma_player_id, int(offset_ms / 1000))
+                # Wait briefly for player state to update
+                await asyncio.sleep(0.1)
+            await self._broadcast_timeline()
+            return web.Response(status=200)
+        finally:
+            self._updating_from_plex = False
+
+    async def handle_set_parameters(self, request: web.Request) -> web.Response:
+        """Handle parameter changes (volume, shuffle, repeat)."""
+        if not self._ma_player_id:
+            return web.Response(status=200)
+
+        self._updating_from_plex = True
+        try:
+            if "volume" in request.query:
+                volume = int(request.query["volume"])
+                await self.provider.mass.players.cmd_volume_set(self._ma_player_id, volume)
+
+            if "shuffle" in request.query:
+                # Plex sends shuffle as "0" or "1"
+                shuffle = request.query["shuffle"] == "1"
+                self.provider.mass.player_queues.set_shuffle(self._ma_player_id, shuffle)
+
+            if "repeat" in request.query:
+                # Plex repeat: 0=off, 1=repeat one, 2=repeat all
+                repeat_value = int(request.query["repeat"])
+
+                # Map Plex repeat to MA repeat mode
+                if repeat_value == 0:
+                    # Repeat off
+                    self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.OFF)
+                elif repeat_value == 1:
+                    # Repeat one track
+                    self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.ONE)
+                elif repeat_value == 2:
+                    # Repeat all
+                    self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.ALL)
+
+            await self._broadcast_timeline()
+            return web.Response(status=200)
+        finally:
+            self._updating_from_plex = False
+
+    async def handle_options(self, request: web.Request) -> web.Response:
+        """Handle OPTIONS requests for CORS (like test-client.py)."""
+        return web.Response(
+            status=200,
+            headers={
+                "Access-Control-Allow-Origin": "*",
+                "Access-Control-Allow-Methods": "GET, POST, OPTIONS",
+                "Access-Control-Allow-Headers": "*",
+            },
+        )
+
+    async def handle_resources(self, request: web.Request) -> web.Response:
+        """Return player information (matching test-client.py format exactly)."""
+        # Get player name
+        player_name = "Music Assistant"
+        if self._ma_player_id:
+            player = self.provider.mass.players.get(self._ma_player_id)
+            if player:
+                player_name = player.display_name
+
+        # Get player state
+        state = "stopped"
+        if self._ma_player_id:
+            player = self.provider.mass.players.get(self._ma_player_id)
+            if player and player.state:
+                state_value = (
+                    player.state.value if hasattr(player.state, "value") else str(player.state)
+                )
+                if state_value in ["playing", "paused"]:
+                    state = state_value
+
+        local_ip = self.provider.mass.streams.publish_ip
+        version = self.provider.mass.version if self.provider.mass.version != "0.0.0" else "1.0.0"
+
+        # Match test-client.py format exactly
+        xml = f"""<?xml version="1.0" encoding="UTF-8"?>
+<MediaContainer>
+    <Player title="{player_name}"
+            protocol="plex"
+            protocolVersion="1"
+            protocolCapabilities="timeline,playback,navigation,playqueues"
+            machineIdentifier="{self.client_id}"
+            product="Music Assistant"
+            platform="{platform.system()}"
+            platformVersion="{platform.release()}"
+            deviceClass="{self.device_class}"
+            state="{state}"
+            address="{local_ip}"
+            port="{self.port}"
+            version="{version}"
+            provides="client,player,pubsub-player">
+        <Connection protocol="http" address="{local_ip}" port="{self.port}"
+                    uri="http://{local_ip}:{self.port}" local="1"/>
+    </Player>
+</MediaContainer>"""
+        return web.Response(
+            text=xml, content_type="text/xml", headers={"Access-Control-Allow-Origin": "*"}
+        )
+
+    def _build_timeline_attributes(
+        self,
+        track: Any,
+        state: str,
+        duration: int,
+        time: int,
+        volume: int,
+        shuffle: int,
+        repeat: int,
+        controllable: str,
+        queue: Any | None,
+    ) -> list[str]:
+        """Build timeline attributes for a playing track.
+
+        :param track: The current track media item.
+        :param state: Playback state (playing, paused, etc.).
+        :param duration: Track duration in milliseconds.
+        :param time: Current playback time in milliseconds.
+        :param volume: Volume level (0-100).
+        :param shuffle: Shuffle state (0 or 1).
+        :param repeat: Repeat mode (0=off, 1=one, 2=all).
+        :param controllable: Controllable features string.
+        :param queue: The MA queue object.
+        :return: List of timeline attribute strings.
+        """
+        # Get Plex key and ratingKey
+        key = None
+        rating_key = None
+        for mapping in track.provider_mappings:
+            if mapping.provider_instance == self.provider.instance_id:
+                key = mapping.item_id
+                rating_key = key.split("/")[-1]
+                break
+
+        if not key:
+            return []
+
+        # Server identification
+        plex_url = urlparse(self.provider._baseurl)
+        machine_identifier = self.provider._plex_server.machineIdentifier
+        address = plex_url.hostname
+        port = plex_url.port or (443 if plex_url.scheme == "https" else 32400)
+        protocol = plex_url.scheme
+
+        # Build timeline attributes
+        attrs = [
+            f'state="{state}"',
+            f'duration="{duration}"',
+            f'time="{time}"',
+            f'ratingKey="{rating_key}"',
+            f'key="{key}"',
+        ]
+
+        # Add play queue info if available
+        if self.play_queue_id and queue:
+            if queue.current_index is not None:
+                play_queue_item_id = self.play_queue_item_ids.get(
+                    queue.current_index, queue.current_index + 1
+                )
+                attrs.append(f'playQueueItemID="{play_queue_item_id}"')
+            attrs.append(f'playQueueID="{self.play_queue_id}"')
+            attrs.append(f'playQueueVersion="{self.play_queue_version}"')
+            attrs.append(f'containerKey="/playQueues/{self.play_queue_id}"')
+
+        # Add standard attributes
+        attrs.extend(
+            [
+                'type="music"',
+                f'volume="{volume}"',
+                f'shuffle="{shuffle}"',
+                f'repeat="{repeat}"',
+                f'controllable="{controllable}"',
+                f'machineIdentifier="{machine_identifier}"',
+                f'address="{address}"',
+                f'port="{port}"',
+                f'protocol="{protocol}"',
+            ]
+        )
+
+        return attrs
+
+    async def _build_timeline_xml(
+        self, include_metadata: bool = False, command_id: str = "0"
+    ) -> str:
+        """Build timeline XML from current Music Assistant player state."""
+        player_id = self._ma_player_id
+
+        # Get MA player and queue
+        player = self.provider.mass.players.get(player_id) if player_id else None
+        queue = self.provider.mass.player_queues.get(player_id) if player_id else None
+
+        # Controllable features for music
+        controllable = (
+            "volume,repeat,skipPrevious,seekTo,stepBack,stepForward,stop,playPause,shuffle,skipNext"
+        )
+
+        # Map MA playback state to Plex state (stopped, paused, playing, buffering, error)
+        state = "stopped"
+        if player and player.playback_state:
+            state_value = (
+                player.playback_state.value
+                if hasattr(player.playback_state, "value")
+                else str(player.playback_state)
+            )
+
+            # Map MA states to Plex states
+            if state_value == "playing":
+                state = "playing"
+            elif state_value == "paused":
+                state = "paused"
+            elif state_value == "buffering":
+                state = "buffering"
+            elif state_value == "idle":
+                # Idle with a current track = paused, idle without track = stopped
+                state = (
+                    "paused"
+                    if queue and queue.current_item and queue.current_item.media_item
+                    else "stopped"
+                )
+            else:
+                state = "stopped"
+
+        # Get volume (0-100)
+        volume = int(player.volume_level) if player and player.volume_level else 100
+
+        # Get shuffle (0/1) and repeat (0=off, 1=one, 2=all)
+        shuffle = 0
+        repeat = 0
+        if queue:
+            shuffle = 1 if queue.shuffle_enabled else 0
+            if hasattr(queue, "repeat_mode"):
+                repeat_mode = queue.repeat_mode
+                if hasattr(repeat_mode, "value"):
+                    repeat_value = repeat_mode.value
+                    if repeat_value == "one":
+                        repeat = 1
+                    elif repeat_value == "all":
+                        repeat = 2
+
+        # Build music timeline
+        if (
+            state in ["playing", "paused"]
+            and queue
+            and queue.current_item
+            and queue.current_item.media_item
+        ):
+            track = queue.current_item.media_item
+
+            # Duration in milliseconds
+            duration = round(track.duration * 1000) if track.duration else 0
+
+            # Current playback time in milliseconds
+            time = (
+                round(player.corrected_elapsed_time * 1000)
+                if player and player.corrected_elapsed_time is not None
+                else 0
+            )
+
+            # Build timeline attributes
+            attrs = self._build_timeline_attributes(
+                track, state, duration, time, volume, shuffle, repeat, controllable, queue
+            )
+
+            if attrs:
+                music_timeline = f"<Timeline {' '.join(attrs)}/>"
+            else:
+                # No Plex mapping, send basic timeline with actual state
+                music_timeline = (
+                    f'<Timeline state="{state}" time="{time}" type="music" volume="{volume}" '
+                    f'shuffle="{shuffle}" repeat="{repeat}" controllable="{controllable}"/>'
+                )
+        else:
+            # No current track - use actual state
+            time = (
+                round(player.corrected_elapsed_time * 1000)
+                if player and player.corrected_elapsed_time is not None
+                else 0
+            )
+            music_timeline = (
+                f'<Timeline state="{state}" time="{time}" type="music" volume="{volume}" '
+                f'shuffle="{shuffle}" repeat="{repeat}" controllable="{controllable}"/>'
+            )
+
+        # Video and photo timelines (always stopped for music player)
+        video_timeline = '<Timeline type="video" state="stopped"/>'
+        photo_timeline = '<Timeline type="photo" state="stopped"/>'
+
+        # Combine all timelines
+        return (
+            f'<MediaContainer commandID="{command_id}">'
+            f"{music_timeline}{video_timeline}{photo_timeline}"
+            f"</MediaContainer>"
+        )
+
+    async def _handle_player_event(self, event: MassEvent) -> None:
+        """Handle player state change events."""
+        if not self._ma_player_id or event.object_id != self._ma_player_id:
+            return
+
+        # Skip if we're the ones making the changes
+        if self._updating_from_plex:
+            return
+
+        try:
+            # Send timeline to Plex server (for activity tracking)
+            await self._send_timeline_to_server()
+
+            # Broadcast timeline to subscribed controllers
+            # Timeline will be built from current MA player state
+            await self._broadcast_timeline()
+        except Exception as e:
+            LOGGER.debug(f"Error handling player event: {e}")
+
+    async def _handle_queue_event(self, event: MassEvent) -> None:
+        """Handle queue change events."""
+        if not self._ma_player_id or event.object_id != self._ma_player_id:
+            return
+
+        # Skip if we're the ones making the changes
+        if self._updating_from_plex:
+            return
+
+        try:
+            # Send timeline to Plex server (for activity tracking)
+            await self._send_timeline_to_server()
+
+            # Broadcast timeline to subscribed controllers
+            # Timeline will be built from current MA player state
+            await self._broadcast_timeline()
+        except Exception as e:
+            LOGGER.debug(f"Error handling queue event: {e}")
+
+    async def _handle_queue_items_updated(self, event: MassEvent) -> None:
+        """Handle queue items being added/removed/reordered."""
+        if not self._ma_player_id or event.object_id != self._ma_player_id:
+            return
+
+        # Skip if we're the ones making the changes
+        if self._updating_from_plex:
+            return
+
+        # Get current MA queue state
+        queue_items = self.provider.mass.player_queues.items(self._ma_player_id)
+        if not queue_items:
+            return
+
+        current_keys = []
+        for item in queue_items:
+            if not item.media_item:
+                continue
+            # Find Plex mapping
+            for mapping in item.media_item.provider_mappings:
+                if mapping.provider_instance == self.provider.instance_id:
+                    current_keys.append(mapping.item_id)
+                    break
+
+        # Check if queue actually changed from what we last synced FROM Plex
+        if (
+            len(current_keys) == self._last_synced_ma_queue_length
+            and current_keys == self._last_synced_ma_queue_keys
+        ):
+            # Queue hasn't changed from last sync, skip
+            LOGGER.debug("MA queue matches last synced state, skipping Plex sync")
+            return
+
+        LOGGER.info(
+            f"MA queue changed: {self._last_synced_ma_queue_length} -> {len(current_keys)} items"
+        )
+
+        # (Re)create Plex PlayQueue from MA queue
+        try:
+            await self._create_plex_playqueue_from_ma()
+            # Update tracked state
+            self._last_synced_ma_queue_length = len(current_keys)
+            self._last_synced_ma_queue_keys = current_keys
+        except Exception as e:
+            LOGGER.debug(f"Error creating Plex PlayQueue: {e}")
+
+        # Broadcast timeline update
+        try:
+            await self._broadcast_timeline()
+        except Exception as e:
+            LOGGER.debug(f"Error broadcasting timeline: {e}")
+
+    async def _create_plex_playqueue_from_ma(self) -> None:
+        """Create a new Plex PlayQueue from current MA queue."""
+        ma_queue = self.provider.mass.player_queues.get(self._ma_player_id)  # type: ignore[arg-type]
+        queue_items = self.provider.mass.player_queues.items(self._ma_player_id)  # type: ignore[arg-type]
+
+        if not ma_queue or not queue_items:
+            return
+
+        # Fetch Plex items for all tracks in MA queue
+        async def fetch_plex_item(plex_key: str) -> object | None:
+            """Fetch a single Plex item."""
+            try:
+
+                def fetch_item() -> object:
+                    return self.plex_server.fetchItem(plex_key)
+
+                return await asyncio.to_thread(fetch_item)
+            except Exception as e:
+                LOGGER.debug(f"Failed to fetch Plex item {plex_key}: {e}")
+                return None
+
+        # Collect all fetch tasks
+        fetch_tasks = []
+        for item in queue_items:
+            if not item.media_item:
+                continue
+
+            # Find Plex mapping
+            plex_key = None
+            for mapping in item.media_item.provider_mappings:
+                if mapping.provider_instance == self.provider.instance_id:
+                    plex_key = mapping.item_id
+                    break
+
+            if plex_key:
+                fetch_tasks.append(fetch_plex_item(plex_key))
+
+        # Fetch all items concurrently
+        plex_items = []
+        if fetch_tasks:
+            fetched_items = await asyncio.gather(*fetch_tasks, return_exceptions=True)
+            plex_items = [item for item in fetched_items if item is not None]
+
+        if not plex_items:
+            LOGGER.debug("No Plex tracks in MA queue, skipping PlayQueue creation")
+            return
+
+        # Determine which track should be selected (currently playing)
+        start_item = None
+        if ma_queue.current_index is not None and ma_queue.current_index < len(plex_items):
+            start_item = plex_items[ma_queue.current_index]
+
+        # Create Plex PlayQueue - don't pass shuffle since MA queue is already in desired order
+        def create_queue() -> PlayQueue:
+            return PlayQueue.create(
+                self.plex_server,
+                items=plex_items,
+                startItem=start_item,
+                shuffle=0,  # Don't shuffle, plex_items is already in MA queue order
+                continuous=1,
+            )
+
+        try:
+            playqueue = await asyncio.to_thread(create_queue)
+
+            if playqueue:
+                self.play_queue_id = str(playqueue.playQueueID)
+                self.play_queue_version = playqueue.playQueueVersion
+
+                # Build item ID mappings
+                self.play_queue_item_ids = {}
+                for i, item in enumerate(playqueue.items):
+                    if hasattr(item, "playQueueItemID"):
+                        self.play_queue_item_ids[i] = item.playQueueItemID
+
+                LOGGER.info(
+                    f"Created Plex PlayQueue {self.play_queue_id} with {len(plex_items)} tracks"
+                )
+        except Exception as e:
+            LOGGER.exception(f"Error creating Plex PlayQueue: {e}")
+
+    async def _send_timeline(self, client_id: str) -> None:
+        """Send timeline update to specific controller."""
+        subscription = self.subscriptions.get(client_id)
+        if not subscription:
+            return
+
+        timeline_xml = await self._build_timeline_xml()
+
+        try:
+            await self.provider.mass.http_session.post(
+                f"{subscription['url']}/:/timeline",
+                data=timeline_xml,
+                headers={
+                    "X-Plex-Client-Identifier": self.client_id,
+                    "Content-Type": "text/xml",
+                },
+                timeout=ClientTimeout(total=5),
+            )
+            # Update last_update timestamp on successful send
+            subscription["last_update"] = time.time()
+        except Exception as e:
+            LOGGER.debug(f"Failed to send timeline to {client_id}: {e}")
+
+    async def _send_timeline_to_server(self) -> None:
+        """Send timeline update to Plex server for activity tracking."""
+        if not self._ma_player_id:
+            return
+
+        try:
+            player = self.provider.mass.players.get(self._ma_player_id)
+            queue = self.provider.mass.player_queues.get(self._ma_player_id)
+
+            if (
+                not player
+                or not queue
+                or not queue.current_item
+                or not queue.current_item.media_item
+            ):
+                return
+
+            track = queue.current_item.media_item
+
+            # Find Plex mapping
+            plex_key = None
+            for mapping in track.provider_mappings:
+                if mapping.provider_instance == self.provider.instance_id:
+                    plex_key = mapping.item_id
+                    break
+
+            if not plex_key:
+                return
+
+            # Extract rating key from plex_key (e.g., "/library/metadata/12345" -> "12345")
+            rating_key = plex_key.split("/")[-1]
+
+            # Get playback state
+            state_value = (
+                player.playback_state.value
+                if hasattr(player.playback_state, "value")
+                else str(player.playback_state)
+            )
+
+            # Map to Plex state
+            if state_value == "playing":
+                plex_state = "playing"
+            elif state_value == "paused":
+                plex_state = "paused"
+            else:
+                plex_state = "stopped"
+
+            # Get position in milliseconds
+            position_ms = (
+                round(player.corrected_elapsed_time * 1000)
+                if player.corrected_elapsed_time is not None
+                else 0
+            )
+            duration_ms = round(track.duration * 1000) if track.duration else 0
+
+            # Get play queue info if available
+            container_key = ""
+            play_queue_item_id = ""
+            if self.play_queue_id:
+                container_key = f"/playQueues/{self.play_queue_id}"
+                if queue.current_index is not None:
+                    play_queue_item_id = str(
+                        self.play_queue_item_ids.get(queue.current_index, queue.current_index + 1)
+                    )
+
+            # Build timeline params (only Plex timeline data)
+            params = {
+                "ratingKey": rating_key,
+                "key": plex_key,
+                "state": plex_state,
+                "time": str(position_ms),
+                "duration": str(duration_ms),
+            }
+
+            # Add play queue info if available
+            if container_key:
+                params["containerKey"] = container_key
+            if play_queue_item_id:
+                params["playQueueItemID"] = play_queue_item_id
+
+            def send_timeline() -> None:
+                # Pass session headers to identify this specific player instance
+                self.plex_server.query("/:/timeline", params=params, headers=self.headers)
+
+            await asyncio.to_thread(send_timeline)
+
+        except Exception as e:
+            LOGGER.debug(f"Failed to send timeline to Plex server: {e}")
+
+    async def _broadcast_timeline(self) -> None:
+        """Send timeline to all subscribed controllers."""
+        current_time = time.time()
+        stale_clients = []
+        for client_id, sub in self.subscriptions.items():
+            try:
+                last_update = float(sub["last_update"])  # type: ignore[arg-type]
+                if current_time - last_update > 90:
+                    stale_clients.append(client_id)
+            except (ValueError, TypeError):
+                # If conversion fails, treat client as stale
+                LOGGER.debug(f"Invalid last_update for client {client_id}, treating as stale")
+                stale_clients.append(client_id)
+
+        for client_id in stale_clients:
+            del self.subscriptions[client_id]
+
+        await asyncio.gather(
+            *(self._send_timeline(client_id) for client_id in list(self.subscriptions.keys())),
+            return_exceptions=True,  # Don't fail all if one fails
+        )
+
+    # for debugging purposes only
+    # async def handle_unknown(self, request: web.Request) -> web.Response:
+    #     """Catch-all handler for unexpected or unsupported paths."""
+    #     LOGGER.debug(
+    #         "Unhandled request: %s %s from %s",
+    #         request.method,
+    #         request.path,
+    #         request.remote,
+    #     )
+    #
+    #     # You can log query/body if needed (be careful not to leak tokens)
+    #     if request.query:
+    #         LOGGER.debug("Query params for %s: %s", request.path, dict(request.query))
+    #     try:
+    #         data = await request.text()
+    #         if data:
+    #             LOGGER.debug("Body for %s: %s", request.path, data)
+    #     except Exception as e:
+    #         LOGGER.debug("Could not read request body: %s", e)
+    #
+    #     return web.Response(status=404, text=f"Unhandled path: {request.path}")