From eb7c529c14ebc8f69c443e8ff586593a290458d8 Mon Sep 17 00:00:00 2001 From: cloudwithax Date: Sat, 2 Oct 2021 20:51:05 -0400 Subject: [PATCH] Added spotify track queueing support --- pomice/__init__.py | 2 +- pomice/exceptions.py | 12 + pomice/node.py | 86 +- pomice/objects.py | 6 +- pomice/player.py | 2 + pomice/spotify/__init__.py | 28 + pomice/spotify/client.py | 348 ++++++ pomice/spotify/errors.py | 41 + pomice/spotify/http.py | 1789 +++++++++++++++++++++++++++++ pomice/spotify/models/__init__.py | 26 + pomice/spotify/models/album.py | 121 ++ pomice/spotify/models/artist.py | 186 +++ pomice/spotify/models/base.py | 147 +++ pomice/spotify/models/common.py | 109 ++ pomice/spotify/models/library.py | 189 +++ pomice/spotify/models/player.py | 262 +++++ pomice/spotify/models/playlist.py | 525 +++++++++ pomice/spotify/models/track.py | 123 ++ pomice/spotify/models/typing.py | 9 + pomice/spotify/models/user.py | 562 +++++++++ pomice/spotify/oauth.py | 202 ++++ pomice/spotify/sync/__init__.py | 26 + pomice/spotify/sync/models.py | 83 ++ pomice/spotify/sync/thread.py | 48 + pomice/spotify/utils.py | 60 + pomice/utils.py | 2 + 26 files changed, 4977 insertions(+), 17 deletions(-) create mode 100644 pomice/spotify/__init__.py create mode 100644 pomice/spotify/client.py create mode 100644 pomice/spotify/errors.py create mode 100644 pomice/spotify/http.py create mode 100644 pomice/spotify/models/__init__.py create mode 100644 pomice/spotify/models/album.py create mode 100644 pomice/spotify/models/artist.py create mode 100644 pomice/spotify/models/base.py create mode 100644 pomice/spotify/models/common.py create mode 100644 pomice/spotify/models/library.py create mode 100644 pomice/spotify/models/player.py create mode 100644 pomice/spotify/models/playlist.py create mode 100644 pomice/spotify/models/track.py create mode 100644 pomice/spotify/models/typing.py create mode 100644 pomice/spotify/models/user.py create mode 100644 pomice/spotify/oauth.py create mode 100644 pomice/spotify/sync/__init__.py create mode 100644 pomice/spotify/sync/models.py create mode 100644 pomice/spotify/sync/thread.py create mode 100644 pomice/spotify/utils.py diff --git a/pomice/__init__.py b/pomice/__init__.py index fabc583..21e3da6 100644 --- a/pomice/__init__.py +++ b/pomice/__init__.py @@ -11,4 +11,4 @@ from .filters import * from .objects import * from .pool import NodePool from .node import Node -from .player import Player +from .player import Player \ No newline at end of file diff --git a/pomice/exceptions.py b/pomice/exceptions.py index 8898a0c..12a00f3 100644 --- a/pomice/exceptions.py +++ b/pomice/exceptions.py @@ -41,4 +41,16 @@ class TrackLoadError(PomiceException): class FilterInvalidArgument(PomiceException): """An invalid argument was passed to a filter.""" + pass + +class SpotifyAlbumLoadFailed(PomiceException): + """The pomice Spotify client was unable to load an album""" + pass + +class SpotifyTrackLoadFailed(PomiceException): + """The pomice Spotify client was unable to load a track""" + pass + +class SpotifyPlaylistLoadFailed(PomiceException): + """The pomice Spotify client was unable to load a playlist""" pass \ No newline at end of file diff --git a/pomice/node.py b/pomice/node.py index 282755d..4b188dc 100644 --- a/pomice/node.py +++ b/pomice/node.py @@ -1,3 +1,4 @@ +from os import strerror import aiohttp import discord import asyncio @@ -5,18 +6,23 @@ import typing import json import socket import time +import re from discord.ext import commands from typing import Optional, Union from urllib.parse import quote +from . import spotify from . import events from . import exceptions from . import objects from . import __version__ from .utils import ExponentialBackoff, NodeStats + +SPOTIFY_URL_REGEX = re.compile(r'https?://open.spotify.com/(?Palbum|playlist|track)/(?P[a-zA-Z0-9]+)') + class Node: - def __init__(self, pool, bot: Union[commands.Bot, discord.Client, commands.AutoShardedBot, discord.AutoShardedClient], host: str, port: int, password: str, identifier: str, **kwargs): + def __init__(self, pool, bot: Union[commands.Bot, discord.Client, commands.AutoShardedBot, discord.AutoShardedClient], host: str, port: int, password: str, identifier: str, spotify_client_id: Optional[str], spotify_client_secret: Optional[str]): self._bot = bot self._host = host self._port = port @@ -43,6 +49,13 @@ class Node: self._players = {} + self._spotify_client_id: str = spotify_client_id + self._spotify_client_secret: str = spotify_client_secret + + if self._spotify_client_id and self._spotify_client_secret: + self._spotify_client: spotify.Client = spotify.Client(self._spotify_client_id, self._spotify_client_secret) + self._spotify_http_client: spotify.HTTPClient = spotify.HTTPClient(self._spotify_client_id, self._spotify_client_secret) + self._bot.add_listener(self._update_handler, "on_socket_response") def __repr__(self): @@ -178,8 +191,61 @@ class Node: async def get_tracks(self, query: str, ctx: commands.Context = None): - async with self._session.get(url=f"{self._rest_uri}/loadtracks?identifier={quote(query)}", headers={"Authorization": self._password}) as response: - data = await response.json() + if spotify_url_check := SPOTIFY_URL_REGEX.match(query): + + search_type = spotify_url_check.group('type') + spotify_id = spotify_url_check.group('id') + if search_type == "playlist": + results: spotify.Playlist = spotify.Playlist(client=self._spotify_client, data=await self._spotify_http_client.get_playlist(spotify_id)) + try: + search_tracks = await results.get_all_tracks() + tracks = [ + objects.Track( + track_id='spotify', + ctx=ctx, + info={'title': track.name or 'Unknown', 'author': ', '.join(artist.name for artist in track.artists) or 'Unknown', + 'length': track.duration or 0, 'identifier': track.id or 'Unknown', 'uri': track.url or 'spotify', + 'isStream': False, 'isSeekable': False, 'position': 0, 'thumbnail': track.images[0].url if track.images else None}, + + ) for track in search_tracks + ] + return objects.Playlist(playlist_info={"name": results.name, "selectedTrack": search_tracks[0]}, tracks=tracks, ctx=ctx) + except: + raise exceptions.SpotifyPlaylistLoadFailed(f"Unable to find results for {query}") + elif search_type == "album": + results: spotify.Album = await self._spotify_client.get_album(spotify_id=spotify_id) + try: + search_tracks = await results.get_all_tracks() + tracks = [ + objects.Track( + track_id='spotify', + ctx=ctx, + info={'title': track.name or 'Unknown', 'author': ', '.join(artist.name for artist in track.artists) or 'Unknown', + 'length': track.duration or 0, 'identifier': track.id or 'Unknown', 'uri': track.url or 'spotify', + 'isStream': False, 'isSeekable': False, 'position': 0, 'thumbnail': track.images[0].url if track.images else None}, + + ) for track in search_tracks + ] + + return objects.Playlist(playlist_info={"name": results.name, "selectedTrack": search_tracks[0]}, tracks=tracks, ctx=ctx) + except: + raise exceptions.SpotifyAlbumLoadFailed(f"Unable to find results for {query}") + elif search_type == 'track': + try: + results: spotify.Track = await self._spotify_client.get_track(spotify_id=spotify_id) + return objects.Track( + track_id='spotify', + ctx=ctx, + info={'title': results.name or 'Unknown', 'author': ', '.join(artist.name for artist in results.artists) or 'Unknown', + 'length': results.duration or 0, 'identifier': results.id or 'Unknown', 'uri': results.url or 'spotify', + 'isStream': False, 'isSeekable': False, 'position': 0, 'thumbnail': results.images[0].url if results.images else None},) + except: + raise exceptions.SpotifyTrackLoadFailed(f"Unable to find results for {query}") + + + else: + async with self._session.get(url=f"{self._rest_uri}/loadtracks?identifier={quote(query)}", headers={"Authorization": self._password}) as response: + data = await response.json() load_type = data.get("loadType") @@ -193,13 +259,9 @@ class Node: return None elif load_type == "PLAYLIST_LOADED": - if ctx: - return objects.Playlist(playlist_info=data["playlistInfo"], tracks=data["tracks"], ctx=ctx) - else: - return objects.Playlist(playlist_info=data["playlistInfo"], tracks=data["tracks"]) - + return objects.Playlist(playlist_info=data["playlistInfo"], tracks=data["tracks"], ctx=ctx) + elif load_type == "SEARCH_RESULT" or load_type == "TRACK_LOADED": - if ctx: - return [objects.Track(track_id=track["track"], info=track["info"], ctx=ctx) for track in data["tracks"]] - else: - return [objects.Track(track_id=track["track"], info=track["info"]) for track in data["tracks"]] + return [objects.Track(track_id=track["track"], info=track["info"], ctx=ctx) for track in data["tracks"]] + + diff --git a/pomice/objects.py b/pomice/objects.py index 7bcb9e7..71a3ed1 100644 --- a/pomice/objects.py +++ b/pomice/objects.py @@ -36,10 +36,8 @@ class Playlist: self.name = playlist_info.get("name") self.selected_track = playlist_info.get("selectedTrack") - if ctx: - self.tracks = [Track(track_id=track["track"], info=track["info"], ctx=ctx) for track in self.tracks_raw] - else: - self.tracks = [Track(track_id=track["track"], info=track["info"]) for track in self.tracks_raw] + self.tracks = [Track(track_id=track["track"], info=track["info"], ctx=ctx) for track in self.tracks_raw] + def __str__(self): return self.name diff --git a/pomice/player.py b/pomice/player.py index 352296e..a2b7e7f 100644 --- a/pomice/player.py +++ b/pomice/player.py @@ -151,6 +151,8 @@ class Player(VoiceProtocol): await self._node.send(op='destroy', guildId=str(self._guild.id)) async def play(self, track: objects.Track, start_position: int = 0): + if track.track_id == "spotify": + track: objects.Track = await self._node.get_tracks(f"{track.title} {track.author}") await self._node.send(op='play', guildId=str(self._guild.id), track=track.track_id, startTime=start_position, endTime=track.length, noReplace=False) self._current = track return self._current diff --git a/pomice/spotify/__init__.py b/pomice/spotify/__init__.py new file mode 100644 index 0000000..c9c95af --- /dev/null +++ b/pomice/spotify/__init__.py @@ -0,0 +1,28 @@ +__version__ = "0.10.2" +__title__ = "spotify" +__author__ = "mental" +__license__ = "MIT" + +from typing import Dict, Type + +from .oauth import * +from .utils import clean as _clean_namespace +from .errors import * +from .models import * +from .client import * +from .models import SpotifyBase +from .http import HTTPClient, HTTPUserClient + +__all__ = tuple(name for name in locals() if name[0] != "_") + +_locals = locals() # pylint: disable=invalid-name + +_types: Dict[str, Type[Union[SpotifyBase, HTTPClient]]] +with _clean_namespace(locals(), "_locals", "_clean_namespace"): + _types = dict( # pylint: disable=invalid-name + (name, _locals[name]) + for name, obj in _locals.items() + if isinstance(obj, type) and issubclass(obj, SpotifyBase) + ) + _types["HTTPClient"] = HTTPClient + _types["HTTPUserClient"] = HTTPUserClient diff --git a/pomice/spotify/client.py b/pomice/spotify/client.py new file mode 100644 index 0000000..4befee8 --- /dev/null +++ b/pomice/spotify/client.py @@ -0,0 +1,348 @@ +import asyncio +from typing import Optional, List, Iterable, NamedTuple, Type, Union, Dict + +from .http import HTTPClient +from .utils import to_id +from . import OAuth2, Artist, Album, Track, User, Playlist + +__all__ = ("Client", "SearchResults") + +_TYPES = {"artist": Artist, "album": Album, "playlist": Playlist, "track": Track} + +_SEARCH_TYPES = {"track", "playlist", "artist", "album"} +_SEARCH_TYPE_ERR = ( + 'Bad queary type! got "%s" expected any of: track, playlist, artist, album' +) + + +class SearchResults(NamedTuple): + """A namedtuple of search results. + + Attributes + ---------- + artists : List[:class:`Artist`] + The artists of the search. + playlists : List[:class:`Playlist`] + The playlists of the search. + albums : List[:class:`Album`] + The albums of the search. + tracks : List[:class:`Track`] + The tracks of the search. + """ + + artists: Optional[List[Artist]] = None + playlists: Optional[List[Playlist]] = None + albums: Optional[List[Album]] = None + tracks: Optional[List[Track]] = None + + +class Client: + """Represents a Client app on Spotify. + + This class is used to interact with the Spotify API. + + Parameters + ---------- + client_id : :class:`str` + The client id provided by spotify for the app. + client_secret : :class:`str` + The client secret for the app. + loop : Optional[:class:`asyncio.AbstractEventLoop`] + The event loop the client should run on, if no loop is specified `asyncio.get_event_loop()` is called and used instead. + + Attributes + ---------- + client_id : :class:`str` + The applications client_id, also aliased as `id` + http : :class:`HTTPClient` + The HTTPClient that is being used. + loop : Optional[:class:`asyncio.AbstractEventLoop`] + The event loop the client is running on. + """ + + _default_http_client: Type[HTTPClient] = HTTPClient + + def __init__( + self, + client_id: str, + client_secret: str, + *, + loop: Optional[asyncio.AbstractEventLoop] = None, + ) -> None: + if not isinstance(client_id, str): + raise TypeError("client_id must be a string.") + + if not isinstance(client_secret, str): + raise TypeError("client_secret must be a string.") + + if loop is not None and not isinstance(loop, asyncio.AbstractEventLoop): + raise TypeError( + "loop argument must be None or an instance of asyncio.AbstractEventLoop." + ) + + self.loop = loop = loop or asyncio.get_event_loop() + self.http = self._default_http_client(client_id, client_secret, loop=loop) + + def __repr__(self): + return f"" + + async def __aenter__(self) -> "Client": + return self + + async def __aexit__(self, exc_type, exc_value, traceback) -> None: + await self.close() + + # Properties + + @property + def client_id(self) -> str: + """:class:`str` - The Spotify client ID.""" + return self.http.client_id + + @property + def id(self): # pylint: disable=invalid-name + """:class:`str` - The Spotify client ID.""" + return self.http.client_id + + # Public api + + def oauth2_url( + self, + redirect_uri: str, + scopes: Optional[Union[Iterable[str], Dict[str, bool]]] = None, + state: Optional[str] = None, + ) -> str: + """Generate an oauth2 url for user authentication. + + This is an alias to :meth:`OAuth2.url_only` but the + difference is that the client id is autmatically + passed in to the constructor. + + Parameters + ---------- + redirect_uri : :class:`str` + Where spotify should redirect the user to after authentication. + scopes : Optional[Iterable[:class:`str`], Dict[:class:`str`, :class:`bool`]] + The scopes to be requested. + state : Optional[:class:`str`] + Using a state value can increase your assurance that an incoming connection is the result of an + authentication request. + + Returns + ------- + url : :class:`str` + The OAuth2 url. + """ + return OAuth2.url_only( + client_id=self.http.client_id, + redirect_uri=redirect_uri, + scopes=scopes, + state=state, + ) + + async def close(self) -> None: + """Close the underlying HTTP session to Spotify.""" + await self.http.close() + + async def user_from_token(self, token: str) -> User: + """Create a user session from a token. + + .. note:: + + This code is equivelent to `User.from_token(client, token)` + + Parameters + ---------- + token : :class:`str` + The token to attatch the user session to. + + Returns + ------- + user : :class:`spotify.User` + The user from the ID + """ + return await User.from_token(self, token) + + async def get_album(self, spotify_id: str, *, market: str = "US") -> Album: + """Retrive an album with a spotify ID. + + Parameters + ---------- + spotify_id : :class:`str` + The ID to search for. + market : Optional[:class:`str`] + An ISO 3166-1 alpha-2 country code + + Returns + ------- + album : :class:`spotify.Album` + The album from the ID + """ + data = await self.http.album(to_id(spotify_id), market=market) + return Album(self, data) + + async def get_artist(self, spotify_id: str) -> Artist: + """Retrive an artist with a spotify ID. + + Parameters + ---------- + spotify_id : str + The ID to search for. + + Returns + ------- + artist : Artist + The artist from the ID + """ + data = await self.http.artist(to_id(spotify_id)) + return Artist(self, data) + + async def get_track(self, spotify_id: str) -> Track: + """Retrive an track with a spotify ID. + + Parameters + ---------- + spotify_id : str + The ID to search for. + + Returns + ------- + track : Track + The track from the ID + """ + data = await self.http.track(to_id(spotify_id)) + return Track(self, data) + + async def get_user(self, spotify_id: str) -> User: + """Retrive an user with a spotify ID. + + Parameters + ---------- + spotify_id : str + The ID to search for. + + Returns + ------- + user : User + The user from the ID + """ + data = await self.http.user(to_id(spotify_id)) + return User(self, data) + + # Get multiple objects + + async def get_albums(self, *ids: str, market: str = "US") -> List[Album]: + """Retrive multiple albums with a list of spotify IDs. + + Parameters + ---------- + ids : List[str] + the ID to look for + market : Optional[str] + An ISO 3166-1 alpha-2 country code + + Returns + ------- + albums : List[Album] + The albums from the IDs + """ + data = await self.http.albums( + ",".join(to_id(_id) for _id in ids), market=market + ) + return list(Album(self, album) for album in data["albums"]) + + async def get_artists(self, *ids: str) -> List[Artist]: + """Retrive multiple artists with a list of spotify IDs. + + Parameters + ---------- + ids : List[:class:`str`] + The IDs to look for. + + Returns + ------- + artists : List[:class:`Artist`] + The artists from the IDs + """ + data = await self.http.artists(",".join(to_id(_id) for _id in ids)) + return list(Artist(self, artist) for artist in data["artists"]) + + async def search( # pylint: disable=invalid-name + self, + q: str, + *, + types: Iterable[str] = ("track", "playlist", "artist", "album"), + limit: int = 20, + offset: int = 0, + market: str = "US", + should_include_external: bool = False, + ) -> SearchResults: + """Access the spotify search functionality. + + >>> results = client.search('Cadet', types=['artist']) + >>> for artist in result.get('artists', []): + ... if artist.name.lower() == 'cadet': + ... print(repr(artist)) + ... break + + Parameters + ---------- + q : :class:`str` + the search query + types : Optional[Iterable[`:class:`str`]] + A sequence of search types (can be any of `track`, `playlist`, `artist` or `album`) to refine the search request. + A `ValueError` may be raised if a search type is found that is not valid. + limit : Optional[:class:`int`] + The limit of search results to return when searching. + Maximum limit is 50, any larger may raise a :class:`HTTPException` + offset : Optional[:class:`int`] + The offset from where the api should start from in the search results. + market : Optional[:class:`str`] + An ISO 3166-1 alpha-2 country code. Provide this parameter if you want to apply Track Relinking. + should_include_external : :class:`bool` + If `True` is specified, the response will include any relevant audio content + that is hosted externally. By default external content is filtered out from responses. + + Returns + ------- + results : :class:`SearchResults` + The results of the search. + + Raises + ------ + TypeError + Raised when a parameter with a bad type is passed. + ValueError + Raised when a bad search type is passed with the `types` argument. + """ + if not hasattr(types, "__iter__"): + raise TypeError("types must be an iterable.") + + types_ = set(types) + + if not types_.issubset(_SEARCH_TYPES): + raise ValueError(_SEARCH_TYPE_ERR % types_.difference(_SEARCH_TYPES).pop()) + + query_type = ",".join(tp.strip() for tp in types) + + include_external: Optional[str] + if should_include_external: + include_external = "audio" + else: + include_external = None + + data = await self.http.search( + q=q, + query_type=query_type, + market=market, + limit=limit, + offset=offset, + include_external=include_external, + ) + + return SearchResults( + **{ + key: [_TYPES[obj["type"]](self, obj) for obj in value["items"]] + for key, value in data.items() + } + ) diff --git a/pomice/spotify/errors.py b/pomice/spotify/errors.py new file mode 100644 index 0000000..c29ba97 --- /dev/null +++ b/pomice/spotify/errors.py @@ -0,0 +1,41 @@ +__all__ = ("SpotifyException", "HTTPException", "Forbidden", "NotFound") + + +class SpotifyException(Exception): + """Base exception class for spotify.py.""" + + +class HTTPException(SpotifyException): + """A generic exception that's thrown when a HTTP operation fails.""" + + def __init__(self, response, message): + self.response = response + self.status = response.status + error = message.get("error") + + if isinstance(error, dict): + self.text = error.get("message", "") + else: + self.text = message.get("error_description", "") + + fmt = "{0.reason} (status code: {0.status})" + if self.text.strip(): + fmt += ": {1}" + + super().__init__(fmt.format(self.response, self.text)) + + +class Forbidden(HTTPException): + """An exception that's thrown when status code 403 occurs.""" + + +class NotFound(HTTPException): + """An exception that's thrown when status code 404 occurs.""" + + +class BearerTokenError(HTTPException): + """An exception that's thrown when Spotify could not provide a valid Bearer Token""" + + +class RateLimitedException(Exception): + """An exception that gets thrown when a rate limit is encountered.""" diff --git a/pomice/spotify/http.py b/pomice/spotify/http.py new file mode 100644 index 0000000..b20f1cc --- /dev/null +++ b/pomice/spotify/http.py @@ -0,0 +1,1789 @@ +# pylint: skip-file + +import asyncio +import sys +import json +import time +from typing import ( + Optional, + List, + Sequence, + Union, + Dict, + Awaitable, + BinaryIO, + Tuple, + Any, +) +from base64 import b64encode +from urllib.parse import quote + +import aiohttp +import backoff # type: ignore + +from . import __version__ +from .utils import filter_items +from .errors import ( + HTTPException, + Forbidden, + NotFound, + SpotifyException, + BearerTokenError, + RateLimitedException, +) + +__all__ = ("HTTPClient", "HTTPUserClient") + +_GET_BEARER_ARG_ERR = "{name} was `None` when getting a bearer token." +_PYTHON_VERSION = ".".join(str(_) for _ in sys.version_info[:3]) +_AIOHTTP_VERSION = aiohttp.__version__ + + +class HTTPClient: + """A class responsible for handling all HTTP logic. + + This class combines a small amount of stateful logic control + with the :meth:`request` method and a very thin wrapper over + the raw HTTP API. + + All endpoint methods mirror the default arguments the API + uses and is best described as a series of "good defaults" + for the routes. + + Parameters + ---------- + client_id : str + The client id provided by spotify for the app. + client_secret : str + The client secret for the app. + loop : Optional[event loop] + The event loop the client should run on, if no loop is specified `asyncio.get_event_loop()` is called and used instead. + + + Attributes + ---------- + loop : AbstractEventLoop + The loop the client is running with. + client_id : str + The client id of the app. + client_secret : str + The client secret. + """ + + RETRY_AMOUNT = 10 + DEFAULT_USER_AGENT = ( + user_agent + ) = f"Application (https://github.com/mental32/spotify.py {__version__}) Python/{_PYTHON_VERSION} aiohttp/{_AIOHTTP_VERSION}" + + def __init__(self, client_id: str, client_secret: str, loop=None): + self.loop = loop or asyncio.get_event_loop() + self._session = aiohttp.ClientSession(loop=self.loop) + + self.client_id = client_id + self.client_secret = client_secret + + self.bearer_info: Optional[Dict[str, str]] = None + + self.__request_barrier_lock = asyncio.Lock() + self.__request_barrier = asyncio.Event() + self.__request_barrier.set() + + @staticmethod + def route( + method: str, path: str, *, base: str = "https://api.spotify.com/v1", **kwargs + ) -> Tuple[str, str]: + """Used for constructing URLs for API endpoints. + + Parameters + ---------- + method : str + The HTTP/REST method used. + path : str + A path to be formatted. + kwargs : Any + The arguments used to format the path. + + Returns + ------- + route : Tuple[str, str] + A tuple of the method and formatted url path to use. + """ + url = base + path + + if kwargs: + url = url.format( + **{ + key: (quote(value) if isinstance(value, str) else value) + for key, value in kwargs.items() + } + ) + + return (method, url) + + async def get_bearer_info( + self, + client_id: Optional[str] = None, + client_secret: Optional[str] = None, + session: Optional[aiohttp.ClientSession] = None, + ): + """Get the application bearer token from client_id and client_secret. + + Raises + ------ + SpotifyException + This will be raised when either `client_id` or + `client_secret` is `None` + """ + client_id = client_id or self.client_id + client_secret = client_secret or self.client_secret + + if client_id is None: + raise SpotifyException(_GET_BEARER_ARG_ERR.format(name="client_id")) + + if client_secret is None: + raise SpotifyException(_GET_BEARER_ARG_ERR.format(name="client_secret")) + + token = b64encode(":".join((client_id, client_secret)).encode()) + + data = {"grant_type": "client_credentials"} + headers = {"Authorization": f"Basic {token.decode()}"} + + session = session or self._session + + async with session.post( + "https://accounts.spotify.com/api/token", data=data, headers=headers + ) as response: + bearer_info = json.loads(await response.text(encoding="utf-8")) + + if "error" in bearer_info.keys(): + raise BearerTokenError(response=response, message=bearer_info) + + return bearer_info + + @backoff.on_exception(backoff.expo, RateLimitedException) + async def request(self, route, **kwargs): + r"""Make a request to the spotify API with the current bearer credentials. + + Parameters + ---------- + route : Tuple[str, str] + A tuple of the method and url gained from :meth:`route`. + \*\*kwargs : Any + keyword arguments to pass into :class:`aiohttp.ClientSession.request` + """ + assert isinstance(route, tuple), "route parameter was not a tuple!" + assert len(route) == 2, "route parameter must have exactly two items" + + method, url, = route + + headers = kwargs.pop("headers", {}) + if "Authorization" not in headers: + if self.bearer_info is None: + self.bearer_info = bearer_info = await self.get_bearer_info() + access_token = bearer_info["access_token"] + else: + access_token = self.bearer_info["access_token"] + + headers["Authorization"] = "Bearer " + access_token + + headers = { + "Content-Type": kwargs.pop("content_type", "application/json"), + "User-Agent": self.user_agent, + **headers, + } + + if "json" in kwargs: + headers["Content-Type"] = "application/json" + kwargs["data"] = json.dumps( + kwargs.pop("json"), separators=(",", ":"), ensure_ascii=True + ) + + for current_retry in range(self.RETRY_AMOUNT): + await self.__request_barrier.wait() + + response = await self._session.request( + method, url, headers=headers, **kwargs + ) + + try: + status = response.status + + try: + data = json.loads(await response.text(encoding="utf-8")) + except json.decoder.JSONDecodeError: + data = {} + + if 300 > status >= 200: + return data + + if status == 401: + self.bearer_info = bearer_info = await self.get_bearer_info() + headers["Authorization"] = "Bearer " + bearer_info["access_token"] + continue + + if status == 429: + # we're being rate limited. + + self.__request_barrier.clear() + amount = int(response.headers.get("Retry-After")) + checkpoint = int(time.time()) + + async with self.__request_barrier_lock: + if (int(time.time()) - checkpoint) < amount: + self.__request_barrier.clear() + await asyncio.sleep(int(amount), loop=self.loop) + self.__request_barrier.set() + + continue + + if status in (502, 503): + # unconditional retry + continue + + if status == 403: + raise Forbidden(response, data) + + if status == 404: + raise NotFound(response, data) + finally: + await response.release() + + if response.status == 429: + raise RateLimitedException((amount, _max_retries - current_retry)) + + raise HTTPException(response, data) + + async def close(self): + """Close the underlying HTTP session.""" + await self._session.close() + + # Methods are defined in the order that they are listed in + # the api docs (https://developer.spotify.com/documentation/web-api/reference/) + + # Album related endpoints + + def album(self, spotify_id: str, market: Optional[str] = "US") -> Awaitable: + """Get Spotify catalog information for a single album. + + Parameters + ---------- + spotify_id : str + The spotify_id to search by. + market : Optional[str] + An ISO 3166-1 alpha-2 country code. + """ + route = self.route("GET", "/albums/{spotify_id}", spotify_id=spotify_id) + payload: Dict[str, Any] = {} + + if market: + payload["market"] = market + + return self.request(route, params=payload) + + def album_tracks( + self, + spotify_id: str, + limit: Optional[int] = 20, + offset: Optional[int] = 0, + market="US", + ) -> Awaitable: + """Get Spotify catalog information about an album’s tracks. + + Parameters + ---------- + spotify_id : str + The spotify_id to search by. + limit : Optional[int] + The maximum number of items to return. Default: 20. Minimum: 1. Maximum: 50. + offset : Optiona[int] + The offset of which Spotify should start yielding from. + market : Optional[str] + An ISO 3166-1 alpha-2 country code. + """ + route = self.route("GET", "/albums/{spotify_id}/tracks", spotify_id=spotify_id) + payload: Dict[str, Any] = {"limit": limit, "offset": offset} + + if market: + payload["market"] = market + + return self.request(route, params=payload) + + def albums(self, spotify_ids, market="US") -> Awaitable: + """Get Spotify catalog information for multiple albums identified by their Spotify IDs. + + Parameters + ---------- + spotify_ids : List[str] + The spotify_ids to search by. + market : Optional[str] + An ISO 3166-1 alpha-2 country code. + """ + route = self.route("GET", "/albums/") + payload: Dict[str, Any] = {"ids": spotify_ids} + + if market: + payload["market"] = market + + return self.request(route, params=payload) + + # Artist related endpoints. + + def artist(self, spotify_id) -> Awaitable: + """Get Spotify catalog information for a single artist identified by their unique Spotify ID. + + Parameters + ---------- + spotify_id : str + The spotify_id to search by. + """ + route = self.route("GET", "/artists/{spotify_id}", spotify_id=spotify_id) + return self.request(route) + + def artist_albums( + self, + spotify_id, + include_groups=None, + limit: Optional[int] = 20, + offset: Optional[int] = 0, + market="US", + ): + """Get Spotify catalog information about an artist’s albums. + + Parameters + ---------- + spotify_id : str + The spotify_id to search by. + include_groups : INCLUDE_GROUPS_TP + INCLUDE_GROUPS + limit : Optional[int] + The maximum number of items to return. Default: 20. Minimum: 1. Maximum: 50. + offset : Optiona[int] + The offset of which Spotify should start yielding from. + market : Optional[str] + An ISO 3166-1 alpha-2 country code. + """ + route = self.route("GET", "/artists/{spotify_id}/albums", spotify_id=spotify_id) + payload: Dict[str, Any] = {"limit": limit, "offset": offset} + + if include_groups: + payload["include_groups"] = include_groups + + if market: + payload["market"] = market + + return self.request(route, params=payload) + + def artist_top_tracks(self, spotify_id, country) -> Awaitable: + """Get Spotify catalog information about an artist’s top tracks by country. + + Parameters + ---------- + spotify_id : str + The spotify_id to search by. + country : COUNTRY_TP + COUNTRY + """ + route = self.route( + "GET", "/artists/{spotify_id}/top-tracks", spotify_id=spotify_id + ) + payload: Dict[str, Any] = {"country": country} + return self.request(route, params=payload) + + def artist_related_artists(self, spotify_id) -> Awaitable: + """Get Spotify catalog information about artists similar to a given artist. + + Similarity is based on analysis of the Spotify community’s listening history. + + Parameters + ---------- + spotify_id : str + The spotify_id to search by. + """ + route = self.route( + "GET", "/artists/{spotify_id}/related-artists", spotify_id=spotify_id + ) + return self.request(route) + + def artists(self, spotify_ids) -> Awaitable: + """Get Spotify catalog information for several artists based on their Spotify IDs. + + Parameters + ---------- + spotify_id : List[str] + The spotify_ids to search with. + """ + route = self.route("GET", "/artists") + payload: Dict[str, Any] = {"ids": spotify_ids} + return self.request(route, params=payload) + + # Browse endpoints. + + def category(self, category_id, country=None, locale=None) -> Awaitable: + """Get a single category used to tag items in Spotify (on, for example, the Spotify player’s “Browse” tab). + + Parameters + ---------- + category_id : str + The Spotify category ID for the category. + country : COUNTRY_TP + COUNTRY + locale : LOCALE_TP + LOCALE + """ + route = self.route( + "GET", "/browse/categories/{category_id}", category_id=category_id + ) + payload: Dict[str, Any] = {} + + if country: + payload["country"] = country + + if locale: + payload["locale"] = locale + + return self.request(route, params=payload) + + def category_playlists( + self, + category_id, + limit: Optional[int] = 20, + offset: Optional[int] = 0, + country=None, + ) -> Awaitable: + """Get a list of Spotify playlists tagged with a particular category. + + Parameters + ---------- + category_id : str + The Spotify category ID for the category. + limit : Optional[int] + The maximum number of items to return. Default: 20. Minimum: 1. Maximum: 50. + offset : Optional[int] + The index of the first item to return. Default: 0 + country : COUNTRY_TP + COUNTRY + """ + route = self.route( + "GET", "/browse/categories/{category_id}/playlists", category_id=category_id + ) + payload: Dict[str, Any] = {"limit": limit, "offset": offset} + + if country: + payload["country"] = country + + return self.request(route, params=payload) + + def categories( + self, + limit: Optional[int] = 20, + offset: Optional[int] = 0, + country=None, + locale=None, + ) -> Awaitable: + """Get a list of categories used to tag items in Spotify (on, for example, the Spotify player’s “Browse” tab). + + Parameters + ---------- + limit : Optional[int] + The maximum number of items to return. Default: 20. Minimum: 1. Maximum: 50. + offset : Optional[int] + The index of the first item to return. Default: 0 + country : COUNTRY_TP + COUNTRY + locale : LOCALE_TP + LOCALE + """ + route = self.route("GET", "/browse/categories") + payload: Dict[str, Any] = {"limit": limit, "offset": offset} + + if country: + payload["country"] = country + + if locale: + payload["locale"] = locale + + return self.request(route, params=payload) + + def featured_playlists( + self, + locale=None, + country=None, + timestamp=None, + limit: Optional[int] = 20, + offset: Optional[int] = 0, + ): + """Get a list of Spotify featured playlists (shown, for example, on a Spotify player’s ‘Browse’ tab). + + Parameters + ---------- + locale : LOCALE_TP + LOCALE + country : COUNTRY_TP + COUNTRY + timestamp : TIMESTAMP_TP + TIMESTAMP + limit : Optional[int] + The maximum number of items to return. Default: 20. Minimum: 1. Maximum: 50. + offset : Optional[int] + The index of the first item to return. Default: 0 + """ + route = self.route("GET", "/browse/featured-playlists") + payload: Dict[str, Any] = {"limit": limit, "offset": offset} + + if country: + payload["country"] = country + + if locale: + payload["locale"] = locale + + if timestamp: + payload["timestamp"] = timestamp + + return self.request(route, params=payload) + + def new_releases( + self, *, country=None, limit: Optional[int] = 20, offset: Optional[int] = 0 + ) -> Awaitable: + """Get a list of new album releases featured in Spotify (shown, for example, on a Spotify player’s “Browse” tab). + + Parameters + ---------- + limit : Optional[int] + The maximum number of items to return. Default: 20. Minimum: 1. Maximum: 50. + offset : Optional[int] + The index of the first item to return. Default: 0 + country : COUNTRY_TP + COUNTRY + """ + route = self.route("GET", "/browse/new-releases") + payload: Dict[str, Any] = {"limit": limit, "offset": offset} + + if country: + payload["country"] = country + + return self.request(route, params=payload) + + def recommendations( + self, + seed_artists, + seed_genres, + seed_tracks, + *, + limit: Optional[int] = 20, + market=None, + **filters, + ): + """Get Recommendations Based on Seeds. + + Parameters + ---------- + seed_artists : str + A comma separated list of Spotify IDs for seed artists. Up to 5 seed values may be provided. + seed_genres : str + A comma separated list of any genres in the set of available genre seeds. Up to 5 seed values may be provided. + seed_tracks : str + A comma separated list of Spotify IDs for a seed track. Up to 5 seed values may be provided. + limit : Optional[int] + The maximum number of items to return. Default: 20. Minimum: 1. Maximum: 50. + market : Optional[str] + An ISO 3166-1 alpha-2 country code. + max_* : Optional[Keyword arguments] + For each tunable track attribute, a hard ceiling on the selected track attribute’s value can be provided. + min_* : Optional[Keyword arguments] + For each tunable track attribute, a hard floor on the selected track attribute’s value can be provided. + target_* : Optional[Keyword arguments] + For each of the tunable track attributes (below) a target value may be provided. + """ + route = self.route("GET", "/recommendations") + payload: Dict[str, Any] = { + "seed_artists": seed_artists, + "seed_genres": seed_genres, + "seed_tracks": seed_tracks, + "limit": limit, + } + + if market: + payload["market"] = market + + if filters: + payload.update(filters) + + return self.request(route, params=payload) + + # Follow related endpoints. + + def following_artists_or_users(self, ids, *, type_="artist") -> Awaitable: + """Check to see if the current user is following one or more artists or other Spotify users. + + Parameters + ---------- + ids : List[:class:`str`] + A comma-separated list of the artist or the user Spotify IDs to check. + A maximum of 50 IDs can be sent in one request. + type_ : Optional[:class:`str`] + The ID type: either "artist" or "user". + Default: "artist" + """ + route = self.route("GET", "/me/following/contains") + payload: Dict[str, Any] = {"ids": ids, "type": type_} + + return self.request(route, params=payload) + + def following_playlists(self, playlist_id: str, ids: List[str]) -> Awaitable: + """Check to see if one or more Spotify users are following a specified playlist. + + Parameters + ---------- + playlist_id : :class:`str` + The Spotify ID of the playlist. + ids : List[:class:`str`] + A list of the artist or the user Spotify IDs. + A maximum of five IDs are allowed. + """ + route = self.route( + "GET", + "/playlists/{playlist_id}/followers/contains", + playlist_id=playlist_id, + ) + payload: Dict[str, Any] = {"ids": ids} + + return self.request(route, params=payload) + + def follow_artist_or_user(self, type_: str, ids: List[str]) -> Awaitable: + """Add the current user as a follower of one or more artists or other Spotify users. + + Parameters + ---------- + type_ : :class:`str` + either artist or user. + ids : List[:class:`str`] + A list of the artist or the user Spotify IDs. + """ + route = self.route("PUT", "/me/following") + payload: Dict[str, Any] = {"ids": ids, "type": type_} + + return self.request(route, params=payload) + + def followed_artists( + self, *, limit: Optional[int] = 20, after: Optional[str] = None + ) -> Awaitable: + """Get the current user’s followed artists. + + Parameters + ---------- + limit : Optional[:class:`int`] + The maximum number of items to return. Default: 20. Minimum: 1. Maximum: 50. + after : Optional[:class:`str`] + The last artist ID retrieved from the previous request. + """ + route = self.route("GET", "/me/following") + payload: Dict[str, Any] = {"limit": limit, "type": "artist"} + + if after: + payload["after"] = after + + return self.request(route, params=payload) + + def unfollow_artists_or_users(self, type_: str, ids: List[str]) -> Awaitable: + """Remove the current user as a follower of one or more artists or other Spotify users. + + Parameters + ---------- + type_ : :class:`str` + either artist or user. + ids : List[:class:`str`] + A list of the artist or the user Spotify IDs. + """ + route = self.route("DELETE", "/me/following") + payload: Dict[str, Any] = {"ids": ids, "type": type_} + + return self.request(route, params=payload) + + def unfollow_playlist(self, playlist_id: str) -> Awaitable: + """Remove the current user as a follower of a playlist. + + Parameters + ---------- + playlist_id : :class:`str` + The Spotify ID of the playlist that is to be no longer followed. + """ + route = self.route( + "DELETE", "/playlists/{playlist_id}/followers", playlist_id=playlist_id + ) + + return self.request(route) + + def is_saved_album(self, ids) -> Awaitable: + """Check if one or more albums is already saved in the current Spotify user’s ‘Your Music’ library. + + Parameters + ---------- + ids : List[:class:`str`] + A list of the Spotify IDs. + """ + route = self.route("GET", "/me/albums/contains") + payload: Dict[str, Any] = {"ids": ",".join(ids)} + + return self.request(route, params=payload) + + def is_saved_track(self, ids: List[str]) -> Awaitable: + """Check if one or more tracks is already saved in the current Spotify user’s ‘Your Music’ library. + + Parameters + ---------- + ids : List[:class:`str`] + A list of the Spotify IDs. + """ + route = self.route("GET", "/me/tracks/contains") + payload: Dict[str, Any] = {"ids": ",".join(ids)} + + return self.request(route, params=payload) + + def saved_albums( + self, + *, + limit: Optional[int] = 20, + offset: Optional[int] = 0, + market: Optional[str] = None, + ) -> Awaitable: + """Get a list of the albums saved in the current Spotify user’s ‘Your Music’ library. + + Parameters + ---------- + limit : Optional[:class:`str`] + The maximum number of objects to return. Default: 20. Minimum: 1. Maximum: 50. + offset : Optional[:class:`str`] + The index of the first object to return. Default: 0 (i.e., the first object). Use with limit to get the next set of objects. + market : Optional[:class:`str`] + An ISO 3166-1 alpha-2 country code or the string from_token. Provide this parameter if you want to apply Track Relinking. + """ + route = self.route("GET", "/me/albums") + payload: Dict[str, Any] = {"limit": limit, "offset": offset} + + if market is not None: + payload["market"] = market + + return self.request(route, params=payload) + + def saved_tracks( + self, + *, + limit: Optional[int] = 20, + offset: Optional[int] = 0, + market: Optional[str] = None, + ) -> Awaitable: + """Get a list of the songs saved in the current Spotify user’s ‘Your Music’ library. + + Parameters + ---------- + limit : Optional[:class:`str`] + The maximum number of objects to return. Default: 20. Minimum: 1. Maximum: 50. + offset : Optional[:class:`str`] + The index of the first object to return. Default: 0 (i.e., the first object). Use with limit to get the next set of objects. + market : Optional[:class:`str`] + An ISO 3166-1 alpha-2 country code or the string from_token. Provide this parameter if you want to apply Track Relinking. + """ + route = self.route("GET", "/me/tracks") + payload: Dict[str, Any] = {"limit": limit, "offset": offset} + + if market: + payload["market"] = market + + return self.request(route, params=payload) + + def delete_saved_albums(self, ids: List[str]) -> Awaitable: + """Remove one or more albums from the current user’s ‘Your Music’ library. + + Parameters + ---------- + ids : List[:class:`str`] + A list of the Spotify IDs. + """ + route = self.route("DELETE", "/me/albums") + return self.request(route, json=ids) + + def delete_saved_tracks(self, ids: List[str]) -> Awaitable: + """Remove one or more tracks from the current user’s ‘Your Music’ library. + + Parameters + ---------- + ids : List[:class:`str`] + A list of the Spotify IDs. + """ + route = self.route("DELETE", "/me/tracks") + return self.request(route, json=ids) + + def save_tracks(self, ids: List[str]) -> Awaitable: + """Save one or more tracks to the current user’s ‘Your Music’ library. + + Parameters + ---------- + ids : List[:class:`str`] + A list of the Spotify IDs. + """ + route = self.route("PUT", "/me/tracks") + return self.request(route, json=ids) + + def save_albums(self, ids: List[str]) -> Awaitable: + """Save one or more albums to the current user’s ‘Your Music’ library. + + Parameters + ---------- + ids : List[:class:`str`] + A list of the Spotify IDs. + """ + route = self.route("PUT", "/me/albums") + return self.request(route, json=ids) + + def top_artists_or_tracks( + self, + type_: str, + *, + limit: Optional[int] = 20, + offset: Optional[int] = 0, + time_range: Optional[str] = None, + ) -> Awaitable: + """Get the current user’s top artists or tracks based on calculated affinity. + + Affinity is a measure of the expected preference a user has for a particular track or artist. + It is based on user behavior, including play history, but does not include actions made while in incognito mode. + Light or infrequent users of Spotify may not have sufficient play history to generate a full affinity data set. + + As a user’s behavior is likely to shift over time, this preference data is available over three time spans. + + For each time range, the top 50 tracks and artists are available for each user. + In the future, it is likely that this restriction will be relaxed. This data is typically updated once each day for each user. + + Parameters + ---------- + type_ : :class;`str` + The type of entity to return. Valid values: "artists" or "tracks". + limit : Optional[:class:`int`] + The number of entities to return. Default: 20. Minimum: 1. Maximum: 50. For example: limit=2 + offset : Optional[:class:`int`] + The index of the first entity to return. Default: 0 (i.e., the first track). Use with limit to get the next set of entities. + time_range : Optional[:class:`str`] + Over what time frame the affinities are computed. + Valid values: + - "long_term" (calculated from several years of data and including all new data as it becomes available) + - "medium_term" (approximately last 6 months) + - "short_term" (approximately last 4 weeks). Default: medium_term. + """ + route = self.route("GET", "/me/top/{type_}", type_=type_) + payload: Dict[str, Any] = {"limit": limit, "offset": offset} + + if time_range is not None: + payload["time_range"] = time_range + + return self.request(route, params=payload) + + def available_devices(self) -> Awaitable: + """Get information about a user’s available devices.""" + route = self.route("GET", "/me/player/devices") + return self.request(route) + + def current_player(self, *, market: Optional[str] = None) -> Awaitable: + """Get information about the user’s current playback state, including track, track progress, and active device. + + Parameters + ---------- + market : Optional[:class:`str`] + An ISO 3166-1 alpha-2 country code or the string from_token. Provide this parameter if you want to apply Track Relinking. + """ + route = self.route("GET", "/me/player") + payload: Dict[str, Any] = {} + + if market: + payload["market"] = market + + return self.request(route, params=payload) + + def playback_queue(self, *, uri: str, device_id: Optional[str] = None) -> Awaitable: + """Add an item to the end of the user’s current playback queue. + + Parameters + ---------- + uri : :class:`str` + The uri of the item to add to the queue. Must be a track or an + episode uri. + device_id : :class:`str` + The id of the device this command is targeting. If not supplied, + the user’s currently active device is the target. + """ + route = self.route("POST", "/me/player/queue") + params = {"uri": uri} + + if device_id is not None: + params["device_id"] = device_id + + return self.request(route, params=params) + + def recently_played( + self, + *, + limit: Optional[int] = 20, + before: Optional[str] = None, + after: Optional[str] = None, + ) -> Awaitable: + """Get tracks from the current user’s recently played tracks. + + Returns the most recent 50 tracks played by a user. + Note that a track currently playing will not be visible in play history until it has completed. + A track must be played for more than 30 seconds to be included in play history. + + Any tracks listened to while the user had “Private Session” enabled in their client will not be returned in the list of recently played tracks. + + The endpoint uses a bidirectional cursor for paging. + Follow the next field with the before parameter to move back in time, or use the after parameter to move forward in time. + If you supply no before or after parameter, the endpoint will return the most recently played songs, and the next link will page back in time. + + Parameters + ---------- + limit : Optional[:class:`int`] + The maximum number of items to return. Default: 20. Minimum: 1. Maximum: 50. + after : Optional[:class:`str`] + A Unix timestamp in milliseconds. Returns all items after (but not including) this cursor position. If after is specified, before must not be specified. + before : Optional[:class:`str`] + A Unix timestamp in milliseconds. Returns all items before (but not including) this cursor position. If before is specified, after must not be specified. + """ + route = self.route("GET", "/me/player/recently-played") + payload: Dict[str, Any] = {"limit": limit} + + if before: + payload["before"] = before + elif after: + payload["after"] = after + + return self.request(route, params=payload) + + def currently_playing(self, *, market: Optional[str] = None) -> Awaitable: + """Get the object currently being played on the user’s Spotify account. + + Parameters + ---------- + market : Optional[:class:`str`] + An ISO 3166-1 alpha-2 country code or the string from_token. Provide this parameter if you want to apply Track Relinking. + """ + route = self.route("GET", "/me/player/currently-playing") + payload: Dict[str, Any] = {} + + if market: + payload["market"] = market + + return self.request(route, params=payload) + + def pause_playback(self, *, device_id: Optional[str] = None) -> Awaitable: + """Pause playback on the user’s account. + + Parameters + ---------- + device_id : Optional[:class:`str`] + The id of the device this command is targeting. If not supplied, the user’s currently active device is the target. + """ + route = self.route("PUT", "/me/player/pause") + payload: Dict[str, Any] = {} + + if device_id: + payload["device_id"] = device_id + + return self.request(route, params=payload) + + def seek_playback( + self, position_ms: int, *, device_id: Optional[str] = None + ) -> Awaitable: + """Seeks to the given position in the user’s currently playing track. + + Parameters + ---------- + position_ms : :class:`int` + The position in milliseconds to seek to. Must be a positive number. Passing in a position that is greater than the length of the track will cause the player to start playing the next song. + device_id : Optional[:class:`str`] + The id of the device this command is targeting. If not supplied, the user’s currently active device is the target. + """ + route = self.route("PUT", "/me/player/seek") + payload: Dict[str, Any] = {"position_ms": position_ms} + + if device_id: + payload["device_id"] = device_id + + return self.request(route, params=payload) + + def repeat_playback( + self, state: str, *, device_id: Optional[str] = None + ) -> Awaitable: + """Set the repeat mode for the user’s playback. Options are repeat-track, repeat-context, and off. + + Parameters + ---------- + state : :class:`str` + "track", "context" or "off". + - track will repeat the current track. + - context will repeat the current context. + - off will turn repeat off. + device_id : Optional[str] + The id of the device this command is targeting. If not supplied, the user’s currently active device is the target. + """ + route = self.route("PUT", "/me/player/repeat") + payload: Dict[str, Any] = {"state": state} + + if device_id: + payload["device_id"] = device_id + + return self.request(route, params=payload) + + def set_playback_volume( + self, volume: int, *, device_id: Optional[str] = None + ) -> Awaitable: + """Set the volume for the user’s current playback device. + + Parameters + ---------- + volume : :class:`int` + The volume to set. Must be a value from 0 to 100 inclusive. + device_id : Optional[:class:`str`] + The id of the device this command is targeting. If not supplied, the user’s currently active device is the target. + """ + route = self.route("PUT", "/me/player/volume") + payload: Dict[str, Any] = {"volume_percent": volume} + + if device_id: + payload["device_id"] = device_id + + return self.request(route, params=payload) + + def skip_next(self, *, device_id: Optional[str] = None) -> Awaitable: + """Skips to next track in the user’s queue. + + Parameters + ---------- + device_id : Optional[:class:`str`] + The id of the device this command is targeting. If not supplied, the user’s currently active device is the target. + """ + route = self.route("POST", "/me/player/next") + payload: Dict[str, Any] = {} + + if device_id: + payload["device_id"] = device_id + + return self.request(route, params=payload) + + def skip_previous(self, *, device_id: Optional[str] = None) -> Awaitable: + """Skips to previous track in the user’s queue. + + Parameters + ---------- + device_id : Optional[:class:`str`] + The id of the device this command is targeting. If not supplied, the user’s currently active device is the target. + """ + route = self.route("POST", "/me/player/previous") + payload: Dict[str, Any] = {} + + if device_id: + payload["device_id"] = device_id + + return self.request(route, params=payload) + + def play_playback( + self, + context_uri: Union[str, Sequence[str]], + *, + offset: Optional[Union[str, int]] = None, + device_id: Optional[str] = None, + position_ms: Optional[int] = 0, + ) -> Awaitable: + """Start a new context or resume current playback on the user’s active device. + + .. note:: + + In order to resume playback set the context_uri to None. + + Parameters + ---------- + context_uri : Union[str, Sequence[:class:`str`]] + The context to play, if it is a string + then it must be a uri of a album, artist + or playlist. + + Otherwise a sequece of strings can be passed + in and they must all be track URIs + offset : Optional[Union[:class:`str`, :class:`int`]] + The offset of which to start from, + must either be an integer or a track URI. + device_id : Optional[:class:`str`] + The id of the device this command is targeting. If not supplied, the user’s currently active device is the target. + position_ms : Optional[:class:`int`] + indicates from what position to start playback. Must be a positive number. + Passing in a position that is greater than the length of the track will cause the player to start playing the next song. + """ + route = self.route("PUT", "/me/player/play") + payload: Dict[str, Any] = {"position_ms": position_ms} + params: Dict[str, Any] = {} + can_set_offset: bool = False + + if isinstance(context_uri, str): + payload["context_uri"] = context_uri + can_set_offset = "playlist" in context_uri or "album" in context_uri + + elif hasattr(context_uri, "__iter__"): + payload["uris"] = list(context_uri) + can_set_offset = True + + elif context_uri is None: + pass # Do nothing, context_uri == None is allowed and intended for resume's + + else: + raise TypeError( + f"`context_uri` must be a string or an iterable object, got {type(context_uri)}" + ) + + if offset is not None: + if can_set_offset: + _offset: Dict[str, Union[int, str]] + + if isinstance(offset, str): + _offset = {"uri": offset} + + elif isinstance(offset, int): + _offset = {"position": offset} + + else: + raise TypeError( + f"`offset` should be either a string or an integer, got {type(offset)}" + ) + + payload["offset"] = _offset + else: + raise ValueError( + "not able to set `offset` as either `context_uri` was not a list or it was a playlist or album uri." + ) + + if device_id is not None: + params["device_id"] = device_id + + return self.request(route, params=params, json=payload) + + def shuffle_playback( + self, state: bool, *, device_id: Optional[str] = None + ) -> Awaitable: + """Toggle shuffle on or off for user’s playback. + + Parameters + ---------- + state : :class:`bool` + True : Shuffle user’s playback + False : Do not shuffle user’s playback. + device_id : Optional[:class:`str`] + The id of the device this command is targeting. If not supplied, the user’s currently active device is the target. + """ + route = self.route("PUT", "/me/player/shuffle") + payload: Dict[str, Any] = {"state": f"{bool(state)}".lower()} + if device_id is not None: + payload["device_id"] = device_id + + return self.request(route, params=payload) + + def transfer_player( + self, device_id: str, *, play: Optional[bool] = False + ) -> Awaitable: + """Transfer playback to a new device and determine if it should start playing. + + .. note: + + Note that a value of false for the play parameter when also transferring to another device_id will not pause playback. + To ensure that playback is paused on the new device you should send a pause command to the currently active device before transferring to the new device_id. + + Parameters + ---------- + device_id : :class:`str` + A Spotify Device ID + play : Optional[:class:`bool`] + True: ensure playback happens on new device. + False or not provided: keep the current playback state. + """ + route = self.route("PUT", "/me/player") + payload: Dict[str, Any] = {"device_ids": [device_id], "play": play} + + return self.request(route, json=payload) + + def add_playlist_tracks( + self, + playlist_id: str, + tracks: Sequence[Union[str]], + position: Optional[int] = None, + ) -> Awaitable: + """Add one or more tracks to a user’s playlist. + + Parameters + ---------- + playlist_id : :class:`str` + The Spotify ID for the playlist. + tracks : Sequence[Union[:class:`str`]] + A sequence of track URIs. + position : Optional[:class:`int`] + The position to insert the tracks, a zero-based index. + """ + route = self.route( + "POST", "/playlists/{playlist_id}/tracks", playlist_id=playlist_id + ) + + payload: Dict[str, Any] = {"uris": [track for track in tracks]} + + if position is not None: + payload["position"] = position + + return self.request(route, json=payload) + + def change_playlist_details( + self, + playlist_id: str, + *, + name: Optional[str] = None, + public: Optional[bool] = None, + collaborative: Optional[bool] = None, + description: Optional[str] = None, + ) -> Awaitable: + """Change a playlist’s name and public/private state. (The user must, of course, own the playlist.) + + Parameters + ---------- + playlist_id : :class:`str` + The Spotify ID for the playlist. + name : :class:`str` + The name for the new playlist + public : Optional[:class:`bool`] + Defaults to true . If true the playlist will be public, if false it will be private + collaborative : Optional[:class:`bool`] + Defaults to false . If true the playlist will be collaborative. + + .. note:: + to create a collaborative playlist you must also set public to false + description : Optional[:class:`str`] + The value for playlist description as displayed in Spotify Clients and in the Web API. + """ + route = self.route("PUT", "/playlists/{playlist_id}", playlist_id=playlist_id) + + payload: Dict[str, Any] = filter_items( + { + "name": name, + "public": public, + "collaborative": collaborative, + "description": description, + } + ) + + return self.request(route, json=payload) + + def create_playlist( + self, + user_id: str, + *, + name: str, + public: Optional[bool] = True, + collaborative: Optional[bool] = False, + description: Optional[str] = "", + ) -> Awaitable: + """Create a playlist for a Spotify user. (The playlist will be empty until you add tracks.) + + Parameters + ---------- + user_id : :class:`str` + The user’s Spotify user ID. + name : :class:`str` + The name for the new playlist + public : Optional[:class:`bool`] + Defaults to true . If true the playlist will be public, if false it will be private + collaborative : Optional[:class:`bool`] + Defaults to false . If true the playlist will be collaborative. + + .. note:: + to create a collaborative playlist you must also set public to false + description : Optional[:class:`str`] + The value for playlist description as displayed in Spotify Clients and in the Web API. + """ + route = self.route("POST", "/users/{user_id}/playlists", user_id=user_id) + + payload: Dict[str, Any] = { + "name": name, + "public": public, + "collaborative": collaborative, + "description": description, + } + + return self.request(route, json=payload) + + def follow_playlist( + self, playlist_id: str, *, public: Optional[bool] = True + ) -> Awaitable: + """Add the current user as a follower of a playlist. + + Parameters + ---------- + playlist_id : :class:`str` + The Spotify ID of the playlist. Any playlist can be followed, regardless of its public/private status, as long as you know its playlist ID. + public : Optional[:class:`bool`] + Defaults to true. If true the playlist will be included in user’s public playlists, if false it will remain private. + """ + route = self.route( + "PUT", "/playlists/{playlist_id}/followers", playlist_id=playlist_id + ) + + payload: Dict[str, Any] = {"public": public} + + return self.request(route, json=payload) + + def current_playlists( + self, *, limit: Optional[int] = 20, offset: Optional[int] = 0 + ) -> Awaitable: + """Get a list of the playlists owned or followed by the current Spotify user. + + Parameters + ---------- + limit : Optional[:class:`str`] + The maximum number of playlists to return. Default: 20. Minimum: 1. Maximum: 50. + offset : Optional[:class:`str`] + he index of the first playlist to return. Default: 0 (the first object). Maximum offset: 100.000. + """ + route = self.route("GET", "/me/playlists") + return self.request(route, params={"limit": limit, "offset": offset}) + + def get_playlists( + self, user_id: str, *, limit: Optional[int] = 20, offset: Optional[int] = 0 + ) -> Awaitable: + """Get a list of the playlists owned or followed by a Spotify user. + + Parameters + ---------- + user_id : :class:`str` + The user’s Spotify user ID. + limit : Optional[:class:`str`] + The maximum number of playlists to return. Default: 20. Minimum: 1. Maximum: 50. + offset : Optional[:class:`str`] + he index of the first playlist to return. Default: 0 (the first object). Maximum offset: 100.000. + """ + route = self.route("GET", "/users/{user_id}/playlists", user_id=user_id) + return self.request(route, params={"limit": limit, "offset": offset}) + + def get_playlist_cover_image(self, playlist_id: str) -> Awaitable: + """Get the current image associated with a specific playlist. + + Parameters + ---------- + playlist_id : :class:`str` + The Spotify ID for the playlist. + """ + route = self.route( + "GET", "/playlists/{playlist_id}/images", playlist_id=playlist_id + ) + return self.request(route) + + def get_playlist( + self, + playlist_id: str, + *, + fields: Optional[str] = None, + market: Optional[str] = None, + ) -> Awaitable: + """Get a playlist owned by a Spotify user. + + Parameters + ---------- + playlist_id : :class:`str` + The Spotify ID for the playlist. + fields: Optional[:class:`str`] + Filters for the query: a comma-separated list of the fields to return. + If omitted, all fields are returned. For example, to get just the total number of tracks and the request limit: `fields=total,limit` + + A dot separator can be used to specify non-reoccurring fields, while parentheses can be used to specify reoccurring fields within objects. + For example, to get just the added date and user ID of the adder: `fields=items(added_at,added_by.id)` + + Use multiple parentheses to drill down into nested objects, for example: `fields=items(track(name,href,album(name,href)))` + + Fields can be excluded by prefixing them with an exclamation mark, for example: `fields=items.track.album(!external_urls,images)` + market : Optional[:class:`str`] + An ISO 3166-1 alpha-2 country code or the string "from_token". + Provide this parameter if you want to apply Track Relinking. + """ + route = self.route("GET", "/playlists/{playlist_id}", playlist_id=playlist_id) + payload: Dict[str, Any] = {} + + if fields: + payload["fields"] = fields + + if market: + payload["market"] = market + + return self.request(route, params=payload) + + def get_playlist_tracks( + self, + playlist_id: str, + *, + fields: Optional[str] = None, + market: Optional[str] = None, + limit: Optional[int] = 20, + offset: Optional[int] = 0, + ) -> Awaitable: + """Get full details of the tracks of a playlist owned by a Spotify user. + + Parameters + ---------- + playlist_id : :class:`str` + The Spotify ID for the playlist. + fields: Optional[:class:`str`] + Filters for the query: a comma-separated list of the fields to return. + If omitted, all fields are returned. For example, to get just the total number of tracks and the request limit: `fields=total,limit` + + A dot separator can be used to specify non-reoccurring fields, while parentheses can be used to specify reoccurring fields within objects. + For example, to get just the added date and user ID of the adder: `fields=items(added_at,added_by.id)` + + Use multiple parentheses to drill down into nested objects, for example: `fields=items(track(name,href,album(name,href)))` + + Fields can be excluded by prefixing them with an exclamation mark, for example: `fields=items.track.album(!external_urls,images)` + limit : Optional[:class:`str`] + The maximum number of tracks to return. Default: 100. Minimum: 1. Maximum: 100. + offset : Optional[:class:`str`] + The index of the first track to return. Default: 0 (the first object). + market : Optional[:class:`str`] + An ISO 3166-1 alpha-2 country code or the string "from_token". + Provide this parameter if you want to apply Track Relinking. + """ + route = self.route( + "GET", "/playlists/{playlist_id}/tracks", playlist_id=playlist_id + ) + payload: Dict[str, Any] = {"limit": limit, "offset": offset} + + if fields: + payload["fields"] = fields + + if market: + payload["market"] = market + + return self.request(route, params=payload) + + def remove_playlist_tracks( + self, + playlist_id: str, + tracks: Sequence[Union[str, Dict[str, Any]]], + *, + snapshot_id: str = None, + ) -> Awaitable: + """Remove one or more tracks from a user’s playlist. + + Parameters + ---------- + playlist_id : str + The id of the playlist to target + tracks : Sequence[Union[str, Dict[str, Union[str, int]]]] + Either a sequence of track URIs to remove a specific occurence + of a track or for targeted removal pass in a dict that looks like + `{'uri': URI, 'position': POSITIONS}` where `URI` is track URI and + `POSITIONS` is an list of integers + snapshot_id : Optional[str] + The snapshot to target. + """ + route = self.route( + "DELETE ", "/playlists/{playlist_id}/tracks", playlist_id=playlist_id + ) + payload: Dict[str, Any] = { + "tracks": [ + ({"uri": track} if isinstance(track, str) else track) + for track in tracks + ] + } + + if snapshot_id: + payload["snapshot_id"] = snapshot_id + + return self.request(route, json=payload) + + def reorder_playlists_tracks( + self, + playlist_id: str, + range_start: int, + range_length: int, + insert_before: int, + *, + snapshot_id: Optional[str] = None, + ) -> Awaitable: + """Reorder a track or a group of tracks in a playlist. + + Visualization of how reordering tracks works + + .. image:: /images/visualization-reordering-tracks.png + + .. note:: + When reordering tracks, the timestamp indicating when they were added and the user who added them will be kept untouched. + In addition, the users following the playlists won’t be notified about changes in the playlists when the tracks are reordered. + + Parameters + ---------- + playlist_id : :class:`str` + The Spotify ID for the playlist. + range_start : :class:`int` + The position of the first track to be reordered. + range_length : :class:`int` + The amount of tracks to be reordered. Defaults to 1 if not set. + + The range of tracks to be reordered begins from the range_start position, and includes the range_length subsequent tracks. + insert_before : :class:`int` + The position where the tracks should be inserted. + + To reorder the tracks to the end of the playlist, simply set insert_before to the position after the last track. + snapshot_id : Optional[:class:`str`] + The playlist’s snapshot ID against which you want to make the changes. + """ + route = self.route( + "PUT", "/playlists/{playlist_id}/tracks", playlist_id=playlist_id + ) + payload: Dict[str, Any] = { + "range_start": range_start, + "range_length": range_length, + "insert_before": insert_before, + } + + if snapshot_id: + payload["snapshot_id"] = snapshot_id + + return self.request(route, json=payload) + + def replace_playlist_tracks( + self, playlist_id: str, tracks: Sequence[str] + ) -> Awaitable: + """Replace all the tracks in a playlist, overwriting its existing tracks. + + .. note:: + + This powerful request can be useful for replacing tracks, re-ordering existing tracks, or clearing the playlist. + + Parameters + ---------- + playlist_id : :class:`str` + The Spotify ID for the playlist. + tracks : Sequence[:class:`str`] + A list of tracks to replace with. + """ + route = self.route( + "PUT", "/playlists/{playlist_id}/tracks", playlist_id=playlist_id + ) + payload: Dict[str, Any] = {"uris": tuple(tracks)} + + return self.request(route, json=payload) + + def upload_playlist_cover_image( + self, playlist_id: str, file: BinaryIO + ) -> Awaitable: + """Replace the image used to represent a specific playlist. + + Parameters + ---------- + playlist_id : :class:`str` + The Spotify ID for the playlist. + file : File-like object + An file-like object that supports reading + the contents that are being read should be :class:`bytes` + """ + route = self.route( + "PUT", "/playlists/{playlist_id}/images", playlist_id=playlist_id + ) + return self.request( + route, data=b64encode(file.read()), content_type="image/jpeg" + ) + + def track_audio_analysis(self, track_id: str) -> Awaitable: + """Get a detailed audio analysis for a single track identified by its unique Spotify ID. + + The Audio Analysis endpoint provides low-level audio analysis for all of the tracks in the Spotify catalog. + The Audio Analysis describes the track’s structure and musical content, including rhythm, pitch, and timbre. + All information is precise to the audio sample. + + Many elements of analysis include confidence values, a floating-point number ranging from 0.0 to 1.0. + Confidence indicates the reliability of its corresponding attribute. + Elements carrying a small confidence value should be considered speculative. + There may not be sufficient data in the audio to compute the attribute with high certainty. + + Parameters + ---------- + track_id : :class:`str` + The Spotify ID for the track. + """ + route = self.route("GET", "/audio-analysis/{id}", id=track_id) + return self.request(route) + + def track_audio_features(self, track_id: str) -> Awaitable: + """Get audio feature information for a single track identified by its unique Spotify ID. + + Parameters + ---------- + track_id : :class:`str` + The Spotify ID for the track. + """ + route = self.route("GET", "/audio-features/{id}", id=track_id) + return self.request(route) + + def audio_features(self, track_ids: List[str]) -> Awaitable: + """Get audio features for multiple tracks based on their Spotify IDs. + + Parameters + ---------- + track_ids : List[:class:`str`] + A comma-separated list of the Spotify IDs for the tracks. Maximum: 100 IDs. + """ + route = self.route("GET", "/audio-features") + return self.request(route, params={"ids": track_ids}) + + def track(self, track_id: str, market: Optional[str] = None) -> Awaitable: + """Get Spotify catalog information for a single track identified by its unique Spotify ID. + + Parameters + ---------- + track_id : :class:`str` + The Spotify ID for the track. + market : Optional[:class:`str`] + An ISO 3166-1 alpha-2 country code or the string "from_token". + Provide this parameter if you want to apply Track Relinking. + """ + route = self.route("GET", "/tracks/{id}", id=track_id) + payload: Dict[str, Any] = {} + + if market is not None: + payload["market"] = market + + return self.request(route, params=payload) + + def tracks(self, track_ids: List[str], market: Optional[str] = None) -> Awaitable: + """Get Spotify catalog information for multiple tracks based on their Spotify IDs. + + Parameters + ---------- + track_ids : List[:class:`str`] + A comma-separated list of the Spotify IDs for the tracks. Maximum: 50 IDs. + market : Optional[:class:`str`] + An ISO 3166-1 alpha-2 country code or the string "from_token". + Provide this parameter if you want to apply Track Relinking. + """ + route = self.route("GET", "/tracks") + payload: Dict[str, Any] = {"ids": track_ids} + + if market is not None: + payload["market"] = market + + return self.request(route, params=payload) + + def current_user(self) -> Awaitable: + """Get detailed profile information about the current user (including the current user’s username).""" + route = self.route("GET", "/me") + return self.request(route) + + def user(self, user_id: str) -> Awaitable: + """Get public profile information about a Spotify user. + + Parameters + --------- + user_id : class:`str` + The user’s Spotify user ID. + """ + route = self.route("GET", "/users/{user_id}", user_id=user_id) + return self.request(route) + + def search( # pylint: disable=invalid-name + self, + q: str, + query_type: str = "track,playlist,artist,album", + market: str = "US", + limit: int = 20, + offset: int = 0, + include_external: Optional[str] = None, + ) -> Awaitable: + """Get Spotify Catalog information about artists, albums, tracks or playlists that match a keyword string. + + Parameters + ---------- + q : :class:`str` + Search query keywords and optional field filters and operators. e.g. `roadhouse blues.` + + query_type : Optional[:class:`str`] + A comma-separated list of item types to search across. (default: "track,playlist,artist,album") + Valid types are: album, artist, playlist, and track. + Search results include hits from all the specified item types. + + market : Optional[:class:`str`] + An ISO 3166-1 alpha-2 country code or the string "from_token". (default: "US") + If a country code is specified, only artists, albums, and tracks with content that is playable in that market is returned. + + .. note:: + - Playlist results are not affected by the market parameter. + - If market is set to "from_token", and a valid access token is specified in the request header, only + content playable in the country associated with the user account, is returned. + - Users can view the country that is associated with their account in the account settings. A user must + grant access to the user-read-private scope prior to when the access token is issued. + + limit : Optional[:class:`int`] + Maximum number of results to return. (Default: 20, Minimum: 1, Maximum: 50) + + .. note:: + The limit is applied within each type, not on the total response. + For example, if the limit value is 3 and the type is artist,album, the response contains 3 artists and 3 albums. + + offset : Optional[:class:`int`] + The index of the first result to return. + Default: 0 (the first result). + Maximum offset (including limit): 10,000. + Use with limit to get the next page of search results. + + include_external : Optional[:class:`str`] + Possible values: `audio` + If `include_external=audio` is specified the response will include any relevant audio content that is hosted externally. + By default external content is filtered out from responses. + + """ + route = self.route("GET", "/search") + payload: Dict[str, Any] = { + "q": quote(q), + "type": query_type, + "limit": limit, + "offset": offset, + } + + if market: + payload["market"] = market + + if include_external is not None: + payload["include_external"] = include_external + + return self.request(route, params=payload) + + +REFRESH_TOKEN_URL = "https://accounts.spotify.com/api/token?grant_type=refresh_token&refresh_token={refresh_token}" + + +class HTTPUserClient(HTTPClient): + """HTTPClient for access to user endpoints.""" + + def __init__( + self, + client_id: str, + client_secret: str, + token: str = None, + refresh_token: str = None, + loop=None, + ): + assert token or refresh_token + super().__init__(client_id, client_secret, loop=loop) + if token: + self.bearer_info = {"access_token": token} + self.refresh_token = refresh_token + + async def get_bearer_info(self, *_, **__): + if not self.refresh_token: + # Should only happen if User.from_token didn't receive refresh_token + raise SpotifyException( + "Access token expired and no refresh token was provided" + ) + + headers = { + "Authorization": f"Basic {b64encode(':'.join((self.client_id, self.client_secret)).encode()).decode()}", + "Content-Type": "application/x-www-form-urlencoded", + } + + route = ("POST", REFRESH_TOKEN_URL.format(refresh_token=self.refresh_token)) + return await self.request(route, headers=headers) diff --git a/pomice/spotify/models/__init__.py b/pomice/spotify/models/__init__.py new file mode 100644 index 0000000..96e5e63 --- /dev/null +++ b/pomice/spotify/models/__init__.py @@ -0,0 +1,26 @@ +from .. import _clean_namespace +from . import typing + +from .base import AsyncIterable, SpotifyBase, URIBase +from .common import Device, Context, Image +from .artist import Artist +from .track import Track, PlaylistTrack +from .player import Player +from .album import Album +from .library import Library +from .playlist import Playlist +from .user import User + +__all__ = ( + "User", + "Track", + "PlaylistTrack", + "Artist", + "Album", + "Playlist", + "Library", + "Player", + "Device", + "Context", + "Image", +) diff --git a/pomice/spotify/models/album.py b/pomice/spotify/models/album.py new file mode 100644 index 0000000..dce1955 --- /dev/null +++ b/pomice/spotify/models/album.py @@ -0,0 +1,121 @@ +from functools import partial +from typing import Optional, List + +from ..oauth import set_required_scopes +from . import AsyncIterable, URIBase, Image, Artist, Track + + +class Album(URIBase, AsyncIterable): # pylint: disable=too-many-instance-attributes + """A Spotify Album. + + Attributes + ---------- + artists : List[Artist] + The artists for the album. + id : str + The ID of the album. + name : str + The name of the album. + href : str + The HTTP API URL for the album. + uri : str + The URI for the album. + album_group : str + ossible values are “album”, “single”, “compilation”, “appears_on”. + Compare to album_type this field represents relationship between the artist and the album. + album_type : str + The type of the album: one of "album" , "single" , or "compilation". + release_date : str + The date the album was first released. + release_date_precision : str + The precision with which release_date value is known: year, month or day. + genres : List[str] + A list of the genres used to classify the album. + label : str + The label for the album. + popularity : int + The popularity of the album. The value will be between 0 and 100, with 100 being the most popular. + copyrights : List[Dict] + The copyright statements of the album. + markets : List[str] + The markets in which the album is available: ISO 3166-1 alpha-2 country codes. + """ + + def __init__(self, client, data): + self.__client = client + + # Simple object attributes. + self.type = data.pop("album_type", None) + self.group = data.pop("album_group", None) + self.artists = [Artist(client, artist) for artist in data.pop("artists", [])] + + self.artist = self.artists[0] if self.artists else None + self.markets = data.pop("avaliable_markets", None) + self.url = data.pop("external_urls").get("spotify", None) + self.id = data.pop("id", None) # pylint: disable=invalid-name + self.name = data.pop("name", None) + self.href = data.pop("href", None) + self.uri = data.pop("uri", None) + self.release_date = data.pop("release_date", None) + self.release_date_precision = data.pop("release_date_precision", None) + self.images = [Image(**image) for image in data.pop("images", [])] + self.restrictions = data.pop("restrictions", None) + + # Full object attributes + self.genres = data.pop("genres", None) + self.copyrights = data.pop("copyrights", None) + self.label = data.pop("label", None) + self.popularity = data.pop("popularity", None) + self.total_tracks = data.pop("total_tracks", None) + + # AsyncIterable attrs + self.__aiter_klass__ = Track + self.__aiter_fetch__ = partial( + self.__client.http.album_tracks, self.id, limit=50 + ) + + def __repr__(self): + return f"" + + # Public + + @set_required_scopes(None) + async def get_tracks( + self, *, limit: Optional[int] = 20, offset: Optional[int] = 0 + ) -> List[Track]: + """get the albums tracks from spotify. + + Parameters + ---------- + limit : Optional[int] + The limit on how many tracks to retrieve for this album (default is 20). + offset : Optional[int] + The offset from where the api should start from in the tracks. + + Returns + ------- + tracks : List[Track] + The tracks of the artist. + """ + data = await self.__client.http.album_tracks( + self.id, limit=limit, offset=offset + ) + return list(Track(self.__client, item, album=self) for item in data["items"]) + + @set_required_scopes(None) + async def get_all_tracks( + self, *, market: Optional[str] = "US" + ) -> List[Track]: # pylint: disable=unused-argument + """loads all of the albums tracks, depending on how many the album has this may be a long operation. + + Parameters + ---------- + market : Optional[str] + An ISO 3166-1 alpha-2 country code. Provide this parameter if you want to apply Track Relinking. + + Returns + ------- + tracks : List[:class:`spotify.Track`] + The tracks of the artist. + """ + return [track async for track in self] diff --git a/pomice/spotify/models/artist.py b/pomice/spotify/models/artist.py new file mode 100644 index 0000000..80ef633 --- /dev/null +++ b/pomice/spotify/models/artist.py @@ -0,0 +1,186 @@ +from functools import partial +from typing import Optional, List, TYPE_CHECKING + +from ..oauth import set_required_scopes +from . import AsyncIterable, URIBase, Image + +if TYPE_CHECKING: + import spotify + + +class Artist(URIBase, AsyncIterable): # pylint: disable=too-many-instance-attributes + """A Spotify Artist. + + Attributes + ---------- + id : str + The Spotify ID of the artist. + uri : str + The URI of the artist. + url : str + The open.spotify URL. + href : str + A link to the Web API endpoint providing full details of the artist. + name : str + The name of the artist. + genres : List[str] + A list of the genres the artist is associated with. + For example: "Prog Rock" , "Post-Grunge". (If not yet classified, the array is empty.) + followers : Optional[int] + The total number of followers. + popularity : int + The popularity of the artist. + The value will be between 0 and 100, with 100 being the most popular. + The artist’s popularity is calculated from the popularity of all the artist’s tracks. + images : List[Image] + Images of the artist in various sizes, widest first. + """ + + def __init__(self, client, data): + self.__client = client + + # Simplified object attributes + self.id = data.pop("id") # pylint: disable=invalid-name + self.uri = data.pop("uri") + self.url = data.pop("external_urls").get("spotify", None) + self.href = data.pop("href") + self.name = data.pop("name") + + # Full object attributes + self.genres = data.pop("genres", None) + self.followers = data.pop("followers", {}).get("total", None) + self.popularity = data.pop("popularity", None) + self.images = list(Image(**image) for image in data.pop("images", [])) + + # AsyncIterable attrs + from .album import Album + + self.__aiter_klass__ = Album + self.__aiter_fetch__ = partial( + self.__client.http.artist_albums, self.id, limit=50 + ) + + def __repr__(self): + return f"" + + # Public + + @set_required_scopes(None) + async def get_albums( + self, + *, + limit: Optional[int] = 20, + offset: Optional[int] = 0, + include_groups=None, + market: Optional[str] = None, + ) -> List["spotify.Album"]: + """Get the albums of a Spotify artist. + + Parameters + ---------- + limit : Optional[int] + The maximum number of items to return. Default: 20. Minimum: 1. Maximum: 50. + offset : Optiona[int] + The offset of which Spotify should start yielding from. + include_groups : INCLUDE_GROUPS_TP + INCLUDE_GROUPS + market : Optional[str] + An ISO 3166-1 alpha-2 country code. + + Returns + ------- + albums : List[Album] + The albums of the artist. + """ + from .album import Album + + data = await self.__client.http.artist_albums( + self.id, + limit=limit, + offset=offset, + include_groups=include_groups, + market=market, + ) + return list(Album(self.__client, item) for item in data["items"]) + + @set_required_scopes(None) + async def get_all_albums(self, *, market="US") -> List["spotify.Album"]: + """loads all of the artists albums, depending on how many the artist has this may be a long operation. + + Parameters + ---------- + market : Optional[str] + An ISO 3166-1 alpha-2 country code. + + Returns + ------- + albums : List[Album] + The albums of the artist. + """ + from .album import Album + + albums: List[Album] = [] + offset = 0 + total = await self.total_albums(market=market) + + while len(albums) < total: + data = await self.__client.http.artist_albums( + self.id, limit=50, offset=offset, market=market + ) + + offset += 50 + albums += list(Album(self.__client, item) for item in data["items"]) + + return albums + + @set_required_scopes(None) + async def total_albums(self, *, market: str = None) -> int: + """get the total amout of tracks in the album. + + Parameters + ---------- + market : Optional[str] + An ISO 3166-1 alpha-2 country code. + + Returns + ------- + total : int + The total amount of albums. + """ + data = await self.__client.http.artist_albums( + self.id, limit=1, offset=0, market=market + ) + return data["total"] + + @set_required_scopes(None) + async def top_tracks(self, country: str = "US") -> List["spotify.Track"]: + """Get Spotify catalog information about an artist’s top tracks by country. + + Parameters + ---------- + country : str + The country to search for, it defaults to 'US'. + + Returns + ------- + tracks : List[Track] + The artists top tracks. + """ + from .track import Track + + top = await self.__client.http.artist_top_tracks(self.id, country=country) + return list(Track(self.__client, item) for item in top["tracks"]) + + @set_required_scopes(None) + async def related_artists(self) -> List["Artist"]: + """Get Spotify catalog information about artists similar to a given artist. + + Similarity is based on analysis of the Spotify community’s listening history. + + Returns + ------- + artists : List[Artist] + The artists deemed similar. + """ + related = await self.__client.http.artist_related_artists(self.id) + return list(Artist(self.__client, item) for item in related["artists"]) diff --git a/pomice/spotify/models/base.py b/pomice/spotify/models/base.py new file mode 100644 index 0000000..5347b9e --- /dev/null +++ b/pomice/spotify/models/base.py @@ -0,0 +1,147 @@ +from typing import Optional, Callable, Type + +import spotify + + +class SpotifyBase: + """The base class all Spotify models **must** derive from. + + This base class is used to transparently construct spotify + models based on the :class:`spotify,Client` type. + + Currently it is used to detect whether a Client is a synchronous + client and, if as such, construct and return the appropriate + synchronous model. + """ + + __slots__ = () + + def __new__(cls, client, *_, **__): + if not isinstance(client, spotify.Client): + raise TypeError( + f"{cls!r}: expected client argument to be an instance of `spotify.Client`. Instead got {type(client)}" + ) + + if hasattr(client, "__client_thread__"): + cls = getattr( # pylint: disable=self-cls-assignment + spotify.sync.models, cls.__name__ + ) + + return object.__new__(cls) + + async def from_href(self): + """Get the full object from spotify with a `href` attribute. + + .. note :: + + This can be used to get an updated model of the object. + + Returns + ------- + model : SpotifyBase + An instance of whatever the class was before. + + Raises + ------ + TypeError + This is raised if the model has no `href` attribute. + + Additionally if the model has no `http` attribute and + the model has no way to access its client, while theoretically + impossible its a failsafe, this will be raised. + """ + if not hasattr(self, "href"): + raise TypeError( + "Spotify object has no `href` attribute, therefore cannot be retrived" + ) + + if hasattr(self, "http"): + return await self.http.request( # pylint: disable=no-member + ("GET", self.href) # pylint: disable=no-member + ) + + klass = type(self) + + try: + client = getattr(self, f"_{klass.__name__}__client") + except AttributeError: + raise TypeError("Spotify object has no way to access a HTTPClient.") + else: + http = client.http # pylint: disable=no-member + + data = await http.request(("GET", self.href)) # pylint: disable=no-member + + return klass(client, data) + + +class URIBase(SpotifyBase): + """Base class used for inheriting magic methods for models who have URIs. + + This class inherits from :class:`SpotifyBase` and is used to reduce boilerplate + in spotify models by supplying a `__eq__`, `__ne__`, and `__str__` double underscore + methods. + + The objects that inherit from :class:`URIBase` support equality and string casting. + + - Two objects are equal if **They are strictly the same type and have the same uri** + - Casting to a string will return the uri of the object. + """ + + uri = repr(None) + + def __hash__(self): + return hash(self.uri) # pylint: disable=no-member + + def __eq__(self, other): + return ( + type(self) is type(other) and self.uri == other.uri + ) # pylint: disable=no-member + + def __ne__(self, other): + return not self.__eq__(other) + + def __str__(self): + return self.uri # pylint: disable=no-member + + +class AsyncIterable(SpotifyBase): + """Base class intended for all models that can be asynchronously iterated over. + + This class implements two magic class vars: + + * `__aiter_fetch__` ~ A coroutine function that accepts a keyword argument named `option` + * `__aiter_klass__` ~ A spotify model class, essentially a type that subclasses `SpotifyBase` + + Additionally the class implements `__aiter__` that will exhaust the paging + objects returned by the `__aiter_fetch__` calls and yield each data item in + said paging objects as an instance of `__aiter_klass__`. + """ + + __aiter_fetch__: Optional[Callable] = None + __aiter_klass__: Optional[Type[SpotifyBase]] = None + + async def __aiter__(self): + client = getattr(self, f"_{type(self).__name__}__client") + + assert self.__aiter_fetch__ is not None + fetch = self.__aiter_fetch__ + + assert self.__aiter_klass__ is not None + klass = self.__aiter_klass__ + + total = None + processed = offset = 0 + + while total is None or processed < total: + data = await fetch(offset=offset) # pylint: disable=not-callable + + if total is None: + assert "total" in data + total = data["total"] + + assert "items" in data + for item in data["items"]: + processed += 1 + yield klass(client, item) # pylint: disable=not-callable + + offset += 50 diff --git a/pomice/spotify/models/common.py b/pomice/spotify/models/common.py new file mode 100644 index 0000000..ac9b9f4 --- /dev/null +++ b/pomice/spotify/models/common.py @@ -0,0 +1,109 @@ +class Image: + """An object representing a Spotify image resource. + + Attributes + ---------- + height : :class:`str` + The height of the image. + width : :class:`str` + The width of the image. + url : :class:`str` + The URL of the image. + """ + + __slots__ = ("height", "width", "url") + + def __init__(self, *, height: str, width: str, url: str): + self.height = height + self.width = width + self.url = url + + def __repr__(self): + return f"" + + def __eq__(self, other): + return type(self) is type(other) and self.url == other.url + + +class Context: + """A Spotify Context. + + Attributes + ---------- + type : str + The object type, e.g. “artist”, “playlist”, “album”. + href : str + A link to the Web API endpoint providing full details of the track. + external_urls : str + External URLs for this context. + uri : str + The Spotify URI for the context. + """ + + __slots__ = ("external_urls", "type", "href", "uri") + + def __init__(self, data): + self.external_urls = data.get("external_urls") + self.type = data.get("type") + + self.href = data.get("href") + self.uri = data.get("uri") + + def __repr__(self): + return f"" + + def __eq__(self, other): + return type(self) is type(other) and self.uri == other.uri + + +class Device: + """A Spotify Users device. + + Attributes + ---------- + id : str + The device ID + name : int + The name of the device. + type : str + A Device type, such as “Computer”, “Smartphone” or “Speaker”. + volume : int + The current volume in percent. This may be null. + is_active : bool + if this device is the currently active device. + is_restricted : bool + Whether controlling this device is restricted. + At present if this is “true” then no Web API commands will be accepted by this device. + is_private_session : bool + If this device is currently in a private session. + """ + + __slots__ = ( + "id", + "name", + "type", + "volume", + "is_active", + "is_restricted", + "is_private_session", + ) + + def __init__(self, data): + self.id = data.get("id") # pylint: disable=invalid-name + self.name = data.get("name") + self.type = data.get("type") + + self.volume = data.get("volume_percent") + + self.is_active = data.get("is_active") + self.is_restricted = data.get("is_restricted") + self.is_private_session = data.get("is_private_session") + + def __eq__(self, other): + return type(self) is type(other) and self.id == other.id + + def __repr__(self): + return f"" + + def __str__(self): + return self.id diff --git a/pomice/spotify/models/library.py b/pomice/spotify/models/library.py new file mode 100644 index 0000000..9891260 --- /dev/null +++ b/pomice/spotify/models/library.py @@ -0,0 +1,189 @@ +from typing import Sequence, Union, List + +from ..oauth import set_required_scopes +from . import SpotifyBase +from .track import Track +from .album import Album + + +class Library(SpotifyBase): + """A Spotify Users Library. + + Attributes + ---------- + user : :class:`Spotify.User` + The user which this library object belongs to. + """ + + def __init__(self, client, user): + self.user = user + self.__client = client + + def __repr__(self): + return f"" + + def __eq__(self, other): + return type(self) is type(other) and self.user == other.user + + def __ne__(self, other): + return not self.__eq__(other) + + @set_required_scopes("user-library-read") + async def contains_albums(self, *albums: Sequence[Union[str, Album]]) -> List[bool]: + """Check if one or more albums is already saved in the current Spotify user’s ‘Your Music’ library. + + Parameters + ---------- + albums : Union[Album, str] + A sequence of artist objects or spotify IDs + """ + _albums = [str(obj) for obj in albums] + return await self.user.http.is_saved_album(_albums) + + @set_required_scopes("user-library-read") + async def contains_tracks(self, *tracks: Sequence[Union[str, Track]]) -> List[bool]: + """Check if one or more tracks is already saved in the current Spotify user’s ‘Your Music’ library. + + Parameters + ---------- + tracks : Union[Track, str] + A sequence of track objects or spotify IDs + """ + _tracks = [str(obj) for obj in tracks] + return await self.user.http.is_saved_track(_tracks) + + @set_required_scopes("user-library-read") + async def get_tracks(self, *, limit=20, offset=0) -> List[Track]: + """Get a list of the songs saved in the current Spotify user’s ‘Your Music’ library. + + Parameters + ---------- + limit : Optional[int] + The maximum number of items to return. Default: 20. Minimum: 1. Maximum: 50. + offset : Optional[int] + The index of the first item to return. Default: 0 + """ + data = await self.user.http.saved_tracks(limit=limit, offset=offset) + + return [Track(self.__client, item["track"]) for item in data["items"]] + + @set_required_scopes("user-library-read") + async def get_all_tracks(self) -> List[Track]: + """Get a list of all the songs saved in the current Spotify user’s ‘Your Music’ library. + + Returns + ------- + tracks : List[:class:`Track`] + The tracks of the artist. + """ + tracks: List[Track] = [] + total = None + offset = 0 + + while True: + data = await self.user.http.saved_tracks(limit=50, offset=offset) + + if total is None: + total = data["total"] + + offset += 50 + tracks += list( + Track(self.__client, item["track"]) for item in data["items"] + ) + + if len(tracks) >= total: + break + + return tracks + + @set_required_scopes("user-library-read") + async def get_albums(self, *, limit=20, offset=0) -> List[Album]: + """Get a list of the albums saved in the current Spotify user’s ‘Your Music’ library. + + Parameters + ---------- + limit : Optional[int] + The maximum number of items to return. Default: 20. Minimum: 1. Maximum: 50. + offset : Optional[int] + The index of the first item to return. Default: 0 + """ + data = await self.user.http.saved_albums(limit=limit, offset=offset) + + return [Album(self.__client, item["album"]) for item in data["items"]] + + @set_required_scopes("user-library-read") + async def get_all_albums(self) -> List[Album]: + """Get a list of the albums saved in the current Spotify user’s ‘Your Music’ library. + + Returns + ------- + albums : List[:class:`Album`] + The albums. + """ + albums: List[Album] = [] + total = None + offset = 0 + + while True: + data = await self.user.http.saved_albums(limit=50, offset=offset) + + if total is None: + total = data["total"] + + offset += 50 + albums += list( + Album(self.__client, item["album"]) for item in data["items"] + ) + + if len(albums) >= total: + break + + return albums + + @set_required_scopes("user-library-modify") + async def remove_albums(self, *albums): + """Remove one or more albums from the current user’s ‘Your Music’ library. + + Parameters + ---------- + albums : Sequence[Union[Album, str]] + A sequence of artist objects or spotify IDs + """ + _albums = [(obj if isinstance(obj, str) else obj.id) for obj in albums] + await self.user.http.delete_saved_albums(",".join(_albums)) + + @set_required_scopes("user-library-modify") + async def remove_tracks(self, *tracks): + """Remove one or more tracks from the current user’s ‘Your Music’ library. + + Parameters + ---------- + tracks : Sequence[Union[Track, str]] + A sequence of track objects or spotify IDs + """ + _tracks = [(obj if isinstance(obj, str) else obj.id) for obj in tracks] + await self.user.http.delete_saved_tracks(",".join(_tracks)) + + @set_required_scopes("user-library-modify") + async def save_albums(self, *albums): + """Save one or more albums to the current user’s ‘Your Music’ library. + + Parameters + ---------- + albums : Sequence[Union[Album, str]] + A sequence of artist objects or spotify IDs + """ + _albums = [(obj if isinstance(obj, str) else obj.id) for obj in albums] + await self.user.http.save_albums(",".join(_albums)) + + @set_required_scopes("user-library-modify") + async def save_tracks(self, *tracks): + """Save one or more tracks to the current user’s ‘Your Music’ library. + + Parameters + ---------- + tracks : Sequence[Union[Track, str]] + A sequence of track objects or spotify IDs + """ + _tracks = [(obj if isinstance(obj, str) else obj.id) for obj in tracks] + await self.user.http.save_tracks(_tracks) diff --git a/pomice/spotify/models/player.py b/pomice/spotify/models/player.py new file mode 100644 index 0000000..9c3de26 --- /dev/null +++ b/pomice/spotify/models/player.py @@ -0,0 +1,262 @@ +from typing import Union, Optional, List + +from ..oauth import set_required_scopes +from . import SpotifyBase, Device, Track +from .typing import SomeURIs, SomeURI + +Offset = Union[int, str, Track] +SomeDevice = Union[Device, str] + + +class Player(SpotifyBase): # pylint: disable=too-many-instance-attributes + """A Spotify Users current playback. + + Attributes + ---------- + device : :class:`spotify.Device` + The device that is currently active. + repeat_state : :class:`str` + "off", "track", "context" + shuffle_state : :class:`bool` + If shuffle is on or off. + is_playing : :class:`bool` + If something is currently playing. + """ + + def __init__(self, client, user, data): + self.__client = client + self.__user = user + + self.repeat_state = data.get("repeat_state", None) + self.shuffle_state = data.pop("shuffle_state", None) + self.is_playing = data.pop("is_playing", None) + self.device = Device(data=data.pop("device", None)) + + def __repr__(self): + return f"" + + # Properties + + @property + def user(self): + return self.__user + + # Public methods + + @set_required_scopes("user-modify-playback-state") + async def pause(self, *, device: Optional[SomeDevice] = None): + """Pause playback on the user’s account. + + Parameters + ---------- + device : Optional[:obj:`SomeDevice`] + The Device object or id of the device this command is targeting. + If not supplied, the user’s currently active device is the target. + """ + device_id: Optional[str] = str(device) if device is not None else None + await self.user.http.pause_playback(device_id=device_id) + + @set_required_scopes("user-modify-playback-state") + async def resume(self, *, device: Optional[SomeDevice] = None): + """Resume playback on the user's account. + + Parameters + ---------- + device : Optional[:obj:`SomeDevice`] + The Device object or id of the device this command is targeting. + If not supplied, the user’s currently active device is the target. + """ + device_id: Optional[str] = str(device) if device is not None else None + await self.user.http.play_playback(None, device_id=device_id) + + @set_required_scopes("user-modify-playback-state") + async def seek(self, pos, *, device: Optional[SomeDevice] = None): + """Seeks to the given position in the user’s currently playing track. + + Parameters + ---------- + pos : int + The position in milliseconds to seek to. + Must be a positive number. + Passing in a position that is greater than the length of the track will cause the player to start playing the next song. + device : Optional[:obj:`SomeDevice`] + The Device object or id of the device this command is targeting. + If not supplied, the user’s currently active device is the target. + """ + device_id: Optional[str] = str(device) if device is not None else None + await self.user.http.seek_playback(pos, device_id=device_id) + + @set_required_scopes("user-modify-playback-state") + async def set_repeat(self, state, *, device: Optional[SomeDevice] = None): + """Set the repeat mode for the user’s playback. + + Parameters + ---------- + state : str + Options are repeat-track, repeat-context, and off + device : Optional[:obj:`SomeDevice`] + The Device object or id of the device this command is targeting. + If not supplied, the user’s currently active device is the target. + """ + device_id: Optional[str] = str(device) if device is not None else None + await self.user.http.repeat_playback(state, device_id=device_id) + + @set_required_scopes("user-modify-playback-state") + async def set_volume(self, volume: int, *, device: Optional[SomeDevice] = None): + """Set the volume for the user’s current playback device. + + Parameters + ---------- + volume : int + The volume to set. Must be a value from 0 to 100 inclusive. + device : Optional[:obj:`SomeDevice`] + The Device object or id of the device this command is targeting. + If not supplied, the user’s currently active device is the target. + """ + device_id: Optional[str] = str(device) if device is not None else None + await self.user.http.set_playback_volume(volume, device_id=device_id) + + @set_required_scopes("user-modify-playback-state") + async def next(self, *, device: Optional[SomeDevice] = None): + """Skips to next track in the user’s queue. + + Parameters + ---------- + device : Optional[:obj:`SomeDevice`] + The Device object or id of the device this command is targeting. + If not supplied, the user’s currently active device is the target. + """ + device_id: Optional[str] = str(device) if device is not None else None + await self.user.http.skip_next(device_id=device_id) + + @set_required_scopes("user-modify-playback-state") + async def previous(self, *, device: Optional[SomeDevice] = None): + """Skips to previous track in the user’s queue. + + Note that this will ALWAYS skip to the previous track, regardless of the current track’s progress. + Returning to the start of the current track should be performed using :meth:`seek` + + Parameters + ---------- + device : Optional[:obj:`SomeDevice`] + The Device object or id of the device this command is targeting. + If not supplied, the user’s currently active device is the target. + """ + device_id: Optional[str] = str(device) if device is not None else None + return await self.user.http.skip_previous(device_id=device_id) + + @set_required_scopes("user-modify-playback-state") + async def enqueue(self, uri: SomeURI, device: Optional[SomeDevice] = None): + """Add an item to the end of the user’s current playback queue. + + Parameters + ---------- + uri : Union[:class:`spotify.URIBase`, :class:`str`] + The uri of the item to add to the queue. Must be a track or an + episode uri. + device_id : Optional[Union[Device, :class:`str`]] + The id of the device this command is targeting. If not supplied, + the user’s currently active device is the target. + """ + device_id: Optional[str] + if device is not None: + if not isinstance(device, (Device, str)): + raise TypeError( + f"Expected `device` to either be a spotify.Device or a string. got {type(device)!r}" + ) + + device_id = str(device) + else: + device_id = None + + await self.user.http.playback_queue(uri=str(uri), device_id=device_id) + + @set_required_scopes("user-modify-playback-state") + async def play( + self, + *uris: SomeURIs, + offset: Optional[Offset] = 0, + device: Optional[SomeDevice] = None, + ): + """Start a new context or resume current playback on the user’s active device. + + The method treats a single argument as a Spotify context, such as a Artist, Album and playlist objects/URI. + When called with multiple positional arguments they are interpreted as a array of Spotify Track objects/URIs. + + Parameters + ---------- + *uris : SomeURI + When a single argument is passed in that argument is treated + as a context (except if it is a track or track uri). + Valid contexts are: albums, artists, playlists. + Album, Artist and Playlist objects are accepted too. + Otherwise when multiple arguments are passed in they, + A sequence of Spotify Tracks or Track URIs to play. + offset : Optional[:obj:`Offset`] + Indicates from where in the context playback should start. + Only available when `context` corresponds to an album or playlist object, + or when the `uris` parameter is used. when an integer offset is zero based and can’t be negative. + device : Optional[:obj:`SomeDevice`] + The Device object or id of the device this command is targeting. + If not supplied, the user’s currently active device is the target. + """ + context_uri: Union[List[str], str] + + if ( + len(uris) > 1 + or isinstance(uris[0], Track) + or (isinstance(uris[0], str) and "track" in uris[0]) + ): + # Regular uris paramter + context_uri = [str(uri) for uri in uris] + else: + # Treat it as a context URI + context_uri = str(uris[0]) + + device_id: Optional[str] + if device is not None: + if not isinstance(device, (Device, str)): + raise TypeError( + f"Expected `device` to either be a spotify.Device or a string. got {type(device)!r}" + ) + + device_id = str(device) + else: + device_id = None + + await self.user.http.play_playback( + context_uri, offset=offset, device_id=device_id + ) + + @set_required_scopes("user-modify-playback-state") + async def shuffle( + self, state: Optional[bool] = None, *, device: Optional[SomeDevice] = None + ): + """shuffle on or off for user’s playback. + + Parameters + ---------- + state : Optional[bool] + if `True` then Shuffle user’s playback. + else if `False` do not shuffle user’s playback. + device : Optional[:obj:`SomeDevice`] + The Device object or id of the device this command is targeting. + If not supplied, the user’s currently active device is the target. + """ + device_id: Optional[str] = str(device) if device is not None else None + await self.user.http.shuffle_playback(state, device_id=device_id) + + @set_required_scopes("user-modify-playback-state") + async def transfer(self, device: SomeDevice, ensure_playback: bool = False): + """Transfer playback to a new device and determine if it should start playing. + + Parameters + ---------- + device : :obj:`SomeDevice` + The device on which playback should be started/transferred. + ensure_playback : bool + if `True` ensure playback happens on new device. + else keep the current playback state. + """ + device_id: Optional[str] = str(device) if device is not None else None + await self.user.http.transfer_player(device_id=device_id, play=ensure_playback) diff --git a/pomice/spotify/models/playlist.py b/pomice/spotify/models/playlist.py new file mode 100644 index 0000000..70890ae --- /dev/null +++ b/pomice/spotify/models/playlist.py @@ -0,0 +1,525 @@ +from functools import partial +from itertools import islice +from typing import List, Optional, Union, Callable, Tuple, Iterable, TYPE_CHECKING, Any, Dict, Set + +from ..oauth import set_required_scopes +from ..http import HTTPUserClient, HTTPClient +from . import AsyncIterable, URIBase, Track, PlaylistTrack, Image + +if TYPE_CHECKING: + import spotify + + +class MutableTracks: + __slots__ = ( + "playlist", + "tracks", + "was_empty", + "is_empty", + "replace_tracks", + "get_all_tracks", + ) + + def __init__(self, playlist: "Playlist") -> None: + self.playlist = playlist + self.tracks = tracks = getattr(playlist, "_Playlist__tracks") + + if tracks is not None: + self.was_empty = self.is_empty = not tracks + + self.replace_tracks = playlist.replace_tracks + self.get_all_tracks = playlist.get_all_tracks + + async def __aenter__(self): + if self.tracks is None: + self.tracks = tracks = list(await self.get_all_tracks()) + self.was_empty = self.is_empty = not tracks + else: + tracks = list(self.tracks) + + return tracks + + async def __aexit__(self, typ, value, traceback): + if self.was_empty and self.is_empty: + # the tracks were empty and is still empty. + # skip the api call. + return + + tracks = self.tracks + + await self.replace_tracks(*tracks) + setattr(self.playlist, "_Playlist__tracks", tuple(self.tracks)) + + +class Playlist(URIBase, AsyncIterable): # pylint: disable=too-many-instance-attributes + """A Spotify Playlist. + + Attributes + ---------- + collaborative : :class:`bool` + Returns true if context is not search and the owner allows other users to modify the playlist. Otherwise returns false. + description : :class:`str` + The playlist description. Only returned for modified, verified playlists, otherwise null. + url : :class:`str` + The open.spotify URL. + followers : :class:`int` + The total amount of followers + href : :class:`str` + A link to the Web API endpoint providing full details of the playlist. + id : :class:`str` + The Spotify ID for the playlist. + images : List[:class:`spotify.Image`] + Images for the playlist. + The array may be empty or contain up to three images. + The images are returned by size in descending order. + If returned, the source URL for the image ( url ) is temporary and will expire in less than a day. + name : :class:`str` + The name of the playlist. + owner : :class:`spotify.User` + The user who owns the playlist + public : :class`bool` + The playlist’s public/private status: + true the playlist is public, + false the playlist is private, + null the playlist status is not relevant. + snapshot_id : :class:`str` + The version identifier for the current playlist. + tracks : Optional[Tuple[:class:`PlaylistTrack`]] + A tuple of :class:`PlaylistTrack` objects or `None`. + """ + + __slots__ = ( + "collaborative", + "description", + "url", + "followers", + "href", + "id", + "images", + "name", + "owner", + "public", + "uri", + "total_tracks", + "__client", + "__http", + "__tracks", + ) + + __tracks: Optional[Tuple[PlaylistTrack, ...]] + __http: Union[HTTPUserClient, HTTPClient] + total_tracks: Optional[int] + + def __init__( + self, + client: "spotify.Client", + data: Union[dict, "Playlist"], + *, + http: Optional[HTTPClient] = None, + ): + self.__client = client + self.__http = http or client.http + + assert self.__http is not None + + self.__tracks = None + self.total_tracks = None + + if not isinstance(data, (Playlist, dict)): + raise TypeError("data must be a Playlist instance or a dict.") + + if isinstance(data, dict): + self.__from_raw(data) + else: + for name in filter((lambda name: name[0] != "_"), Playlist.__slots__): + setattr(self, name, getattr(data, name)) + + # AsyncIterable attrs + self.__aiter_klass__ = PlaylistTrack + self.__aiter_fetch__ = partial( + client.http.get_playlist_tracks, self.id, limit=50 + ) + + def __repr__(self): + return f'' + + def __len__(self): + return self.total_tracks + + # Internals + + def __from_raw(self, data: dict) -> None: + from .user import User + + client = self.__client + + self.id = data.pop("id") # pylint: disable=invalid-name + + self.images = tuple(Image(**image) for image in data.pop("images", [])) + self.owner = User(client, data=data.pop("owner")) + + self.public = data.pop("public") + self.collaborative = data.pop("collaborative") + self.description = data.pop("description", None) + self.followers = data.pop("followers", {}).get("total", None) + self.href = data.pop("href") + self.name = data.pop("name") + self.url = data.pop("external_urls").get("spotify", None) + self.uri = data.pop("uri") + + tracks: Optional[Tuple[PlaylistTrack, ...]] = ( + tuple(PlaylistTrack(client, item) for item in data["tracks"]["items"]) + if "items" in data["tracks"] + else None + ) + + self.__tracks = tracks + + self.total_tracks = ( + data["tracks"]["total"] + ) + + # Track retrieval + + @set_required_scopes(None) + async def get_tracks( + self, *, limit: Optional[int] = 20, offset: Optional[int] = 0 + ) -> Tuple[PlaylistTrack, ...]: + """Get a fraction of a playlists tracks. + + Parameters + ---------- + limit : Optional[int] + The limit on how many tracks to retrieve for this playlist (default is 20). + offset : Optional[int] + The offset from where the api should start from in the tracks. + + Returns + ------- + tracks : Tuple[PlaylistTrack] + The tracks of the playlist. + """ + data = await self.__http.get_playlist_tracks( + self.id, limit=limit, offset=offset + ) + return tuple(PlaylistTrack(self.__client, item) for item in data["items"]) + + @set_required_scopes(None) + async def get_all_tracks(self) -> Tuple[PlaylistTrack, ...]: + """Get all playlist tracks from the playlist. + + Returns + ------- + tracks : Tuple[:class:`PlaylistTrack`] + The playlists tracks. + """ + tracks: List[PlaylistTrack] = [] + offset = 0 + + if self.total_tracks is None: + self.total_tracks = ( + await self.__http.get_playlist_tracks(self.id, limit=1, offset=0) + )["total"] + + while len(tracks) < self.total_tracks: + data = await self.__http.get_playlist_tracks( + self.id, limit=50, offset=offset + ) + + tracks += [PlaylistTrack(self.__client, item) for item in data["items"]] + offset += 50 + + self.total_tracks = len(tracks) + return tuple(tracks) + + # Playlist structure modification + + # Basic api wrapping + + @set_required_scopes("playlist-modify-public", "playlist-modify-private") + async def add_tracks(self, *tracks) -> str: + """Add one or more tracks to a user’s playlist. + + Parameters + ---------- + tracks : Iterable[Union[:class:`str`, :class:`Track`]] + Tracks to add to the playlist + + Returns + ------- + snapshot_id : :class:`str` + The snapshot id of the playlist. + """ + data = await self.__http.add_playlist_tracks( + self.id, tracks=[str(track) for track in tracks] + ) + return data["snapshot_id"] + + @set_required_scopes("playlist-modify-public", "playlist-modify-private") + async def remove_tracks( + self, *tracks: Union[str, Track, Tuple[Union[str, Track], List[int]]] + ): + """Remove one or more tracks from a user’s playlist. + + Parameters + ---------- + tracks : Iterable[Union[:class:`str`, :class:`Track`]] + Tracks to remove from the playlist + + Returns + ------- + snapshot_id : :class:`str` + The snapshot id of the playlist. + """ + tracks_: List[Union[str, Dict[str, Union[str, Set[int]]]]] = [] + + for part in tracks: + if not isinstance(part, (Track, str, tuple)): + raise TypeError( + "Track argument of tracks parameter must be a Track instance, string or a tuple of those and an iterator of positive integers." + ) + + if isinstance(part, (Track, str)): + tracks_.append(str(part)) + continue + + track, positions, = part + + if not isinstance(track, (Track, str)): + raise TypeError( + "Track argument of tuple track parameter must be a Track instance or a string." + ) + + if not hasattr(positions, "__iter__"): + raise TypeError("Positions element of track tuple must be a iterator.") + + if not all(isinstance(index, int) for index in positions): + raise TypeError("Members of the positions iterator must be integers.") + + elem: Dict[str, Union[str, Set[int]]] = {"uri": str(track), "positions": set(positions)} + tracks_.append(elem) + + data = await self.__http.remove_playlist_tracks(self.id, tracks=tracks_) + return data["snapshot_id"] + + @set_required_scopes("playlist-modify-public", "playlist-modify-private") + async def replace_tracks(self, *tracks: Union[Track, PlaylistTrack, str]) -> None: + """Replace all the tracks in a playlist, overwriting its existing tracks. + + This powerful request can be useful for replacing tracks, re-ordering existing tracks, or clearing the playlist. + + Parameters + ---------- + tracks : Iterable[Union[:class:`str`, :class:`Track`]] + Tracks to place in the playlist + """ + bucket: List[str] = [] + for track in tracks: + if not isinstance(track, (str, Track)): + raise TypeError( + f"tracks must be a iterable of strings or Track instances. Got {type(track)!r}" + ) + + bucket.append(str(track)) + + body: Tuple[str, ...] = tuple(bucket) + + head: Tuple[str, ...] + tail: Tuple[str, ...] + head, tail = body[:100], body[100:] + + if head: + await self.__http.replace_playlist_tracks(self.id, tracks=head) + + while tail: + head, tail = tail[:100], tail[100:] + await self.extend(head) + + @set_required_scopes("playlist-modify-public", "playlist-modify-private") + async def reorder_tracks( + self, + start: int, + insert_before: int, + length: int = 1, + *, + snapshot_id: Optional[str] = None, + ) -> str: + """Reorder a track or a group of tracks in a playlist. + + Parameters + ---------- + start : int + The position of the first track to be reordered. + insert_before : int + The position where the tracks should be inserted. + length : Optional[int] + The amount of tracks to be reordered. Defaults to 1 if not set. + snapshot_id : str + The playlist’s snapshot ID against which you want to make the changes. + + Returns + ------- + snapshot_id : str + The snapshot id of the playlist. + """ + data = await self.__http.reorder_playlists_tracks( + self.id, start, length, insert_before, snapshot_id=snapshot_id + ) + return data["snapshot_id"] + + # Library functionality. + + @set_required_scopes("playlist-modify-public", "playlist-modify-private") + async def clear(self): + """Clear the playlists tracks. + + .. note:: + + This method will mutate the current + playlist object, and the spotify Playlist. + + .. warning:: + + This is a desctructive operation and can not be reversed! + """ + await self.__http.replace_playlist_tracks(self.id, tracks=[]) + + @set_required_scopes("playlist-modify-public", "playlist-modify-private") + async def extend(self, tracks: Union["Playlist", Iterable[Union[Track, str]]]): + """Extend a playlists tracks with that of another playlist or a list of Track/Track URIs. + + .. note:: + + This method will mutate the current + playlist object, and the spotify Playlist. + + Parameters + ---------- + tracks : Union["Playlist", List[Union[Track, str]]] + Tracks to add to the playlist, acceptable values are: + - A :class:`spotify.Playlist` object + - A :class:`list` of :class:`spotify.Track` objects or Track URIs + + Returns + ------- + snapshot_id : str + The snapshot id of the playlist. + """ + bucket: Iterable[Union[Track, str]] + + if isinstance(tracks, Playlist): + bucket = await tracks.get_all_tracks() + + elif not hasattr(tracks, "__iter__"): + raise TypeError( + f"`tracks` was an invalid type, expected any of: Playlist, Iterable[Union[Track, str]], instead got {type(tracks)}" + ) + + else: + bucket = list(tracks) + + gen: Iterable[str] = (str(track) for track in bucket) + + while True: + head: List[str] = list(islice(gen, 0, 100)) + + if not head: + break + + await self.__http.add_playlist_tracks(self.id, tracks=head) + + @set_required_scopes("playlist-modify-public", "playlist-modify-private") + async def insert(self, index, obj: Union[PlaylistTrack, Track]) -> None: + """Insert an object before the index. + + .. note:: + + This method will mutate the current + playlist object, and the spotify Playlist. + """ + if not isinstance(obj, (PlaylistTrack, Track)): + raise TypeError( + f"Expected a PlaylistTrack or Track object instead got {obj!r}" + ) + + async with MutableTracks(self) as tracks: + tracks.insert(index, obj) + + @set_required_scopes("playlist-modify-public", "playlist-modify-private") + async def pop(self, index: int = -1) -> PlaylistTrack: + """Remove and return the track at the specified index. + + .. note:: + + This method will mutate the current + playlist object, and the spotify Playlist. + + Returns + ------- + playlist_track : :class:`PlaylistTrack` + The track that was removed. + + Raises + ------ + IndexError + If there are no tracks or the index is out of range. + """ + async with MutableTracks(self) as tracks: + return tracks.pop(index) + + @set_required_scopes("playlist-modify-public", "playlist-modify-private") + async def sort( + self, + *, + key: Optional[Callable[[PlaylistTrack], bool]] = None, + reverse: Optional[bool] = False, + ) -> None: + """Stable sort the playlist in place. + + .. note:: + + This method will mutate the current + playlist object, and the spotify Playlist. + """ + async with MutableTracks(self) as tracks: + tracks.sort(key=key, reverse=reverse) + + @set_required_scopes("playlist-modify-public", "playlist-modify-private") + async def remove(self, value: Union[PlaylistTrack, Track]) -> None: + """Remove the first occurence of the value. + + .. note:: + + This method will mutate the current + playlist object, and the spotify Playlist. + + Raises + ------- + ValueError + If the value is not present. + """ + async with MutableTracks(self) as tracks: + tracks.remove(value) + + @set_required_scopes("playlist-modify-public", "playlist-modify-private") + async def copy(self) -> "Playlist": + """Return a shallow copy of the playlist object. + + Returns + ------- + playlist : :class:`Playlist` + The playlist object copy. + """ + return Playlist(client=self.__client, data=self, http=self.__http) + + @set_required_scopes("playlist-modify-public", "playlist-modify-private") + async def reverse(self) -> None: + """Reverse the playlist in place. + + .. note:: + + This method will mutate the current + playlist object, and the spotify Playlist. + """ + async with MutableTracks(self) as tracks: + tracks.reverse() diff --git a/pomice/spotify/models/track.py b/pomice/spotify/models/track.py new file mode 100644 index 0000000..e4ddd73 --- /dev/null +++ b/pomice/spotify/models/track.py @@ -0,0 +1,123 @@ +"""Source implementation for spotify Tracks, and any other semantically relevent, implementation.""" + +import datetime +from itertools import starmap + +from ..oauth import set_required_scopes +from . import URIBase, Image, Artist + + +class Track(URIBase): # pylint: disable=too-many-instance-attributes + """A Spotify Track object. + + Attribtues + ---------- + id : :class:`str` + The Spotify ID for the track. + name : :class:`str` + The name of the track. + href : :class:`str` + A link to the Web API endpoint providing full details of the track. + uri : :class:`str` + The Spotify URI for the track. + duration : int + The track length in milliseconds. + explicit : bool + Whether or not the track has explicit + `True` if it does. + `False` if it does not (or unknown) + disc_number : int + The disc number (usually 1 unless the album consists of more than one disc). + track_number : int + The number of the track. + If an album has several discs, the track number is the number on the specified disc. + url : :class:`str` + The open.spotify URL for this Track + is_local : bool + Whether or not the track is from a local file. + popularity : int + POPULARITY + preview_url : :class:`str` + The preview URL for this Track. + images : List[Image] + The images of the Track. + markets : List[:class:`str`] + The available markets for the Track. + """ + + def __init__(self, client, data, album=None): + from .album import Album + + self.__client = client + + self.artists = artists = list( + Artist(client, artist) for artist in data.pop("artists", []) + ) + self.artist = artists[-1] if artists else None + + album_ = data.pop("album", None) + self.album = Album(client, album_) if album_ else album + + self.id = data.pop("id", None) # pylint: disable=invalid-name + self.name = data.pop("name", None) + self.href = data.pop("href", None) + self.uri = data.pop("uri", None) + self.duration = data.pop("duration_ms", None) + self.explicit = data.pop("explicit", None) + self.disc_number = data.pop("disc_number", None) + self.track_number = data.pop("track_number", None) + self.url = data.pop("external_urls").get("spotify", None) + self.is_local = data.pop("is_local", None) + self.popularity = data.pop("popularity", None) + self.preview_url = data.pop("preview_url", None) + self.markets = data.pop("available_markets", []) + + if "images" in data: + self.images = list(starmap(Image, data.pop("images"))) + else: + self.images = self.album.images.copy() if self.album is not None else [] + + def __repr__(self): + return f"" + + @set_required_scopes(None) + def audio_analysis(self): + """Get a detailed audio analysis for the track.""" + return self.__client.http.track_audio_analysis(self.id) + + @set_required_scopes(None) + def audio_features(self): + """Get audio feature information for the track.""" + return self.__client.http.track_audio_features(self.id) + + +class PlaylistTrack(Track, URIBase): + """A Track on a Playlist. + + Like a regular :class:`Track` but has some additional attributes. + + Attributes + ---------- + added_by : :class:`str` + The Spotify user who added the track. + is_local : bool + Whether this track is a local file or not. + added_at : datetime.datetime + The datetime of when the track was added to the playlist. + """ + + __slots__ = ("added_at", "added_by", "is_local") + + def __init__(self, client, data): + from .user import User + + super().__init__(client, data["track"]) + + self.added_by = User(client, data["added_by"]) + self.added_at = datetime.datetime.strptime( + data["added_at"], "%Y-%m-%dT%H:%M:%SZ" + ) + self.is_local = data["is_local"] + + def __repr__(self): + return f"" diff --git a/pomice/spotify/models/typing.py b/pomice/spotify/models/typing.py new file mode 100644 index 0000000..e43ad61 --- /dev/null +++ b/pomice/spotify/models/typing.py @@ -0,0 +1,9 @@ +"""Type annotation aliases for other Spotify models.""" + +from typing import Union, Sequence + +from .base import URIBase + +SomeURI = Union[URIBase, str] +SomeURIs = Sequence[Union[URIBase, str]] +OneOrMoreURIs = Union[SomeURI, Sequence[SomeURI]] diff --git a/pomice/spotify/models/user.py b/pomice/spotify/models/user.py new file mode 100644 index 0000000..558473e --- /dev/null +++ b/pomice/spotify/models/user.py @@ -0,0 +1,562 @@ +"""Source implementation for a spotify User""" + +import functools +from functools import partial +from base64 import b64encode +from typing import ( + Optional, + Dict, + Union, + List, + Type, + TypeVar, + TYPE_CHECKING, +) + +from ..utils import to_id +from ..http import HTTPUserClient +from . import ( + AsyncIterable, + URIBase, + Image, + Device, + Context, + Player, + Playlist, + Track, + Artist, + Library, +) + +if TYPE_CHECKING: + import spotify + +T = TypeVar("T", Artist, Track) # pylint: disable=invalid-name + + +def ensure_http(func): + func.__ensure_http__ = True + return func + + +class User(URIBase, AsyncIterable): # pylint: disable=too-many-instance-attributes + """A Spotify User. + + Attributes + ---------- + id : :class:`str` + The Spotify user ID for the user. + uri : :class:`str` + The Spotify URI for the user. + url : :class:`str` + The open.spotify URL. + href : :class:`str` + A link to the Web API endpoint for this user. + display_name : :class:`str` + The name displayed on the user’s profile. + `None` if not available. + followers : :class:`int` + The total number of followers. + images : List[:class:`Image`] + The user’s profile image. + email : :class:`str` + The user’s email address, as entered by the user when creating their account. + country : :class:`str` + The country of the user, as set in the user’s account profile. An ISO 3166-1 alpha-2 country code. + birthdate : :class:`str` + The user’s date-of-birth. + product : :class:`str` + The user’s Spotify subscription level: “premium”, “free”, etc. + (The subscription level “open” can be considered the same as “free”.) + """ + + def __init__(self, client: "spotify.Client", data: dict, **kwargs): + self.__client = self.client = client + + if "http" not in kwargs: + self.library = None + self.http = client.http + else: + self.http = kwargs.pop("http") + self.library = Library(client, self) + + # Public user object attributes + self.id = data.pop("id") # pylint: disable=invalid-name + self.uri = data.pop("uri") + self.url = data.pop("external_urls").get("spotify", None) + self.display_name = data.pop("display_name", None) + self.href = data.pop("href") + self.followers = data.pop("followers", {}).get("total", None) + self.images = list(Image(**image) for image in data.pop("images", [])) + + # Private user object attributes + self.email = data.pop("email", None) + self.country = data.pop("country", None) + self.birthdate = data.pop("birthdate", None) + self.product = data.pop("product", None) + + # AsyncIterable attrs + self.__aiter_klass__ = Playlist + self.__aiter_fetch__ = partial( + self.__client.http.get_playlists, self.id, limit=50 + ) + + def __repr__(self): + return f"" + + def __getattr__(self, attr): + value = object.__getattribute__(self, attr) + + if ( + hasattr(value, "__ensure_http__") + and getattr(self, "http", None) is not None + ): + + @functools.wraps(value) + def _raise(*args, **kwargs): + raise AttributeError( + "User has not HTTP presence to perform API requests." + ) + + return _raise + return value + + async def __aenter__(self) -> "User": + return self + + async def __aexit__(self, _, __, ___): + await self.http.close() + + # Internals + + async def _get_top(self, klass: Type[T], kwargs: dict) -> List[T]: + target = {Artist: "artists", Track: "tracks"}[klass] + data = { + key: value + for key, value in kwargs.items() + if key in ("limit", "offset", "time_range") + } + + resp = await self.http.top_artists_or_tracks(target, **data) # type: ignore + + return [klass(self.__client, item) for item in resp["items"]] + + ### Alternate constructors + + @classmethod + async def from_code( + cls, client: "spotify.Client", code: str, *, redirect_uri: str, + ): + """Create a :class:`User` object from an authorization code. + + Parameters + ---------- + client : :class:`spotify.Client` + The spotify client to associate the user with. + code : :class:`str` + The authorization code to use to further authenticate the user. + redirect_uri : :class:`str` + The rediriect URI to use in tandem with the authorization code. + """ + route = ("POST", "https://accounts.spotify.com/api/token") + payload = { + "redirect_uri": redirect_uri, + "grant_type": "authorization_code", + "code": code, + } + + client_id = client.http.client_id + client_secret = client.http.client_secret + + headers = { + "Authorization": f"Basic {b64encode(':'.join((client_id, client_secret)).encode()).decode()}", + "Content-Type": "application/x-www-form-urlencoded", + } + + raw = await client.http.request(route, headers=headers, params=payload) + + token = raw["access_token"] + refresh_token = raw["refresh_token"] + + return await cls.from_token(client, token, refresh_token) + + @classmethod + async def from_token( + cls, + client: "spotify.Client", + token: Optional[str], + refresh_token: Optional[str] = None, + ): + """Create a :class:`User` object from an access token. + + Parameters + ---------- + client : :class:`spotify.Client` + The spotify client to associate the user with. + token : :class:`str` + The access token to use for http requests. + refresh_token : :class:`str` + Used to acquire new token when it expires. + """ + client_id = client.http.client_id + client_secret = client.http.client_secret + http = HTTPUserClient(client_id, client_secret, token, refresh_token) + data = await http.current_user() + return cls(client, data=data, http=http) + + @classmethod + async def from_refresh_token(cls, client: "spotify.Client", refresh_token: str): + """Create a :class:`User` object from a refresh token. + It will poll the spotify API for a new access token and + use that to initialize the spotify user. + + Parameters + ---------- + client : :class:`spotify.Client` + The spotify client to associate the user with. + refresh_token: str + Used to acquire token. + """ + return await cls.from_token(client, None, refresh_token) + + ### Contextual methods + + @ensure_http + async def currently_playing(self) -> Dict[str, Union[Track, Context, str]]: + """Get the users currently playing track. + + Returns + ------- + context, track : Dict[str, Union[Track, Context, str]] + A tuple of the context and track. + """ + data = await self.http.currently_playing() # type: ignore + + if "item" in data: + context = data.pop("context", None) + + if context is not None: + data["context"] = Context(context) + else: + data["context"] = None + + data["item"] = Track(self.__client, data.get("item", {}) or {}) + + return data + + @ensure_http + async def get_player(self) -> Player: + """Get information about the users current playback. + + Returns + ------- + player : :class:`Player` + A player object representing the current playback. + """ + player = Player(self.__client, self, await self.http.current_player()) # type: ignore + return player + + @ensure_http + async def get_devices(self) -> List[Device]: + """Get information about the users avaliable devices. + + Returns + ------- + devices : List[:class:`Device`] + The devices the user has available. + """ + data = await self.http.available_devices() # type: ignore + return [Device(item) for item in data["devices"]] + + @ensure_http + async def recently_played( + self, + *, + limit: int = 20, + before: Optional[str] = None, + after: Optional[str] = None, + ) -> List[Dict[str, Union[Track, Context, str]]]: + """Get tracks from the current users recently played tracks. + + Returns + ------- + playlist_history : List[Dict[:class:`str`, Union[Track, Context, :class:`str`]]] + A list of playlist history object. + Each object is a dict with a timestamp, track and context field. + """ + data = await self.http.recently_played(limit=limit, before=before, after=after) # type: ignore + client = self.__client + + # List[T] where T: {'track': Track, 'content': Context: 'timestamp': ISO8601} + return [ + { + "played_at": track.get("played_at"), + "context": Context(track.get("context", {}) or {}), + "track": Track(client, track.get("track", {}) or {}), + } + for track in data["items"] + ] + + ### Playlist track methods + + @ensure_http + async def add_tracks(self, playlist: Union[str, Playlist], *tracks) -> str: + """Add one or more tracks to a user’s playlist. + + Parameters + ---------- + playlist : Union[:class:`str`, Playlist] + The playlist to modify + tracks : Sequence[Union[:class:`str`, Track]] + Tracks to add to the playlist + + Returns + ------- + snapshot_id : :class:`str` + The snapshot id of the playlist. + """ + data = await self.http.add_playlist_tracks( # type: ignore + to_id(str(playlist)), tracks=[str(track) for track in tracks] + ) + return data["snapshot_id"] + + @ensure_http + async def replace_tracks(self, playlist, *tracks) -> None: + """Replace all the tracks in a playlist, overwriting its existing tracks. + + This powerful request can be useful for replacing tracks, re-ordering existing tracks, or clearing the playlist. + + Parameters + ---------- + playlist : Union[:class:`str`, PLaylist] + The playlist to modify + tracks : Sequence[Union[:class:`str`, Track]] + Tracks to place in the playlist + """ + await self.http.replace_playlist_tracks( # type: ignore + to_id(str(playlist)), tracks=",".join(str(track) for track in tracks) + ) + + @ensure_http + async def remove_tracks(self, playlist, *tracks): + """Remove one or more tracks from a user’s playlist. + + Parameters + ---------- + playlist : Union[:class:`str`, Playlist] + The playlist to modify + tracks : Sequence[Union[:class:`str`, Track]] + Tracks to remove from the playlist + + Returns + ------- + snapshot_id : :class:`str` + The snapshot id of the playlist. + """ + data = await self.http.remove_playlist_tracks( # type: ignore + to_id(str(playlist)), tracks=(str(track) for track in tracks) + ) + return data["snapshot_id"] + + @ensure_http + async def reorder_tracks( + self, playlist, start, insert_before, length=1, *, snapshot_id=None + ): + """Reorder a track or a group of tracks in a playlist. + + Parameters + ---------- + playlist : Union[:class:`str`, Playlist] + The playlist to modify + start : int + The position of the first track to be reordered. + insert_before : int + The position where the tracks should be inserted. + length : Optional[int] + The amount of tracks to be reordered. Defaults to 1 if not set. + snapshot_id : :class:`str` + The playlist’s snapshot ID against which you want to make the changes. + + Returns + ------- + snapshot_id : :class:`str` + The snapshot id of the playlist. + """ + data = await self.http.reorder_playlists_tracks( # type: ignore + to_id(str(playlist)), start, length, insert_before, snapshot_id=snapshot_id + ) + return data["snapshot_id"] + + ### Playlist methods + + @ensure_http + async def edit_playlist( + self, playlist, *, name=None, public=None, collaborative=None, description=None + ): + """Change a playlist’s name and public/private, collaborative state and description. + + Parameters + ---------- + playlist : Union[:class:`str`, Playlist] + The playlist to modify + name : Optional[:class:`str`] + The new name of the playlist. + public : Optional[bool] + The public/private status of the playlist. + `True` for public, `False` for private. + collaborative : Optional[bool] + If `True`, the playlist will become collaborative and other users will be able to modify the playlist. + description : Optional[:class:`str`] + The new playlist description + """ + + kwargs = { + "name": name, + "public": public, + "collaborative": collaborative, + "description": description, + } + + await self.http.change_playlist_details(to_id(str(playlist)), **kwargs) # type: ignore + + @ensure_http + async def create_playlist( + self, name, *, public=True, collaborative=False, description=None + ): + """Create a playlist for a Spotify user. + + Parameters + ---------- + name : :class:`str` + The name of the playlist. + public : Optional[bool] + The public/private status of the playlist. + `True` for public, `False` for private. + collaborative : Optional[bool] + If `True`, the playlist will become collaborative and other users will be able to modify the playlist. + description : Optional[:class:`str`] + The playlist description + + Returns + ------- + playlist : :class:`Playlist` + The playlist that was created. + """ + data = {"name": name, "public": public, "collaborative": collaborative} + + if description: + data["description"] = description + + playlist_data = await self.http.create_playlist(self.id, **data) # type: ignore + return Playlist(self.__client, playlist_data, http=self.http) + + @ensure_http + async def follow_playlist( + self, playlist: Union[str, Playlist], *, public: bool = True + ) -> None: + """follow a playlist + + Parameters + ---------- + playlist : Union[:class:`str`, Playlist] + The playlist to modify + public : Optional[bool] + The public/private status of the playlist. + `True` for public, `False` for private. + """ + await self.http.follow_playlist(to_id(str(playlist)), public=public) # type: ignore + + @ensure_http + async def get_playlists( + self, *, limit: int = 20, offset: int = 0 + ) -> List[Playlist]: + """get the users playlists from spotify. + + Parameters + ---------- + limit : Optional[int] + The limit on how many playlists to retrieve for this user (default is 20). + offset : Optional[int] + The offset from where the api should start from in the playlists. + + Returns + ------- + playlists : List[Playlist] + A list of the users playlists. + """ + data = await self.http.get_playlists(self.id, limit=limit, offset=offset) # type: ignore + + return [ + Playlist(self.__client, playlist_data, http=self.http) + for playlist_data in data["items"] + ] + + @ensure_http + async def get_all_playlists(self) -> List[Playlist]: + """Get all of the users playlists from spotify. + + Returns + ------- + playlists : List[:class:`Playlist`] + A list of the users playlists. + """ + playlists: List[Playlist] = [] + total = None + offset = 0 + + while True: + data = await self.http.get_playlists(self.id, limit=50, offset=offset) # type: ignore + + if total is None: + total = data["total"] + + offset += 50 + playlists += [ + Playlist(self.__client, playlist_data, http=self.http) + for playlist_data in data["items"] + ] + + if len(playlists) >= total: + break + + return playlists + + @ensure_http + async def top_artists(self, **data) -> List[Artist]: + """Get the current user’s top artists based on calculated affinity. + + Parameters + ---------- + limit : Optional[int] + The number of entities to return. Default: 20. Minimum: 1. Maximum: 50. + offset : Optional[int] + The index of the first entity to return. Default: 0 + time_range : Optional[:class:`str`] + Over what time frame the affinities are computed. (long_term, short_term, medium_term) + + Returns + ------- + tracks : List[Artist] + The top artists for the user. + """ + return await self._get_top(Artist, data) + + @ensure_http + async def top_tracks(self, **data) -> List[Track]: + """Get the current user’s top tracks based on calculated affinity. + + Parameters + ---------- + limit : Optional[int] + The number of entities to return. Default: 20. Minimum: 1. Maximum: 50. + offset : Optional[int] + The index of the first entity to return. Default: 0 + time_range : Optional[:class:`str`] + Over what time frame the affinities are computed. (long_term, short_term, medium_term) + + Returns + ------- + tracks : List[Track] + The top tracks for the user. + """ + return await self._get_top(Track, data) diff --git a/pomice/spotify/oauth.py b/pomice/spotify/oauth.py new file mode 100644 index 0000000..08eb5fc --- /dev/null +++ b/pomice/spotify/oauth.py @@ -0,0 +1,202 @@ +from urllib.parse import quote_plus as quote +from typing import Optional, Dict, Iterable, Union, Set, Callable, Tuple, Any + +VALID_SCOPES = ( + # Playlists + "playlist-read-collaborative" + "playlist-modify-private" + "playlist-modify-public" + "playlist-read-private" + # Spotify Connect + "user-modify-playback-state" + "user-read-currently-playing" + "user-read-playback-state" + # Users + "user-read-private" + "user-read-email" + # Library + "user-library-modify" + "user-library-read" + # Follow + "user-follow-modify" + "user-follow-read" + # Listening History + "user-read-recently-played" + "user-top-read" + # Playback + "streaming" + "app-remote-control" +) + + +def set_required_scopes(*scopes: Optional[str]) -> Callable: + """A decorator that lets you attach metadata to functions. + + Parameters + ---------- + scopes : :class:`str` + A series of scopes that are required. + + Returns + ------- + decorator : :class:`typing.Callable` + The decorator that sets the scope metadata. + """ + + def decorate(func) -> Callable: + func.__requires_spotify_scopes__ = tuple(scopes) + return func + + return decorate + + +def get_required_scopes(func: Callable[..., Any]) -> Tuple[str, ...]: + """Get the required scopes for a function. + + Parameters + ---------- + func : Callable[..., Any] + The function to inspect. + + Returns + ------- + scopes : Tuple[:class:`str`, ...] + A tuple of scopes required for a call to succeed. + """ + if not hasattr(func, "__requires_spotify_scopes__"): + raise AttributeError("Scope metadata has not been set for this object!") + return func.__requires_spotify_scopes__ # type: ignore + + +class OAuth2: + """Helper object for Spotify OAuth2 operations. + + At a very basic level you can you oauth2 only for authentication. + + >>> oauth2 = OAuth2(client, 'some://redirect/uri') + >>> print(oauth2.url) + + Working with scopes: + + >>> oauth2 = OAuth2(client, 'some://redirect/uri', scopes=['user-read-currently-playing']) + >>> oauth2.set_scopes(user_read_playback_state=True) + + Parameters + ---------- + client_id : :class:`str` + The client id provided by spotify for the app. + redirect_uri : :class:`str` + The URI Spotify should redirect to. + scopes : Optional[Iterable[:class:`str`], Dict[:class:`str`, :class:`bool`]] + The scopes to be requested. + state : Optional[:class:`str`] + The state used to secure sessions. + + Attributes + ---------- + attrs : Dict[:class:`str`, :class:`str`] + The attributes used when constructing url parameters + parameters : :class:`str` + The URL parameters used + url : :class:`str` + The URL for OAuth2 + """ + + _BASE = "https://accounts.spotify.com/authorize/?response_type=code&{parameters}" + + def __init__( + self, + client_id: str, + redirect_uri: str, + *, + scopes: Optional[Union[Iterable[str], Dict[str, bool]]] = None, + state: str = None, + ): + self.client_id = client_id + self.redirect_uri = redirect_uri + self.state = state + self.__scopes: Set[str] = set() + + if scopes is not None: + if not isinstance(scopes, dict) and hasattr(scopes, "__iter__"): + scopes = {scope: True for scope in scopes} + + if isinstance(scopes, dict): + self.set_scopes(**scopes) + else: + raise TypeError( + f"scopes must be an iterable of strings or a dict of string to bools" + ) + + def __repr__(self): + return f"" + + def __str__(self): + return self.url + + # Alternate constructors + + @classmethod + def from_client(cls, client, *args, **kwargs): + """Construct a OAuth2 object from a `spotify.Client`.""" + return cls(client.http.client_id, *args, **kwargs) + + @staticmethod + def url_only(**kwargs) -> str: + """Construct a OAuth2 URL instead of an OAuth2 object.""" + oauth = OAuth2(**kwargs) + return oauth.url + + # Properties + + @property + def scopes(self): + """:class:`frozenset` - A frozenset of the current scopes""" + return frozenset(self.__scopes) + + @property + def attributes(self): + """Attributes used when constructing url parameters.""" + data = {"client_id": self.client_id, "redirect_uri": quote(self.redirect_uri)} + + if self.scopes: + data["scope"] = quote(" ".join(self.scopes)) + + if self.state is not None: + data["state"] = self.state + + return data + + attrs = attributes + + @property + def parameters(self) -> str: + """:class:`str` - The formatted url parameters that are used.""" + return "&".join("{0}={1}".format(*item) for item in self.attributes.items()) + + @property + def url(self) -> str: + """:class:`str` - The formatted oauth url used for authorization.""" + return self._BASE.format(parameters=self.parameters) + + # Public api + + def set_scopes(self, **scopes: Dict[str, bool]) -> None: + r"""Modify the scopes for the current oauth2 object. + + Add or remove certain scopes from this oauth2 instance. + Since hypens are not allowed, replace the _ with -. + + >>> oauth2.set_scopes(user_read_playback_state=True) + + Parameters + ---------- + \*\*scopes: Dict[:class:`str`, :class:`bool`] + The scopes to enable or disable. + """ + for scope_name, state in scopes.items(): + scope_name = scope_name.replace("_","-") + if state: + self.__scopes.add(scope_name) + else: + self.__scopes.remove(scope_name) diff --git a/pomice/spotify/sync/__init__.py b/pomice/spotify/sync/__init__.py new file mode 100644 index 0000000..963b0a7 --- /dev/null +++ b/pomice/spotify/sync/__init__.py @@ -0,0 +1,26 @@ +# pylint: skip-file + +from spotify import * +from spotify import __all__, _types, Client +from spotify.utils import clean as _clean_namespace + +from . import models +from .models import Client, Synchronous as _Sync + + +with _clean_namespace(locals(), "name", "base", "Mock"): + for name, base in _types.items(): + + class Mock(base, metaclass=_Sync): # type: ignore + __slots__ = {"__client_thread__"} + + Mock.__name__ = base.__name__ + Mock.__qualname__ = base.__qualname__ + Mock.__doc__ = base.__doc__ + + locals()[name] = Mock + setattr(models, name, Mock) + + Client._default_http_client = locals()[ + "HTTPClient" + ] # pylint: disable=protected-access diff --git a/pomice/spotify/sync/models.py b/pomice/spotify/sync/models.py new file mode 100644 index 0000000..8630150 --- /dev/null +++ b/pomice/spotify/sync/models.py @@ -0,0 +1,83 @@ +from functools import wraps +from inspect import getmembers, iscoroutinefunction +from typing import Type, Callable, TYPE_CHECKING + +from .. import Client as _Client +from .thread import EventLoopThread + +if TYPE_CHECKING: + import spotify + + +def _infer_initializer(base: Type, name: str) -> Callable[..., None]: + """Infer the __init__ to use for a given :class:`typing.Type` base and :class:`str` name.""" + if name in {"HTTPClient", "HTTPUserClient"}: + + def initializer(self: "spotify.HTTPClient", *args, **kwargs) -> None: + base.__init__(self, *args, **kwargs) + self.__client_thread__ = kwargs["loop"].__spotify_thread__ # type: ignore + + else: + assert name != "Client" + + def initializer(self: "spotify.SpotifyBase", client: _Client, *args, **kwargs) -> None: # type: ignore + base.__init__(self, client, *args, **kwargs) + self.__client_thread__ = client.__client_thread__ # type: ignore + + return initializer + + +def _normalize_coroutine_function(corofunc): + if isinstance(corofunc, classmethod): + + @classmethod + @wraps(corofunc) + def wrapped(cls, client, *args, **kwargs): + assert isinstance(client, _Client) + client.__client_thread__.run_coroutine_threadsafe( + corofunc(cls, client, *args, **kwargs) + ) + + else: + + @wraps(corofunc) + def wrapped(self, *args, **kwargs): + return self.__client_thread__.run_coroutine_threadsafe( + corofunc(self, *args, **kwargs) + ) + + return wrapped + + +class Synchronous(type): + """Metaclass used for overloading coroutine functions on models.""" + + def __new__(cls, name, bases, dct): + klass = super().__new__(cls, name, bases, dct) + + base = bases[0] + name = base.__name__ + + if name != "Client": + # Models and the HTTP classes get their __init__ overloaded. + initializer = _infer_initializer(base, name) + setattr(klass, "__init__", initializer) + + for ident, obj in getmembers(base): + if not iscoroutinefunction(obj): + continue + + setattr(klass, ident, _normalize_coroutine_function(obj)) + + return klass # type: ignore + + +class Client(_Client, metaclass=Synchronous): + def __init__(self, *args, **kwargs): + thread = EventLoopThread() + thread.start() + + kwargs["loop"] = thread.loop # pylint: disable=protected-access + + super().__init__(*args, **kwargs) + self.__thread = self.__client_thread__ = thread diff --git a/pomice/spotify/sync/thread.py b/pomice/spotify/sync/thread.py new file mode 100644 index 0000000..2627f75 --- /dev/null +++ b/pomice/spotify/sync/thread.py @@ -0,0 +1,48 @@ +from asyncio import new_event_loop, run_coroutine_threadsafe, set_event_loop +from threading import Thread, RLock, get_ident +from typing import Any, Coroutine + + +class EventLoopThread(Thread): + """A surrogate thread that spins an asyncio event loop.""" + + def __init__(self): + super().__init__(daemon=True) + + self.__lock = RLock() + self.__loop = loop = new_event_loop() + loop.__spotify_thread__ = self + + # Properties + + @property + def loop(self): + return self.__loop + + # Overloads + + def run(self): + set_event_loop(self.__loop) + self.__loop.run_forever() + + # Public API + + def run_coroutine_threadsafe(self, coro: Coroutine) -> Any: + """Like :func:`asyncio.run_coroutine_threadsafe` but for this specific thread.""" + + # If the current thread is the same + # as the event loop Thread. + # + # then we're in the process of making + # nested calls to await other coroutines + # and should pass back the coroutine as it should be. + if get_ident() == self.ident: + return coro + + # Double lock because I haven't looked + # into whether this deadlocks under whatever + # conditions, Best to play it safe. + with self.__lock: + future = run_coroutine_threadsafe(coro, self.__loop) + + return future.result() diff --git a/pomice/spotify/utils.py b/pomice/spotify/utils.py new file mode 100644 index 0000000..64a580a --- /dev/null +++ b/pomice/spotify/utils.py @@ -0,0 +1,60 @@ +from re import compile as re_compile +from functools import lru_cache +from contextlib import contextmanager +from typing import Iterable, Hashable, TypeVar, Dict, Tuple + +__all__ = ("clean", "filter_items", "to_id") + +_URI_RE = re_compile(r"^.*:([a-zA-Z0-9]+)$") +_OPEN_RE = re_compile(r"http[s]?:\/\/open\.spotify\.com\/(.*)\/(.*)") + + +@contextmanager +def clean(mapping: dict, *keys: Iterable[Hashable]): + """A helper context manager that defers mutating a mapping.""" + yield + for key in keys: + mapping.pop(key) + + +K = TypeVar("K") # pylint: disable=invalid-name +V = TypeVar("V") # pylint: disable=invalid-name + + +@lru_cache(maxsize=1024) +def _cached_filter_items(data: Tuple[Tuple[K, V], ...]) -> Dict[K, V]: + data_ = {} + for key, value in data: + if value is not None: + data_[key] = value + return data_ + + +def filter_items(data: Dict[K, V]) -> Dict[K, V]: + """Filter the items of a dict where the value is not None.""" + return _cached_filter_items((*data.items(),)) + + +def to_id(value: str) -> str: + """Get a spotify ID from a URI or open.spotify URL. + + Paramters + --------- + value : :class:`str` + The value to operate on. + + Returns + ------- + id : :class:`str` + The Spotify ID from the value. + """ + value = value.strip() + match = _URI_RE.match(value) + + if match is None: + match = _OPEN_RE.match(value) + + if match is None: + return value + return match.group(2) + return match.group(1) diff --git a/pomice/utils.py b/pomice/utils.py index fb731c1..347bd2a 100644 --- a/pomice/utils.py +++ b/pomice/utils.py @@ -77,3 +77,5 @@ class NodeStats: def __repr__(self) -> str: return f'' + +