get basic code for the apple music client in place and working
This commit is contained in:
parent
25c6d399e8
commit
809bb4aa3f
|
|
@ -1 +1,5 @@
|
|||
"""Apple Music module for Pomice, made possible by cloudwithax 2023"""
|
||||
|
||||
from .exceptions import *
|
||||
from .objects import *
|
||||
from .client import Client
|
||||
|
|
@ -0,0 +1,85 @@
|
|||
import re
|
||||
import aiohttp
|
||||
import json
|
||||
|
||||
AM_URL_REGEX = re.compile(r"https?://music.apple.com/(?P<country>[a-zA-Z]{2})/(?P<type>album|playlist|song|artist)/(?P<name>.+)/(?P<id>[^?]+)")
|
||||
AM_SINGLE_IN_ALBUM_REGEX = re.compile(r"https?://music.apple.com/(?P<country>[a-zA-Z]{2})/(?P<type>album|playlist|song|artist)/(?P<name>.+)/(?P<id>.+)(\?i=)(?P<id2>.+)")
|
||||
AM_REQ_URL = "https://api.music.apple.com/v1/catalog/{country}/{type}s/{id}"
|
||||
|
||||
class Client:
|
||||
"""The base Apple Music client for Pomice.
|
||||
This will do all the heavy lifting of getting tracks from Apple Music
|
||||
and translating it to a valid Lavalink track. No client auth is required here.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.token: str = None
|
||||
self.origin: str = None
|
||||
self.session: aiohttp.ClientSession = None
|
||||
self.headers = None
|
||||
|
||||
|
||||
async def request_token(self):
|
||||
self.session = aiohttp.ClientSession()
|
||||
async with self.session.get("https://music.apple.com/assets/index.919fe17f.js") as resp:
|
||||
text = await resp.text()
|
||||
result = re.search("\"(eyJ.+?)\"", text).group(1)
|
||||
self.token = result
|
||||
self.headers = {
|
||||
'Authorization': f"Bearer {result}",
|
||||
'Origin': 'https://apple.com',
|
||||
}
|
||||
|
||||
|
||||
async def search(self, query: str):
|
||||
if not self.token:
|
||||
await self.request_token()
|
||||
|
||||
result = AM_URL_REGEX.match(query)
|
||||
|
||||
country = result.group("country")
|
||||
type = result.group("type")
|
||||
id = result.group("id")
|
||||
|
||||
if type == "album" and (sia_result := AM_SINGLE_IN_ALBUM_REGEX.match(query)):
|
||||
# apple music likes to generate links for singles off an album
|
||||
# by adding a param at the end of the url
|
||||
# so we're gonna scan for that and correct it
|
||||
|
||||
id = sia_result.group("id2")
|
||||
type = "song"
|
||||
request_url = AM_REQ_URL.format(country=country, type=type, id=id)
|
||||
else:
|
||||
request_url = AM_REQ_URL.format(country=country, type=type, id=id)
|
||||
|
||||
print(request_url)
|
||||
|
||||
print(self.token)
|
||||
|
||||
|
||||
if type == "playlist":
|
||||
async with self.session.get(request_url, headers=self.headers) as resp:
|
||||
print(resp.status)
|
||||
data = await resp.json()
|
||||
|
||||
elif type == "album":
|
||||
async with self.session.get(request_url, headers=self.headers) as resp:
|
||||
print(resp.status)
|
||||
data = await resp.json()
|
||||
|
||||
elif type == "song":
|
||||
async with self.session.get(request_url, headers=self.headers) as resp:
|
||||
print(resp.status)
|
||||
data = await resp.json()
|
||||
|
||||
elif type == "artist":
|
||||
async with self.session.get(request_url, headers=self.headers) as resp:
|
||||
print(resp.status)
|
||||
data = await resp.json()
|
||||
|
||||
with open('yes.txt', 'w') as file:
|
||||
file.write(json.dumps(data))
|
||||
|
||||
|
||||
|
||||
await self.session.close()
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
class AppleMusicRequestException(Exception):
|
||||
"""An error occurred when making a request to the Apple Music API"""
|
||||
pass
|
||||
|
||||
|
||||
class InvalidAppleMusicURL(Exception):
|
||||
"""An invalid Apple Music URL was passed"""
|
||||
pass
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
class Track:
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class Album:
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
class Playlist:
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
class Artist:
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
|
@ -17,6 +17,7 @@ from discord.ext import commands
|
|||
from . import (
|
||||
__version__,
|
||||
spotify,
|
||||
applemusic
|
||||
)
|
||||
|
||||
from .enums import SearchType, NodeAlgorithm
|
||||
|
|
@ -46,6 +47,11 @@ DISCORD_MP3_URL_REGEX = re.compile(
|
|||
r"(?P<message_id>[0-9]+)/(?P<file>[a-zA-Z0-9_.]+)+"
|
||||
)
|
||||
|
||||
AM_URL_REGEX = re.compile(
|
||||
r"https?://music.apple.com/(?P<country>[a-zA-Z]{2})/(?P<type>album|playlist|song|artist)/(?P<name>.+)/(?P<id>[^?]+)"
|
||||
)
|
||||
|
||||
|
||||
URL_REGEX = re.compile(
|
||||
r"https?://(?:www\.)?.+"
|
||||
)
|
||||
|
|
@ -56,6 +62,7 @@ class Node:
|
|||
"""The base class for a node.
|
||||
This node object represents a Lavalink node.
|
||||
To enable Spotify searching, pass in a proper Spotify Client ID and Spotify Client Secret
|
||||
To enable Apple music, set the "apple_music" parameter to "True"
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
|
|
@ -72,6 +79,7 @@ class Node:
|
|||
session: Optional[aiohttp.ClientSession] = None,
|
||||
spotify_client_id: Optional[str] = None,
|
||||
spotify_client_secret: Optional[str] = None,
|
||||
apple_music: bool = False
|
||||
|
||||
):
|
||||
self._bot = bot
|
||||
|
|
@ -91,7 +99,7 @@ class Node:
|
|||
self._websocket: aiohttp.ClientWebSocketResponse = None
|
||||
self._task: asyncio.Task = None
|
||||
|
||||
self._session_id = None
|
||||
self._session_id: str = None
|
||||
self._metadata = None
|
||||
self._available = None
|
||||
|
||||
|
|
@ -111,6 +119,9 @@ class Node:
|
|||
self._spotify_client_id, self._spotify_client_secret
|
||||
)
|
||||
|
||||
if apple_music:
|
||||
self._apple_music_client = applemusic.Client()
|
||||
|
||||
self._bot.add_listener(self._update_handler, "on_socket_response")
|
||||
|
||||
def __repr__(self):
|
||||
|
|
@ -204,8 +215,12 @@ class Node:
|
|||
self._stats = NodeStats(data)
|
||||
return
|
||||
|
||||
if not (player := self._players.get(int(data["guildId"]))):
|
||||
return
|
||||
if op == "ready":
|
||||
self._session_id = data.get("sessionId")
|
||||
|
||||
if "guildId" in data:
|
||||
if not (player := self._players.get(int(data["guildId"]))):
|
||||
return
|
||||
|
||||
if op == "event":
|
||||
await player._dispatch_event(data)
|
||||
|
|
@ -216,9 +231,9 @@ class Node:
|
|||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
guild_id: Optional[Union[int, str]],
|
||||
query: Optional[str],
|
||||
data: Optional[Union[dict, str]]
|
||||
guild_id: Optional[Union[int, str]] = None,
|
||||
query: Optional[str] = None,
|
||||
data: Optional[Union[dict, str]] = None
|
||||
):
|
||||
if not self._available:
|
||||
raise NodeNotAvailable(
|
||||
|
|
@ -231,10 +246,13 @@ class Node:
|
|||
f'{f"/{guild_id}" if guild_id else ""}' \
|
||||
f'{f"?{query}" if query else ""}'
|
||||
|
||||
async with self._session.request(method=method, url=uri, json=data or {}) as resp:
|
||||
async with self._session.request(method=method, url=uri, headers={"Authorization": self._password}, json=data or {}) as resp:
|
||||
if resp.status >= 300:
|
||||
raise NodeRestException(f'Error fetching from Lavalink REST api: {resp.status} {resp.reason}')
|
||||
|
||||
if method == "DELETE":
|
||||
return await resp.json(content_type=None)
|
||||
|
||||
return await resp.json()
|
||||
|
||||
|
||||
|
|
@ -253,12 +271,10 @@ class Node:
|
|||
)
|
||||
self._task = self._bot.loop.create_task(self._listen())
|
||||
self._available = True
|
||||
self._session_id = f"pomice_{secrets.token_hex(20)}"
|
||||
async with self._session.get(f'{self._rest_uri}/version') as resp:
|
||||
async with self._session.get(f'{self._rest_uri}/version', headers={"Authorization": self._password}) as resp:
|
||||
version: str = await resp.text()
|
||||
# To make version comparasion easier, lets remove the periods
|
||||
# from the version numbers and compare them like whole numbers
|
||||
version = int(version.translate(str.maketrans('', '', string.punctuation)).replace(" ", ""))
|
||||
print(version)
|
||||
|
||||
return self
|
||||
|
|
@ -340,7 +356,12 @@ class Node:
|
|||
for filter in filters:
|
||||
filter.set_preload()
|
||||
|
||||
if SPOTIFY_URL_REGEX.match(query):
|
||||
|
||||
if AM_URL_REGEX.match(query):
|
||||
await self._apple_music_client.search(query=query)
|
||||
|
||||
|
||||
elif SPOTIFY_URL_REGEX.match(query):
|
||||
if not self._spotify_client_id and not self._spotify_client_secret:
|
||||
raise InvalidSpotifyClientAuthorization(
|
||||
"You did not provide proper Spotify client authorization credentials. "
|
||||
|
|
|
|||
Loading…
Reference in New Issue