1.7 update part 1

This commit is contained in:
cloudwithax 2022-06-15 16:18:40 -04:00
parent 822fa1e3c4
commit 9c702876f8
12 changed files with 246 additions and 119 deletions

View File

@ -43,6 +43,14 @@ class FilterInvalidArgument(PomiceException):
"""An invalid argument was passed to a filter."""
pass
class FilterTagInvalid(PomiceException):
"""An invalid tag was passed or Pomice was unable to find a filter tag"""
pass
class FilterTagAlreadyInUse(PomiceException):
"""A filter with a tag is already in use by another filter"""
pass
class SpotifyAlbumLoadFailed(PomiceException):
"""The pomice Spotify client was unable to load an album."""

View File

@ -8,9 +8,13 @@ class Filter:
You can use these filters if you have the latest Lavalink version
installed. If you do not have the latest Lavalink version,
these filters will not work.
You must specify a tag for each filter you put on.
This is necessary for the removal of filters.
"""
def __init__(self):
self.payload = None
self.tag: str = None
class Equalizer(Filter):
@ -21,13 +25,14 @@ class Equalizer(Filter):
The format for the levels is: List[Tuple[int, float]]
"""
def __init__(self, *, levels: list):
def __init__(self, *, tag: str, levels: list):
super().__init__()
self.eq = self._factory(levels)
self.raw = levels
self.payload = {"equalizer": self.eq}
self.tag = tag
def _factory(self, levels: list):
_dict = collections.defaultdict(int)
@ -37,11 +42,6 @@ class Equalizer(Filter):
return _dict
self.eq = self._factory(levels=self.raw)
self.payload = {"equalizer": self.eq}
return self.payload
def __repr__(self) -> str:
return f"<Pomice.EqualizerFilter eq={self.eq} raw={self.raw}>"
@ -56,6 +56,7 @@ class Timescale(Filter):
def __init__(
self,
*,
tag: str,
speed: float = 1.0,
pitch: float = 1.0,
rate: float = 1.0
@ -72,6 +73,7 @@ class Timescale(Filter):
self.speed = speed
self.pitch = pitch
self.rate = rate
self.tag = tag
self.payload = {"timescale": {"speed": self.speed,
"pitch": self.pitch,
@ -89,6 +91,7 @@ class Karaoke(Filter):
def __init__(
self,
*,
tag: str,
level: float = 1.0,
mono_level: float = 1.0,
filter_band: float = 220.0,
@ -100,6 +103,7 @@ class Karaoke(Filter):
self.mono_level = mono_level
self.filter_band = filter_band
self.filter_width = filter_width
self.tag = tag
self.payload = {"karaoke": {"level": self.level,
"monoLevel": self.mono_level,
@ -121,6 +125,7 @@ class Tremolo(Filter):
def __init__(
self,
*,
tag: str,
frequency: float = 2.0,
depth: float = 0.5
):
@ -135,6 +140,7 @@ class Tremolo(Filter):
self.frequency = frequency
self.depth = depth
self.tag = tag
self.payload = {"tremolo": {"frequency": self.frequency,
"depth": self.depth}}
@ -151,6 +157,7 @@ class Vibrato(Filter):
def __init__(
self,
*,
tag: str,
frequency: float = 2.0,
depth: float = 0.5
):
@ -165,6 +172,7 @@ class Vibrato(Filter):
self.frequency = frequency
self.depth = depth
self.tag = tag
self.payload = {"vibrato": {"frequency": self.frequency,
"depth": self.depth}}
@ -178,10 +186,11 @@ class Rotation(Filter):
the audio is being rotated around the listener's head
"""
def __init__(self, *, rotation_hertz: float = 5):
def __init__(self, *, tag: str, rotation_hertz: float = 5):
super().__init__()
self.rotation_hertz = rotation_hertz
self.tag = tag
self.payload = {"rotation": {"rotationHz": self.rotation_hertz}}
def __repr__(self) -> str:
@ -196,6 +205,7 @@ class ChannelMix(Filter):
def __init__(
self,
*,
tag: str,
left_to_left: float = 1,
right_to_right: float = 1,
left_to_right: float = 0,
@ -220,6 +230,7 @@ class ChannelMix(Filter):
self.left_to_right = left_to_right
self.right_to_left = right_to_left
self.right_to_right = right_to_right
self.tag = tag
self.payload = {"channelMix": {"leftToLeft": self.left_to_left,
"leftToRight": self.left_to_right,
@ -242,6 +253,7 @@ class Distortion(Filter):
def __init__(
self,
*,
tag: str,
sin_offset: float = 0,
sin_scale: float = 1,
cos_offset: float = 0,
@ -261,6 +273,7 @@ class Distortion(Filter):
self.tan_scale = tan_scale
self.offset = offset
self.scale = scale
self.tag = tag
self.payload = {"distortion": {
"sinOffset": self.sin_offset,
@ -286,10 +299,11 @@ class LowPass(Filter):
You can also do this with the Equalizer filter, but this is an easier way to do it.
"""
def __init__(self, *, smoothing: float = 20):
def __init__(self, *, tag: str, smoothing: float = 20):
super().__init__()
self.smoothing = smoothing
self.tag = tag
self.payload = {"lowPass": {"smoothing": self.smoothing}}
def __repr__(self) -> str:

View File

@ -2,6 +2,7 @@ import time
from typing import (
Any,
Dict,
List,
Optional
)
@ -16,11 +17,56 @@ from discord.ext import commands
from . import events
from .enums import SearchType
from .events import PomiceEvent, TrackEndEvent, TrackStartEvent
from .exceptions import FilterInvalidArgument, TrackInvalidPosition, TrackLoadError
from .exceptions import FilterInvalidArgument, FilterTagAlreadyInUse, FilterTagInvalid, TrackInvalidPosition, TrackLoadError
from .filters import Filter
from .objects import Track
from .pool import Node, NodePool
class Filters:
"""Helper class for filters"""
def __init__(self):
self._filters: List[Filter] = []
def add_filter(self, *, filter: Filter):
"""Adds a filter to the list of filters applied"""
if any(f for f in self._filters if f.tag == filter.tag):
raise FilterTagAlreadyInUse(
"A filter with that tag is already in use."
)
self._filters.append(filter)
def remove_filter(self, *, filter_tag: str):
"""Removes a filter from the list of filters applied using its filter tag"""
if not any(f for f in self._filters if f.tag == filter_tag):
raise FilterTagInvalid(
"A filter with that tag was not found."
)
for index, filter in enumerate(self._filters):
if filter.tag == filter_tag:
del self._filters[index]
def has_filter(self, *, filter_tag: str):
"""Checks if a filter exists in the list of filters using its filter tag"""
return any(f for f in self._filters if f.tag == filter_tag)
def reset_filters(self):
"""Removes all filters from the list"""
self._filters = []
def get_all_payloads(self):
"""Returns a formatted dict of all the filter payloads"""
payload = {}
for filter in self._filters:
payload.update(filter.payload)
return payload
def get_filters(self):
"""Returns the current list of applied filters"""
return self._filters
class Player(VoiceProtocol):
"""The base player class for Pomice.
@ -51,7 +97,7 @@ class Player(VoiceProtocol):
self._node = node if node else NodePool.get_node()
self._current: Track = None
self._filter: Filter = None
self._filters: Filters = Filters()
self._volume = 100
self._paused = False
self._is_connected = False
@ -124,9 +170,9 @@ class Player(VoiceProtocol):
return self._volume
@property
def filter(self) -> Filter:
"""Property which returns the currently applied filter, if one is applied"""
return self._filter
def filters(self) -> Filters:
"""Property which returns the helper class for interacting with filters"""
return self._filters
@property
def bot(self) -> Client:
@ -205,7 +251,7 @@ class Player(VoiceProtocol):
"""
return await self._node.get_tracks(query, ctx=ctx, search_type=search_type)
async def connect(self, *, timeout: float, reconnect: bool):
async def connect(self, *, timeout: float, reconnect: bool, self_deaf: bool = False, self_mute: bool = False):
await self.guild.change_voice_state(channel=self.channel)
self._node._players[self.guild.id] = self
self._is_connected = True
@ -246,14 +292,20 @@ class Player(VoiceProtocol):
) -> Track:
"""Plays a track. If a Spotify track is passed in, it will be handled accordingly."""
if track.spotify:
# First lets try using the tracks ISRC, every track has one (hopefully)
try:
search: Track = (await self._node.get_tracks(
f"{track._search_type}:{track.isrc}", ctx=track.ctx))[0]
except:
# First method didn't work, lets try just searching it up
try:
search: Track = (await self._node.get_tracks(
f"{track._search_type}:{track.title} - {track.author}", ctx=track.ctx))[0]
if not search:
except:
# The song wasn't able to be found, raise error
raise TrackLoadError (
"No equivalent track was able to be found."
)
track.original = search
data = {
"op": "play",
"guildId": str(self.guild.id),
@ -261,6 +313,7 @@ class Player(VoiceProtocol):
"startTime": str(start),
"noReplace": ignore_if_playing
}
track.original = search
else:
data = {
"op": "play",
@ -300,31 +353,55 @@ class Player(VoiceProtocol):
self._volume = volume
return self._volume
async def set_filter(self, filter: Filter, fast_apply=False) -> Filter:
"""Sets a filter of the player. Takes a pomice.Filter object.
async def add_filter(self, filter: Filter, fast_apply=False) -> Filter:
"""Adds a filter to the player. Takes a pomice.Filter object.
This will only work if you are using a version of Lavalink that supports filters.
If you would like for the filter to apply instantly, set the `fast_apply` arg to `True`.
(You must have a song playing in order for `fast_apply` to work.)
"""
await self._node.send(op="filters", guildId=str(self.guild.id), **filter.payload)
self._filters.add_filter(filter=filter)
payload = self._filters.get_all_payloads()
await self._node.send(op="filters", guildId=str(self.guild.id), **payload)
if fast_apply:
await self.seek(self.position)
self._filter = filter
return filter
async def reset_filter(self, fast_apply=False):
"""Resets a currently applied filter to its default parameters.
You must have a filter applied in order for this to work
return self._filters
async def remove_filter(self, filter_tag: str, fast_apply=False) -> Filter:
"""Removes a filter from the player. Takes a filter tag.
This will only work if you are using a version of Lavalink that supports filters.
If you would like for the filter to apply instantly, set the `fast_apply` arg to `True`.
(You must have a song playing in order for `fast_apply` to work.)
"""
if not self._filter:
raise FilterInvalidArgument(
"You must have a filter applied first in order to use this method."
)
self._filters.remove_filter(filter_tag=filter_tag)
payload = self._filters.get_all_payloads()
await self._node.send(op="filters", guildId=str(self.guild.id), **payload)
if fast_apply:
await self.seek(self.position)
return self._filters
async def reset_filters(self, *, fast_apply=False):
"""Resets all currently applied filters to their default parameters.
You must have filters applied in order for this to work.
If you would like the filters to be removed instantly, set the `fast_apply` arg to `True`.
(You must have a song playing in order for `fast_apply` to work.)
"""
if not self._filters:
raise FilterInvalidArgument(
"You must have filters applied first in order to use this method."
)
self._filters.reset_filters()
await self._node.send(op="filters", guildId=str(self.guild.id))
if fast_apply:
await self.seek(self.position)
self._filter = None

View File

@ -34,7 +34,7 @@ if TYPE_CHECKING:
from .player import Player
SPOTIFY_URL_REGEX = re.compile(
r"https?://open.spotify.com/(?P<type>album|playlist|track)/(?P<id>[a-zA-Z0-9]+)"
r"https?://open.spotify.com/(?P<type>album|playlist|track|artist)/(?P<id>[a-zA-Z0-9]+)"
)
DISCORD_MP3_URL_REGEX = re.compile(

View File

@ -1,7 +1,5 @@
"""Spotify module for Pomice, made possible by cloudwithax 2021"""
from .exceptions import InvalidSpotifyURL, SpotifyRequestException
from .track import Track
from .playlist import Playlist
from .album import Album
from .exceptions import *
from .objects import *
from .client import Client

View File

@ -1,20 +0,0 @@
from .track import Track
class Album:
"""The base class for a Spotify album"""
def __init__(self, data: dict) -> None:
self.name = data["name"]
self.artists = ", ".join(artist["name"] for artist in data["artists"])
self.image = data["images"][0]["url"]
self.tracks = [Track(track, image=self.image) for track in data["tracks"]["items"]]
self.total_tracks = data["total_tracks"]
self.id = data["id"]
self.uri = data["external_urls"]["spotify"]
def __repr__(self) -> str:
return (
f"<Pomice.spotify.Album name={self.name} artists={self.artists} id={self.id} "
f"total_tracks={self.total_tracks} tracks={self.tracks}>"
)

View File

@ -3,16 +3,16 @@ import time
from base64 import b64encode
import aiohttp
import orjson as json
from .album import Album
from .exceptions import InvalidSpotifyURL, SpotifyRequestException
from .playlist import Playlist
from .track import Track
from .objects import *
GRANT_URL = "https://accounts.spotify.com/api/token"
REQUEST_URL = "https://api.spotify.com/v1/{type}s/{id}"
SPOTIFY_URL_REGEX = re.compile(
r"https?://open.spotify.com/(?P<type>album|playlist|track)/(?P<id>[a-zA-Z0-9]+)"
r"https?://open.spotify.com/(?P<type>album|playlist|track|artist)/(?P<id>[a-zA-Z0-9]+)"
)
@ -43,7 +43,7 @@ class Client:
f"Error fetching bearer token: {resp.status} {resp.reason}"
)
data: dict = await resp.json()
data: dict = await resp.json(loads=json.loads)
self._bearer_token = data["access_token"]
self._expiry = time.time() + (int(data["expires_in"]) - 10)
@ -68,14 +68,23 @@ class Client:
f"Error while fetching results: {resp.status} {resp.reason}"
)
data: dict = await resp.json()
data: dict = await resp.json(loads=json.loads)
if spotify_type == "track":
return Track(data)
elif spotify_type == "album":
return Album(data)
else:
elif spotify_type == "artist":
async with self.session.get(f"{request_url}/top-tracks?market=US", headers=self._bearer_headers) as resp:
if resp.status != 200:
raise SpotifyRequestException(
f"Error while fetching results: {resp.status} {resp.reason}"
)
track_data: dict = await resp.json(loads=json.loads)
tracks = track_data['tracks']
return Artist(data, tracks)
else:
tracks = [
Track(track["track"])
for track in data["tracks"]["items"] if track["track"] is not None
@ -93,7 +102,7 @@ class Client:
f"Error while fetching results: {resp.status} {resp.reason}"
)
next_data: dict = await resp.json()
next_data: dict = await resp.json(loads=json.loads)
tracks += [
Track(track["track"])

85
pomice/spotify/objects.py Normal file
View File

@ -0,0 +1,85 @@
from typing import List
class Track:
"""The base class for a Spotify Track"""
def __init__(self, data: dict, image = None) -> None:
self.name = data["name"]
self.artists = ", ".join(artist["name"] for artist in data["artists"])
self.length = data["duration_ms"]
self.id = data["id"]
self.isrc = data["external_ids"]["isrc"]
if data.get("album") and data["album"].get("images"):
self.image = data["album"]["images"][0]["url"]
else:
self.image = image
if data["is_local"]:
self.uri = None
else:
self.uri = data["external_urls"]["spotify"]
def __repr__(self) -> str:
return (
f"<Pomice.spotify.Track name={self.name} artists={self.artists} "
f"length={self.length} id={self.id} isrc={self.isrc}>"
)
class Playlist:
"""The base class for a Spotify playlist"""
def __init__(self, data: dict, tracks: List[Track]) -> None:
self.name = data["name"]
self.tracks = tracks
self.owner = data["owner"]["display_name"]
self.total_tracks = data["tracks"]["total"]
self.id = data["id"]
if data.get("images") and len(data["images"]):
self.image = data["images"][0]["url"]
else:
self.image = None
self.uri = data["external_urls"]["spotify"]
def __repr__(self) -> str:
return (
f"<Pomice.spotify.Playlist name={self.name} owner={self.owner} id={self.id} "
f"total_tracks={self.total_tracks} tracks={self.tracks}>"
)
class Album:
"""The base class for a Spotify album"""
def __init__(self, data: dict) -> None:
self.name = data["name"]
self.artists = ", ".join(artist["name"] for artist in data["artists"])
self.image = data["images"][0]["url"]
self.tracks = [Track(track, image=self.image) for track in data["tracks"]["items"]]
self.total_tracks = data["total_tracks"]
self.id = data["id"]
self.uri = data["external_urls"]["spotify"]
def __repr__(self) -> str:
return (
f"<Pomice.spotify.Album name={self.name} artists={self.artists} id={self.id} "
f"total_tracks={self.total_tracks} tracks={self.tracks}>"
)
class Artist:
"""The base class for a Spotify artist"""
def __init__(self, data: dict, tracks: dict) -> None:
self.name = f"Top tracks for {data['name']}" # Setting that because its only playing top tracks
self.genres = ", ".join(genre for genre in data["genres"])
self.followers = data["followers"]["total"]
self.image = data["images"][0]["url"]
self.tracks = [Track(track, image=self.image) for track in tracks]
self.id = data["id"]
self.uri = data["external_urls"]["spotify"]
def __repr__(self) -> str:
return (
f"<Pomice.spotify.Artist name={self.name} id={self.id} "
f"tracks={self.tracks}>"
)

View File

@ -1,24 +0,0 @@
from .track import Track
from typing import List
class Playlist:
"""The base class for a Spotify playlist"""
def __init__(self, data: dict, tracks: List[Track]) -> None:
self.name = data["name"]
self.tracks = tracks
self.owner = data["owner"]["display_name"]
self.total_tracks = data["tracks"]["total"]
self.id = data["id"]
if data.get("images") and len(data["images"]):
self.image = data["images"][0]["url"]
else:
self.image = None
self.uri = data["external_urls"]["spotify"]
def __repr__(self) -> str:
return (
f"<Pomice.spotify.Playlist name={self.name} owner={self.owner} id={self.id} "
f"total_tracks={self.total_tracks} tracks={self.tracks}>"
)

View File

@ -1,25 +0,0 @@
class Track:
"""The base class for a Spotify Track"""
def __init__(self, data: dict, image = None) -> None:
self.name = data["name"]
self.artists = ", ".join(artist["name"] for artist in data["artists"])
self.length = data["duration_ms"]
self.id = data["id"]
self.isrc = data["external_ids"]["isrc"]
if data.get("album") and data["album"].get("images"):
self.image = data["album"]["images"][0]["url"]
else:
self.image = image
if data["is_local"]:
self.uri = None
else:
self.uri = data["external_urls"]["spotify"]
def __repr__(self) -> str:
return (
f"<Pomice.spotify.Track name={self.name} artists={self.artists} "
f"length={self.length} id={self.id} isrc={self.isrc}>"
)

2
requirements.txt Normal file
View File

@ -0,0 +1,2 @@
aiohttp>=3.7.4,<4
orjson

View File

@ -1,5 +1,8 @@
import setuptools
with open("requirements.txt") as f:
requirements = f.read().splitlines()
with open("README.md") as f:
readme = f.read()
@ -14,7 +17,7 @@ setuptools.setup(
long_description=readme,
long_description_content_type="text/markdown",
include_package_data=True,
install_requires=None,
install_requires=requirements,
extra_require=None,
classifiers=[
"Framework :: AsyncIO",