feat: add close to clients; style: formatting pass
This commit is contained in:
parent
458b686769
commit
c5ca63b014
|
|
@ -8,16 +8,17 @@ import base64
|
|||
from datetime import datetime
|
||||
from .objects import *
|
||||
from .exceptions import *
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..pool import Node
|
||||
|
||||
AM_URL_REGEX = re.compile(r"https?://music.apple.com/(?P<country>[a-zA-Z]{2})/(?P<type>album|playlist|song|artist)/(?P<name>.+)/(?P<id>[^?]+)")
|
||||
AM_SINGLE_IN_ALBUM_REGEX = re.compile(r"https?://music.apple.com/(?P<country>[a-zA-Z]{2})/(?P<type>album|playlist|song|artist)/(?P<name>.+)/(?P<id>.+)(\?i=)(?P<id2>.+)")
|
||||
AM_URL_REGEX = re.compile(
|
||||
r"https?://music.apple.com/(?P<country>[a-zA-Z]{2})/(?P<type>album|playlist|song|artist)/(?P<name>.+)/(?P<id>[^?]+)"
|
||||
)
|
||||
AM_SINGLE_IN_ALBUM_REGEX = re.compile(
|
||||
r"https?://music.apple.com/(?P<country>[a-zA-Z]{2})/(?P<type>album|playlist|song|artist)/(?P<name>.+)/(?P<id>.+)(\?i=)(?P<id2>.+)"
|
||||
)
|
||||
AM_REQ_URL = "https://api.music.apple.com/v1/catalog/{country}/{type}s/{id}"
|
||||
AM_BASE_URL = "https://api.music.apple.com"
|
||||
|
||||
|
||||
class Client:
|
||||
"""The base Apple Music client for Pomice.
|
||||
This will do all the heavy lifting of getting tracks from Apple Music
|
||||
|
|
@ -30,29 +31,31 @@ class Client:
|
|||
self.session: aiohttp.ClientSession = None
|
||||
self.headers = None
|
||||
|
||||
|
||||
async def request_token(self):
|
||||
if not self.session:
|
||||
self.session = aiohttp.ClientSession()
|
||||
|
||||
async with self.session.get("https://music.apple.com/assets/index.919fe17f.js") as resp:
|
||||
async with self.session.get(
|
||||
"https://music.apple.com/assets/index.919fe17f.js"
|
||||
) as resp:
|
||||
if resp.status != 200:
|
||||
raise AppleMusicRequestException(
|
||||
f"Error while fetching results: {resp.status} {resp.reason}"
|
||||
)
|
||||
text = await resp.text()
|
||||
result = re.search("\"(eyJ.+?)\"", text).group(1)
|
||||
result = re.search('"(eyJ.+?)"', text).group(1)
|
||||
self.token = result
|
||||
self.headers = {
|
||||
'Authorization': f"Bearer {result}",
|
||||
'Origin': 'https://apple.com',
|
||||
"Authorization": f"Bearer {result}",
|
||||
"Origin": "https://apple.com",
|
||||
}
|
||||
token_split = self.token.split(".")[1]
|
||||
token_json = base64.b64decode(token_split + '=' * (-len(token_split) % 4)).decode()
|
||||
token_json = base64.b64decode(
|
||||
token_split + "=" * (-len(token_split) % 4)
|
||||
).decode()
|
||||
token_data = json.loads(token_json)
|
||||
self.expiry = datetime.fromtimestamp(token_data["exp"])
|
||||
|
||||
|
||||
async def search(self, query: str):
|
||||
if not self.token or datetime.utcnow() > self.expiry:
|
||||
await self.request_token()
|
||||
|
|
@ -73,7 +76,6 @@ class Client:
|
|||
else:
|
||||
request_url = AM_REQ_URL.format(country=country, type=type, id=id)
|
||||
|
||||
|
||||
async with self.session.get(request_url, headers=self.headers) as resp:
|
||||
if resp.status != 200:
|
||||
raise AppleMusicRequestException(
|
||||
|
|
@ -83,7 +85,6 @@ class Client:
|
|||
|
||||
data = data["data"][0]
|
||||
|
||||
|
||||
if type == "song":
|
||||
return Song(data)
|
||||
|
||||
|
|
@ -91,7 +92,9 @@ class Client:
|
|||
return Album(data)
|
||||
|
||||
elif type == "artist":
|
||||
async with self.session.get(f"{request_url}/view/top-songs", headers=self.headers) as resp:
|
||||
async with self.session.get(
|
||||
f"{request_url}/view/top-songs", headers=self.headers
|
||||
) as resp:
|
||||
if resp.status != 200:
|
||||
raise AppleMusicRequestException(
|
||||
f"Error while fetching results: {resp.status} {resp.reason}"
|
||||
|
|
@ -108,13 +111,17 @@ class Client:
|
|||
tracks = [Song(track) for track in track_data.get("data")]
|
||||
|
||||
if not len(tracks):
|
||||
raise AppleMusicRequestException("This playlist is empty and therefore cannot be queued.")
|
||||
raise AppleMusicRequestException(
|
||||
"This playlist is empty and therefore cannot be queued."
|
||||
)
|
||||
|
||||
if track_data.get("next"):
|
||||
next_page_url = AM_BASE_URL + track_data.get("next")
|
||||
|
||||
while next_page_url is not None:
|
||||
async with self.session.get(next_page_url, headers=self.headers) as resp:
|
||||
async with self.session.get(
|
||||
next_page_url, headers=self.headers
|
||||
) as resp:
|
||||
if resp.status != 200:
|
||||
raise AppleMusicRequestException(
|
||||
f"Error while fetching results: {resp.status} {resp.reason}"
|
||||
|
|
@ -128,6 +135,9 @@ class Client:
|
|||
else:
|
||||
next_page_url = None
|
||||
|
||||
|
||||
|
||||
return Playlist(data, tracks)
|
||||
|
||||
async def close(self):
|
||||
if self.session:
|
||||
await self.session.close()
|
||||
self.session = None
|
||||
|
|
|
|||
|
|
@ -322,10 +322,10 @@ class Node:
|
|||
await self._websocket.close()
|
||||
await self._session.close()
|
||||
if self._spotify_client:
|
||||
await self._spotify_client.session.close()
|
||||
await self._spotify_client.close()
|
||||
|
||||
if self._apple_music_client:
|
||||
await self._apple_music_client.session.close()
|
||||
await self._apple_music_client.close()
|
||||
|
||||
del self._pool._nodes[self._identifier]
|
||||
self.available = False
|
||||
|
|
|
|||
|
|
@ -11,8 +11,6 @@ from .exceptions import InvalidSpotifyURL, SpotifyRequestException
|
|||
from .objects import *
|
||||
|
||||
|
||||
|
||||
|
||||
GRANT_URL = "https://accounts.spotify.com/api/token"
|
||||
REQUEST_URL = "https://api.spotify.com/v1/{type}s/{id}"
|
||||
SPOTIFY_URL_REGEX = re.compile(
|
||||
|
|
@ -34,7 +32,9 @@ class Client:
|
|||
|
||||
self._bearer_token: str = None
|
||||
self._expiry = 0
|
||||
self._auth_token = b64encode(f"{self._client_id}:{self._client_secret}".encode())
|
||||
self._auth_token = b64encode(
|
||||
f"{self._client_id}:{self._client_secret}".encode()
|
||||
)
|
||||
self._grant_headers = {"Authorization": f"Basic {self._auth_token.decode()}"}
|
||||
self._bearer_headers = None
|
||||
|
||||
|
|
@ -44,7 +44,9 @@ class Client:
|
|||
if not self.session:
|
||||
self.session = aiohttp.ClientSession()
|
||||
|
||||
async with self.session.post(GRANT_URL, data=_data, headers=self._grant_headers) as resp:
|
||||
async with self.session.post(
|
||||
GRANT_URL, data=_data, headers=self._grant_headers
|
||||
) as resp:
|
||||
if resp.status != 200:
|
||||
raise SpotifyRequestException(
|
||||
f"Error fetching bearer token: {resp.status} {resp.reason}"
|
||||
|
|
@ -82,28 +84,35 @@ class Client:
|
|||
elif spotify_type == "album":
|
||||
return Album(data)
|
||||
elif spotify_type == "artist":
|
||||
async with self.session.get(f"{request_url}/top-tracks?market=US", headers=self._bearer_headers) as resp:
|
||||
async with self.session.get(
|
||||
f"{request_url}/top-tracks?market=US", headers=self._bearer_headers
|
||||
) as resp:
|
||||
if resp.status != 200:
|
||||
raise SpotifyRequestException(
|
||||
f"Error while fetching results: {resp.status} {resp.reason}"
|
||||
)
|
||||
|
||||
track_data: dict = await resp.json(loads=json.loads)
|
||||
tracks = track_data['tracks']
|
||||
tracks = track_data["tracks"]
|
||||
return Artist(data, tracks)
|
||||
else:
|
||||
tracks = [
|
||||
Track(track["track"])
|
||||
for track in data["tracks"]["items"] if track["track"] is not None
|
||||
for track in data["tracks"]["items"]
|
||||
if track["track"] is not None
|
||||
]
|
||||
|
||||
if not len(tracks):
|
||||
raise SpotifyRequestException("This playlist is empty and therefore cannot be queued.")
|
||||
raise SpotifyRequestException(
|
||||
"This playlist is empty and therefore cannot be queued."
|
||||
)
|
||||
|
||||
next_page_url = data["tracks"]["next"]
|
||||
|
||||
while next_page_url is not None:
|
||||
async with self.session.get(next_page_url, headers=self._bearer_headers) as resp:
|
||||
async with self.session.get(
|
||||
next_page_url, headers=self._bearer_headers
|
||||
) as resp:
|
||||
if resp.status != 200:
|
||||
raise SpotifyRequestException(
|
||||
f"Error while fetching results: {resp.status} {resp.reason}"
|
||||
|
|
@ -113,7 +122,8 @@ class Client:
|
|||
|
||||
tracks += [
|
||||
Track(track["track"])
|
||||
for track in next_data["items"] if track["track"] is not None
|
||||
for track in next_data["items"]
|
||||
if track["track"] is not None
|
||||
]
|
||||
next_page_url = next_data["next"]
|
||||
|
||||
|
|
@ -133,7 +143,9 @@ class Client:
|
|||
if not spotify_type == "track":
|
||||
raise InvalidSpotifyURL("The provided query is not a Spotify track.")
|
||||
|
||||
request_url = REQUEST_URL.format(type="recommendation", id=f"?seed_tracks={spotify_id}")
|
||||
request_url = REQUEST_URL.format(
|
||||
type="recommendation", id=f"?seed_tracks={spotify_id}"
|
||||
)
|
||||
|
||||
async with self.session.get(request_url, headers=self._bearer_headers) as resp:
|
||||
if resp.status != 200:
|
||||
|
|
@ -146,3 +158,8 @@ class Client:
|
|||
tracks = [Track(track) for track in data["tracks"]]
|
||||
|
||||
return tracks
|
||||
|
||||
async def close(self) -> None:
|
||||
if self.session:
|
||||
await self.session.close()
|
||||
self.session = None
|
||||
|
|
|
|||
Loading…
Reference in New Issue