Update client.py

This commit is contained in:
Crussader 2022-03-27 20:57:36 +04:00 committed by GitHub
parent 331202ae24
commit 1ee7b65d71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 103 additions and 40 deletions

View File

@ -1,18 +1,27 @@
import asyncio
import re import re
import itertools
import time import time
from base64 import b64encode from base64 import b64encode
import aiohttp import aiohttp
try:
import orjson
json = orjson
except ImportError:
import json
from .album import Album from .album import Album
from .exceptions import InvalidSpotifyURL, SpotifyRequestException from .exceptions import InvalidSpotifyURL, SpotifyRequestException
from .playlist import Playlist from .playlist import Playlist
from .track import Track from .track import Track
from .artist import Artist
GRANT_URL = "https://accounts.spotify.com/api/token" GRANT_URL = "https://accounts.spotify.com/api/token"
REQUEST_URL = "https://api.spotify.com/v1/{type}s/{id}" REQUEST_URL = "https://api.spotify.com/v1/{type}s/{id}"
SPOTIFY_URL_REGEX = re.compile( SPOTIFY_URL_REGEX = re.compile(
r"https?://open.spotify.com/(?P<type>album|playlist|track)/(?P<id>[a-zA-Z0-9]+)" r"https?://open.spotify.com/(?P<type>album|playlist|track|artist)/(?P<id>[a-zA-Z0-9]+)"
) )
@ -22,7 +31,12 @@ class Client:
for any Spotify URL you throw at it. for any Spotify URL you throw at it.
""" """
def __init__(self, client_id: str, client_secret: str) -> None: def __init__(
self,
client_id: str,
client_secret: str,
) -> None:
self._client_id = client_id self._client_id = client_id
self._client_secret = client_secret self._client_secret = client_secret
@ -30,8 +44,10 @@ class Client:
self._bearer_token: str = None self._bearer_token: str = None
self._expiry = 0 self._expiry = 0
self._auth_token = b64encode(f"{self._client_id}:{self._client_secret}".encode()) self._auth_token = b64encode(
self._grant_headers = {"Authorization": f"Basic {self._auth_token.decode()}"} f"{self._client_id}:{self._client_secret}".encode())
self._grant_headers = {
"Authorization": f"Basic {self._auth_token.decode()}"}
self._bearer_headers = None self._bearer_headers = None
async def _fetch_bearer_token(self) -> None: async def _fetch_bearer_token(self) -> None:
@ -43,15 +59,69 @@ class Client:
f"Error fetching bearer token: {resp.status} {resp.reason}" f"Error fetching bearer token: {resp.status} {resp.reason}"
) )
data: dict = await resp.json() data: dict = await resp.json(loads=json.loads)
self._bearer_token = data["access_token"] self._bearer_token = data["access_token"]
self._expiry = time.time() + (int(data["expires_in"]) - 10) self._expiry = time.time() + (int(data["expires_in"]) - 10)
self._bearer_headers = {"Authorization": f"Bearer {self._bearer_token}"} self._bearer_headers = {
"Authorization": f"Bearer {self._bearer_token}"}
async def _load(self, type_: str, request_url: str, data: dict, tracks: list, other: dict = None, full: bool = False):
async def _fetch_async(count, url):
async with self.session.get(url, headers=self._bearer_headers) as resp:
if resp.status != 200:
raise SpotifyRequestException(
f"Error while fetching results: {resp.status} {resp.reason}"
)
next_data: dict = await resp.json(loads=json.loads)
inner = [count]
inner.extend(Track(track.get('track') or track)
for track in next_data['items'])
tracks.append(inner)
processes = []
if type_ == 'playlist':
if not len(tracks):
raise SpotifyRequestException(
"This playlist is empty and therefore cannot be queued.")
urls = [f"{request_url}/tracks?offset={off}"
for off in range(100, data['tracks']['total']+100, 100)]
cls = Playlist(data, [])
else:
urls = [REQUEST_URL.format(type="album", id=album['id'])+'/tracks'
for album in (data['items'] if not other else other['items'])]
cls = Artist(data, [])
processes = [_fetch_async(url, count)
for url, count in enumerate(urls[:9] if not full else urls, start=1)]
async def search(self, *, query: str): try:
await asyncio.gather(*processes)
except TimeoutError:
# dont know why it TimeoutError simply happens
# but if it does then manually waiting for each func
[await func for func in processes]
# tracks are jumbled for huge playlists so we use this to sort it out
tracks.sort(key=lambda i: False if isinstance(i, Track) else i[0])
tracks = [track for track in itertools.chain(
*tracks) if not isinstance(track, int)]
cls.tracks = tracks
if cls.total_tracks == 0:
cls.total_tracks = len(tracks)
return cls
async def search(self, *, query: str, raw: bool = False):
if not self._bearer_token or time.time() >= self._expiry: if not self._bearer_token or time.time() >= self._expiry:
await self._fetch_bearer_token() await self._fetch_bearer_token()
@ -70,41 +140,34 @@ class Client:
f"Error while fetching results: {resp.status} {resp.reason}" f"Error while fetching results: {resp.status} {resp.reason}"
) )
data: dict = await resp.json() data = await resp.json(loads=json.loads)
if spotify_type == "track": if raw:
return data
if spotify_type == 'track':
return Track(data) return Track(data)
elif spotify_type == "album": elif spotify_type == 'album':
return Album(data) return Album(data)
else: else:
tracks = []
if spotify_type == 'playlist':
inner = [0]
inner.extend(Track(track['track'])
for track in data['tracks']['items'] if track['track'])
tracks.append(inner)
tracks = [ other = None # is for artist data
Track(track["track"]) else:
for track in data["tracks"]["items"] if track["track"] is not None if not data.get('items'):
] async with self.session.get(f"{request_url}/albums", headers=self._bearer_headers) as resp:
if resp.status != 200:
raise SpotifyRequestException(
f"Error while fetching results: {resp.status} {resp.reason}"
)
if not len(tracks): other = await resp.json(loads=json.loads)
raise SpotifyRequestException("This playlist is empty and therefore cannot be queued.") else:
other = None
next_page_url = data["tracks"]["next"]
while next_page_url is not None:
async with self.session.get(next_page_url, headers=self._bearer_headers) as resp:
if resp.status != 200:
raise SpotifyRequestException(
f"Error while fetching results: {resp.status} {resp.reason}"
)
next_data: dict = await resp.json()
tracks += [
Track(track["track"])
for track in next_data["items"] if track["track"] is not None
]
next_page_url = next_data["next"]
return Playlist(data, tracks)
async def close(self):
await self.session.close()
return await self._load(spotify_type, request_url, data, tracks, other)