This commit is contained in:
cloudwithax 2023-02-20 04:07:10 -05:00
parent 2c380671e9
commit d63e1f61c5
11 changed files with 487 additions and 214 deletions

View File

@ -18,7 +18,7 @@ if not discord.version_info.major >= 2:
"using 'pip install discord.py'"
)
__version__ = "2.0.1a"
__version__ = "2.1"
__title__ = "pomice"
__author__ = "cloudwithax"
@ -30,4 +30,5 @@ from .objects import *
from .queue import *
from .player import *
from .pool import *
from .routeplanner import *

View File

@ -93,13 +93,16 @@ class Client:
return Artist(data, tracks=tracks)
else:
tracks = [Song(track) for track in data["relationships"]["tracks"]["data"]]
track_data: dict = data["relationships"]["tracks"]
tracks = [Song(track) for track in track_data.get("data")]
if not len(tracks):
raise AppleMusicRequestException("This playlist is empty and therefore cannot be queued.")
if data["relationships"]["tracks"]["next"]:
next_page_url = AM_BASE_URL + data["relationships"]["tracks"]["next"]
if track_data.get("next"):
next_page_url = AM_BASE_URL + track_data.get("next")
while next_page_url is not None:
async with self.session.get(next_page_url, headers=self.headers) as resp:
@ -112,12 +115,10 @@ class Client:
tracks += [Song(track) for track in next_data["data"]]
if next_data.get("next"):
next_page_url = AM_BASE_URL + next_data["next"]
next_page_url = AM_BASE_URL + next_data.get("next")
else:
next_page_url = None
return Playlist(data, tracks)

View File

@ -6,6 +6,7 @@ from typing import List
class Song:
"""The base class for an Apple Music song"""
def __init__(self, data: dict) -> None:
self.name: str = data["attributes"]["name"]
self.url: str = data["attributes"]["url"]
self.isrc: str = data["attributes"]["isrc"]
@ -36,6 +37,7 @@ class Playlist:
# we'll use the first song's image as the image for the playlist
# because apple dynamically generates playlist covers client-side
self.image = self.tracks[0].image
print("worked")
def __repr__(self) -> str:
return (

View File

@ -1,8 +1,11 @@
import re
from enum import Enum
class SearchType(Enum):
"""The enum for the different search types for Pomice.
"""
The enum for the different search types for Pomice.
This feature is exclusively for the Spotify search feature of Pomice.
If you are not using this feature, this class is not necessary.
@ -22,8 +25,59 @@ class SearchType(Enum):
def __str__(self) -> str:
return self.value
class TrackType(Enum):
"""
The enum for the different track types for Pomice.
TrackType.YOUTUBE defines that the track is from YouTube
TrackType.SOUNDCLOUD defines that the track is from SoundCloud.
TrackType.SPOTIFY defines that the track is from Spotify
TrackType.APPLE_MUSIC defines that the track is from Apple Music.
TrackType.HTTP defines that the track is from an HTTP source.
"""
# We don't have to define anything special for these, since these just serve as flags
YOUTUBE = "youtube_track"
SOUNDCLOUD = "soundcloud_track"
SPOTIFY = "spotify_track"
APPLE_MUSIC = "apple_music_track"
HTTP = "http_source"
def __str__(self) -> str:
return self.value
class PlaylistType(Enum):
"""
The enum for the different playlist types for Pomice.
PlaylistType.YOUTUBE defines that the playlist is from YouTube
PlaylistType.SOUNDCLOUD defines that the playlist is from SoundCloud.
PlaylistType.SPOTIFY defines that the playlist is from Spotify
PlaylistType.APPLE_MUSIC defines that the playlist is from Apple Music.
"""
# We don't have to define anything special for these, since these just serve as flags
YOUTUBE = "youtube_playlist"
SOUNDCLOUD = "soundcloud_playlist"
SPOTIFY = "spotify_playlist"
APPLE_MUSIC = "apple_music_list"
def __str__(self) -> str:
return self.value
class NodeAlgorithm(Enum):
"""The enum for the different node algorithms in Pomice.
"""
The enum for the different node algorithms in Pomice.
The enums in this class are to only differentiate different
methods, since the actual method is handled in the
@ -45,7 +99,8 @@ class NodeAlgorithm(Enum):
return self.value
class LoopMode(Enum):
"""The enum for the different loop modes.
"""
The enum for the different loop modes.
This feature is exclusively for the queue utility of pomice.
If you are not using this feature, this class is not necessary.
@ -55,9 +110,151 @@ class LoopMode(Enum):
"""
# We don't have to define anything special for these, since these just serve as flags
TRACK = "TRACK"
TRACK = "track"
QUEUE = "queue"
def __str__(self) -> str:
return self.value
class PlatformRecommendation(Enum):
"""
The enum for choosing what platform you want for recommendations.
This feature is exclusively for the recommendations function.
If you are not using this feature, this class is not necessary.
PlatformRecommendation.SPOTIFY sets the recommendations to come from Spotify
PlatformRecommendation.YOUTUBE sets the recommendations to come from YouTube
"""
# We don't have to define anything special for these, since these just serve as flags
SPOTIFY = "spotify"
YOUTUBE = "youtube"
def __str__(self) -> str:
return self.value
class RouteStrategy(Enum):
"""
The enum for specifying the route planner strategy for Lavalink.
This feature is exclusively for the RoutePlanner class.
If you are not using this feature, this class is not necessary.
RouteStrategy.ROTATE_ON_BAN specifies that the node is rotating IPs
whenever they get banned by Youtube.
RouteStrategy.LOAD_BALANCE specifies that the node is selecting
random IPs to balance out requests between them.
RouteStrategy.NANO_SWITCH specifies that the node is switching
between IPs every CPU clock cycle.
RouteStrategy.ROTATING_NANO_SWITCH specifies that the node is switching
between IPs every CPU clock cycle and is rotating between IP blocks on
ban.
"""
ROTATE_ON_BAN = "RotatingIpRoutePlanner"
LOAD_BALANCE = "BalancingIpRoutePlanner"
NANO_SWITCH = "NanoIpRoutePlanner"
ROTATING_NANO_SWITCH = "RotatingNanoIpRoutePlanner"
class RouteIPType(Enum):
"""
The enum for specifying the route planner IP block type for Lavalink.
This feature is exclusively for the RoutePlanner class.
If you are not using this feature, this class is not necessary.
RouteIPType.IPV4 specifies that the IP block type is IPV4
RouteIPType.IPV6 specifies that the IP block type is IPV6
"""
IPV4 = "Inet4Address"
IPV6 = "Inet6Address"
class URLRegex():
"""
The enums for all the URL Regexes in use by Pomice.
URLRegex.SPOTIFY_URL returns the Spotify URL Regex.
URLRegex.DISCORD_MP3_URL returns the Discord MP3 URL Regex.
URLRegex.YOUTUBE_URL returns the Youtube URL Regex.
URLRegex.YOUTUBE_PLAYLIST returns the Youtube Playlist Regex.
URLRegex.YOUTUBE_TIMESTAMP returns the Youtube Timestamp Regex.
URLRegex.AM_URL returns the Apple Music URL Regex.
URLRegex.SOUNDCLOUD_URL returns the SoundCloud URL Regex.
URLRegex.BASE_URL returns the standard URL Regex.
"""
SPOTIFY_URL = re.compile(
r"https?://open.spotify.com/(?P<type>album|playlist|track|artist)/(?P<id>[a-zA-Z0-9]+)"
)
DISCORD_MP3_URL = re.compile(
r"https?://cdn.discordapp.com/attachments/(?P<channel_id>[0-9]+)/"
r"(?P<message_id>[0-9]+)/(?P<file>[a-zA-Z0-9_.]+)+"
)
YOUTUBE_URL = re.compile(
r"^((?:https?:)?\/\/)?((?:www|m)\.)?((?:youtube\.com|youtu.be))"
r"(\/(?:[\w\-]+\?v=|embed\/|v\/)?)([\w\-]+)(\S+)?$"
)
YOUTUBE_PLAYLIST_URL = re.compile(
r"^((?:https?:)?\/\/)?((?:www|m)\.)?((?:youtube\.com|youtu.be))/playlist\?list=.*"
)
YOUTUBE_VID_IN_PLAYLIST = re.compile(
r"(?P<video>^.*?v.*?)(?P<list>&list.*)"
)
YOUTUBE_TIMESTAMP = re.compile(
r"(?P<video>^.*?)(\?t|&start)=(?P<time>\d+)?.*"
)
AM_URL = re.compile(
r"https?://music.apple.com/(?P<country>[a-zA-Z]{2})/"
r"(?P<type>album|playlist|song|artist)/(?P<name>.+)/(?P<id>[^?]+)"
)
AM_SINGLE_IN_ALBUM_REGEX = re.compile(
r"https?://music.apple.com/(?P<country>[a-zA-Z]{2})/(?P<type>album|playlist|song|artist)/"
r"(?P<name>.+)/(?P<id>.+)(\?i=)(?P<id2>.+)"
)
SOUNDCLOUD_URL = re.compile(
r"((?:https?:)?\/\/)?((?:www|m)\.)?soundcloud.com\/.*/.*"
)
SOUNDCLOUD_PLAYLIST_URL = re.compile(
r"^(https?:\/\/)?(www.)?(m\.)?soundcloud\.com\/.*/sets/.*"
)
SOUNDCLOUD_TRACK_IN_SET_URL = re.compile(
r"^(https?:\/\/)?(www.)?(m\.)?soundcloud\.com/[a-zA-Z0-9-._]+/[a-zA-Z0-9-._]+(\?in)"
)
LAVALINK_SEARCH = re.compile(
r"(?P<type>ytm?|sc)search:"
)
BASE_URL = re.compile(
r"https?://(?:www\.)?.+"
)

View File

@ -1,8 +1,16 @@
from __future__ import annotations
from discord import Client
from discord.ext import commands
from .pool import NodePool
from typing import TYPE_CHECKING, Union
if TYPE_CHECKING:
from .player import Player
class PomiceEvent:
"""The base class for all events dispatched by a node.
Every event must be formatted within your bot's code as a listener.
@ -15,7 +23,7 @@ class PomiceEvent:
name = "event"
handler_args = ()
def dispatch(self, bot: Client):
def dispatch(self, bot: Union[Client, commands.Bot]):
bot.dispatch(f"pomice_{self.name}", *self.handler_args)
@ -25,7 +33,7 @@ class TrackStartEvent(PomiceEvent):
"""
name = "track_start"
def __init__(self, data: dict, player):
def __init__(self, data: dict, player: Player):
self.player = player
self.track = self.player._current
@ -42,7 +50,7 @@ class TrackEndEvent(PomiceEvent):
"""
name = "track_end"
def __init__(self, data: dict, player):
def __init__(self, data: dict, player: Player):
self.player = player
self.track = self.player._ending_track
self.reason: str = data["reason"]
@ -64,7 +72,7 @@ class TrackStuckEvent(PomiceEvent):
"""
name = "track_stuck"
def __init__(self, data: dict, player):
def __init__(self, data: dict, player: Player):
self.player = player
self.track = self.player._ending_track
self.threshold: float = data["thresholdMs"]
@ -83,7 +91,7 @@ class TrackExceptionEvent(PomiceEvent):
"""
name = "track_exception"
def __init__(self, data: dict, player):
def __init__(self, data: dict, player: Player):
self.player = player
self.track = self.player._ending_track
if data.get('error'):

View File

@ -1,10 +1,10 @@
import re
from __future__ import annotations
from typing import List, Optional, Union
from discord import Member, User
from discord.ext import commands
from .enums import SearchType
from .enums import SearchType, TrackType, PlaylistType
from .filters import Filter
from . import (
@ -12,10 +12,6 @@ from . import (
applemusic
)
SOUNDCLOUD_URL_REGEX = re.compile(
r"^(https?:\/\/)?(www.)?(m\.)?soundcloud\.com\/[\w\-\.]+(\/)+[\w\-\.]+/?$"
)
class Track:
"""The base track object. Returns critical track information needed for parsing by Lavalink.
@ -28,29 +24,25 @@ class Track:
track_id: str,
info: dict,
ctx: Optional[commands.Context] = None,
spotify: bool = False,
apple_music: bool = False,
am_track: applemusic.Song = None,
track_type: TrackType,
search_type: SearchType = SearchType.ytsearch,
spotify_track: spotify.Track = None,
filters: Optional[List[Filter]] = None,
timestamp: Optional[float] = None,
requester: Optional[Union[Member, User]] = None
requester: Optional[Union[Member, User]] = None,
):
self.track_id = track_id
self.info = info
self.spotify = spotify
self.apple_music = apple_music
self.filters: List[Filter] = filters
self.track_type: TrackType = track_type
self.filters: Optional[List[Filter]] = filters
self.timestamp: Optional[float] = timestamp
if spotify or apple_music:
if self.track_type == TrackType.SPOTIFY or self.track_type == TrackType.APPLE_MUSIC:
self.original: Optional[Track] = None
else:
self.original = self
self._search_type = search_type
self.spotify_track = spotify_track
self.am_track = am_track
self.playlist: Playlist = None
self.title = info.get("title")
self.author = info.get("author")
@ -61,7 +53,7 @@ class Track:
if self.uri:
if info.get("thumbnail"):
self.thumbnail = info.get("thumbnail")
elif SOUNDCLOUD_URL_REGEX.match(self.uri):
elif self.track_type == TrackType.SOUNDCLOUD:
# ok so theres no feasible way of getting a Soundcloud image URL
# so we're just gonna leave it blank for brevity
self.thumbnail = None
@ -105,40 +97,20 @@ class Playlist:
*,
playlist_info: dict,
tracks: list,
ctx: Optional[commands.Context] = None,
spotify: bool = False,
spotify_playlist: spotify.Playlist = None,
apple_music: bool = False,
am_playlist: applemusic.Playlist = None
playlist_type: PlaylistType,
thumbnail: Optional[str] = None,
uri: Optional[str] = None
):
self.playlist_info = playlist_info
self.tracks_raw = tracks
self.spotify = spotify
self.tracks: List[Track] = tracks
self.name = playlist_info.get("name")
self.spotify_playlist = spotify_playlist
self.apple_music = apple_music
self.am_playlist = am_playlist
self.playlist_type = playlist_type
self._thumbnail = None
self._uri = None
self._thumbnail = thumbnail
self._uri = uri
if self.spotify:
self.tracks = tracks
self._thumbnail = self.spotify_playlist.image
self._uri = self.spotify_playlist.uri
elif self.apple_music:
self.tracks = tracks
self._thumbnail = self.am_playlist.image
self._uri = self.am_playlist.url
else:
self.tracks = [
Track(track_id=track["track"], info=track["info"], ctx=ctx)
for track in self.tracks_raw
]
self._thumbnail = None
self._uri = None
for track in self.tracks:
track.playlist = self
if (index := playlist_info.get("selectedTrack")) == -1:
self.selected_track = None

View File

@ -3,7 +3,8 @@ from typing import (
Any,
Dict,
List,
Optional
Optional,
Union
)
from discord import (
@ -15,11 +16,11 @@ from discord import (
from discord.ext import commands
from . import events
from .enums import SearchType
from .enums import SearchType, PlatformRecommendation
from .events import PomiceEvent, TrackEndEvent, TrackStartEvent
from .exceptions import FilterInvalidArgument, FilterTagAlreadyInUse, FilterTagInvalid, TrackInvalidPosition, TrackLoadError
from .filters import Filter
from .objects import Track
from .objects import Track, Playlist
from .pool import Node, NodePool
class Filters:
@ -111,7 +112,7 @@ class Player(VoiceProtocol):
node: Node = None
):
self.client = client
self._bot = client
self._bot: Union[Client, commands.Bot] = client
self.channel = channel
self._guild = channel.guild if channel else None
@ -197,7 +198,7 @@ class Player(VoiceProtocol):
return self._filters
@property
def bot(self) -> Client:
def bot(self) -> Union[Client, commands.Bot]:
"""Property which returns the bot associated with this player instance"""
return self._bot
@ -284,13 +285,18 @@ class Player(VoiceProtocol):
"""
return await self._node.get_tracks(query, ctx=ctx, search_type=search_type, filters=filters)
async def get_recommendations(self, *, query: str, ctx: Optional[commands.Context] = None):
async def get_recommendations(
self,
*,
track: Track,
ctx: Optional[commands.Context] = None
) -> Union[List[Track], None]:
"""
Gets recommendations from Spotify. Query must be a valid Spotify Track URL.
Gets recommendations from either YouTube or Spotify.
You can pass in a discord.py Context object to get a
Context object on all tracks that get recommended.
"""
return await self._node.get_recommendations(query=query, ctx=ctx)
return await self._node.get_recommendations(track=track, ctx=ctx)
async def connect(self, *, timeout: float, reconnect: bool, self_deaf: bool = False, self_mute: bool = False):
await self.guild.change_voice_state(channel=self.channel, self_deaf=self_deaf, self_mute=self_mute)
@ -373,6 +379,11 @@ class Player(VoiceProtocol):
}
# Lets set the current track before we play it so any
# corresponding events can capture it correctly
self._current = track
# Remove preloaded filters if last track had any
if self.filters.has_preload:
for filter in self.filters.get_preload_filters():
@ -384,6 +395,7 @@ class Player(VoiceProtocol):
# Check if theres no global filters and if the track has any filters
# that need to be applied
if track.filters and not self.filters.has_global:
# Now apply all filters
for filter in track.filters:
@ -400,7 +412,6 @@ class Player(VoiceProtocol):
query=f"noReplace={ignore_if_playing}"
)
self._current = track
return self._current
async def seek(self, position: float) -> float:

View File

@ -17,7 +17,7 @@ from . import (
applemusic
)
from .enums import SearchType, NodeAlgorithm
from .enums import *
from .exceptions import (
AppleMusicNotEnabled,
InvalidSpotifyClientAuthorization,
@ -32,37 +32,11 @@ from .exceptions import (
from .filters import Filter
from .objects import Playlist, Track
from .utils import ExponentialBackoff, NodeStats, Ping
from .routeplanner import RoutePlanner
if TYPE_CHECKING:
from .player import Player
SPOTIFY_URL_REGEX = re.compile(
r"https?://open.spotify.com/(?P<type>album|playlist|track|artist)/(?P<id>[a-zA-Z0-9]+)"
)
DISCORD_MP3_URL_REGEX = re.compile(
r"https?://cdn.discordapp.com/attachments/(?P<channel_id>[0-9]+)/"
r"(?P<message_id>[0-9]+)/(?P<file>[a-zA-Z0-9_.]+)+"
)
YOUTUBE_PLAYLIST_REGEX = re.compile(
r"(?P<video>^.*?v.*?)(?P<list>&list.*)"
)
YOUTUBE_TIMESTAMP_REGEX = re.compile(
r"(?P<video>^.*?)(\?t|&start)=(?P<time>\d+)?.*"
)
AM_URL_REGEX = re.compile(
r"https?://music.apple.com/(?P<country>[a-zA-Z]{2})/(?P<type>album|playlist|song|artist)/(?P<name>.+)/(?P<id>[^?]+)"
)
URL_REGEX = re.compile(
r"https?://(?:www\.)?.+"
)
class Node:
"""The base class for a node.
@ -75,7 +49,7 @@ class Node:
self,
*,
pool,
bot: Client,
bot: Union[Client, commands.Bot],
host: str,
port: int,
password: str,
@ -108,6 +82,8 @@ class Node:
self._session_id: str = None
self._metadata = None
self._available = None
self._version: str = None
self._route_planner = RoutePlanner(self)
self._headers = {
"Authorization": self._password,
@ -156,7 +132,7 @@ class Node:
@property
def bot(self) -> Client:
def bot(self) -> Union[Client, commands.Bot]:
"""Property which returns the discord.py client linked to this node"""
return self._bot
@ -240,10 +216,37 @@ class Node:
elif op == "playerUpdate":
await player._update_state(data)
def _get_type(self, query: str):
if match := URLRegex.LAVALINK_SEARCH.match(query):
type = match.group("type")
if type == "sc":
return TrackType.SOUNDCLOUD
return TrackType.YOUTUBE
elif URLRegex.YOUTUBE_URL.match(query):
if URLRegex.YOUTUBE_PLAYLIST_URL.match(query):
return PlaylistType.YOUTUBE
return TrackType.YOUTUBE
elif URLRegex.SOUNDCLOUD_URL.match(query):
if URLRegex.SOUNDCLOUD_TRACK_IN_SET_URL.match(query):
return TrackType.SOUNDCLOUD
if URLRegex.SOUNDCLOUD_PLAYLIST_URL.match(query):
return PlaylistType.SOUNDCLOUD
return TrackType.SOUNDCLOUD
else:
return TrackType.HTTP
async def send(
self,
method: str,
path: str,
include_version: bool = True,
guild_id: Optional[Union[int, str]] = None,
query: Optional[str] = None,
data: Optional[Union[dict, str]] = None
@ -254,18 +257,21 @@ class Node:
)
uri: str = f'{self._rest_uri}/' \
f'v3/' \
f'{f"v{self._version}/" if include_version else ""}' \
f'{path}' \
f'{f"/{guild_id}" if guild_id else ""}' \
f'{f"?{query}" if query else ""}'
async with self._session.request(method=method, url=uri, headers={"Authorization": self._password}, json=data or {}) as resp:
async with self._session.request(method=method, url=uri, headers=self._headers, json=data or {}) as resp:
if resp.status >= 300:
raise NodeRestException(f'Error fetching from Lavalink REST api: {resp.status} {resp.reason}')
if method == "DELETE":
if method == "DELETE" or resp.status == 204:
return await resp.json(content_type=None)
if resp.content_type == "text/plain":
return await resp.text()
return await resp.json()
@ -284,8 +290,7 @@ class Node:
)
self._task = self._bot.loop.create_task(self._listen())
self._available = True
async with self._session.get(f'{self._rest_uri}/version', headers={"Authorization": self._password}) as resp:
version: str = await resp.text()
version = await self.send(method="GET", path="version", include_version=False)
version = version.replace(".", "")
if int(version) < 370:
raise LavalinkVersionIncompatible(
@ -293,6 +298,7 @@ class Node:
"Lavalink version 3.7.0 or above is required to use this library."
)
self._version = version[:1]
return self
except aiohttp.ClientConnectorError:
@ -332,17 +338,9 @@ class Node:
Context object on the track it builds.
"""
async with self._session.get(
f"{self._rest_uri}/v3/decodetrack?",
headers={"Authorization": self._password},
params={"track": identifier}
) as resp:
if not resp.status == 200:
raise TrackLoadError(
f"Failed to build track. Check if the identifier is correct and try again."
)
data: dict = await resp.json()
data: dict = await self.send(method="GET", path="decodetrack", query=f"encodedTrack={identifier}")
return Track(track_id=identifier, ctx=ctx, info=data)
async def get_tracks(
@ -367,15 +365,14 @@ class Node:
timestamp = None
if not URL_REGEX.match(query) and not re.match(r"(?:ytm?|sc)search:.", query):
if not URLRegex.BASE_URL.match(query) and not re.match(r"(?:ytm?|sc)search:.", query):
query = f"{search_type}:{query}"
if filters:
for filter in filters:
filter.set_preload()
if AM_URL_REGEX.match(query):
if URLRegex.AM_URL.match(query):
if not self._apple_music_client:
raise AppleMusicNotEnabled(
"You must have Apple Music functionality enabled in order to play Apple Music tracks."
@ -388,9 +385,8 @@ class Node:
Track(
track_id=apple_music_results.id,
ctx=ctx,
track_type=TrackType.APPLE_MUSIC,
search_type=search_type,
apple_music=True,
am_track=apple_music_results,
filters=filters,
info={
"title": apple_music_results.name,
@ -411,9 +407,8 @@ class Node:
Track(
track_id=track.id,
ctx=ctx,
track_type=TrackType.APPLE_MUSIC,
search_type=search_type,
apple_music=True,
am_track=track,
filters=filters,
info={
"title": track.name,
@ -433,13 +428,13 @@ class Node:
return Playlist(
playlist_info={"name": apple_music_results.name, "selectedTrack": 0},
tracks=tracks,
ctx=ctx,
apple_music=True,
am_playlist=apple_music_results
playlist_type=PlaylistType.APPLE_MUSIC,
thumbnail=apple_music_results.image,
uri=apple_music_results.url
)
elif SPOTIFY_URL_REGEX.match(query):
elif URLRegex.SPOTIFY_URL.match(query):
if not self._spotify_client_id and not self._spotify_client_secret:
raise InvalidSpotifyClientAuthorization(
"You did not provide proper Spotify client authorization credentials. "
@ -454,9 +449,8 @@ class Node:
Track(
track_id=spotify_results.id,
ctx=ctx,
track_type=TrackType.SPOTIFY,
search_type=search_type,
spotify=True,
spotify_track=spotify_results,
filters=filters,
info={
"title": spotify_results.name,
@ -477,9 +471,8 @@ class Node:
Track(
track_id=track.id,
ctx=ctx,
track_type=TrackType.SPOTIFY,
search_type=search_type,
spotify=True,
spotify_track=track,
filters=filters,
info={
"title": track.name,
@ -499,17 +492,14 @@ class Node:
return Playlist(
playlist_info={"name": spotify_results.name, "selectedTrack": 0},
tracks=tracks,
ctx=ctx,
spotify=True,
spotify_playlist=spotify_results
playlist_type=PlaylistType.SPOTIFY,
thumbnail=spotify_results.image,
uri=spotify_results.uri
)
elif discord_url := DISCORD_MP3_URL_REGEX.match(query):
async with self._session.get(
url=f"{self._rest_uri}/v3/loadtracks?identifier={quote(query)}",
headers={"Authorization": self._password}
) as response:
data: dict = await response.json()
elif discord_url := URLRegex.DISCORD_MP3_URL.match(query):
data: dict = await self.send(method="GET", path="loadtracks", query=f"identifier={quote(query)}")
track: dict = data["tracks"][0]
info: dict = track.get("info")
@ -526,6 +516,7 @@ class Node:
"identifier": info.get("identifier")
},
ctx=ctx,
track_type=TrackType.HTTP,
filters=filters
)
]
@ -533,24 +524,21 @@ class Node:
else:
# If YouTube url contains a timestamp, capture it for use later.
if (match := YOUTUBE_TIMESTAMP_REGEX.match(query)):
if (match := URLRegex.YOUTUBE_TIMESTAMP.match(query)):
timestamp = float(match.group("time"))
# If query is a video thats part of a playlist, get the video and queue that instead
# (I can't tell you how much i've wanted to implement this in here)
if (match := YOUTUBE_PLAYLIST_REGEX.match(query)):
if (match := URLRegex.YOUTUBE_VID_IN_PLAYLIST.match(query)):
query = match.group("video")
async with self._session.get(
url=f"{self._rest_uri}/v3/loadtracks?identifier={quote(query)}",
headers={"Authorization": self._password}
) as response:
data = await response.json()
data: dict = await self.send(method="GET", path="loadtracks", query=f"identifier={quote(query)}")
load_type = data.get("loadType")
query_type = self._get_type(query)
if not load_type:
raise TrackLoadError("There was an error while trying to load this track.")
@ -562,10 +550,21 @@ class Node:
return None
elif load_type == "PLAYLIST_LOADED":
if query_type == PlaylistType.SOUNDCLOUD:
track_type = TrackType.SOUNDCLOUD
else:
track_type = TrackType.YOUTUBE
tracks = [
Track(track_id=track["track"], info=track["info"], ctx=ctx, track_type=track_type)
for track in data["tracks"]
]
return Playlist(
playlist_info=data["playlistInfo"],
tracks=data["tracks"],
ctx=ctx
tracks=tracks,
playlist_type=query_type,
thumbnail=tracks[0].thumbnail,
uri=query
)
elif load_type == "SEARCH_RESULT" or load_type == "TRACK_LOADED":
@ -574,25 +573,33 @@ class Node:
track_id=track["track"],
info=track["info"],
ctx=ctx,
track_type=query_type,
filters=filters,
timestamp=timestamp
)
for track in data["tracks"]
]
async def get_recommendations(self, *, query: str, ctx: Optional[commands.Context] = None):
async def get_recommendations(
self,
*,
track: Track,
ctx: Optional[commands.Context] = None
) -> Union[List[Track], None]:
"""
Gets recommendations from Spotify. Query must be a valid Spotify Track URL.
Gets recommendations from either YouTube or Spotify.
The track that is passed in must be either from
YouTube or Spotify or else this will not work.
You can pass in a discord.py Context object to get a
Context object on all tracks that get recommended.
"""
results = await self._spotify_client.get_recommendations(query=query)
if track.track_type == TrackType.SPOTIFY:
results = await self._spotify_client.get_recommendations(query=track.uri)
tracks = [
Track(
track_id=track.id,
ctx=ctx,
spotify=True,
spotify_track=track,
track_type=TrackType.SPOTIFY,
info={
"title": track.name,
"author": track.artists,
@ -610,8 +617,11 @@ class Node:
]
return tracks
elif track.track_type == TrackType.YOUTUBE:
tracks = await self.get_tracks(query=f"ytsearch:https://www.youtube.com/watch?v={track.identifier}&list=RD{track.identifier}", ctx=ctx)
return tracks
else:
raise TrackLoadError("The specfied track must be either a YouTube or Spotify track to recieve recommendations.")
class NodePool:

30
pomice/routeplanner.py Normal file
View File

@ -0,0 +1,30 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .pool import Node
from .utils import RouteStats
class RoutePlanner:
"""
The base route planner class for Pomice.
Handles all requests made to the route planner API for Lavalink.
"""
def __init__(self, node: Node) -> None:
self.node = node
self.session = node._session
async def get_status(self):
"""Gets the status of the route planner API."""
data: dict = await self.node.send(method="GET", path="routeplanner/status")
return RouteStats(data)
async def free_address(self, ip: str):
"""Frees an address using the route planner API"""
await self.node.send(method="POST", path="routeplanner/free/address", data={"address": ip})
async def free_all_addresses(self):
"""Frees all available addresses using the route planner api"""
await self.node.send(method="POST", path="routeplanner/free/address/all")

View File

@ -43,7 +43,7 @@ class Playlist:
if data.get("images") and len(data["images"]):
self.image: str = data["images"][0]["url"]
else:
self.image = None
self.image = self.tracks[0].image
self.uri = data["external_urls"]["spotify"]
def __repr__(self) -> str:

View File

@ -1,8 +1,11 @@
import random
import time
import socket
from .enums import RouteStrategy, RouteIPType
from timeit import default_timer as timer
from itertools import zip_longest
from datetime import datetime
__all__ = [
@ -84,6 +87,44 @@ class NodeStats:
def __repr__(self) -> str:
return f"<Pomice.NodeStats total_players={self.players_total!r} playing_active={self.players_active!r}>"
class FailingIPBlock:
"""
The base class for the failing IP block object from the route planner stats.
Gives critical information about any failing addresses on the block
and the time they failed.
"""
def __init__(self, data: dict) -> None:
self.address = data.get("address")
self.failing_time = datetime.fromtimestamp(float(data.get("failingTimestamp")))
def __repr__(self) -> str:
return f"<Pomice.FailingIPBlock address={self.address} failing_time={self.failing_time}>"
class RouteStats:
"""
The base class for the route planner stats object.
Gives critical information about the route planner strategy on the node.
"""
def __init__(self, data: dict) -> None:
self.strategy = RouteStrategy(data.get("class"))
details: dict = data.get("details")
ip_block: dict = details.get("ipBlock")
self.ip_block_type = RouteIPType(ip_block.get("type"))
self.ip_block_size = ip_block.get("size")
self.failing_addresses = [FailingIPBlock(data) for data in details.get("failingAddresses")]
self.block_index = details.get("blockIndex")
self.address_index = details.get("currentAddressIndex")
def __repr__(self) -> str:
return f"<Pomice.RouteStats route_strategy={self.strategy!r} failing_addresses={len(self.failing_addresses)}>"
class Ping:
# Thanks to https://github.com/zhengxiaowai/tcping for the nice ping impl