Wrap Song in a broader Playback state object with stuff like volume and repeat mode

This commit is contained in:
Danielle McLean 2024-07-26 09:53:17 +10:00
parent 085bca7974
commit 68609f3d07
Signed by: 00dani
GPG key ID: 6854781A0488421C
24 changed files with 765 additions and 245 deletions

View file

@ -0,0 +1,37 @@
from ...config.model import MpdConfig
from ...playback import Playback
from ...playback.queue import Queue
from ...playback.settings import Settings, to_oneshot
from ...tools.types import option_fmap
from ..types import MpdState
from .to_song import to_song
def to_queue(mpd: MpdState) -> Queue:
return Queue(
current=int(mpd.current["pos"]),
next=int(mpd.status["nextsong"]),
length=int(mpd.status["playlistlength"]),
)
def to_settings(mpd: MpdState) -> Settings:
return Settings(
volume=option_fmap(int, mpd.status.get("volume")),
repeat=mpd.status["repeat"] == "1",
random=mpd.status["random"] == "1",
single=to_oneshot(mpd.status["single"]),
consume=to_oneshot(mpd.status["consume"]),
)
def to_playback(config: MpdConfig, mpd: MpdState) -> Playback:
partition = mpd.status["partition"]
queue = to_queue(mpd)
settings = to_settings(mpd)
return Playback(
partition=partition,
queue=queue,
settings=settings,
song=to_song(config, mpd),
)

View file

@ -0,0 +1,51 @@
from pathlib import Path
from yarl import URL
from ...config.model import MpdConfig
from ...playback.state import PlaybackState
from ...song import Song, Stopped, to_artwork, to_brainz
from ...tools.types import option_fmap, un_maybe_plural
from ..types import MpdState
def file_to_url(config: MpdConfig, file: str) -> URL | None:
url = URL(file)
if url.scheme != "":
# We already got an absolute URL - probably a stream? - so we can just return it.
return url
if not config.music_directory:
# We have a relative song URI, but we can't make it absolute since no music directory is configured.
return None
# Prepend the configured music directory, then turn the whole path into a file:// URL.
abs_file = config.music_directory / file
return URL(abs_file.as_uri())
def to_song(config: MpdConfig, mpd: MpdState) -> Song | Stopped:
state = PlaybackState(mpd.status["state"])
if state == PlaybackState.stop:
return Stopped()
file = mpd.current["file"]
url = file_to_url(config, file)
return Song(
state=state,
file=Path(file),
url=url,
title=mpd.current.get("title"),
artist=un_maybe_plural(mpd.current.get("artist")),
album=un_maybe_plural(mpd.current.get("album")),
album_artist=un_maybe_plural(mpd.current.get("albumartist")),
composer=un_maybe_plural(mpd.current.get("composer")),
genre=un_maybe_plural(mpd.current.get("genre")),
track=option_fmap(int, mpd.current.get("track")),
disc=option_fmap(int, mpd.current.get("disc")),
duration=option_fmap(float, mpd.status.get("duration")),
elapsed=float(mpd.status["elapsed"]),
musicbrainz=to_brainz(mpd.current),
art=to_artwork(mpd.art),
)

View file

@ -1,6 +1,5 @@
import asyncio
from collections.abc import Iterable
from pathlib import Path
from mpd.asyncio import MPDClient
from mpd.base import CommandError
@ -8,54 +7,15 @@ from rich import print as rprint
from yarl import URL
from ..config.model import MpdConfig
from ..playback import Playback
from ..playback.state import PlaybackState
from ..player import Player
from ..song import PlaybackState, Song, to_artwork, to_brainz
from ..song_receiver import Receiver
from ..tools.types import option_fmap, un_maybe_plural
from .artwork_cache import MpdArtworkCache
from .convert.to_playback import to_playback
from .types import MpdState
def mpd_file_to_uri(config: MpdConfig, file: str) -> URL | None:
url = URL(file)
if url.scheme != "":
# We already got an absolute URL - probably a stream? - so we can just return it.
return url
if not config.music_directory:
# We have a relative song URI, but we can't make it absolute since no music directory is configured.
return None
# Prepend the configured music directory, then turn the whole path into a file:// URL.
abs_file = config.music_directory / file
return URL(abs_file.as_uri())
def mpd_state_to_song(config: MpdConfig, mpd: MpdState) -> Song:
file = mpd.current["file"]
url = mpd_file_to_uri(config, file)
return Song(
state=PlaybackState(mpd.status["state"]),
queue_index=int(mpd.current["pos"]),
queue_length=int(mpd.status["playlistlength"]),
file=Path(file),
url=url,
title=mpd.current.get("title"),
artist=un_maybe_plural(mpd.current.get("artist")),
album=un_maybe_plural(mpd.current.get("album")),
album_artist=un_maybe_plural(mpd.current.get("albumartist")),
composer=un_maybe_plural(mpd.current.get("composer")),
genre=un_maybe_plural(mpd.current.get("genre")),
track=option_fmap(int, mpd.current.get("track")),
disc=option_fmap(int, mpd.current.get("disc")),
duration=option_fmap(float, mpd.status.get("duration")),
elapsed=float(mpd.status["elapsed"]),
musicbrainz=to_brainz(mpd.current),
art=to_artwork(mpd.art),
)
class MpdStateListener(Player):
config: MpdConfig
client: MPDClient
@ -101,22 +61,19 @@ class MpdStateListener(Player):
if starting_idle_count != self.idle_count:
return
if status["state"] == "stop":
print("Nothing playing")
await self.update(None)
return
art = await self.art_cache.get_cached_artwork(current)
if starting_idle_count != self.idle_count:
return
art = None
if status["state"] != "stop":
art = await self.art_cache.get_cached_artwork(current)
if starting_idle_count != self.idle_count:
return
state = MpdState(status, current, art)
song = mpd_state_to_song(self.config, state)
rprint(song)
await self.update(song)
pb = to_playback(self.config, state)
rprint(pb)
await self.update(pb)
async def update(self, song: Song | None) -> None:
await asyncio.gather(*(r.update(song) for r in self.receivers))
async def update(self, playback: Playback) -> None:
await asyncio.gather(*(r.update(playback) for r in self.receivers))
async def get_art(self, file: str) -> bytes | None:
picture = await self.readpicture(file)

View file

@ -0,0 +1,3 @@
from .playback import Playback
__all__ = ("Playback",)

View file

@ -0,0 +1,36 @@
from dataclasses import dataclass
from pydantic import Field
from ..song.song import Song
from ..song.stopped import Stopped
from ..tools.schema.define import schema
from .queue import Queue
from .settings import Settings
@schema("https://static.00dani.me/m/schemata/mpd-now-playable/playback-v1.json")
@dataclass(slots=True, kw_only=True)
class Playback:
#: The MPD partition this playback information came from. Essentially, MPD
#: can act as multiple music player servers simultaneously, distinguished
#: by name. For most users, this will always be "default".
partition: str
#: Stats about MPD's song queue, including the current song and next song's
#: indices in it.
queue: Queue
#: Playback settings such as volume and repeat mode.
settings: Settings
#: Information about the current song itself. MPD provides none of this
#: information if its playback is currently stopped, so mpd-now-playable
#: doesn't either and will give you a Stopped instead in that case.
song: Song | Stopped = Field(discriminator="state")
@property
def active_song(self) -> Song | None:
if isinstance(self.song, Song):
return self.song
return None

View file

@ -0,0 +1,13 @@
from dataclasses import dataclass
@dataclass(slots=True)
class Queue:
#: The zero-based index of the current song in MPD's queue.
current: int
#: The index of the next song to be played, taking into account random and
#: repeat playback settings.
next: int
#: The total length of MPD's queue - the last song in the queue will have
#: the index one less than this, since queue indices are zero-based.
length: int

View file

@ -0,0 +1,48 @@
from dataclasses import dataclass
from typing import Literal
OneShotFlag = bool | Literal["oneshot"]
def to_oneshot(value: str) -> OneShotFlag:
match value:
case "1":
return True
case "0":
return False
case "oneshot":
return "oneshot"
return False
@dataclass(slots=True, kw_only=True)
class Settings:
#: The playback volume ranging from 0 to 100 - it will only be available if
#: MPD has a volume mixer configured.
volume: int | None
#: Repeat playback of the queued songs. This setting normally means the
#: entire queue will be played on repeat, but its behaviour can be
#: influenced by the other playback mode flags.
repeat: bool
#: Play the queued songs in random order. This is distinct from shuffling
#: the queue, which randomises the queue's order once when you send the
#: shuffle command and will then play the queue in that new order
#: repeatedly if asked. If MPD is asked to both repeat and randomise, the
#: queue is effectively shuffled each time it loops.
random: bool
#: Play only a single song. If MPD is asked to repeat, then the current
#: song will be played repeatedly. Otherwise, when the current song ends
#: MPD will simply stop playback. Like the consume flag, the single flag
#: can also be set to "oneshot", which will cause the single flag to be
#: switched off after it takes effect once (either the current song will
#: repeat just once, or playback will stop but the single flag will be
#: switched off).
single: OneShotFlag
#: Remove songs from the queue as they're played. This flag can also be set
#: to "oneshot", which means the currently playing song will be consumed,
#: and then the flag will automatically be switched off.
consume: OneShotFlag

View file

@ -0,0 +1,7 @@
from enum import StrEnum
class PlaybackState(StrEnum):
play = "play"
pause = "pause"
stop = "stop"

View file

@ -1,26 +1,19 @@
from typing import Protocol
from .song import PlaybackState
from .playback.state import PlaybackState
class Player(Protocol):
async def on_play_pause(self) -> PlaybackState:
...
async def on_play_pause(self) -> PlaybackState: ...
async def on_play(self) -> PlaybackState:
...
async def on_play(self) -> PlaybackState: ...
async def on_pause(self) -> PlaybackState:
...
async def on_pause(self) -> PlaybackState: ...
async def on_stop(self) -> PlaybackState:
...
async def on_stop(self) -> PlaybackState: ...
async def on_next(self) -> None:
...
async def on_next(self) -> None: ...
async def on_prev(self) -> None:
...
async def on_prev(self) -> None: ...
async def on_seek(self, position: float) -> None:
...
async def on_seek(self, position: float) -> None: ...

View file

@ -0,0 +1,33 @@
from Foundation import NSMutableDictionary
from MediaPlayer import (
MPMediaItemPropertyArtwork,
MPMediaItemPropertyTitle,
MPNowPlayingInfoMediaTypeNone,
MPNowPlayingInfoPropertyMediaType,
MPNowPlayingInfoPropertyPlaybackQueueCount,
MPNowPlayingInfoPropertyPlaybackQueueIndex,
MPNowPlayingInfoPropertyPlaybackRate,
)
from ....playback import Playback
from .song_to_media_item import song_to_media_item
from .to_nsimage import MPD_LOGO
def playback_to_media_item(playback: Playback) -> NSMutableDictionary:
nowplaying_info = nothing_to_media_item()
if song := playback.active_song:
nowplaying_info = song_to_media_item(song)
nowplaying_info[MPNowPlayingInfoPropertyPlaybackQueueCount] = playback.queue.length
nowplaying_info[MPNowPlayingInfoPropertyPlaybackQueueIndex] = playback.queue.current
return nowplaying_info
def nothing_to_media_item() -> NSMutableDictionary:
nowplaying_info = NSMutableDictionary.dictionary()
nowplaying_info[MPNowPlayingInfoPropertyMediaType] = MPNowPlayingInfoMediaTypeNone
nowplaying_info[MPMediaItemPropertyArtwork] = MPD_LOGO
nowplaying_info[MPMediaItemPropertyTitle] = "MPD (stopped)"
nowplaying_info[MPNowPlayingInfoPropertyPlaybackRate] = 0.0
return nowplaying_info

View file

@ -0,0 +1,61 @@
from Foundation import NSMutableDictionary
from MediaPlayer import (
MPMediaItemPropertyAlbumTitle,
MPMediaItemPropertyAlbumTrackNumber,
MPMediaItemPropertyArtist,
MPMediaItemPropertyArtwork,
MPMediaItemPropertyComposer,
MPMediaItemPropertyDiscNumber,
MPMediaItemPropertyGenre,
MPMediaItemPropertyPersistentID,
MPMediaItemPropertyPlaybackDuration,
MPMediaItemPropertyTitle,
MPNowPlayingInfoMediaTypeAudio,
MPNowPlayingInfoPropertyAssetURL,
MPNowPlayingInfoPropertyElapsedPlaybackTime,
MPNowPlayingInfoPropertyExternalContentIdentifier,
MPNowPlayingInfoPropertyMediaType,
MPNowPlayingInfoPropertyPlaybackRate,
)
from ....playback.state import PlaybackState
from ....song import Song
from ..persistent_id import song_to_persistent_id
from .to_nsimage import data_to_media_item_artwork
def join_plural_field(field: list[str]) -> str | None:
if field:
return ", ".join(field)
return None
def song_to_media_item(song: Song) -> NSMutableDictionary:
nowplaying_info = NSMutableDictionary.dictionary()
nowplaying_info[MPNowPlayingInfoPropertyMediaType] = MPNowPlayingInfoMediaTypeAudio
nowplaying_info[MPNowPlayingInfoPropertyElapsedPlaybackTime] = song.elapsed
nowplaying_info[MPNowPlayingInfoPropertyExternalContentIdentifier] = str(song.file)
nowplaying_info[MPMediaItemPropertyPersistentID] = song_to_persistent_id(song)
nowplaying_info[MPMediaItemPropertyTitle] = song.title
nowplaying_info[MPMediaItemPropertyArtist] = join_plural_field(song.artist)
nowplaying_info[MPMediaItemPropertyAlbumTitle] = join_plural_field(song.album)
nowplaying_info[MPMediaItemPropertyAlbumTrackNumber] = song.track
nowplaying_info[MPMediaItemPropertyDiscNumber] = song.disc
nowplaying_info[MPMediaItemPropertyGenre] = join_plural_field(song.genre)
nowplaying_info[MPMediaItemPropertyComposer] = join_plural_field(song.composer)
nowplaying_info[MPMediaItemPropertyPlaybackDuration] = song.duration
if song.url is not None:
nowplaying_info[MPNowPlayingInfoPropertyAssetURL] = song.url.human_repr()
# MPD can't play back music at different rates, so we just want to set it
# to 1.0 if the song is playing. (Set it to 0.0 if the song is paused.)
rate = 1.0 if song.state == PlaybackState.play else 0.0
nowplaying_info[MPNowPlayingInfoPropertyPlaybackRate] = rate
if song.art:
artwork = data_to_media_item_artwork(song.art.data)
nowplaying_info[MPMediaItemPropertyArtwork] = artwork
return nowplaying_info

View file

@ -0,0 +1,40 @@
from pathlib import Path
from AppKit import NSCompositingOperationCopy, NSImage, NSMakeRect
from Foundation import CGSize
from MediaPlayer import MPMediaItemArtwork
def logo_to_ns_image() -> NSImage:
return NSImage.alloc().initByReferencingFile_(
str(Path(__file__).parents[3] / "mpd/logo.svg")
)
def data_to_ns_image(data: bytes) -> NSImage:
return NSImage.alloc().initWithData_(data)
def data_to_media_item_artwork(data: bytes) -> MPMediaItemArtwork:
return ns_image_to_media_item_artwork(data_to_ns_image(data))
def ns_image_to_media_item_artwork(img: NSImage) -> MPMediaItemArtwork:
def resize(size: CGSize) -> NSImage:
new = NSImage.alloc().initWithSize_(size)
new.lockFocus()
img.drawInRect_fromRect_operation_fraction_(
NSMakeRect(0, 0, size.width, size.height),
NSMakeRect(0, 0, img.size().width, img.size().height),
NSCompositingOperationCopy,
1.0,
)
new.unlockFocus()
return new
return MPMediaItemArtwork.alloc().initWithBoundsSize_requestHandler_(
img.size(), resize
)
MPD_LOGO = logo_to_ns_image()

View file

@ -0,0 +1,19 @@
from MediaPlayer import (
MPMusicPlaybackState,
MPMusicPlaybackStatePaused,
MPMusicPlaybackStatePlaying,
MPMusicPlaybackStateStopped,
)
from ....playback.state import PlaybackState
__all__ = ("playback_state_to_cocoa",)
def playback_state_to_cocoa(state: PlaybackState) -> MPMusicPlaybackState:
mapping: dict[PlaybackState, MPMusicPlaybackState] = {
PlaybackState.play: MPMusicPlaybackStatePlaying,
PlaybackState.pause: MPMusicPlaybackStatePaused,
PlaybackState.stop: MPMusicPlaybackStateStopped,
}
return mapping[state]

View file

@ -1,37 +1,11 @@
from collections.abc import Callable, Coroutine
from pathlib import Path
from typing import Literal
from AppKit import NSCompositingOperationCopy, NSImage, NSMakeRect
from corefoundationasyncio import CoreFoundationEventLoop
from Foundation import CGSize, NSMutableDictionary
from MediaPlayer import (
MPChangePlaybackPositionCommandEvent,
MPMediaItemArtwork,
MPMediaItemPropertyAlbumTitle,
MPMediaItemPropertyAlbumTrackNumber,
MPMediaItemPropertyArtist,
MPMediaItemPropertyArtwork,
MPMediaItemPropertyComposer,
MPMediaItemPropertyDiscNumber,
MPMediaItemPropertyGenre,
MPMediaItemPropertyPersistentID,
MPMediaItemPropertyPlaybackDuration,
MPMediaItemPropertyTitle,
MPMusicPlaybackState,
MPMusicPlaybackStatePaused,
MPMusicPlaybackStatePlaying,
MPMusicPlaybackStateStopped,
MPNowPlayingInfoCenter,
MPNowPlayingInfoMediaTypeAudio,
MPNowPlayingInfoMediaTypeNone,
MPNowPlayingInfoPropertyAssetURL,
MPNowPlayingInfoPropertyElapsedPlaybackTime,
MPNowPlayingInfoPropertyExternalContentIdentifier,
MPNowPlayingInfoPropertyMediaType,
MPNowPlayingInfoPropertyPlaybackQueueCount,
MPNowPlayingInfoPropertyPlaybackQueueIndex,
MPNowPlayingInfoPropertyPlaybackRate,
MPRemoteCommandCenter,
MPRemoteCommandEvent,
MPRemoteCommandHandlerStatus,
@ -39,100 +13,13 @@ from MediaPlayer import (
)
from ...config.model import CocoaReceiverConfig
from ...playback import Playback
from ...playback.state import PlaybackState
from ...player import Player
from ...song import PlaybackState, Song
from ...song_receiver import LoopFactory, Receiver
from ...tools.asyncio import run_background_task
from .persistent_id import song_to_persistent_id
def logo_to_ns_image() -> NSImage:
return NSImage.alloc().initByReferencingFile_(
str(Path(__file__).parent.parent.parent / "mpd/logo.svg")
)
def data_to_ns_image(data: bytes) -> NSImage:
return NSImage.alloc().initWithData_(data)
def ns_image_to_media_item_artwork(img: NSImage) -> MPMediaItemArtwork:
def resize(size: CGSize) -> NSImage:
new = NSImage.alloc().initWithSize_(size)
new.lockFocus()
img.drawInRect_fromRect_operation_fraction_(
NSMakeRect(0, 0, size.width, size.height),
NSMakeRect(0, 0, img.size().width, img.size().height),
NSCompositingOperationCopy,
1.0,
)
new.unlockFocus()
return new
return MPMediaItemArtwork.alloc().initWithBoundsSize_requestHandler_(
img.size(), resize
)
def playback_state_to_cocoa(state: PlaybackState) -> MPMusicPlaybackState:
mapping: dict[PlaybackState, MPMusicPlaybackState] = {
PlaybackState.play: MPMusicPlaybackStatePlaying,
PlaybackState.pause: MPMusicPlaybackStatePaused,
PlaybackState.stop: MPMusicPlaybackStateStopped,
}
return mapping[state]
def join_plural_field(field: list[str]) -> str | None:
if field:
return ", ".join(field)
return None
def song_to_media_item(song: Song) -> NSMutableDictionary:
nowplaying_info = nothing_to_media_item()
nowplaying_info[MPNowPlayingInfoPropertyMediaType] = MPNowPlayingInfoMediaTypeAudio
nowplaying_info[MPNowPlayingInfoPropertyElapsedPlaybackTime] = song.elapsed
nowplaying_info[MPNowPlayingInfoPropertyExternalContentIdentifier] = str(song.file)
nowplaying_info[MPNowPlayingInfoPropertyPlaybackQueueCount] = song.queue_length
nowplaying_info[MPNowPlayingInfoPropertyPlaybackQueueIndex] = song.queue_index
nowplaying_info[MPMediaItemPropertyPersistentID] = song_to_persistent_id(song)
nowplaying_info[MPMediaItemPropertyTitle] = song.title
nowplaying_info[MPMediaItemPropertyArtist] = join_plural_field(song.artist)
nowplaying_info[MPMediaItemPropertyAlbumTitle] = join_plural_field(song.album)
nowplaying_info[MPMediaItemPropertyAlbumTrackNumber] = song.track
nowplaying_info[MPMediaItemPropertyDiscNumber] = song.disc
nowplaying_info[MPMediaItemPropertyGenre] = join_plural_field(song.genre)
nowplaying_info[MPMediaItemPropertyComposer] = join_plural_field(song.composer)
nowplaying_info[MPMediaItemPropertyPlaybackDuration] = song.duration
if song.url is not None:
nowplaying_info[MPNowPlayingInfoPropertyAssetURL] = song.url.human_repr()
# MPD can't play back music at different rates, so we just want to set it
# to 1.0 if the song is playing. (Leave it at 0.0 if the song is paused.)
if song.state == PlaybackState.play:
nowplaying_info[MPNowPlayingInfoPropertyPlaybackRate] = 1.0
if song.art:
nowplaying_info[MPMediaItemPropertyArtwork] = ns_image_to_media_item_artwork(
data_to_ns_image(song.art.data)
)
return nowplaying_info
def nothing_to_media_item() -> NSMutableDictionary:
nowplaying_info = NSMutableDictionary.dictionary()
nowplaying_info[MPNowPlayingInfoPropertyMediaType] = MPNowPlayingInfoMediaTypeNone
nowplaying_info[MPMediaItemPropertyArtwork] = MPD_LOGO
nowplaying_info[MPMediaItemPropertyTitle] = "MPD (stopped)"
nowplaying_info[MPNowPlayingInfoPropertyPlaybackRate] = 0.0
return nowplaying_info
MPD_LOGO = ns_image_to_media_item_artwork(logo_to_ns_image())
from .convert.playback_to_media_item import playback_to_media_item
from .convert.to_state import playback_state_to_cocoa
class CocoaLoopFactory(LoopFactory[CoreFoundationEventLoop]):
@ -191,13 +78,9 @@ class CocoaNowPlayingReceiver(Receiver):
# unpause with remote commands.
self.info_center.setPlaybackState_(MPMusicPlaybackStatePlaying)
async def update(self, song: Song | None) -> None:
if song:
self.info_center.setNowPlayingInfo_(song_to_media_item(song))
self.info_center.setPlaybackState_(playback_state_to_cocoa(song.state))
else:
self.info_center.setNowPlayingInfo_(nothing_to_media_item())
self.info_center.setPlaybackState_(MPMusicPlaybackStateStopped)
async def update(self, playback: Playback) -> None:
self.info_center.setNowPlayingInfo_(playback_to_media_item(playback))
self.info_center.setPlaybackState_(playback_state_to_cocoa(playback.song.state))
def _create_handler(
self, player: Callable[[], Coroutine[None, None, PlaybackState | None]]

View file

@ -6,8 +6,8 @@ from websockets.server import WebSocketServerProtocol, serve
from yarl import URL
from ...config.model import WebsocketsReceiverConfig
from ...playback import Playback
from ...player import Player
from ...song import Song
from ...song_receiver import DefaultLoopFactory, Receiver
MSGPACK_NULL = ormsgpack.packb(None)
@ -37,7 +37,9 @@ class WebsocketsReceiver(Receiver):
async def start(self, player: Player) -> None:
self.player = player
await serve(self.handle, host=self.config.host, port=self.config.port, reuse_port=True)
await serve(
self.handle, host=self.config.host, port=self.config.port, reuse_port=True
)
async def handle(self, conn: WebSocketServerProtocol) -> None:
self.connections.add(conn)
@ -47,9 +49,6 @@ class WebsocketsReceiver(Receiver):
finally:
self.connections.remove(conn)
async def update(self, song: Song | None) -> None:
if song is None:
self.last_status = MSGPACK_NULL
else:
self.last_status = ormsgpack.packb(song, default=default)
async def update(self, playback: Playback) -> None:
self.last_status = ormsgpack.packb(playback, default=default)
broadcast(self.connections, self.last_status)

View file

@ -1,12 +1,13 @@
from .artwork import Artwork, ArtworkSchema, to_artwork
from .musicbrainz import to_brainz
from .song import PlaybackState, Song
from .song import Song
from .stopped import Stopped
__all__ = (
"Artwork",
"ArtworkSchema",
"to_artwork",
"to_brainz",
"PlaybackState",
"Song",
"Stopped",
)

View file

@ -1,30 +1,19 @@
from dataclasses import dataclass
from enum import StrEnum
from pathlib import Path
from typing import Literal
from ..playback.state import PlaybackState
from ..tools.schema.define import schema
from ..tools.schema.fields import Url
from .artwork import Artwork
from .musicbrainz import MusicBrainzIds
class PlaybackState(StrEnum):
play = "play"
pause = "pause"
stop = "stop"
@schema("https://cdn.00dani.me/m/schemata/mpd-now-playable/song-v1.json")
@dataclass(slots=True, kw_only=True)
class Song:
#: Whether MPD is currently playing, paused, or stopped. Pretty simple.
state: PlaybackState
#: The zero-based index of the current song in MPD's queue.
queue_index: int
#: The total length of MPD's queue - the last song in the queue will have
#: the index one less than this, since queue indices are zero-based.
queue_length: int
#: Whether MPD is currently playing or paused. Pretty simple.
state: Literal[PlaybackState.play, PlaybackState.pause]
#: The relative path to the current song inside the music directory. MPD
#: itself uses this path as a stable identifier for the audio file in many

View file

@ -0,0 +1,9 @@
from dataclasses import dataclass, field
from typing import Literal
from ..playback.state import PlaybackState
@dataclass(slots=True, kw_only=True)
class Stopped:
state: Literal[PlaybackState.stop] = field(default=PlaybackState.stop, repr=False)

View file

@ -4,8 +4,8 @@ from importlib import import_module
from typing import Generic, Iterable, Literal, Protocol, TypeVar, cast
from .config.model import BaseReceiverConfig
from .playback import Playback
from .player import Player
from .song import Song
from .tools.types import not_none
T = TypeVar("T", bound=AbstractEventLoop, covariant=True)
@ -25,7 +25,7 @@ class Receiver(Protocol):
def loop_factory(cls) -> LoopFactory[AbstractEventLoop]: ...
async def start(self, player: Player) -> None: ...
async def update(self, song: Song | None) -> None: ...
async def update(self, playback: Playback) -> None: ...
class ReceiverModule(Protocol):

View file

@ -49,7 +49,9 @@ class MyGenerateJsonSchema(GenerateJsonSchema):
if __name__ == "__main__":
from ...config.model import Config
from ...playback import Playback
from ...song import Song
write(Config)
write(Playback, mode="serialization")
write(Song, mode="serialization")