fix formatting

This commit is contained in:
cloudwithax 2023-03-11 10:41:46 -05:00
parent 8860df99de
commit 4f86e44fec
17 changed files with 185 additions and 142 deletions

2
.gitignore vendored
View File

@ -6,7 +6,7 @@ pomice.egg-info/
docs/_build/
build/
.gitpod.yml
.python-verson
.python-version
Pipfile.lock
.mypy_cache/
.vscode/

View File

@ -11,10 +11,14 @@ repos:
- id: requirements-txt-fixer
- id: trailing-whitespace
- repo: https://github.com/psf/black
rev: stable
rev: 23.1.0
hooks:
- id: black
language_version: python3.8
- repo: https://github.com/asottile/blacken-docs
rev: 1.13.0
hooks:
- id: blacken-docs
- repo: https://github.com/asottile/pyupgrade
rev: v3.3.1
hooks:

View File

@ -4,22 +4,23 @@ import os
import sys
from typing import Any
from typing import Dict
sys.path.insert(0, os.path.abspath('.'))
sys.path.insert(0, os.path.abspath('..'))
sys.path.insert(0, os.path.abspath("."))
sys.path.insert(0, os.path.abspath(".."))
project = 'Pomice'
copyright = '2023, cloudwithax'
author = 'cloudwithax'
project = "Pomice"
copyright = "2023, cloudwithax"
author = "cloudwithax"
release = '2.2'
release = "2.2"
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.autosummary',
'sphinx.ext.linkcode',
'myst_parser',
"sphinx.ext.autodoc",
"sphinx.ext.autosummary",
"sphinx.ext.linkcode",
"myst_parser",
]
myst_enable_extensions = [
@ -40,9 +41,9 @@ myst_enable_extensions = [
myst_heading_anchors = 3
templates_path = ['_templates']
templates_path = ["_templates"]
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
# We need to include this because discord.py has special tags
# they inlcude within their docstrings that dont parse
@ -55,9 +56,9 @@ rst_prolog = """
.. _coroutine_link: https://docs.python.org/3/library/asyncio-task.html#coroutine
"""
html_theme = 'furo'
html_theme = "furo"
html_static_path = ['_static']
html_static_path = ["_static"]
html_title = "Pomice"
@ -89,9 +90,9 @@ def linkcode_resolve(domain, info):
# i absolutely MUST add this here or else
# the docs will not build. fuck sphinx
try:
if domain != 'py':
if domain != "py":
return None
if not info['module']:
if not info["module"]:
return None
mod = importlib.import_module(info["module"])

View File

@ -56,12 +56,14 @@ class Player(pomice.Player):
if track.is_stream:
embed = discord.Embed(
title="Now playing", description=f":red_circle: **LIVE** [{track.title}]({track.uri}) [{track.requester.mention}]",
title="Now playing",
description=f":red_circle: **LIVE** [{track.title}]({track.uri}) [{track.requester.mention}]",
)
self.controller = await self.context.send(embed=embed)
else:
embed = discord.Embed(
title=f"Now playing", description=f"[{track.title}]({track.uri}) [{track.requester.mention}]",
title=f"Now playing",
description=f"[{track.title}]({track.uri}) [{track.requester.mention}]",
)
self.controller = await self.context.send(embed=embed)
@ -110,7 +112,7 @@ class Music(commands.Cog):
channel = self.bot.get_channel(int(player.channel.id))
required = math.ceil((len(channel.members) - 1) / 2.5)
if ctx.command.name == 'stop':
if ctx.command.name == "stop":
if len(channel.members) == 3:
required = 2
@ -140,12 +142,14 @@ class Music(commands.Cog):
async def on_pomice_track_exception(self, player: Player, track, _):
await player.do_next()
@commands.command(aliases=['joi', 'j', 'summon', 'su', 'con', 'connect'])
@commands.command(aliases=["joi", "j", "summon", "su", "con", "connect"])
async def join(self, ctx: commands.Context, *, channel: discord.VoiceChannel = None) -> None:
if not channel:
channel = getattr(ctx.author.voice, "channel", None)
if not channel:
return await ctx.send("You must be in a voice channel in order to use this command!")
return await ctx.send(
"You must be in a voice channel in order to use this command!",
)
# With the release of discord.py 1.7, you can now add a compatible
# VoiceProtocol class as an argument in VoiceChannel.connect().
@ -157,15 +161,18 @@ class Music(commands.Cog):
await player.set_context(ctx=ctx)
await ctx.send(f"Joined the voice channel `{channel.name}`")
@commands.command(aliases=['disconnect', 'dc', 'disc', 'lv', 'fuckoff'])
@commands.command(aliases=["disconnect", "dc", "disc", "lv", "fuckoff"])
async def leave(self, ctx: commands.Context):
if not (player := ctx.voice_client):
return await ctx.send("You must have the bot in a channel in order to use this command", delete_after=7)
return await ctx.send(
"You must have the bot in a channel in order to use this command",
delete_after=7,
)
await player.destroy()
await ctx.send("Player has left the channel.")
@commands.command(aliases=['pla', 'p'])
@commands.command(aliases=["pla", "p"])
async def play(self, ctx: commands.Context, *, search: str) -> None:
# Checks if the player is in the channel before we play anything
if not (player := ctx.voice_client):
@ -194,17 +201,20 @@ class Music(commands.Cog):
if not player.is_playing:
await player.do_next()
@commands.command(aliases=['pau', 'pa'])
@commands.command(aliases=["pau", "pa"])
async def pause(self, ctx: commands.Context):
"""Pause the currently playing song."""
if not (player := ctx.voice_client):
return await ctx.send("You must have the bot in a channel in order to use this command", delete_after=7)
return await ctx.send(
"You must have the bot in a channel in order to use this command",
delete_after=7,
)
if player.is_paused or not player.is_connected:
return
if self.is_privileged(ctx):
await ctx.send('An admin or DJ has paused the player.', delete_after=10)
await ctx.send("An admin or DJ has paused the player.", delete_after=10)
player.pause_votes.clear()
return await player.set_pause(True)
@ -213,23 +223,29 @@ class Music(commands.Cog):
player.pause_votes.add(ctx.author)
if len(player.pause_votes) >= required:
await ctx.send('Vote to pause passed. Pausing player.', delete_after=10)
await ctx.send("Vote to pause passed. Pausing player.", delete_after=10)
player.pause_votes.clear()
await player.set_pause(True)
else:
await ctx.send(f'{ctx.author.mention} has voted to pause the player. Votes: {len(player.pause_votes)}/{required}', delete_after=15)
await ctx.send(
f"{ctx.author.mention} has voted to pause the player. Votes: {len(player.pause_votes)}/{required}",
delete_after=15,
)
@commands.command(aliases=['res', 'r'])
@commands.command(aliases=["res", "r"])
async def resume(self, ctx: commands.Context):
"""Resume a currently paused player."""
if not (player := ctx.voice_client):
return await ctx.send("You must have the bot in a channel in order to use this command", delete_after=7)
return await ctx.send(
"You must have the bot in a channel in order to use this command",
delete_after=7,
)
if not player.is_paused or not player.is_connected:
return
if self.is_privileged(ctx):
await ctx.send('An admin or DJ has resumed the player.', delete_after=10)
await ctx.send("An admin or DJ has resumed the player.", delete_after=10)
player.resume_votes.clear()
return await player.set_pause(False)
@ -238,29 +254,35 @@ class Music(commands.Cog):
player.resume_votes.add(ctx.author)
if len(player.resume_votes) >= required:
await ctx.send('Vote to resume passed. Resuming player.', delete_after=10)
await ctx.send("Vote to resume passed. Resuming player.", delete_after=10)
player.resume_votes.clear()
await player.set_pause(False)
else:
await ctx.send(f'{ctx.author.mention} has voted to resume the player. Votes: {len(player.resume_votes)}/{required}', delete_after=15)
await ctx.send(
f"{ctx.author.mention} has voted to resume the player. Votes: {len(player.resume_votes)}/{required}",
delete_after=15,
)
@commands.command(aliases=['n', 'nex', 'next', 'sk'])
@commands.command(aliases=["n", "nex", "next", "sk"])
async def skip(self, ctx: commands.Context):
"""Skip the currently playing song."""
if not (player := ctx.voice_client):
return await ctx.send("You must have the bot in a channel in order to use this command", delete_after=7)
return await ctx.send(
"You must have the bot in a channel in order to use this command",
delete_after=7,
)
if not player.is_connected:
return
if self.is_privileged(ctx):
await ctx.send('An admin or DJ has skipped the song.', delete_after=10)
await ctx.send("An admin or DJ has skipped the song.", delete_after=10)
player.skip_votes.clear()
return await player.stop()
if ctx.author == player.current.requester:
await ctx.send('The song requester has skipped the song.', delete_after=10)
await ctx.send("The song requester has skipped the song.", delete_after=10)
player.skip_votes.clear()
return await player.stop()
@ -269,48 +291,63 @@ class Music(commands.Cog):
player.skip_votes.add(ctx.author)
if len(player.skip_votes) >= required:
await ctx.send('Vote to skip passed. Skipping song.', delete_after=10)
await ctx.send("Vote to skip passed. Skipping song.", delete_after=10)
player.skip_votes.clear()
await player.stop()
else:
await ctx.send(f'{ctx.author.mention} has voted to skip the song. Votes: {len(player.skip_votes)}/{required} ', delete_after=15)
await ctx.send(
f"{ctx.author.mention} has voted to skip the song. Votes: {len(player.skip_votes)}/{required} ",
delete_after=15,
)
@commands.command()
async def stop(self, ctx: commands.Context):
"""Stop the player and clear all internal states."""
if not (player := ctx.voice_client):
return await ctx.send("You must have the bot in a channel in order to use this command", delete_after=7)
return await ctx.send(
"You must have the bot in a channel in order to use this command",
delete_after=7,
)
if not player.is_connected:
return
if self.is_privileged(ctx):
await ctx.send('An admin or DJ has stopped the player.', delete_after=10)
await ctx.send("An admin or DJ has stopped the player.", delete_after=10)
return await player.teardown()
required = self.required(ctx)
player.stop_votes.add(ctx.author)
if len(player.stop_votes) >= required:
await ctx.send('Vote to stop passed. Stopping the player.', delete_after=10)
await ctx.send("Vote to stop passed. Stopping the player.", delete_after=10)
await player.teardown()
else:
await ctx.send(f'{ctx.author.mention} has voted to stop the player. Votes: {len(player.stop_votes)}/{required}', delete_after=15)
await ctx.send(
f"{ctx.author.mention} has voted to stop the player. Votes: {len(player.stop_votes)}/{required}",
delete_after=15,
)
@commands.command(aliases=['mix', 'shuf'])
@commands.command(aliases=["mix", "shuf"])
async def shuffle(self, ctx: commands.Context):
"""Shuffle the players queue."""
if not (player := ctx.voice_client):
return await ctx.send("You must have the bot in a channel in order to use this command", delete_after=7)
return await ctx.send(
"You must have the bot in a channel in order to use this command",
delete_after=7,
)
if not player.is_connected:
return
if player.queue.qsize() < 3:
return await ctx.send('The queue is empty. Add some songs to shuffle the queue.', delete_after=15)
return await ctx.send(
"The queue is empty. Add some songs to shuffle the queue.",
delete_after=15,
)
if self.is_privileged(ctx):
await ctx.send('An admin or DJ has shuffled the queue.', delete_after=10)
await ctx.send("An admin or DJ has shuffled the queue.", delete_after=10)
player.shuffle_votes.clear()
return player.queue.shuffle()
@ -318,29 +355,35 @@ class Music(commands.Cog):
player.shuffle_votes.add(ctx.author)
if len(player.shuffle_votes) >= required:
await ctx.send('Vote to shuffle passed. Shuffling the queue.', delete_after=10)
await ctx.send("Vote to shuffle passed. Shuffling the queue.", delete_after=10)
player.shuffle_votes.clear()
player.queue.shuffle()
else:
await ctx.send(f'{ctx.author.mention} has voted to shuffle the queue. Votes: {len(player.shuffle_votes)}/{required}', delete_after=15)
await ctx.send(
f"{ctx.author.mention} has voted to shuffle the queue. Votes: {len(player.shuffle_votes)}/{required}",
delete_after=15,
)
@commands.command(aliases=['v', 'vol'])
@commands.command(aliases=["v", "vol"])
async def volume(self, ctx: commands.Context, *, vol: int):
"""Change the players volume, between 1 and 100."""
if not (player := ctx.voice_client):
return await ctx.send("You must have the bot in a channel in order to use this command", delete_after=7)
return await ctx.send(
"You must have the bot in a channel in order to use this command",
delete_after=7,
)
if not player.is_connected:
return
if not self.is_privileged(ctx):
return await ctx.send('Only the DJ or admins may change the volume.')
return await ctx.send("Only the DJ or admins may change the volume.")
if not 0 < vol < 101:
return await ctx.send('Please enter a value between 1 and 100.')
return await ctx.send("Please enter a value between 1 and 100.")
await player.set_volume(vol)
await ctx.send(f'Set the volume to **{vol}**%', delete_after=7)
await ctx.send(f"Set the volume to **{vol}**%", delete_after=7)
async def setup(bot: commands.Bot):

View File

@ -9,7 +9,8 @@ class MyBot(commands.Bot):
super().__init__(
command_prefix="!",
activity=discord.Activity(
type=discord.ActivityType.listening, name="to music!",
type=discord.ActivityType.listening,
name="to music!",
),
)

View File

@ -13,9 +13,7 @@ import orjson as json
from .exceptions import *
from .objects import *
__all__ = (
"Client",
)
__all__ = ("Client",)
AM_URL_REGEX = re.compile(
r"https?://music.apple.com/(?P<country>[a-zA-Z]{2})/(?P<type>album|playlist|song|artist)/(?P<name>.+)/(?P<id>[^?]+)",
@ -109,7 +107,8 @@ class Client:
if type == "artist":
async with self.session.get(
f"{request_url}/view/top-songs", headers=self.headers,
f"{request_url}/view/top-songs",
headers=self.headers,
) as resp:
if resp.status != 200:
raise AppleMusicRequestException(
@ -121,10 +120,7 @@ class Client:
return Artist(data, tracks=artist_tracks)
track_data: dict = data["relationships"]["tracks"]
album_tracks: List[Song] = [
Song(track)
for track in track_data["data"]
]
album_tracks: List[Song] = [Song(track) for track in track_data["data"]]
if not len(album_tracks):
raise AppleMusicRequestException(

View File

@ -61,9 +61,7 @@ class Album:
self.id: str = data["id"]
self.artists: str = data["attributes"]["artistName"]
self.total_tracks: int = data["attributes"]["trackCount"]
self.tracks: List[Song] = [
Song(track) for track in data["relationships"]["tracks"]["data"]
]
self.tracks: List[Song] = [Song(track) for track in data["relationships"]["tracks"]["data"]]
self.image: str = data["attributes"]["artwork"]["url"].replace(
"{w}x{h}",
f'{data["attributes"]["artwork"]["width"]}x{data["attributes"]["artwork"]["height"]}',
@ -83,9 +81,7 @@ class Artist:
self.name: str = f'Top tracks for {data["attributes"]["name"]}'
self.url: str = data["attributes"]["url"]
self.id: str = data["id"]
self.genres: str = ", ".join(
genre for genre in data["attributes"]["genreNames"]
)
self.genres: str = ", ".join(genre for genre in data["attributes"]["genreNames"])
self.tracks: List[Song] = [Song(track) for track in tracks]
self.image: str = data["attributes"]["artwork"]["url"].replace(
"{w}x{h}",

View File

@ -58,7 +58,6 @@ class TrackStartEvent(PomiceEvent):
)
def __init__(self, data: dict, player: Player):
self.player: Player = player
assert self.player._current is not None
self.track: Track = self.player._current
@ -80,7 +79,6 @@ class TrackEndEvent(PomiceEvent):
__slots__ = ("player", "track", "reason")
def __init__(self, data: dict, player: Player):
self.player: Player = player
assert self.player._ending_track is not None
self.track: Track = self.player._ending_track
@ -107,7 +105,6 @@ class TrackStuckEvent(PomiceEvent):
__slots__ = ("player", "track", "threshold")
def __init__(self, data: dict, player: Player):
self.player: Player = player
assert self.player._ending_track is not None
self.track: Track = self.player._ending_track
@ -133,13 +130,13 @@ class TrackExceptionEvent(PomiceEvent):
__slots__ = ("player", "track", "exception")
def __init__(self, data: dict, player: Player):
self.player: Player = player
assert self.player._ending_track is not None
self.track: Track = self.player._ending_track
# Error is for Lavalink <= 3.3
self.exception: str = data.get(
"error", "",
"error",
"",
) or data.get("exception", "")
# on_pomice_track_exception(player, track, error)
@ -153,9 +150,7 @@ class WebSocketClosedPayload:
__slots__ = ("guild", "code", "reason", "by_remote")
def __init__(self, data: dict):
self.guild: Optional[Guild] = NodePool.get_node(
).bot.get_guild(int(data["guildId"]))
self.guild: Optional[Guild] = NodePool.get_node().bot.get_guild(int(data["guildId"]))
self.code: int = data["code"]
self.reason: str = data["code"]
self.by_remote: bool = data["byRemote"]
@ -196,7 +191,6 @@ class WebSocketOpenEvent(PomiceEvent):
__slots__ = ("target", "ssrc")
def __init__(self, data: dict, _: Any) -> None:
self.target: str = data["target"]
self.ssrc: int = data["ssrc"]

View File

@ -236,6 +236,7 @@ class Karaoke(Filter):
"""Filter which filters the vocal track from any song and leaves the instrumental.
Best for karaoke as the filter implies.
"""
__slots__ = ("level", "mono_level", "filter_band", "filter_width")
def __init__(
@ -274,6 +275,7 @@ class Tremolo(Filter):
"""Filter which produces a wavering tone in the music,
causing it to sound like the music is changing in volume rapidly.
"""
__slots__ = ("frequency", "depth")
def __init__(self, *, tag: str, frequency: float = 2.0, depth: float = 0.5):
@ -293,7 +295,8 @@ class Tremolo(Filter):
self.payload: dict = {
"tremolo": {
"frequency": self.frequency, "depth": self.depth,
"frequency": self.frequency,
"depth": self.depth,
},
}
@ -307,6 +310,7 @@ class Vibrato(Filter):
"""Filter which produces a wavering tone in the music, similar to the Tremolo filter,
but changes in pitch rather than volume.
"""
__slots__ = ("frequency", "depth")
def __init__(self, *, tag: str, frequency: float = 2.0, depth: float = 0.5):
@ -326,7 +330,8 @@ class Vibrato(Filter):
self.payload: dict = {
"vibrato": {
"frequency": self.frequency, "depth": self.depth,
"frequency": self.frequency,
"depth": self.depth,
},
}
@ -479,6 +484,7 @@ class LowPass(Filter):
"""Filter which supresses higher frequencies and allows lower frequencies to pass.
You can also do this with the Equalizer filter, but this is an easier way to do it.
"""
__slots__ = ("smoothing", "payload")
def __init__(self, *, tag: str, smoothing: float = 20):

View File

@ -60,7 +60,6 @@ class Track:
timestamp: Optional[float] = None,
requester: Optional[Union[Member, User, ClientUser]] = None,
):
self.track_id: str = track_id
self.info: dict = info
self.track_type: TrackType = track_type
@ -137,7 +136,6 @@ class Playlist:
thumbnail: Optional[str] = None,
uri: Optional[str] = None,
):
self.playlist_info: dict = playlist_info
self.tracks: List[Track] = tracks
self.name: str = playlist_info.get("name", "Unknown Playlist")

View File

@ -135,7 +135,6 @@ class Player(VoiceProtocol):
*,
node: Optional[Node] = None,
) -> None:
self.client: Client = client
self.channel: VoiceChannel = channel
@ -353,7 +352,9 @@ class Player(VoiceProtocol):
self, *, timeout: float, reconnect: bool, self_deaf: bool = False, self_mute: bool = False
) -> None:
await self.guild.change_voice_state(
channel=self.channel, self_deaf=self_deaf, self_mute=self_mute,
channel=self.channel,
self_deaf=self_deaf,
self_mute=self_mute,
)
self._node._players[self.guild.id] = self
self._is_connected = True
@ -388,7 +389,9 @@ class Player(VoiceProtocol):
self._node._players.pop(self.guild.id)
await self._node.send(
method="DELETE", path=self._player_endpoint_uri, guild_id=self._guild.id,
method="DELETE",
path=self._player_endpoint_uri,
guild_id=self._guild.id,
)
async def play(
@ -405,15 +408,20 @@ class Player(VoiceProtocol):
raise
search = (
await self._node.get_tracks(f"{track._search_type}:{track.isrc}", ctx=track.ctx)
)[0] # type: ignore
)[
0
] # type: ignore
except Exception:
# First method didn't work, lets try just searching it up
try:
search = (
await self._node.get_tracks(
f"{track._search_type}:{track.title} - {track.author}", ctx=track.ctx,
f"{track._search_type}:{track.title} - {track.author}",
ctx=track.ctx,
)
)[0] # type: ignore
)[
0
] # type: ignore
except:
# The song wasn't able to be found, raise error
raise TrackLoadError(

View File

@ -102,7 +102,6 @@ class Node:
apple_music: bool = False,
fallback: bool = False,
):
self._bot: commands.Bot = bot
self._host: str = host
self._port: int = port
@ -147,7 +146,8 @@ class Node:
if self._spotify_client_id and self._spotify_client_secret:
self._spotify_client: spotify.Client = spotify.Client(
self._spotify_client_id, self._spotify_client_secret,
self._spotify_client_id,
self._spotify_client_secret,
)
if apple_music:
@ -227,10 +227,7 @@ class Node:
return
async def _handle_node_switch(self) -> None:
nodes = [
node for node in self.pool._nodes.copy().values()
if node.is_connected
]
nodes = [node for node in self.pool._nodes.copy().values() if node.is_connected]
new_node = random.choice(nodes)
for player in self.players.copy().values():
@ -304,7 +301,10 @@ class Node:
)
async with self._session.request(
method=method, url=uri, headers=self._headers, json=data or {},
method=method,
url=uri,
headers=self._headers,
json=data or {},
) as resp:
if resp.status >= 300:
resp_data: dict = await resp.json()
@ -405,7 +405,9 @@ class Node:
"""
data: dict = await self.send(
method="GET", path="decodetrack", query=f"encodedTrack={identifier}",
method="GET",
path="decodetrack",
query=f"encodedTrack={identifier}",
)
return Track(
track_id=identifier,
@ -499,7 +501,8 @@ class Node:
return Playlist(
playlist_info={
"name": apple_music_results.name, "selectedTrack": 0,
"name": apple_music_results.name,
"selectedTrack": 0,
},
tracks=tracks,
playlist_type=PlaylistType.APPLE_MUSIC,
@ -565,7 +568,8 @@ class Node:
return Playlist(
playlist_info={
"name": spotify_results.name, "selectedTrack": 0,
"name": spotify_results.name,
"selectedTrack": 0,
},
tracks=tracks,
playlist_type=PlaylistType.SPOTIFY,
@ -575,7 +579,9 @@ class Node:
elif discord_url := URLRegex.DISCORD_MP3_URL.match(query):
data: dict = await self.send(
method="GET", path="loadtracks", query=f"identifier={quote(query)}",
method="GET",
path="loadtracks",
query=f"identifier={quote(query)}",
)
track: dict = data["tracks"][0]
@ -611,7 +617,9 @@ class Node:
query = match.group("video")
data = await self.send(
method="GET", path="loadtracks", query=f"identifier={quote(query)}",
method="GET",
path="loadtracks",
query=f"identifier={quote(query)}",
)
load_type = data.get("loadType")
@ -748,9 +756,7 @@ class NodePool:
based on how players it has. This method will return a node with
the least amount of players
"""
available_nodes: List[Node] = [
node for node in cls._nodes.values() if node._available
]
available_nodes: List[Node] = [node for node in cls._nodes.values() if node._available]
if not available_nodes:
raise NoNodesAvailable("There are no nodes available.")
@ -760,10 +766,7 @@ class NodePool:
return min(tested_nodes, key=tested_nodes.get) # type: ignore
elif algorithm == NodeAlgorithm.by_players:
tested_nodes = {
node: len(node.players.keys())
for node in available_nodes
}
tested_nodes = {node: len(node.players.keys()) for node in available_nodes}
return min(tested_nodes, key=tested_nodes.get) # type: ignore
else:
@ -839,9 +842,7 @@ class NodePool:
async def disconnect(cls) -> None:
"""Disconnects all available nodes from the node pool."""
available_nodes: List[Node] = [
node for node in cls._nodes.values() if node._available
]
available_nodes: List[Node] = [node for node in cls._nodes.values() if node._available]
for node in available_nodes:
await node.disconnect()

View File

@ -14,9 +14,7 @@ from .exceptions import QueueException
from .exceptions import QueueFull
from .objects import Track
__all__ = (
"Queue",
)
__all__ = ("Queue",)
class Queue(Iterable[Track]):
@ -36,7 +34,6 @@ class Queue(Iterable[Track]):
*,
overflow: bool = True,
):
self.max_size: Optional[int] = max_size
self._current_item: Track
self._queue: List[Track] = []
@ -353,5 +350,5 @@ class Queue(Iterable[Track]):
def jump(self, item: Track) -> None:
"""Removes all tracks before the."""
index = self.find_position(item)
new_queue = self._queue[index: self.size]
new_queue = self._queue[index : self.size]
self._queue = new_queue

View File

@ -15,9 +15,7 @@ from .exceptions import InvalidSpotifyURL
from .exceptions import SpotifyRequestException
from .objects import *
__all__ = (
"Client",
)
__all__ = ("Client",)
GRANT_URL = "https://accounts.spotify.com/api/token"
@ -96,7 +94,8 @@ class Client:
return Album(data)
elif spotify_type == "artist":
async with self.session.get(
f"{request_url}/top-tracks?market=US", headers=self._bearer_headers,
f"{request_url}/top-tracks?market=US",
headers=self._bearer_headers,
) as resp:
if resp.status != 200:
raise SpotifyRequestException(
@ -155,7 +154,8 @@ class Client:
)
request_url = REQUEST_URL.format(
type="recommendation", id=f"?seed_tracks={spotify_id}",
type="recommendation",
id=f"?seed_tracks={spotify_id}",
)
async with self.session.get(request_url, headers=self._bearer_headers) as resp:

View File

@ -14,9 +14,7 @@ class Track:
def __init__(self, data: dict, image: Optional[str] = None) -> None:
self.name: str = data["name"]
self.artists: str = ", ".join(
artist["name"] for artist in data["artists"]
)
self.artists: str = ", ".join(artist["name"] for artist in data["artists"])
self.length: float = data["duration_ms"]
self.id: str = data["id"]
@ -66,14 +64,9 @@ class Album:
def __init__(self, data: dict) -> None:
self.name: str = data["name"]
self.artists: str = ", ".join(
artist["name"] for artist in data["artists"]
)
self.artists: str = ", ".join(artist["name"] for artist in data["artists"])
self.image: str = data["images"][0]["url"]
self.tracks = [
Track(track, image=self.image)
for track in data["tracks"]["items"]
]
self.tracks = [Track(track, image=self.image) for track in data["tracks"]["items"]]
self.total_tracks: int = data["total_tracks"]
self.id: str = data["id"]
self.uri: str = data["external_urls"]["spotify"]

View File

@ -87,7 +87,6 @@ class NodeStats:
)
def __init__(self, data: Dict[str, Any]) -> None:
memory: dict = data.get("memory", {})
self.used = memory.get("used")
self.free = memory.get("free")
@ -117,7 +116,6 @@ class FailingIPBlock:
__slots__ = ("address", "failing_time")
def __init__(self, data: dict) -> None:
self.address = data.get("address")
self.failing_time = datetime.fromtimestamp(
float(data.get("failingTimestamp", 0)),
@ -143,7 +141,6 @@ class RouteStats:
)
def __init__(self, data: Dict[str, Any]) -> None:
self.strategy = RouteStrategy(data.get("class"))
details: dict = data.get("details", {})
@ -154,7 +151,8 @@ class RouteStats:
self.failing_addresses = [
FailingIPBlock(
data,
) for data in details.get("failingAddresses", [])
)
for data in details.get("failingAddresses", [])
]
self.block_index = details.get("blockIndex")
@ -220,7 +218,8 @@ class Ping:
s = self._create_socket(socket.AF_INET, socket.SOCK_STREAM)
cost_time = self.timer.cost(
(s.connect, s.shutdown), ((self._host, self._port), None),
(s.connect, s.shutdown),
((self._host, self._port), None),
)
s_runtime = 1000 * (cost_time)

View File

@ -6,7 +6,9 @@ version = ""
requirements = ["discord.py>=2.0.0", "aiohttp>=3.7.4,<4", "orjson"]
with open("pomice/__init__.py") as f:
version = re.search(
r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]', f.read(), re.MULTILINE,
r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]',
f.read(),
re.MULTILINE,
).group(1)
if not version:
@ -18,13 +20,17 @@ if version.endswith(("a", "b", "rc")):
import subprocess
p = subprocess.Popen(
["git", "rev-list", "--count", "HEAD"], stdout=subprocess.PIPE, stderr=subprocess.PIPE,
["git", "rev-list", "--count", "HEAD"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
out, err = p.communicate()
if out:
version += out.decode("utf-8").strip()
p = subprocess.Popen(
["git", "rev-parse", "--short", "HEAD"], stdout=subprocess.PIPE, stderr=subprocess.PIPE,
["git", "rev-parse", "--short", "HEAD"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
out, err = p.communicate()
if out: