remove build dir

This commit is contained in:
Clxud 2023-02-28 17:13:38 +00:00
parent 8071a85533
commit e5af15a237
19 changed files with 0 additions and 3353 deletions

View File

@ -1,34 +0,0 @@
"""
Pomice
~~~~~~
The modern Lavalink wrapper designed for discord.py.
:copyright: 2023, cloudwithax
:license: GPL-3.0
"""
import discord
if not discord.version_info.major >= 2:
class DiscordPyOutdated(Exception):
pass
raise DiscordPyOutdated(
"You must have discord.py (v2.0 or greater) to use this library. "
"Uninstall your current version and install discord.py 2.0 "
"using 'pip install discord.py'"
)
__version__ = "2.1.1"
__title__ = "pomice"
__author__ = "cloudwithax"
from .enums import *
from .events import *
from .exceptions import *
from .filters import *
from .objects import *
from .queue import *
from .player import *
from .pool import *
from .routeplanner import *

View File

@ -1,5 +0,0 @@
"""Apple Music module for Pomice, made possible by cloudwithax 2023"""
from .exceptions import *
from .objects import *
from .client import Client

View File

@ -1,124 +0,0 @@
import re
import aiohttp
import orjson as json
import base64
from datetime import datetime
from .objects import *
from .exceptions import *
AM_URL_REGEX = re.compile(r"https?://music.apple.com/(?P<country>[a-zA-Z]{2})/(?P<type>album|playlist|song|artist)/(?P<name>.+)/(?P<id>[^?]+)")
AM_SINGLE_IN_ALBUM_REGEX = re.compile(r"https?://music.apple.com/(?P<country>[a-zA-Z]{2})/(?P<type>album|playlist|song|artist)/(?P<name>.+)/(?P<id>.+)(\?i=)(?P<id2>.+)")
AM_REQ_URL = "https://api.music.apple.com/v1/catalog/{country}/{type}s/{id}"
AM_BASE_URL = "https://api.music.apple.com"
class Client:
"""The base Apple Music client for Pomice.
This will do all the heavy lifting of getting tracks from Apple Music
and translating it to a valid Lavalink track. No client auth is required here.
"""
def __init__(self) -> None:
self.token: str = None
self.expiry: datetime = None
self.session: aiohttp.ClientSession = aiohttp.ClientSession()
self.headers = None
async def request_token(self):
async with self.session.get("https://music.apple.com/assets/index.919fe17f.js") as resp:
if resp.status != 200:
raise AppleMusicRequestException(
f"Error while fetching results: {resp.status} {resp.reason}"
)
text = await resp.text()
result = re.search("\"(eyJ.+?)\"", text).group(1)
self.token = result
self.headers = {
'Authorization': f"Bearer {result}",
'Origin': 'https://apple.com',
}
token_split = self.token.split(".")[1]
token_json = base64.b64decode(token_split + '=' * (-len(token_split) % 4)).decode()
token_data = json.loads(token_json)
self.expiry = datetime.fromtimestamp(token_data["exp"])
async def search(self, query: str):
if not self.token or datetime.utcnow() > self.expiry:
await self.request_token()
result = AM_URL_REGEX.match(query)
country = result.group("country")
type = result.group("type")
id = result.group("id")
if type == "album" and (sia_result := AM_SINGLE_IN_ALBUM_REGEX.match(query)):
# apple music likes to generate links for singles off an album
# by adding a param at the end of the url
# so we're gonna scan for that and correct it
id = sia_result.group("id2")
type = "song"
request_url = AM_REQ_URL.format(country=country, type=type, id=id)
else:
request_url = AM_REQ_URL.format(country=country, type=type, id=id)
async with self.session.get(request_url, headers=self.headers) as resp:
if resp.status != 200:
raise AppleMusicRequestException(
f"Error while fetching results: {resp.status} {resp.reason}"
)
data: dict = await resp.json(loads=json.loads)
data = data["data"][0]
if type == "song":
return Song(data)
elif type == "album":
return Album(data)
elif type == "artist":
async with self.session.get(f"{request_url}/view/top-songs", headers=self.headers) as resp:
if resp.status != 200:
raise AppleMusicRequestException(
f"Error while fetching results: {resp.status} {resp.reason}"
)
top_tracks: dict = await resp.json(loads=json.loads)
tracks: dict = top_tracks["data"]
return Artist(data, tracks=tracks)
else:
track_data: dict = data["relationships"]["tracks"]
tracks = [Song(track) for track in track_data.get("data")]
if not len(tracks):
raise AppleMusicRequestException("This playlist is empty and therefore cannot be queued.")
if track_data.get("next"):
next_page_url = AM_BASE_URL + track_data.get("next")
while next_page_url is not None:
async with self.session.get(next_page_url, headers=self.headers) as resp:
if resp.status != 200:
raise AppleMusicRequestException(
f"Error while fetching results: {resp.status} {resp.reason}"
)
next_data: dict = await resp.json(loads=json.loads)
tracks += [Song(track) for track in next_data["data"]]
if next_data.get("next"):
next_page_url = AM_BASE_URL + next_data.get("next")
else:
next_page_url = None
return Playlist(data, tracks)

View File

@ -1,8 +0,0 @@
class AppleMusicRequestException(Exception):
"""An error occurred when making a request to the Apple Music API"""
pass
class InvalidAppleMusicURL(Exception):
"""An invalid Apple Music URL was passed"""
pass

View File

@ -1,87 +0,0 @@
"""Module for managing Apple Music objects"""
from typing import List
class Song:
"""The base class for an Apple Music song"""
def __init__(self, data: dict) -> None:
self.name: str = data["attributes"]["name"]
self.url: str = data["attributes"]["url"]
self.isrc: str = data["attributes"]["isrc"]
self.length: float = data["attributes"]["durationInMillis"]
self.id: str = data["id"]
self.artists: str = data["attributes"]["artistName"]
self.image: str = data["attributes"]["artwork"]["url"].replace(
"{w}x{h}",
f'{data["attributes"]["artwork"]["width"]}x{data["attributes"]["artwork"]["height"]}'
)
def __repr__(self) -> str:
return (
f"<Pomice.applemusic.Song name={self.name} artists={self.artists} "
f"length={self.length} id={self.id} isrc={self.isrc}>"
)
class Playlist:
"""The base class for an Apple Music playlist"""
def __init__(self, data: dict, tracks: List[Song]) -> None:
self.name: str = data["attributes"]["name"]
self.owner: str = data["attributes"]["curatorName"]
self.id: str = data["id"]
self.tracks: List[Song] = tracks
self.total_tracks: int = len(tracks)
self.url: str = data["attributes"]["url"]
# we'll use the first song's image as the image for the playlist
# because apple dynamically generates playlist covers client-side
self.image = self.tracks[0].image
def __repr__(self) -> str:
return (
f"<Pomice.applemusic.Playlist name={self.name} owner={self.owner} id={self.id} "
f"total_tracks={self.total_tracks} tracks={self.tracks}>"
)
class Album:
"""The base class for an Apple Music album"""
def __init__(self, data: dict) -> None:
self.name: str = data["attributes"]["name"]
self.url: str = data["attributes"]["url"]
self.id: str = data["id"]
self.artists: str = data["attributes"]["artistName"]
self.total_tracks: int = data["attributes"]["trackCount"]
self.tracks: List[Song] = [Song(track) for track in data["relationships"]["tracks"]["data"]]
self.image: str = data["attributes"]["artwork"]["url"].replace(
"{w}x{h}",
f'{data["attributes"]["artwork"]["width"]}x{data["attributes"]["artwork"]["height"]}'
)
def __repr__(self) -> str:
return (
f"<Pomice.applemusic.Album name={self.name} artists={self.artists} id={self.id} "
f"total_tracks={self.total_tracks} tracks={self.tracks}>"
)
class Artist:
"""The base class for an Apple Music artist"""
def __init__(self, data: dict, tracks: dict) -> None:
self.name: str = f'Top tracks for {data["attributes"]["name"]}'
self.url: str = data["attributes"]["url"]
self.id: str = data["id"]
self.genres: str = ", ".join(genre for genre in data["attributes"]["genreNames"])
self.tracks: List[Song] = [Song(track) for track in tracks]
self.image: str = data["attributes"]["artwork"]["url"].replace(
"{w}x{h}",
f'{data["attributes"]["artwork"]["width"]}x{data["attributes"]["artwork"]["height"]}'
)
def __repr__(self) -> str:
return (
f"<Pomice.applemusic.Artist name={self.name} id={self.id} "
f"tracks={self.tracks}>"
)

View File

@ -1,260 +0,0 @@
import re
from enum import Enum
class SearchType(Enum):
"""
The enum for the different search types for Pomice.
This feature is exclusively for the Spotify search feature of Pomice.
If you are not using this feature, this class is not necessary.
SearchType.ytsearch searches using regular Youtube,
which is best for all scenarios.
SearchType.ytmsearch searches using YouTube Music,
which is best for getting audio-only results.
SearchType.scsearch searches using SoundCloud,
which is an alternative to YouTube or YouTube Music.
"""
ytsearch = "ytsearch"
ytmsearch = "ytmsearch"
scsearch = "scsearch"
def __str__(self) -> str:
return self.value
class TrackType(Enum):
"""
The enum for the different track types for Pomice.
TrackType.YOUTUBE defines that the track is from YouTube
TrackType.SOUNDCLOUD defines that the track is from SoundCloud.
TrackType.SPOTIFY defines that the track is from Spotify
TrackType.APPLE_MUSIC defines that the track is from Apple Music.
TrackType.HTTP defines that the track is from an HTTP source.
"""
# We don't have to define anything special for these, since these just serve as flags
YOUTUBE = "youtube_track"
SOUNDCLOUD = "soundcloud_track"
SPOTIFY = "spotify_track"
APPLE_MUSIC = "apple_music_track"
HTTP = "http_source"
def __str__(self) -> str:
return self.value
class PlaylistType(Enum):
"""
The enum for the different playlist types for Pomice.
PlaylistType.YOUTUBE defines that the playlist is from YouTube
PlaylistType.SOUNDCLOUD defines that the playlist is from SoundCloud.
PlaylistType.SPOTIFY defines that the playlist is from Spotify
PlaylistType.APPLE_MUSIC defines that the playlist is from Apple Music.
"""
# We don't have to define anything special for these, since these just serve as flags
YOUTUBE = "youtube_playlist"
SOUNDCLOUD = "soundcloud_playlist"
SPOTIFY = "spotify_playlist"
APPLE_MUSIC = "apple_music_list"
def __str__(self) -> str:
return self.value
class NodeAlgorithm(Enum):
"""
The enum for the different node algorithms in Pomice.
The enums in this class are to only differentiate different
methods, since the actual method is handled in the
get_best_node() method.
NodeAlgorithm.by_ping returns a node based on it's latency,
preferring a node with the lowest response time
NodeAlgorithm.by_players return a nodes based on how many players it has.
This algorithm prefers nodes with the least amount of players.
"""
# We don't have to define anything special for these, since these just serve as flags
by_ping = "BY_PING"
by_players = "BY_PLAYERS"
def __str__(self) -> str:
return self.value
class LoopMode(Enum):
"""
The enum for the different loop modes.
This feature is exclusively for the queue utility of pomice.
If you are not using this feature, this class is not necessary.
LoopMode.TRACK sets the queue loop to the current track.
LoopMode.QUEUE sets the queue loop to the whole queue.
"""
# We don't have to define anything special for these, since these just serve as flags
TRACK = "track"
QUEUE = "queue"
def __str__(self) -> str:
return self.value
class PlatformRecommendation(Enum):
"""
The enum for choosing what platform you want for recommendations.
This feature is exclusively for the recommendations function.
If you are not using this feature, this class is not necessary.
PlatformRecommendation.SPOTIFY sets the recommendations to come from Spotify
PlatformRecommendation.YOUTUBE sets the recommendations to come from YouTube
"""
# We don't have to define anything special for these, since these just serve as flags
SPOTIFY = "spotify"
YOUTUBE = "youtube"
def __str__(self) -> str:
return self.value
class RouteStrategy(Enum):
"""
The enum for specifying the route planner strategy for Lavalink.
This feature is exclusively for the RoutePlanner class.
If you are not using this feature, this class is not necessary.
RouteStrategy.ROTATE_ON_BAN specifies that the node is rotating IPs
whenever they get banned by Youtube.
RouteStrategy.LOAD_BALANCE specifies that the node is selecting
random IPs to balance out requests between them.
RouteStrategy.NANO_SWITCH specifies that the node is switching
between IPs every CPU clock cycle.
RouteStrategy.ROTATING_NANO_SWITCH specifies that the node is switching
between IPs every CPU clock cycle and is rotating between IP blocks on
ban.
"""
ROTATE_ON_BAN = "RotatingIpRoutePlanner"
LOAD_BALANCE = "BalancingIpRoutePlanner"
NANO_SWITCH = "NanoIpRoutePlanner"
ROTATING_NANO_SWITCH = "RotatingNanoIpRoutePlanner"
class RouteIPType(Enum):
"""
The enum for specifying the route planner IP block type for Lavalink.
This feature is exclusively for the RoutePlanner class.
If you are not using this feature, this class is not necessary.
RouteIPType.IPV4 specifies that the IP block type is IPV4
RouteIPType.IPV6 specifies that the IP block type is IPV6
"""
IPV4 = "Inet4Address"
IPV6 = "Inet6Address"
class URLRegex():
"""
The enums for all the URL Regexes in use by Pomice.
URLRegex.SPOTIFY_URL returns the Spotify URL Regex.
URLRegex.DISCORD_MP3_URL returns the Discord MP3 URL Regex.
URLRegex.YOUTUBE_URL returns the Youtube URL Regex.
URLRegex.YOUTUBE_PLAYLIST returns the Youtube Playlist Regex.
URLRegex.YOUTUBE_TIMESTAMP returns the Youtube Timestamp Regex.
URLRegex.AM_URL returns the Apple Music URL Regex.
URLRegex.SOUNDCLOUD_URL returns the SoundCloud URL Regex.
URLRegex.BASE_URL returns the standard URL Regex.
"""
SPOTIFY_URL = re.compile(
r"https?://open.spotify.com/(?P<type>album|playlist|track|artist)/(?P<id>[a-zA-Z0-9]+)"
)
DISCORD_MP3_URL = re.compile(
r"https?://cdn.discordapp.com/attachments/(?P<channel_id>[0-9]+)/"
r"(?P<message_id>[0-9]+)/(?P<file>[a-zA-Z0-9_.]+)+"
)
YOUTUBE_URL = re.compile(
r"^((?:https?:)?\/\/)?((?:www|m)\.)?((?:youtube\.com|youtu.be))"
r"(\/(?:[\w\-]+\?v=|embed\/|v\/)?)([\w\-]+)(\S+)?$"
)
YOUTUBE_PLAYLIST_URL = re.compile(
r"^((?:https?:)?\/\/)?((?:www|m)\.)?((?:youtube\.com|youtu.be))/playlist\?list=.*"
)
YOUTUBE_VID_IN_PLAYLIST = re.compile(
r"(?P<video>^.*?v.*?)(?P<list>&list.*)"
)
YOUTUBE_TIMESTAMP = re.compile(
r"(?P<video>^.*?)(\?t|&start)=(?P<time>\d+)?.*"
)
AM_URL = re.compile(
r"https?://music.apple.com/(?P<country>[a-zA-Z]{2})/"
r"(?P<type>album|playlist|song|artist)/(?P<name>.+)/(?P<id>[^?]+)"
)
AM_SINGLE_IN_ALBUM_REGEX = re.compile(
r"https?://music.apple.com/(?P<country>[a-zA-Z]{2})/(?P<type>album|playlist|song|artist)/"
r"(?P<name>.+)/(?P<id>.+)(\?i=)(?P<id2>.+)"
)
SOUNDCLOUD_URL = re.compile(
r"((?:https?:)?\/\/)?((?:www|m)\.)?soundcloud.com\/.*/.*"
)
SOUNDCLOUD_PLAYLIST_URL = re.compile(
r"^(https?:\/\/)?(www.)?(m\.)?soundcloud\.com\/.*/sets/.*"
)
SOUNDCLOUD_TRACK_IN_SET_URL = re.compile(
r"^(https?:\/\/)?(www.)?(m\.)?soundcloud\.com/[a-zA-Z0-9-._]+/[a-zA-Z0-9-._]+(\?in)"
)
LAVALINK_SEARCH = re.compile(
r"(?P<type>ytm?|sc)search:"
)
BASE_URL = re.compile(
r"https?://(?:www\.)?.+"
)

View File

@ -1,154 +0,0 @@
from __future__ import annotations
from discord import Client
from discord.ext import commands
from .pool import NodePool
from typing import TYPE_CHECKING, Union
if TYPE_CHECKING:
from .player import Player
class PomiceEvent:
"""The base class for all events dispatched by a node.
Every event must be formatted within your bot's code as a listener.
i.e: If you want to listen for when a track starts, the event would be:
```py
@bot.listen
async def on_pomice_track_start(self, event):
```
"""
name = "event"
handler_args = ()
def dispatch(self, bot: Union[Client, commands.Bot]):
bot.dispatch(f"pomice_{self.name}", *self.handler_args)
class TrackStartEvent(PomiceEvent):
"""Fired when a track has successfully started.
Returns the player associated with the event and the pomice.Track object.
"""
name = "track_start"
def __init__(self, data: dict, player: Player):
self.player = player
self.track = self.player._current
# on_pomice_track_start(player, track)
self.handler_args = self.player, self.track
def __repr__(self) -> str:
return f"<Pomice.TrackStartEvent player={self.player} track_id={self.track.track_id}>"
class TrackEndEvent(PomiceEvent):
"""Fired when a track has successfully ended.
Returns the player associated with the event along with the pomice.Track object and reason.
"""
name = "track_end"
def __init__(self, data: dict, player: Player):
self.player = player
self.track = self.player._ending_track
self.reason: str = data["reason"]
# on_pomice_track_end(player, track, reason)
self.handler_args = self.player, self.track, self.reason
def __repr__(self) -> str:
return (
f"<Pomice.TrackEndEvent player={self.player} track_id={self.track.track_id} "
f"reason={self.reason}>"
)
class TrackStuckEvent(PomiceEvent):
"""Fired when a track is stuck and cannot be played. Returns the player
associated with the event along with the pomice.Track object
to be further parsed by the end user.
"""
name = "track_stuck"
def __init__(self, data: dict, player: Player):
self.player = player
self.track = self.player._ending_track
self.threshold: float = data["thresholdMs"]
# on_pomice_track_stuck(player, track, threshold)
self.handler_args = self.player, self.track, self.threshold
def __repr__(self) -> str:
return f"<Pomice.TrackStuckEvent player={self.player!r} track={self.track!r} " \
f"threshold={self.threshold!r}>"
class TrackExceptionEvent(PomiceEvent):
"""Fired when a track error has occured.
Returns the player associated with the event along with the error code and exception.
"""
name = "track_exception"
def __init__(self, data: dict, player: Player):
self.player = player
self.track = self.player._ending_track
if data.get('error'):
# User is running Lavalink <= 3.3
self.exception: str = data["error"]
else:
# User is running Lavalink >=3.4
self.exception: str = data["exception"]
# on_pomice_track_exception(player, track, error)
self.handler_args = self.player, self.track, self.exception
def __repr__(self) -> str:
return f"<Pomice.TrackExceptionEvent player={self.player!r} exception={self.exception!r}>"
class WebSocketClosedPayload:
def __init__(self, data: dict):
self.guild = NodePool.get_node().bot.get_guild(int(data["guildId"]))
self.code: int = data["code"]
self.reason: str = data["code"]
self.by_remote: bool = data["byRemote"]
def __repr__(self) -> str:
return f"<Pomice.WebSocketClosedPayload guild={self.guild!r} code={self.code!r} " \
f"reason={self.reason!r} by_remote={self.by_remote!r}>"
class WebSocketClosedEvent(PomiceEvent):
"""Fired when a websocket connection to a node has been closed.
Returns the reason and the error code.
"""
name = "websocket_closed"
def __init__(self, data: dict, _):
self.payload = WebSocketClosedPayload(data)
# on_pomice_websocket_closed(payload)
self.handler_args = self.payload,
def __repr__(self) -> str:
return f"<Pomice.WebsocketClosedEvent payload={self.payload!r}>"
class WebSocketOpenEvent(PomiceEvent):
"""Fired when a websocket connection to a node has been initiated.
Returns the target and the session SSRC.
"""
name = "websocket_open"
def __init__(self, data: dict, _):
self.target: str = data["target"]
self.ssrc: int = data["ssrc"]
# on_pomice_websocket_open(target, ssrc)
self.handler_args = self.target, self.ssrc
def __repr__(self) -> str:
return f"<Pomice.WebsocketOpenEvent target={self.target!r} ssrc={self.ssrc!r}>"

View File

@ -1,98 +0,0 @@
class PomiceException(Exception):
"""Base of all Pomice exceptions."""
class NodeException(Exception):
"""Base exception for nodes."""
class NodeCreationError(NodeException):
"""There was a problem while creating the node."""
class NodeConnectionFailure(NodeException):
"""There was a problem while connecting to the node."""
class NodeConnectionClosed(NodeException):
"""The node's connection is closed."""
pass
class NodeRestException(NodeException):
"""A request made using the node's REST uri failed"""
pass
class NodeNotAvailable(PomiceException):
"""The node is currently unavailable."""
pass
class NoNodesAvailable(PomiceException):
"""There are no nodes currently available."""
pass
class TrackInvalidPosition(PomiceException):
"""An invalid position was chosen for a track."""
pass
class TrackLoadError(PomiceException):
"""There was an error while loading a track."""
pass
class FilterInvalidArgument(PomiceException):
"""An invalid argument was passed to a filter."""
pass
class FilterTagInvalid(PomiceException):
"""An invalid tag was passed or Pomice was unable to find a filter tag"""
pass
class FilterTagAlreadyInUse(PomiceException):
"""A filter with a tag is already in use by another filter"""
pass
class SpotifyAlbumLoadFailed(PomiceException):
"""The pomice Spotify client was unable to load an album."""
pass
class SpotifyTrackLoadFailed(PomiceException):
"""The pomice Spotify client was unable to load a track."""
pass
class SpotifyPlaylistLoadFailed(PomiceException):
"""The pomice Spotify client was unable to load a playlist."""
pass
class InvalidSpotifyClientAuthorization(PomiceException):
"""No Spotify client authorization was provided for track searching."""
pass
class AppleMusicNotEnabled(PomiceException):
"""An Apple Music Link was passed in when Apple Music functionality was not enabled."""
pass
class QueueException(Exception):
"""Base Pomice queue exception."""
pass
class QueueFull(QueueException):
"""Exception raised when attempting to add to a full Queue."""
pass
class QueueEmpty(QueueException):
"""Exception raised when attempting to retrieve from an empty Queue."""
pass
class LavalinkVersionIncompatible(PomiceException):
"""Lavalink version is incompatible. Must be using Lavalink > 3.7.0 to avoid this error."""
pass

View File

@ -1,393 +0,0 @@
import collections
from .exceptions import FilterInvalidArgument
class Filter:
"""
The base class for all filters.
You can use these filters if you have the latest Lavalink version
installed. If you do not have the latest Lavalink version,
these filters will not work.
You must specify a tag for each filter you put on.
This is necessary for the removal of filters.
"""
def __init__(self):
self.payload = None
self.tag: str = None
self.preload: bool = False
def set_preload(self) -> bool:
"""Internal method to set whether or not the filter was preloaded."""
self.preload = True
return self.preload
class Equalizer(Filter):
"""
Filter which represents a 15 band equalizer.
You can adjust the dynamic of the sound using this filter.
i.e: Applying a bass boost filter to emphasize the bass in a song.
The format for the levels is: List[Tuple[int, float]]
"""
def __init__(self, *, tag: str, levels: list):
super().__init__()
self.eq = self._factory(levels)
self.raw = levels
self.payload = {"equalizer": self.eq}
self.tag = tag
def _factory(self, levels: list):
_dict = collections.defaultdict(int)
_dict.update(levels)
_dict = [{"band": i, "gain": _dict[i]} for i in range(15)]
return _dict
def __repr__(self) -> str:
return f"<Pomice.EqualizerFilter tag={self.tag} eq={self.eq} raw={self.raw}>"
@classmethod
def flat(cls):
"""Equalizer preset which represents a flat EQ board,
with all levels set to their default values.
"""
levels = [
(0, 0.0), (1, 0.0), (2, 0.0), (3, 0.0), (4, 0.0),
(5, 0.0), (6, 0.0), (7, 0.0), (8, 0.0), (9, 0.0),
(10, 0.0), (11, 0.0), (12, 0.0), (13, 0.0), (14, 0.0)
]
return cls(tag="flat", levels=levels)
@classmethod
def boost(cls):
"""Equalizer preset which boosts the sound of a track,
making it sound fun and energetic by increasing the bass
and the highs.
"""
levels = [
(0, -0.075), (1, 0.125), (2, 0.125), (3, 0.1), (4, 0.1),
(5, .05), (6, 0.075), (7, 0.0), (8, 0.0), (9, 0.0),
(10, 0.0), (11, 0.0), (12, 0.125), (13, 0.15), (14, 0.05)
]
return cls(tag="boost", levels=levels)
@classmethod
def metal(cls):
"""Equalizer preset which increases the mids of a track,
preferably one of the metal genre, to make it sound
more full and concert-like.
"""
levels = [
(0, 0.0), (1, 0.1), (2, 0.1), (3, 0.15), (4, 0.13),
(5, 0.1), (6, 0.0), (7, 0.125), (8, 0.175), (9, 0.175),
(10, 0.125), (11, 0.125), (12, 0.1), (13, 0.075), (14, 0.0)
]
return cls(tag="metal", levels=levels)
@classmethod
def piano(cls):
"""Equalizer preset which increases the mids and highs
of a track, preferably a piano based one, to make it
stand out.
"""
levels = [
(0, -0.25), (1, -0.25), (2, -0.125), (3, 0.0),
(4, 0.25), (5, 0.25), (6, 0.0), (7, -0.25), (8, -0.25),
(9, 0.0), (10, 0.0), (11, 0.5), (12, 0.25), (13, -0.025)
]
return cls(tag="piano", levels=levels)
class Timescale(Filter):
"""Filter which changes the speed and pitch of a track.
You can make some very nice effects with this filter,
i.e: a vaporwave-esque filter which slows the track down
a certain amount to produce said effect.
"""
def __init__(
self,
*,
tag: str,
speed: float = 1.0,
pitch: float = 1.0,
rate: float = 1.0
):
super().__init__()
if speed < 0:
raise FilterInvalidArgument("Timescale speed must be more than 0.")
if pitch < 0:
raise FilterInvalidArgument("Timescale pitch must be more than 0.")
if rate < 0:
raise FilterInvalidArgument("Timescale rate must be more than 0.")
self.speed = speed
self.pitch = pitch
self.rate = rate
self.tag = tag
self.payload = {"timescale": {"speed": self.speed,
"pitch": self.pitch,
"rate": self.rate}}
@classmethod
def vaporwave(cls):
"""Timescale preset which slows down the currently playing track,
giving it the effect of a half-speed record/casette playing.
This preset will assign the tag 'vaporwave'.
"""
return cls(tag="vaporwave", speed=0.8, pitch=0.8)
@classmethod
def nightcore(cls):
"""Timescale preset which speeds up the currently playing track,
which matches up to nightcore, a genre of sped-up music
This preset will assign the tag 'nightcore'.
"""
return cls(tag="nightcore", speed=1.25, pitch=1.3)
def __repr__(self):
return f"<Pomice.TimescaleFilter tag={self.tag} speed={self.speed} pitch={self.pitch} rate={self.rate}>"
class Karaoke(Filter):
"""Filter which filters the vocal track from any song and leaves the instrumental.
Best for karaoke as the filter implies.
"""
def __init__(
self,
*,
tag: str,
level: float = 1.0,
mono_level: float = 1.0,
filter_band: float = 220.0,
filter_width: float = 100.0
):
super().__init__()
self.level = level
self.mono_level = mono_level
self.filter_band = filter_band
self.filter_width = filter_width
self.tag = tag
self.payload = {"karaoke": {"level": self.level,
"monoLevel": self.mono_level,
"filterBand": self.filter_band,
"filterWidth": self.filter_width}}
def __repr__(self):
return (
f"<Pomice.KaraokeFilter tag={self.tag} level={self.level} mono_level={self.mono_level} "
f"filter_band={self.filter_band} filter_width={self.filter_width}>"
)
class Tremolo(Filter):
"""Filter which produces a wavering tone in the music,
causing it to sound like the music is changing in volume rapidly.
"""
def __init__(
self,
*,
tag: str,
frequency: float = 2.0,
depth: float = 0.5
):
super().__init__()
if frequency < 0:
raise FilterInvalidArgument(
"Tremolo frequency must be more than 0.")
if depth < 0 or depth > 1:
raise FilterInvalidArgument(
"Tremolo depth must be between 0 and 1.")
self.frequency = frequency
self.depth = depth
self.tag = tag
self.payload = {"tremolo": {"frequency": self.frequency,
"depth": self.depth}}
def __repr__(self):
return f"<Pomice.TremoloFilter tag={self.tag} frequency={self.frequency} depth={self.depth}>"
class Vibrato(Filter):
"""Filter which produces a wavering tone in the music, similar to the Tremolo filter,
but changes in pitch rather than volume.
"""
def __init__(
self,
*,
tag: str,
frequency: float = 2.0,
depth: float = 0.5
):
super().__init__()
if frequency < 0 or frequency > 14:
raise FilterInvalidArgument(
"Vibrato frequency must be between 0 and 14.")
if depth < 0 or depth > 1:
raise FilterInvalidArgument(
"Vibrato depth must be between 0 and 1.")
self.frequency = frequency
self.depth = depth
self.tag = tag
self.payload = {"vibrato": {"frequency": self.frequency,
"depth": self.depth}}
def __repr__(self):
return f"<Pomice.VibratoFilter tag={self.tag} frequency={self.frequency} depth={self.depth}>"
class Rotation(Filter):
"""Filter which produces a stereo-like panning effect, which sounds like
the audio is being rotated around the listener's head
"""
def __init__(self, *, tag: str, rotation_hertz: float = 5):
super().__init__()
self.rotation_hertz = rotation_hertz
self.tag = tag
self.payload = {"rotation": {"rotationHz": self.rotation_hertz}}
def __repr__(self) -> str:
return f"<Pomice.RotationFilter tag={self.tag} rotation_hertz={self.rotation_hertz}>"
class ChannelMix(Filter):
"""Filter which manually adjusts the panning of the audio, which can make
for some cool effects when done correctly.
"""
def __init__(
self,
*,
tag: str,
left_to_left: float = 1,
right_to_right: float = 1,
left_to_right: float = 0,
right_to_left: float = 0
):
super().__init__()
if 0 > left_to_left > 1:
raise ValueError(
"'left_to_left' value must be more than or equal to 0 or less than or equal to 1.")
if 0 > right_to_right > 1:
raise ValueError(
"'right_to_right' value must be more than or equal to 0 or less than or equal to 1.")
if 0 > left_to_right > 1:
raise ValueError(
"'left_to_right' value must be more than or equal to 0 or less than or equal to 1.")
if 0 > right_to_left > 1:
raise ValueError(
"'right_to_left' value must be more than or equal to 0 or less than or equal to 1.")
self.left_to_left = left_to_left
self.left_to_right = left_to_right
self.right_to_left = right_to_left
self.right_to_right = right_to_right
self.tag = tag
self.payload = {"channelMix": {"leftToLeft": self.left_to_left,
"leftToRight": self.left_to_right,
"rightToLeft": self.right_to_left,
"rightToRight": self.right_to_right}
}
def __repr__(self) -> str:
return (
f"<Pomice.ChannelMix tag={self.tag} left_to_left={self.left_to_left} left_to_right={self.left_to_right} "
f"right_to_left={self.right_to_left} right_to_right={self.right_to_right}>"
)
class Distortion(Filter):
"""Filter which generates a distortion effect. Useful for certain filter implementations where
distortion is needed.
"""
def __init__(
self,
*,
tag: str,
sin_offset: float = 0,
sin_scale: float = 1,
cos_offset: float = 0,
cos_scale: float = 1,
tan_offset: float = 0,
tan_scale: float = 1,
offset: float = 0,
scale: float = 1
):
super().__init__()
self.sin_offset = sin_offset
self.sin_scale = sin_scale
self.cos_offset = cos_offset
self.cos_scale = cos_scale
self.tan_offset = tan_offset
self.tan_scale = tan_scale
self.offset = offset
self.scale = scale
self.tag = tag
self.payload = {"distortion": {
"sinOffset": self.sin_offset,
"sinScale": self.sin_scale,
"cosOffset": self.cos_offset,
"cosScale": self.cos_scale,
"tanOffset": self.tan_offset,
"tanScale": self.tan_scale,
"offset": self.offset,
"scale": self.scale
}}
def __repr__(self) -> str:
return (
f"<Pomice.Distortion tag={self.tag} sin_offset={self.sin_offset} sin_scale={self.sin_scale}> "
f"cos_offset={self.cos_offset} cos_scale={self.cos_scale} tan_offset={self.tan_offset} "
f"tan_scale={self.tan_scale} offset={self.offset} scale={self.scale}"
)
class LowPass(Filter):
"""Filter which supresses higher frequencies and allows lower frequencies to pass.
You can also do this with the Equalizer filter, but this is an easier way to do it.
"""
def __init__(self, *, tag: str, smoothing: float = 20):
super().__init__()
self.smoothing = smoothing
self.tag = tag
self.payload = {"lowPass": {"smoothing": self.smoothing}}
def __repr__(self) -> str:
return f"<Pomice.LowPass tag={self.tag} smoothing={self.smoothing}>"

View File

@ -1,136 +0,0 @@
from __future__ import annotations
from typing import List, Optional, Union
from discord import Member, User
from discord.ext import commands
from .enums import SearchType, TrackType, PlaylistType
from .filters import Filter
from . import (
spotify,
applemusic
)
class Track:
"""The base track object. Returns critical track information needed for parsing by Lavalink.
You can also pass in commands.Context to get a discord.py Context object in your track.
"""
def __init__(
self,
*,
track_id: str,
info: dict,
ctx: Optional[commands.Context] = None,
track_type: TrackType,
search_type: SearchType = SearchType.ytsearch,
filters: Optional[List[Filter]] = None,
timestamp: Optional[float] = None,
requester: Optional[Union[Member, User]] = None,
):
self.track_id = track_id
self.info = info
self.track_type: TrackType = track_type
self.filters: Optional[List[Filter]] = filters
self.timestamp: Optional[float] = timestamp
if self.track_type == TrackType.SPOTIFY or self.track_type == TrackType.APPLE_MUSIC:
self.original: Optional[Track] = None
else:
self.original = self
self._search_type = search_type
self.playlist: Playlist = None
self.title = info.get("title")
self.author = info.get("author")
self.uri = info.get("uri")
self.identifier = info.get("identifier")
self.isrc = info.get("isrc")
if self.uri:
if info.get("thumbnail"):
self.thumbnail = info.get("thumbnail")
elif self.track_type == TrackType.SOUNDCLOUD:
# ok so theres no feasible way of getting a Soundcloud image URL
# so we're just gonna leave it blank for brevity
self.thumbnail = None
else:
self.thumbnail = f"https://img.youtube.com/vi/{self.identifier}/mqdefault.jpg"
self.length = info.get("length")
self.ctx = ctx
if requester:
self.requester = requester
else:
self.requester = self.ctx.author if ctx else None
self.is_stream = info.get("isStream")
self.is_seekable = info.get("isSeekable")
self.position = info.get("position")
def __eq__(self, other):
if not isinstance(other, Track):
return False
if self.ctx and other.ctx:
return other.track_id == self.track_id and other.ctx.message.id == self.ctx.message.id
return other.track_id == self.track_id
def __str__(self):
return self.title
def __repr__(self):
return f"<Pomice.track title={self.title!r} uri=<{self.uri!r}> length={self.length}>"
class Playlist:
"""The base playlist object.
Returns critical playlist information needed for parsing by Lavalink.
You can also pass in commands.Context to get a discord.py Context object in your tracks.
"""
def __init__(
self,
*,
playlist_info: dict,
tracks: list,
playlist_type: PlaylistType,
thumbnail: Optional[str] = None,
uri: Optional[str] = None
):
self.playlist_info = playlist_info
self.tracks: List[Track] = tracks
self.name = playlist_info.get("name")
self.playlist_type = playlist_type
self._thumbnail = thumbnail
self._uri = uri
for track in self.tracks:
track.playlist = self
if (index := playlist_info.get("selectedTrack")) == -1:
self.selected_track = None
else:
self.selected_track = self.tracks[index]
self.track_count = len(self.tracks)
def __str__(self):
return self.name
def __repr__(self):
return f"<Pomice.playlist name={self.name!r} track_count={len(self.tracks)}>"
@property
def uri(self) -> Optional[str]:
"""Returns either an Apple Music/Spotify URL/URI, or None if its neither of those."""
return self._uri
@property
def thumbnail(self) -> Optional[str]:
"""Returns either an Apple Music/Spotify album/playlist thumbnail, or None if its neither of those."""
return self._thumbnail

View File

@ -1,520 +0,0 @@
import time
from typing import (
Any,
Dict,
List,
Optional,
Union
)
from discord import (
Client,
Guild,
VoiceChannel,
VoiceProtocol
)
from discord.ext import commands
from . import events
from .enums import SearchType, PlatformRecommendation
from .events import PomiceEvent, TrackEndEvent, TrackStartEvent
from .exceptions import FilterInvalidArgument, FilterTagAlreadyInUse, FilterTagInvalid, TrackInvalidPosition, TrackLoadError
from .filters import Filter
from .objects import Track, Playlist
from .pool import Node, NodePool
class Filters:
"""Helper class for filters"""
def __init__(self):
self._filters: List[Filter] = []
@property
def has_preload(self):
"""Property which checks if any applied filters were preloaded"""
return any(f for f in self._filters if f.preload == True)
@property
def has_global(self):
"""Property which checks if any applied filters are global"""
return any(f for f in self._filters if f.preload == False)
@property
def empty(self):
"""Property which checks if the filter list is empty"""
return len(self._filters) == 0
def add_filter(self, *, filter: Filter):
"""Adds a filter to the list of filters applied"""
if any(f for f in self._filters if f.tag == filter.tag):
raise FilterTagAlreadyInUse(
"A filter with that tag is already in use."
)
self._filters.append(filter)
def remove_filter(self, *, filter_tag: str):
"""Removes a filter from the list of filters applied using its filter tag"""
if not any(f for f in self._filters if f.tag == filter_tag):
raise FilterTagInvalid(
"A filter with that tag was not found."
)
for index, filter in enumerate(self._filters):
if filter.tag == filter_tag:
del self._filters[index]
def has_filter(self, *, filter_tag: str):
"""Checks if a filter exists in the list of filters using its filter tag"""
return any(f for f in self._filters if f.tag == filter_tag)
def reset_filters(self):
"""Removes all filters from the list"""
self._filters = []
def get_preload_filters(self):
"""Get all preloaded filters"""
return [f for f in self._filters if f.preload == True]
def get_all_payloads(self):
"""Returns a formatted dict of all the filter payloads"""
payload = {}
for filter in self._filters:
payload.update(filter.payload)
return payload
def get_filters(self):
"""Returns the current list of applied filters"""
return self._filters
class Player(VoiceProtocol):
"""The base player class for Pomice.
In order to initiate a player, you must pass it in as a cls when you connect to a channel.
i.e: ```py
await ctx.author.voice.channel.connect(cls=pomice.Player)
```
"""
def __call__(self, client: Client, channel: VoiceChannel):
self.client: Client = client
self.channel: VoiceChannel = channel
self._guild: Guild = channel.guild
return self
def __init__(
self,
client: Optional[Client] = None,
channel: Optional[VoiceChannel] = None,
*,
node: Node = None
):
self.client = client
self._bot: Union[Client, commands.Bot] = client
self.channel = channel
self._guild = channel.guild if channel else None
self._node = node if node else NodePool.get_node()
self._current: Track = None
self._filters: Filters = Filters()
self._volume = 100
self._paused = False
self._is_connected = False
self._position = 0
self._last_position = 0
self._last_update = 0
self._ending_track: Optional[Track] = None
self._voice_state = {}
self._player_endpoint_uri = f'sessions/{self._node._session_id}/players'
def __repr__(self):
return (
f"<Pomice.player bot={self.bot} guildId={self.guild.id} "
f"is_connected={self.is_connected} is_playing={self.is_playing}>"
)
@property
def position(self) -> float:
"""Property which returns the player's position in a track in milliseconds"""
current = self._current.original
if not self.is_playing or not self._current:
return 0
if self.is_paused:
return min(self._last_position, current.length)
difference = (time.time() * 1000) - self._last_update
position = self._last_position + difference
if position > current.length:
return 0
return min(position, current.length)
@property
def is_playing(self) -> bool:
"""Property which returns whether or not the player is actively playing a track."""
return self._is_connected and self._current is not None
@property
def is_connected(self) -> bool:
"""Property which returns whether or not the player is connected"""
return self._is_connected
@property
def is_paused(self) -> bool:
"""Property which returns whether or not the player has a track which is paused or not."""
return self._is_connected and self._paused
@property
def current(self) -> Track:
"""Property which returns the currently playing track"""
return self._current
@property
def node(self) -> Node:
"""Property which returns the node the player is connected to"""
return self._node
@property
def guild(self) -> Guild:
"""Property which returns the guild associated with the player"""
return self._guild
@property
def volume(self) -> int:
"""Property which returns the players current volume"""
return self._volume
@property
def filters(self) -> Filters:
"""Property which returns the helper class for interacting with filters"""
return self._filters
@property
def bot(self) -> Union[Client, commands.Bot]:
"""Property which returns the bot associated with this player instance"""
return self._bot
@property
def is_dead(self) -> bool:
"""Returns a bool representing whether the player is dead or not.
A player is considered dead if it has been destroyed and removed from stored players.
"""
return self.guild.id not in self._node._players
async def _update_state(self, data: dict):
state: dict = data.get("state")
self._last_update = time.time() * 1000
self._is_connected = state.get("connected")
self._last_position = state.get("position")
async def _dispatch_voice_update(self, voice_data: Dict[str, Any]):
if {"sessionId", "event"} != self._voice_state.keys():
return
data = {
"token": voice_data['event']['token'],
"endpoint": voice_data['event']['endpoint'],
"sessionId": voice_data['sessionId'],
}
await self._node.send(
method="PATCH",
path=self._player_endpoint_uri,
guild_id=self._guild.id,
data={"voice": data}
)
async def on_voice_server_update(self, data: dict):
self._voice_state.update({"event": data})
await self._dispatch_voice_update(self._voice_state)
async def on_voice_state_update(self, data: dict):
self._voice_state.update({"sessionId": data.get("session_id")})
if not (channel_id := data.get("channel_id")):
await self.disconnect()
self._voice_state.clear()
return
self.channel = self.guild.get_channel(int(channel_id))
if not data.get("token"):
return
await self._dispatch_voice_update({**self._voice_state, "event": data})
async def _dispatch_event(self, data: dict):
event_type = data.get("type")
event: PomiceEvent = getattr(events, event_type)(data, self)
if isinstance(event, TrackEndEvent) and event.reason != "REPLACED":
self._current = None
event.dispatch(self._bot)
if isinstance(event, TrackStartEvent):
self._ending_track = self._current
async def get_tracks(
self,
query: str,
*,
ctx: Optional[commands.Context] = None,
search_type: SearchType = SearchType.ytsearch,
filters: Optional[List[Filter]] = None
):
"""Fetches tracks from the node's REST api to parse into Lavalink.
If you passed in Spotify API credentials when you created the node,
you can also pass in a Spotify URL of a playlist, album or track and it will be parsed
accordingly.
You can pass in a discord.py Context object to get a
Context object on any track you search.
You may also pass in a List of filters
to be applied to your track once it plays.
"""
return await self._node.get_tracks(query, ctx=ctx, search_type=search_type, filters=filters)
async def get_recommendations(
self,
*,
track: Track,
ctx: Optional[commands.Context] = None
) -> Union[List[Track], None]:
"""
Gets recommendations from either YouTube or Spotify.
You can pass in a discord.py Context object to get a
Context object on all tracks that get recommended.
"""
return await self._node.get_recommendations(track=track, ctx=ctx)
async def connect(self, *, timeout: float, reconnect: bool, self_deaf: bool = False, self_mute: bool = False):
await self.guild.change_voice_state(channel=self.channel, self_deaf=self_deaf, self_mute=self_mute)
self._node._players[self.guild.id] = self
self._is_connected = True
async def stop(self):
"""Stops the currently playing track."""
self._current = None
await self._node.send(
method="PATCH",
path=self._player_endpoint_uri,
guild_id=self._guild.id,
data={'encodedTrack': None}
)
async def disconnect(self, *, force: bool = False):
"""Disconnects the player from voice."""
try:
await self.guild.change_voice_state(channel=None)
finally:
self.cleanup()
self._is_connected = False
self.channel = None
async def destroy(self):
"""Disconnects and destroys the player, and runs internal cleanup."""
try:
await self.disconnect()
except AttributeError:
# 'NoneType' has no attribute '_get_voice_client_key' raised by self.cleanup() ->
# assume we're already disconnected and cleaned up
assert self.channel is None and not self.is_connected
self._node._players.pop(self.guild.id)
await self._node.send(method="DELETE", path=self._player_endpoint_uri, guild_id=self._guild.id)
async def play(
self,
track: Track,
*,
start: int = 0,
end: int = 0,
ignore_if_playing: bool = False
) -> Track:
"""Plays a track. If a Spotify track is passed in, it will be handled accordingly."""
# Make sure we've never searched the track before
if track.original is None:
# First lets try using the tracks ISRC, every track has one (hopefully)
try:
if not track.isrc:
# We have to bare raise here because theres no other way to skip this block feasibly
raise
search: Track = (await self._node.get_tracks(
f"{track._search_type}:{track.isrc}", ctx=track.ctx))[0]
except Exception:
# First method didn't work, lets try just searching it up
try:
search: Track = (await self._node.get_tracks(
f"{track._search_type}:{track.title} - {track.author}", ctx=track.ctx))[0]
except:
# The song wasn't able to be found, raise error
raise TrackLoadError (
"No equivalent track was able to be found."
)
data = {
"encodedTrack": search.track_id,
"position": str(start),
"endTime": str(end)
}
track.original = search
track.track_id = search.track_id
# Set track_id for later lavalink searches
else:
data = {
"encodedTrack": track.track_id,
"position": str(start),
"endTime": str(end)
}
# Lets set the current track before we play it so any
# corresponding events can capture it correctly
self._current = track
# Remove preloaded filters if last track had any
if self.filters.has_preload:
for filter in self.filters.get_preload_filters():
await self.remove_filter(filter_tag=filter.tag)
# Global filters take precedence over track filters
# So if no global filters are detected, lets apply any
# necessary track filters
# Check if theres no global filters and if the track has any filters
# that need to be applied
if track.filters and not self.filters.has_global:
# Now apply all filters
for filter in track.filters:
await self.add_filter(filter=filter)
if end > 0:
data["endTime"] = str(end)
await self._node.send(
method="PATCH",
path=self._player_endpoint_uri,
guild_id=self._guild.id,
data=data,
query=f"noReplace={ignore_if_playing}"
)
return self._current
async def seek(self, position: float) -> float:
"""Seeks to a position in the currently playing track milliseconds"""
if position < 0 or position > self._current.original.length:
raise TrackInvalidPosition(
"Seek position must be between 0 and the track length"
)
await self._node.send(
method="PATCH",
path=self._player_endpoint_uri,
guild_id=self._guild.id,
data={"position": position}
)
return self._position
async def set_pause(self, pause: bool) -> bool:
"""Sets the pause state of the currently playing track."""
await self._node.send(
method="PATCH",
path=self._player_endpoint_uri,
guild_id=self._guild.id,
data={"paused": pause}
)
self._paused = pause
return self._paused
async def set_volume(self, volume: int) -> int:
"""Sets the volume of the player as an integer. Lavalink accepts values from 0 to 500."""
await self._node.send(
method="PATCH",
path=self._player_endpoint_uri,
guild_id=self._guild.id,
data={"volume": volume}
)
self._volume = volume
return self._volume
async def add_filter(self, filter: Filter, fast_apply: bool = False) -> Filter:
"""Adds a filter to the player. Takes a pomice.Filter object.
This will only work if you are using a version of Lavalink that supports filters.
If you would like for the filter to apply instantly, set the `fast_apply` arg to `True`.
(You must have a song playing in order for `fast_apply` to work.)
"""
self._filters.add_filter(filter=filter)
payload = self._filters.get_all_payloads()
await self._node.send(
method="PATCH",
path=self._player_endpoint_uri,
guild_id=self._guild.id,
data={"filters": payload}
)
if fast_apply:
await self.seek(self.position)
return self._filters
async def remove_filter(self, filter_tag: str, fast_apply: bool = False) -> Filter:
"""Removes a filter from the player. Takes a filter tag.
This will only work if you are using a version of Lavalink that supports filters.
If you would like for the filter to apply instantly, set the `fast_apply` arg to `True`.
(You must have a song playing in order for `fast_apply` to work.)
"""
self._filters.remove_filter(filter_tag=filter_tag)
payload = self._filters.get_all_payloads()
await self._node.send(
method="PATCH",
path=self._player_endpoint_uri,
guild_id=self._guild.id,
data={"filters": payload}
)
if fast_apply:
await self.seek(self.position)
return self._filters
async def reset_filters(self, *, fast_apply: bool = False):
"""Resets all currently applied filters to their default parameters.
You must have filters applied in order for this to work.
If you would like the filters to be removed instantly, set the `fast_apply` arg to `True`.
(You must have a song playing in order for `fast_apply` to work.)
"""
if not self._filters:
raise FilterInvalidArgument(
"You must have filters applied first in order to use this method."
)
self._filters.reset_filters()
await self._node.send(
method="PATCH",
path=self._player_endpoint_uri,
guild_id=self._guild.id,
data={"filters": {}}
)
if fast_apply:
await self.seek(self.position)

View File

@ -1,726 +0,0 @@
from __future__ import annotations
import asyncio
import random
import re
from typing import Dict, List, Optional, TYPE_CHECKING, Union
from urllib.parse import quote
import aiohttp
from discord import Client
from discord.ext import commands
from . import (
__version__,
spotify,
applemusic
)
from .enums import *
from .exceptions import (
AppleMusicNotEnabled,
InvalidSpotifyClientAuthorization,
LavalinkVersionIncompatible,
NodeConnectionFailure,
NodeCreationError,
NodeNotAvailable,
NoNodesAvailable,
NodeRestException,
TrackLoadError
)
from .filters import Filter
from .objects import Playlist, Track
from .utils import ExponentialBackoff, NodeStats, Ping
from .routeplanner import RoutePlanner
if TYPE_CHECKING:
from .player import Player
class Node:
"""The base class for a node.
This node object represents a Lavalink node.
To enable Spotify searching, pass in a proper Spotify Client ID and Spotify Client Secret
To enable Apple music, set the "apple_music" parameter to "True"
"""
def __init__(
self,
*,
pool,
bot: Union[Client, commands.Bot],
host: str,
port: int,
password: str,
identifier: str,
secure: bool = False,
heartbeat: int = 30,
session: Optional[aiohttp.ClientSession] = None,
spotify_client_id: Optional[str] = None,
spotify_client_secret: Optional[str] = None,
apple_music: bool = False
):
self._bot = bot
self._host = host
self._port = port
self._pool = pool
self._password = password
self._identifier = identifier
self._heartbeat = heartbeat
self._secure = secure
self._websocket_uri = f"{'wss' if self._secure else 'ws'}://{self._host}:{self._port}/v3/websocket"
self._rest_uri = f"{'https' if self._secure else 'http'}://{self._host}:{self._port}"
self._session = session or aiohttp.ClientSession()
self._websocket: aiohttp.ClientWebSocketResponse = None
self._task: asyncio.Task = None
self._session_id: str = None
self._metadata = None
self._available = None
self._version: str = None
self._route_planner = RoutePlanner(self)
self._headers = {
"Authorization": self._password,
"User-Id": str(self._bot.user.id),
"Client-Name": f"Pomice/{__version__}"
}
self._players: Dict[int, Player] = {}
self._spotify_client_id = spotify_client_id
self._spotify_client_secret = spotify_client_secret
self._apple_music_client = None
if self._spotify_client_id and self._spotify_client_secret:
self._spotify_client = spotify.Client(
self._spotify_client_id, self._spotify_client_secret
)
if apple_music:
self._apple_music_client = applemusic.Client()
self._bot.add_listener(self._update_handler, "on_socket_response")
def __repr__(self):
return (
f"<Pomice.node ws_uri={self._websocket_uri} rest_uri={self._rest_uri} "
f"player_count={len(self._players)}>"
)
@property
def is_connected(self) -> bool:
""""Property which returns whether this node is connected or not"""
return self._websocket is not None and not self._websocket.closed
@property
def stats(self) -> NodeStats:
"""Property which returns the node stats."""
return self._stats
@property
def players(self) -> Dict[int, Player]:
"""Property which returns a dict containing the guild ID and the player object."""
return self._players
@property
def bot(self) -> Union[Client, commands.Bot]:
"""Property which returns the discord.py client linked to this node"""
return self._bot
@property
def player_count(self) -> int:
"""Property which returns how many players are connected to this node"""
return len(self.players)
@property
def pool(self):
"""Property which returns the pool this node is apart of"""
return self._pool
@property
def latency(self):
"""Property which returns the latency of the node"""
return Ping(self._host, port=self._port).get_ping()
@property
def ping(self):
"""Alias for `Node.latency`, returns the latency of the node"""
return self.latency
async def _update_handler(self, data: dict):
await self._bot.wait_until_ready()
if not data:
return
if data["t"] == "VOICE_SERVER_UPDATE":
guild_id = int(data["d"]["guild_id"])
try:
player = self._players[guild_id]
await player.on_voice_server_update(data["d"])
except KeyError:
return
elif data["t"] == "VOICE_STATE_UPDATE":
if int(data["d"]["user_id"]) != self._bot.user.id:
return
guild_id = int(data["d"]["guild_id"])
try:
player = self._players[guild_id]
await player.on_voice_state_update(data["d"])
except KeyError:
return
async def _listen(self):
backoff = ExponentialBackoff(base=7)
while True:
msg = await self._websocket.receive()
if msg.type == aiohttp.WSMsgType.CLOSED:
retry = backoff.delay()
await asyncio.sleep(retry)
if not self.is_connected:
self._bot.loop.create_task(self.connect())
else:
self._bot.loop.create_task(self._handle_payload(msg.json()))
async def _handle_payload(self, data: dict):
op = data.get("op", None)
if not op:
return
if op == "stats":
self._stats = NodeStats(data)
return
if op == "ready":
self._session_id = data.get("sessionId")
if "guildId" in data:
if not (player := self._players.get(int(data["guildId"]))):
return
if op == "event":
await player._dispatch_event(data)
elif op == "playerUpdate":
await player._update_state(data)
def _get_type(self, query: str):
if match := URLRegex.LAVALINK_SEARCH.match(query):
type = match.group("type")
if type == "sc":
return TrackType.SOUNDCLOUD
return TrackType.YOUTUBE
elif URLRegex.YOUTUBE_URL.match(query):
if URLRegex.YOUTUBE_PLAYLIST_URL.match(query):
return PlaylistType.YOUTUBE
return TrackType.YOUTUBE
elif URLRegex.SOUNDCLOUD_URL.match(query):
if URLRegex.SOUNDCLOUD_TRACK_IN_SET_URL.match(query):
return TrackType.SOUNDCLOUD
if URLRegex.SOUNDCLOUD_PLAYLIST_URL.match(query):
return PlaylistType.SOUNDCLOUD
return TrackType.SOUNDCLOUD
else:
return TrackType.HTTP
async def send(
self,
method: str,
path: str,
include_version: bool = True,
guild_id: Optional[Union[int, str]] = None,
query: Optional[str] = None,
data: Optional[Union[dict, str]] = None
):
if not self._available:
raise NodeNotAvailable(
f"The node '{self._identifier}' is unavailable."
)
uri: str = f'{self._rest_uri}/' \
f'{f"v{self._version}/" if include_version else ""}' \
f'{path}' \
f'{f"/{guild_id}" if guild_id else ""}' \
f'{f"?{query}" if query else ""}'
async with self._session.request(method=method, url=uri, headers=self._headers, json=data or {}) as resp:
if resp.status >= 300:
raise NodeRestException(f'Error fetching from Lavalink REST api: {resp.status} {resp.reason}')
if method == "DELETE" or resp.status == 204:
return await resp.json(content_type=None)
if resp.content_type == "text/plain":
return await resp.text()
return await resp.json()
def get_player(self, guild_id: int):
"""Takes a guild ID as a parameter. Returns a pomice Player object."""
return self._players.get(guild_id, None)
async def connect(self):
"""Initiates a connection with a Lavalink node and adds it to the node pool."""
await self._bot.wait_until_ready()
try:
self._websocket = await self._session.ws_connect(
self._websocket_uri, headers=self._headers, heartbeat=self._heartbeat
)
self._task = self._bot.loop.create_task(self._listen())
self._available = True
version = await self.send(method="GET", path="version", include_version=False)
version = version.replace(".", "")
if int(version) < 370:
raise LavalinkVersionIncompatible(
"The Lavalink version you're using is incompatible."
"Lavalink version 3.7.0 or above is required to use this library."
)
self._version = version[:1]
return self
except aiohttp.ClientConnectorError:
raise NodeConnectionFailure(
f"The connection to node '{self._identifier}' failed."
)
except aiohttp.WSServerHandshakeError:
raise NodeConnectionFailure(
f"The password for node '{self._identifier}' is invalid."
)
except aiohttp.InvalidURL:
raise NodeConnectionFailure(
f"The URI for node '{self._identifier}' is invalid."
)
async def disconnect(self):
"""Disconnects a connected Lavalink node and removes it from the node pool.
This also destroys any players connected to the node.
"""
for player in self.players.copy().values():
await player.destroy()
await self._websocket.close()
del self._pool.nodes[self._identifier]
self.available = False
self._task.cancel()
async def build_track(
self,
identifier: str,
ctx: Optional[commands.Context] = None
) -> Track:
"""
Builds a track using a valid track identifier
You can also pass in a discord.py Context object to get a
Context object on the track it builds.
"""
data: dict = await self.send(method="GET", path="decodetrack", query=f"encodedTrack={identifier}")
return Track(track_id=identifier, ctx=ctx, info=data)
async def get_tracks(
self,
query: str,
*,
ctx: Optional[commands.Context] = None,
search_type: SearchType = SearchType.ytsearch,
filters: Optional[List[Filter]] = None
):
"""Fetches tracks from the node's REST api to parse into Lavalink.
If you passed in Spotify API credentials, you can also pass in a
Spotify URL of a playlist, album or track and it will be parsed accordingly.
You can pass in a discord.py Context object to get a
Context object on any track you search.
You may also pass in a List of filters
to be applied to your track once it plays.
"""
timestamp = None
if not URLRegex.BASE_URL.match(query) and not re.match(r"(?:ytm?|sc)search:.", query):
query = f"{search_type}:{query}"
if filters:
for filter in filters:
filter.set_preload()
if URLRegex.AM_URL.match(query):
if not self._apple_music_client:
raise AppleMusicNotEnabled(
"You must have Apple Music functionality enabled in order to play Apple Music tracks."
"Please set apple_music to True in your Node class."
)
apple_music_results = await self._apple_music_client.search(query=query)
if isinstance(apple_music_results, applemusic.Song):
return [
Track(
track_id=apple_music_results.id,
ctx=ctx,
track_type=TrackType.APPLE_MUSIC,
search_type=search_type,
filters=filters,
info={
"title": apple_music_results.name,
"author": apple_music_results.artists,
"length": apple_music_results.length,
"identifier": apple_music_results.id,
"uri": apple_music_results.url,
"isStream": False,
"isSeekable": True,
"position": 0,
"thumbnail": apple_music_results.image,
"isrc": apple_music_results.isrc
}
)
]
tracks = [
Track(
track_id=track.id,
ctx=ctx,
track_type=TrackType.APPLE_MUSIC,
search_type=search_type,
filters=filters,
info={
"title": track.name,
"author": track.artists,
"length": track.length,
"identifier": track.id,
"uri": track.url,
"isStream": False,
"isSeekable": True,
"position": 0,
"thumbnail": track.image,
"isrc": track.isrc
}
) for track in apple_music_results.tracks
]
return Playlist(
playlist_info={"name": apple_music_results.name, "selectedTrack": 0},
tracks=tracks,
playlist_type=PlaylistType.APPLE_MUSIC,
thumbnail=apple_music_results.image,
uri=apple_music_results.url
)
elif URLRegex.SPOTIFY_URL.match(query):
if not self._spotify_client_id and not self._spotify_client_secret:
raise InvalidSpotifyClientAuthorization(
"You did not provide proper Spotify client authorization credentials. "
"If you would like to use the Spotify searching feature, "
"please obtain Spotify API credentials here: https://developer.spotify.com/"
)
spotify_results = await self._spotify_client.search(query=query)
if isinstance(spotify_results, spotify.Track):
return [
Track(
track_id=spotify_results.id,
ctx=ctx,
track_type=TrackType.SPOTIFY,
search_type=search_type,
filters=filters,
info={
"title": spotify_results.name,
"author": spotify_results.artists,
"length": spotify_results.length,
"identifier": spotify_results.id,
"uri": spotify_results.uri,
"isStream": False,
"isSeekable": True,
"position": 0,
"thumbnail": spotify_results.image,
"isrc": spotify_results.isrc
}
)
]
tracks = [
Track(
track_id=track.id,
ctx=ctx,
track_type=TrackType.SPOTIFY,
search_type=search_type,
filters=filters,
info={
"title": track.name,
"author": track.artists,
"length": track.length,
"identifier": track.id,
"uri": track.uri,
"isStream": False,
"isSeekable": True,
"position": 0,
"thumbnail": track.image,
"isrc": track.isrc
}
) for track in spotify_results.tracks
]
return Playlist(
playlist_info={"name": spotify_results.name, "selectedTrack": 0},
tracks=tracks,
playlist_type=PlaylistType.SPOTIFY,
thumbnail=spotify_results.image,
uri=spotify_results.uri
)
elif discord_url := URLRegex.DISCORD_MP3_URL.match(query):
data: dict = await self.send(method="GET", path="loadtracks", query=f"identifier={quote(query)}")
track: dict = data["tracks"][0]
info: dict = track.get("info")
return [
Track(
track_id=track["track"],
info={
"title": discord_url.group("file"),
"author": "Unknown",
"length": info.get("length"),
"uri": info.get("uri"),
"position": info.get("position"),
"identifier": info.get("identifier")
},
ctx=ctx,
track_type=TrackType.HTTP,
filters=filters
)
]
else:
# If YouTube url contains a timestamp, capture it for use later.
if (match := URLRegex.YOUTUBE_TIMESTAMP.match(query)):
timestamp = float(match.group("time"))
# If query is a video thats part of a playlist, get the video and queue that instead
# (I can't tell you how much i've wanted to implement this in here)
if (match := URLRegex.YOUTUBE_VID_IN_PLAYLIST.match(query)):
query = match.group("video")
data: dict = await self.send(method="GET", path="loadtracks", query=f"identifier={quote(query)}")
load_type = data.get("loadType")
query_type = self._get_type(query)
if not load_type:
raise TrackLoadError("There was an error while trying to load this track.")
elif load_type == "LOAD_FAILED":
exception = data["exception"]
raise TrackLoadError(f"{exception['message']} [{exception['severity']}]")
elif load_type == "NO_MATCHES":
return None
elif load_type == "PLAYLIST_LOADED":
if query_type == PlaylistType.SOUNDCLOUD:
track_type = TrackType.SOUNDCLOUD
else:
track_type = TrackType.YOUTUBE
tracks = [
Track(track_id=track["track"], info=track["info"], ctx=ctx, track_type=track_type)
for track in data["tracks"]
]
return Playlist(
playlist_info=data["playlistInfo"],
tracks=tracks,
playlist_type=query_type,
thumbnail=tracks[0].thumbnail,
uri=query
)
elif load_type == "SEARCH_RESULT" or load_type == "TRACK_LOADED":
return [
Track(
track_id=track["track"],
info=track["info"],
ctx=ctx,
track_type=query_type,
filters=filters,
timestamp=timestamp
)
for track in data["tracks"]
]
async def get_recommendations(
self,
*,
track: Track,
ctx: Optional[commands.Context] = None
) -> Union[List[Track], None]:
"""
Gets recommendations from either YouTube or Spotify.
The track that is passed in must be either from
YouTube or Spotify or else this will not work.
You can pass in a discord.py Context object to get a
Context object on all tracks that get recommended.
"""
if track.track_type == TrackType.SPOTIFY:
results = await self._spotify_client.get_recommendations(query=track.uri)
tracks = [
Track(
track_id=track.id,
ctx=ctx,
track_type=TrackType.SPOTIFY,
info={
"title": track.name,
"author": track.artists,
"length": track.length,
"identifier": track.id,
"uri": track.uri,
"isStream": False,
"isSeekable": True,
"position": 0,
"thumbnail": track.image,
"isrc": track.isrc
},
requester=self.bot.user
) for track in results
]
return tracks
elif track.track_type == TrackType.YOUTUBE:
tracks = await self.get_tracks(query=f"ytsearch:https://www.youtube.com/watch?v={track.identifier}&list=RD{track.identifier}", ctx=ctx)
return tracks
else:
raise TrackLoadError("The specfied track must be either a YouTube or Spotify track to recieve recommendations.")
class NodePool:
"""The base class for the node pool.
This holds all the nodes that are to be used by the bot.
"""
_nodes: dict = {}
def __repr__(self):
return f"<Pomice.NodePool node_count={self.node_count}>"
@property
def nodes(self) -> Dict[str, Node]:
"""Property which returns a dict with the node identifier and the Node object."""
return self._nodes
@property
def node_count(self):
return len(self._nodes.values())
@classmethod
def get_best_node(cls, *, algorithm: NodeAlgorithm) -> Node:
"""Fetches the best node based on an NodeAlgorithm.
This option is preferred if you want to choose the best node
from a multi-node setup using either the node's latency
or the node's voice region.
Use NodeAlgorithm.by_ping if you want to get the best node
based on the node's latency.
Use NodeAlgorithm.by_players if you want to get the best node
based on how players it has. This method will return a node with
the least amount of players
"""
available_nodes = [node for node in cls._nodes.values() if node._available]
if not available_nodes:
raise NoNodesAvailable("There are no nodes available.")
if algorithm == NodeAlgorithm.by_ping:
tested_nodes = {node: node.latency for node in available_nodes}
return min(tested_nodes, key=tested_nodes.get)
elif algorithm == NodeAlgorithm.by_players:
tested_nodes = {node: len(node.players.keys()) for node in available_nodes}
return min(tested_nodes, key=tested_nodes.get)
@classmethod
def get_node(cls, *, identifier: str = None) -> Node:
"""Fetches a node from the node pool using it's identifier.
If no identifier is provided, it will choose a node at random.
"""
available_nodes = {
identifier: node
for identifier, node in cls._nodes.items() if node._available
}
if not available_nodes:
raise NoNodesAvailable("There are no nodes available.")
if identifier is None:
return random.choice(list(available_nodes.values()))
return available_nodes.get(identifier, None)
@classmethod
async def create_node(
cls,
*,
bot: Client,
host: str,
port: str,
password: str,
identifier: str,
secure: bool = False,
heartbeat: int = 30,
spotify_client_id: Optional[str] = None,
spotify_client_secret: Optional[str] = None,
session: Optional[aiohttp.ClientSession] = None,
apple_music: bool = False
) -> Node:
"""Creates a Node object to be then added into the node pool.
For Spotify searching capabilites, pass in valid Spotify API credentials.
"""
if identifier in cls._nodes.keys():
raise NodeCreationError(f"A node with identifier '{identifier}' already exists.")
node = Node(
pool=cls, bot=bot, host=host, port=port, password=password,
identifier=identifier, secure=secure, heartbeat=heartbeat,
spotify_client_id=spotify_client_id,
session=session, spotify_client_secret=spotify_client_secret,
apple_music=apple_music
)
await node.connect()
cls._nodes[node._identifier] = node
return node

View File

@ -1,344 +0,0 @@
from __future__ import annotations
import random
from copy import copy
from typing import (
Iterable,
Iterator,
List,
Optional,
Union,
)
from .objects import Track
from .enums import LoopMode
from .exceptions import QueueEmpty, QueueException, QueueFull
class Queue(Iterable[Track]):
"""Queue for Pomice. This queue takes pomice.Track as an input and includes looping and shuffling."""
def __init__(
self,
max_size: Optional[int] = None,
*,
overflow: bool = True,
):
self.max_size: Optional[int] = max_size
self._queue: List[Track] = [] # type: ignore
self._overflow: bool = overflow
self._loop_mode: Optional[LoopMode] = None
self._current_item: Optional[Track] = None
def __str__(self) -> str:
"""String showing all Track objects appearing as a list."""
return str(list(f"'{t}'" for t in self))
def __repr__(self) -> str:
"""Official representation with max_size and member count."""
return (
f"<{self.__class__.__name__} max_size={self.max_size} members={self.count}>"
)
def __bool__(self) -> bool:
"""Treats the queue as a bool, with it evaluating True when it contains members."""
return bool(self.count)
def __call__(self, item: Track) -> None:
"""Allows the queue instance to be called directly in order to add a member."""
self.put(item)
def __len__(self) -> int:
"""Return the number of members in the queue."""
return self.count
def __getitem__(self, index: int) -> Track:
"""Returns a member at the given position.
Does not remove item from queue.
"""
if not isinstance(index, int):
raise ValueError("'int' type required.'")
return self._queue[index]
def __setitem__(self, index: int, item: Track):
"""Inserts an item at given position."""
if not isinstance(index, int):
raise ValueError("'int' type required.'")
self.put_at_index(index, item)
def __delitem__(self, index: int) -> None:
"""Delete item at given position."""
self._queue.__delitem__(index)
def __iter__(self) -> Iterator[Track]:
"""Iterate over members in the queue.
Does not remove items when iterating.
"""
return self._queue.__iter__()
def __reversed__(self) -> Iterator[Track]:
"""Iterate over members in reverse order."""
return self._queue.__reversed__()
def __contains__(self, item: Track) -> bool:
"""Check if an item is a member of the queue."""
return item in self._queue
def __add__(self, other: Iterable[Track]) -> Queue:
"""Return a new queue containing all members.
The new queue will have the same max_size as the original.
"""
if not isinstance(other, Iterable):
raise TypeError(f"Adding with the '{type(other)}' type is not supported.")
new_queue = self.copy()
new_queue.extend(other)
return new_queue
def __iadd__(self, other: Union[Iterable[Track], Track]) -> Queue:
"""Add items to queue."""
if isinstance(other, Track):
self.put(other)
return self
if isinstance(other, Iterable):
self.extend(other)
return self
raise TypeError(f"Adding '{type(other)}' type to the queue is not supported.")
def _get(self) -> Track:
return self._queue.pop(0)
def _drop(self) -> Track:
return self._queue.pop()
def _index(self, item: Track) -> int:
return self._queue.index(item)
def _put(self, item: Track) -> None:
self._queue.append(item)
def _insert(self, index: int, item: Track) -> None:
self._queue.insert(index, item)
def _remove(self, item: Track) -> None:
self._queue.remove(item)
def _get_random_float(self) -> float:
return random.random()
@staticmethod
def _check_track(item: Track) -> Track:
if not isinstance(item, Track):
raise TypeError("Only pomice.Track objects are supported.")
return item
@classmethod
def _check_track_container(cls, iterable: Iterable) -> List[Track]:
iterable = list(iterable)
for item in iterable:
cls._check_track(item)
return iterable
@property
def count(self) -> int:
"""Returns queue member count."""
return len(self._queue)
@property
def is_empty(self) -> bool:
"""Returns True if queue has no members."""
return not bool(self.count)
@property
def is_full(self) -> bool:
"""Returns True if queue item count has reached max_size."""
return False if self.max_size is None else self.count >= self.max_size
@property
def is_looping(self) -> bool:
"""Returns True if the queue is looping either a track or the queue"""
return bool(self._loop_mode)
@property
def loop_mode(self) -> LoopMode:
"""Returns the LoopMode enum set in the queue object"""
return self._loop_mode
@property
def size(self) -> int:
"""Returns the amount of items in the queue"""
return len(self._queue)
def get_queue(self) -> List:
"""Returns the queue as a List"""
return self._queue
def get(self):
"""Return next immediately available item in queue if any.
Raises QueueEmpty if no items in queue.
"""
if self._loop_mode == LoopMode.TRACK:
return self._current_item
if self.is_empty:
raise QueueEmpty("No items in the queue.")
if self._loop_mode == LoopMode.QUEUE:
# recurse if the item isnt in the queue
if self._current_item not in self._queue:
self.get()
# set current item to first track in queue if not set already
if not self._current_item:
self._current_item = self._queue[0]
item = self._current_item
# we reached the end of the queue, go back to first track
if self._index(self._current_item) == len(self._queue) - 1:
item = self._queue[0]
# we are in the middle of the queue, go the next item
else:
index = self._index(self._current_item) + 1
item = self._queue[index]
else:
item = self._get()
self._current_item = item
return item
def pop(self) -> Track:
"""Return item from the right end side of the queue.
Raises QueueEmpty if no items in queue.
"""
if self.is_empty:
raise QueueEmpty("No items in the queue.")
return self._queue.pop()
def remove(self, item: Track) -> None:
"""
Removes a item within the queue.
Raises ValueError if item is not in queue.
"""
return self._remove(self._check_track(item))
def find_position(self, item: Track) -> int:
"""Find the position a given item within the queue.
Raises ValueError if item is not in queue.
"""
return self._index(self._check_track(item))
def put(self, item: Track) -> None:
"""Put the given item into the back of the queue."""
if self.is_full:
if not self._overflow:
raise QueueFull(f"Queue max_size of {self.max_size} has been reached.")
self._drop()
return self._put(self._check_track(item))
def put_at_index(self, index: int, item: Track) -> None:
"""Put the given item into the queue at the specified index."""
if self.is_full:
if not self._overflow:
raise QueueFull(f"Queue max_size of {self.max_size} has been reached.")
self._drop()
return self._insert(index, self._check_track(item))
def put_at_front(self, item: Track) -> None:
"""Put the given item into the front of the queue."""
return self.put_at_index(0, item)
def extend(self, iterable: Iterable[Track], *, atomic: bool = True) -> None:
"""
Add the members of the given iterable to the end of the queue.
If atomic is set to True, no tracks will be added upon any exceptions.
If atomic is set to False, as many tracks will be added as possible.
When overflow is enabled for the queue, `atomic=True` won't prevent dropped items.
"""
if atomic:
iterable = self._check_track_container(iterable)
if not self._overflow and self.max_size is not None:
new_len = len(iterable)
if (new_len + self.count) > self.max_size:
raise QueueFull(
f"Queue has {self.count}/{self.max_size} items, "
f"cannot add {new_len} more."
)
for item in iterable:
self.put(item)
def copy(self) -> Queue:
"""Create a copy of the current queue including it's members."""
new_queue = self.__class__(max_size=self.max_size)
new_queue._queue = copy(self._queue)
return new_queue
def clear(self) -> None:
"""Remove all items from the queue."""
self._queue.clear()
def set_loop_mode(self, mode: LoopMode):
"""
Sets the loop mode of the queue.
Takes the LoopMode enum as an argument.
"""
self._loop_mode = mode
if self._loop_mode == LoopMode.QUEUE:
try:
index = self._index(self._current_item)
except ValueError:
index = 0
if self._current_item not in self._queue:
self._queue.insert(index, self._current_item)
self._current_item = self._queue[index]
def disable_loop(self):
"""
Disables loop mode if set.
Raises QueueException if loop mode is already None.
"""
if not self._loop_mode:
raise QueueException("Queue loop is already disabled.")
if self._loop_mode == LoopMode.QUEUE:
index = self.find_position(self._current_item) + 1
self._queue = self._queue[index:]
self._loop_mode = None
def shuffle(self):
"""Shuffles the queue."""
return random.shuffle(self._queue)
def clear_track_filters(self):
"""Clears all filters applied to tracks"""
for track in self._queue:
track.filters = None
def jump(self, item: Track):
"""Returns a new queue with the specified track at the beginning."""
index = self.find_position(item)
new_queue = self._queue[index:self.size]
self._queue = new_queue

View File

@ -1,30 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .pool import Node
from .utils import RouteStats
class RoutePlanner:
"""
The base route planner class for Pomice.
Handles all requests made to the route planner API for Lavalink.
"""
def __init__(self, node: Node) -> None:
self.node = node
self.session = node._session
async def get_status(self):
"""Gets the status of the route planner API."""
data: dict = await self.node.send(method="GET", path="routeplanner/status")
return RouteStats(data)
async def free_address(self, ip: str):
"""Frees an address using the route planner API"""
await self.node.send(method="POST", path="routeplanner/free/address", data={"address": ip})
async def free_all_addresses(self):
"""Frees all available addresses using the route planner api"""
await self.node.send(method="POST", path="routeplanner/free/address/all")

View File

@ -1,5 +0,0 @@
"""Spotify module for Pomice, made possible by cloudwithax 2023"""
from .exceptions import *
from .objects import *
from .client import Client

View File

@ -1,141 +0,0 @@
import re
import time
from base64 import b64encode
import aiohttp
import orjson as json
from .exceptions import InvalidSpotifyURL, SpotifyRequestException
from .objects import *
GRANT_URL = "https://accounts.spotify.com/api/token"
REQUEST_URL = "https://api.spotify.com/v1/{type}s/{id}"
SPOTIFY_URL_REGEX = re.compile(
r"https?://open.spotify.com/(?P<type>album|playlist|track|artist)/(?P<id>[a-zA-Z0-9]+)"
)
class Client:
"""The base client for the Spotify module of Pomice.
This class will do all the heavy lifting of getting all the metadata
for any Spotify URL you throw at it.
"""
def __init__(self, client_id: str, client_secret: str) -> None:
self._client_id = client_id
self._client_secret = client_secret
self.session = aiohttp.ClientSession()
self._bearer_token: str = None
self._expiry = 0
self._auth_token = b64encode(f"{self._client_id}:{self._client_secret}".encode())
self._grant_headers = {"Authorization": f"Basic {self._auth_token.decode()}"}
self._bearer_headers = None
async def _fetch_bearer_token(self) -> None:
_data = {"grant_type": "client_credentials"}
async with self.session.post(GRANT_URL, data=_data, headers=self._grant_headers) as resp:
if resp.status != 200:
raise SpotifyRequestException(
f"Error fetching bearer token: {resp.status} {resp.reason}"
)
data: dict = await resp.json(loads=json.loads)
self._bearer_token = data["access_token"]
self._expiry = time.time() + (int(data["expires_in"]) - 10)
self._bearer_headers = {"Authorization": f"Bearer {self._bearer_token}"}
async def search(self, *, query: str):
if not self._bearer_token or time.time() >= self._expiry:
await self._fetch_bearer_token()
result = SPOTIFY_URL_REGEX.match(query)
spotify_type = result.group("type")
spotify_id = result.group("id")
if not result:
raise InvalidSpotifyURL("The Spotify link provided is not valid.")
request_url = REQUEST_URL.format(type=spotify_type, id=spotify_id)
async with self.session.get(request_url, headers=self._bearer_headers) as resp:
if resp.status != 200:
raise SpotifyRequestException(
f"Error while fetching results: {resp.status} {resp.reason}"
)
data: dict = await resp.json(loads=json.loads)
if spotify_type == "track":
return Track(data)
elif spotify_type == "album":
return Album(data)
elif spotify_type == "artist":
async with self.session.get(f"{request_url}/top-tracks?market=US", headers=self._bearer_headers) as resp:
if resp.status != 200:
raise SpotifyRequestException(
f"Error while fetching results: {resp.status} {resp.reason}"
)
track_data: dict = await resp.json(loads=json.loads)
tracks = track_data['tracks']
return Artist(data, tracks)
else:
tracks = [
Track(track["track"])
for track in data["tracks"]["items"] if track["track"] is not None
]
if not len(tracks):
raise SpotifyRequestException("This playlist is empty and therefore cannot be queued.")
next_page_url = data["tracks"]["next"]
while next_page_url is not None:
async with self.session.get(next_page_url, headers=self._bearer_headers) as resp:
if resp.status != 200:
raise SpotifyRequestException(
f"Error while fetching results: {resp.status} {resp.reason}"
)
next_data: dict = await resp.json(loads=json.loads)
tracks += [
Track(track["track"])
for track in next_data["items"] if track["track"] is not None
]
next_page_url = next_data["next"]
return Playlist(data, tracks)
async def get_recommendations(self, *, query: str):
if not self._bearer_token or time.time() >= self._expiry:
await self._fetch_bearer_token()
result = SPOTIFY_URL_REGEX.match(query)
spotify_type = result.group("type")
spotify_id = result.group("id")
if not result:
raise InvalidSpotifyURL("The Spotify link provided is not valid.")
if not spotify_type == "track":
raise InvalidSpotifyURL("The provided query is not a Spotify track.")
request_url = REQUEST_URL.format(type="recommendation", id=f"?seed_tracks={spotify_id}")
async with self.session.get(request_url, headers=self._bearer_headers) as resp:
if resp.status != 200:
raise SpotifyRequestException(
f"Error while fetching results: {resp.status} {resp.reason}"
)
data: dict = await resp.json(loads=json.loads)
tracks = [Track(track) for track in data["tracks"]]
return tracks

View File

@ -1,8 +0,0 @@
class SpotifyRequestException(Exception):
"""An error occurred when making a request to the Spotify API"""
pass
class InvalidSpotifyURL(Exception):
"""An invalid Spotify URL was passed"""
pass

View File

@ -1,89 +0,0 @@
from typing import List
class Track:
"""The base class for a Spotify Track"""
def __init__(self, data: dict, image = None) -> None:
self.name: str = data["name"]
self.artists: str = ", ".join(artist["name"] for artist in data["artists"])
self.length: float = data["duration_ms"]
self.id: str = data["id"]
if data.get("external_ids"):
self.isrc: str = data["external_ids"]["isrc"]
else:
self.isrc = None
if data.get("album") and data["album"].get("images"):
self.image: str = data["album"]["images"][0]["url"]
else:
self.image: str = image
if data["is_local"]:
self.uri = None
else:
self.uri: str = data["external_urls"]["spotify"]
def __repr__(self) -> str:
return (
f"<Pomice.spotify.Track name={self.name} artists={self.artists} "
f"length={self.length} id={self.id} isrc={self.isrc}>"
)
class Playlist:
"""The base class for a Spotify playlist"""
def __init__(self, data: dict, tracks: List[Track]) -> None:
self.name: str = data["name"]
self.tracks = tracks
self.owner: str = data["owner"]["display_name"]
self.total_tracks: int = data["tracks"]["total"]
self.id: str = data["id"]
if data.get("images") and len(data["images"]):
self.image: str = data["images"][0]["url"]
else:
self.image = self.tracks[0].image
self.uri = data["external_urls"]["spotify"]
def __repr__(self) -> str:
return (
f"<Pomice.spotify.Playlist name={self.name} owner={self.owner} id={self.id} "
f"total_tracks={self.total_tracks} tracks={self.tracks}>"
)
class Album:
"""The base class for a Spotify album"""
def __init__(self, data: dict) -> None:
self.name: str = data["name"]
self.artists: str = ", ".join(artist["name"] for artist in data["artists"])
self.image: str = data["images"][0]["url"]
self.tracks = [Track(track, image=self.image) for track in data["tracks"]["items"]]
self.total_tracks: int = data["total_tracks"]
self.id: str = data["id"]
self.uri: str = data["external_urls"]["spotify"]
def __repr__(self) -> str:
return (
f"<Pomice.spotify.Album name={self.name} artists={self.artists} id={self.id} "
f"total_tracks={self.total_tracks} tracks={self.tracks}>"
)
class Artist:
"""The base class for a Spotify artist"""
def __init__(self, data: dict, tracks: dict) -> None:
self.name: str = f"Top tracks for {data['name']}" # Setting that because its only playing top tracks
self.genres: str = ", ".join(genre for genre in data["genres"])
self.followers: int = data["followers"]["total"]
self.image: str = data["images"][0]["url"]
self.tracks = [Track(track, image=self.image) for track in tracks]
self.id: str = data["id"]
self.uri: str = data["external_urls"]["spotify"]
def __repr__(self) -> str:
return (
f"<Pomice.spotify.Artist name={self.name} id={self.id} "
f"tracks={self.tracks}>"
)

View File

@ -1,191 +0,0 @@
import random
import time
import socket
from .enums import RouteStrategy, RouteIPType
from timeit import default_timer as timer
from itertools import zip_longest
from datetime import datetime
__all__ = [
"ExponentialBackoff",
"NodeStats"
]
class ExponentialBackoff:
"""
The MIT License (MIT)
Copyright (c) 2015-present Rapptz
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""
def __init__(self, base: int = 1, *, integral: bool = False) -> None:
self._base = base
self._exp = 0
self._max = 10
self._reset_time = base * 2 ** 11
self._last_invocation = time.monotonic()
rand = random.Random()
rand.seed()
self._randfunc = rand.randrange if integral else rand.uniform
def delay(self) -> float:
invocation = time.monotonic()
interval = invocation - self._last_invocation
self._last_invocation = invocation
if interval > self._reset_time:
self._exp = 0
self._exp = min(self._exp + 1, self._max)
return self._randfunc(0, self._base * 2 ** self._exp)
class NodeStats:
"""The base class for the node stats object.
Gives critical information on the node, which is updated every minute.
"""
def __init__(self, data: dict) -> None:
memory: dict = data.get("memory")
self.used = memory.get("used")
self.free = memory.get("free")
self.reservable = memory.get("reservable")
self.allocated = memory.get("allocated")
cpu: dict = data.get("cpu")
self.cpu_cores = cpu.get("cores")
self.cpu_system_load = cpu.get("systemLoad")
self.cpu_process_load = cpu.get("lavalinkLoad")
self.players_active = data.get("playingPlayers")
self.players_total = data.get("players")
self.uptime = data.get("uptime")
def __repr__(self) -> str:
return f"<Pomice.NodeStats total_players={self.players_total!r} playing_active={self.players_active!r}>"
class FailingIPBlock:
"""
The base class for the failing IP block object from the route planner stats.
Gives critical information about any failing addresses on the block
and the time they failed.
"""
def __init__(self, data: dict) -> None:
self.address = data.get("address")
self.failing_time = datetime.fromtimestamp(float(data.get("failingTimestamp")))
def __repr__(self) -> str:
return f"<Pomice.FailingIPBlock address={self.address} failing_time={self.failing_time}>"
class RouteStats:
"""
The base class for the route planner stats object.
Gives critical information about the route planner strategy on the node.
"""
def __init__(self, data: dict) -> None:
self.strategy = RouteStrategy(data.get("class"))
details: dict = data.get("details")
ip_block: dict = details.get("ipBlock")
self.ip_block_type = RouteIPType(ip_block.get("type"))
self.ip_block_size = ip_block.get("size")
self.failing_addresses = [FailingIPBlock(data) for data in details.get("failingAddresses")]
self.block_index = details.get("blockIndex")
self.address_index = details.get("currentAddressIndex")
def __repr__(self) -> str:
return f"<Pomice.RouteStats route_strategy={self.strategy!r} failing_addresses={len(self.failing_addresses)}>"
class Ping:
# Thanks to https://github.com/zhengxiaowai/tcping for the nice ping impl
def __init__(self, host, port, timeout=5):
self.timer = self.Timer()
self._successed = 0
self._failed = 0
self._conn_time = None
self._host = host
self._port = port
self._timeout = timeout
class Socket(object):
def __init__(self, family, type_, timeout):
s = socket.socket(family, type_)
s.settimeout(timeout)
self._s = s
def connect(self, host, port):
self._s.connect((host, int(port)))
def shutdown(self):
self._s.shutdown(socket.SHUT_RD)
def close(self):
self._s.close()
class Timer(object):
def __init__(self):
self._start = 0
self._stop = 0
def start(self):
self._start = timer()
def stop(self):
self._stop = timer()
def cost(self, funcs, args):
self.start()
for func, arg in zip_longest(funcs, args):
if arg:
func(*arg)
else:
func()
self.stop()
return self._stop - self._start
def _create_socket(self, family, type_):
return self.Socket(family, type_, self._timeout)
def get_ping(self):
s = self._create_socket(socket.AF_INET, socket.SOCK_STREAM)
cost_time = self.timer.cost(
(s.connect, s.shutdown),
((self._host, self._port), None))
s_runtime = 1000 * (cost_time)
return s_runtime