style changes

This commit is contained in:
vveeps 2021-10-03 23:27:26 +03:00
parent c0b4e1a446
commit 032dc0dc65
10 changed files with 474 additions and 303 deletions

View File

@ -4,7 +4,6 @@ __version__ = "1.0.3"
__title__ = "pomice"
__author__ = "cloudwithax"
from .exceptions import *
from .events import *
from .filters import *

View File

@ -3,47 +3,54 @@ from .pool import NodePool
class PomiceEvent:
"""The base class for all events dispatched by a node.
Every event must be formatted within your bots code as a listener.
Every event must be formatted within your botäs code as a listener.
i.e: If you want to listen for when a track starts, the event would be:
```py
@bot.listen
async def on_pomice_track_start(self, event):
```
"""
def __init__(self):
pass
name = "event"
name = 'event'
class TrackStartEvent(PomiceEvent):
"""Fired when a track has successfully started. Returns the player associated with said track and the track ID"""
"""Fired when a track has successfully started.
Returns the player associated with the track and the track ID
"""
def __init__(self, data):
super().__init__()
self.name = "track_start"
self.player = NodePool.get_node().get_player(int(data["guildId"]))
self.track_id = data['track']
self.track_id = data["track"]
def __repr__(self) -> str:
return f"<Pomice.TrackStartEvent track_id={self.track_id}>"
class TrackEndEvent(PomiceEvent):
"""Fired when a track has successfully ended. Returns the player associated with the track along with the track ID and reason."""
"""Fired when a track has successfully ended.
Returns the player associated with the track along with the track ID and reason.
"""
def __init__(self, data):
super().__init__()
self.name = "track_end"
self.player = NodePool.get_node().get_player(int(data["guildId"]))
self.track_id = data['track']
self.reason = data['reason']
self.track_id = data["track"]
self.reason = data["reason"]
def __repr__(self) -> str:
return f"<Pomice.TrackEndEvent track_id={self.track_id} reason={self.reason}>"
class TrackStuckEvent(PomiceEvent):
"""Fired when a track is stuck and cannot be played. Returns the player associated with the track along with a track ID to be further parsed by the end user."""
"""Fired when a track is stuck and cannot be played. Returns the player
associated with the track along with a track ID to be further parsed by the end user.
"""
def __init__(self, data):
super().__init__()
@ -56,8 +63,12 @@ class TrackStuckEvent(PomiceEvent):
def __repr__(self) -> str:
return f"<Pomice.TrackStuckEvent track_id={self.track_id} threshold={self.threshold}>"
class TrackExceptionEvent(PomiceEvent):
"""Fired when a track error has occured. Returns the player associated with the track along with the error code and execption"""
"""Fired when a track error has occured.
Returns the player associated with the track along with the error code and exception.
"""
def __init__(self, data):
super().__init__()
@ -70,8 +81,12 @@ class TrackExceptionEvent(PomiceEvent):
def __repr__(self) -> str:
return f"<Pomice.TrackExceptionEvent> error={self.error} exeception={self.exception}"
class WebsocketClosedEvent(PomiceEvent):
"""Fired when a websocket connection to a node has been closed. Returns the reason and the error code."""
"""Fired when a websocket connection to a node has been closed.
Returns the reason and the error code.
"""
def __init__(self, data):
super().__init__()
@ -83,15 +98,19 @@ class WebsocketClosedEvent(PomiceEvent):
def __repr__(self) -> str:
return f"<Pomice.WebsocketClosedEvent reason={self.reason} code={self.code}>"
class WebsocketOpenEvent(PomiceEvent):
"""Fired when a websocket connection to a node has been initiated. Returns the target and the session SSRC."""
"""Fired when a websocket connection to a node has been initiated.
Returns the target and the session SSRC.
"""
def __init__(self, data):
super().__init__()
self.name = "websocket_open"
self.target: str = data['target']
self.ssrc: int = data['ssrc']
self.target: str = data["target"]
self.ssrc: int = data["ssrc"]
def __repr__(self) -> str:
return f"<Pomice.WebsocketOpenEvent target={self.target} ssrc={self.ssrc}>"

View File

@ -15,12 +15,12 @@ class NodeConnectionFailure(NodeException):
class NodeConnectionClosed(NodeException):
"""The nodes connection is closed."""
"""The node's connection is closed."""
pass
class NodeNotAvailable(PomiceException):
"""The node is not currently available."""
"""The node is currently unavailable."""
pass
@ -43,18 +43,22 @@ class FilterInvalidArgument(PomiceException):
"""An invalid argument was passed to a filter."""
pass
class SpotifyAlbumLoadFailed(PomiceException):
"""The pomice Spotify client was unable to load an album"""
"""The pomice Spotify client was unable to load an album."""
pass
class SpotifyTrackLoadFailed(PomiceException):
"""The pomice Spotify client was unable to load a track"""
"""The pomice Spotify client was unable to load a track."""
pass
class SpotifyPlaylistLoadFailed(PomiceException):
"""The pomice Spotify client was unable to load a playlist"""
"""The pomice Spotify client was unable to load a playlist."""
pass
class InvalidSpotifyClientAuthorization(PomiceException):
"""No Spotify client authorization was provided in order to use the Spotify track search feature"""
"""No Spotify client authorization was provided for track searching."""
pass

View File

@ -1,27 +1,29 @@
from . import exceptions
from .exceptions import FilterInvalidArgument
class Filter:
def __init__(self):
self.payload = None
class Timescale(Filter):
"""Filter which changes the speed and pitch of a track. Do be warned that this filter is bugged as of the lastest
Lavalink dev version due to the filter patch not corresponding with the track time. In short this means that your
track will either end prematurely or end later due to this. This is not the library's fault.
"""Filter which changes the speed and pitch of a track.
Do be warned that this filter is bugged as of the lastest Lavalink dev version
due to the filter patch not corresponding with the track time.
In short this means that your track will either end prematurely or end later due to this.
This is not the library's fault.
"""
def __init__(self, *, speed: float = 1.0, pitch: float = 1.0, rate: float = 1.0):
super().__init__()
if speed < 0:
raise exceptions.FilterInvalidArgument("Timescale speed must be more than 0.")
raise FilterInvalidArgument("Timescale speed must be more than 0.")
if pitch < 0:
raise exceptions.FilterInvalidArgument("Timescale pitch must be more than 0.")
raise FilterInvalidArgument("Timescale pitch must be more than 0.")
if rate < 0:
raise exceptions.FilterInvalidArgument("Timescale rate must be more than 0.")
raise FilterInvalidArgument("Timescale rate must be more than 0.")
self.speed = speed
self.pitch = pitch
@ -36,11 +38,18 @@ class Timescale(Filter):
class Karaoke(Filter):
"""
Filter which filters the vocal track from any song and leaves the instrumental. Best for karaoke as the filter implies.
"""Filter which filters the vocal track from any song and leaves the instrumental.
Best for karaoke as the filter implies.
"""
def __init__(self, *, level: float, mono_level: float, filter_band: float, filter_width: float):
def __init__(
self,
*,
level: float,
mono_level: float,
filter_band: float,
filter_width: float
):
super().__init__()
self.level = level
@ -54,19 +63,24 @@ class Karaoke(Filter):
"filterWidth": self.filter_width}}
def __repr__(self):
return f"<Pomice.KaraokeFilter level={self.level} mono_level={self.mono_level} filter_band={self.filter_band} filter_width={self.filter_width}>"
return (
f"<Pomice.KaraokeFilter level={self.level} mono_level={self.mono_level} "
f"filter_band={self.filter_band} filter_width={self.filter_width}>"
)
class Tremolo(Filter):
"""Filter which produces a wavering tone in the music, causing it to sound like the music is changing in volume rapidly."""
"""Filter which produces a wavering tone in the music,
causing it to sound like the music is changing in volume rapidly.
"""
def __init__(self, *, frequency: float, depth: float):
super().__init__()
if frequency < 0:
raise exceptions.FilterInvalidArgument("Tremolo frequency must be more than 0.")
raise FilterInvalidArgument("Tremolo frequency must be more than 0.")
if depth < 0 or depth > 1:
raise exceptions.FilterInvalidArgument("Tremolo depth must be between 0 and 1.")
raise FilterInvalidArgument("Tremolo depth must be between 0 and 1.")
self.frequency = frequency
self.depth = depth
@ -79,15 +93,17 @@ class Tremolo(Filter):
class Vibrato(Filter):
"""Filter which produces a wavering tone in the music, similar to the Tremolo filter, but changes in pitch rather than volume."""
"""Filter which produces a wavering tone in the music, similar to the Tremolo filter,
but changes in pitch rather than volume.
"""
def __init__(self, *, frequency: float, depth: float):
super().__init__()
if frequency < 0 or frequency > 14:
raise exceptions.FilterInvalidArgument("Vibrato frequency must be between 0 and 14.")
raise FilterInvalidArgument("Vibrato frequency must be between 0 and 14.")
if depth < 0 or depth > 1:
raise exceptions.FilterInvalidArgument("Vibrato depth must be between 0 and 1.")
raise FilterInvalidArgument("Vibrato depth must be between 0 and 1.")
self.frequency = frequency
self.depth = depth

View File

@ -1,31 +1,50 @@
from os import strerror
import aiohttp
import discord
import asyncio
import typing
import json
import re
import socket
import time
import re
from discord.ext import commands
from typing import Optional, Union
from typing import Optional, Type
from urllib.parse import quote
from . import spotify
from . import events
from . import exceptions
from . import objects
from . import __version__
import aiohttp
import discord
from discord.ext import commands
from . import __version__, objects, spotify
from .exceptions import (
InvalidSpotifyClientAuthorization,
NodeConnectionFailure,
NodeNotAvailable,
SpotifyAlbumLoadFailed,
SpotifyPlaylistLoadFailed,
SpotifyTrackLoadFailed,
TrackLoadError
)
from .spotify import SpotifyException
from .utils import ExponentialBackoff, NodeStats
SPOTIFY_URL_REGEX = re.compile(
r"https?://open.spotify.com/(?P<type>album|playlist|track)/(?P<id>[a-zA-Z0-9]+)"
)
SPOTIFY_URL_REGEX = re.compile(r'https?://open.spotify.com/(?P<type>album|playlist|track)/(?P<id>[a-zA-Z0-9]+)')
class Node:
"""The base class for a node.
This node object represents a Lavalink node.
If you want to enable Spotify searching, pass in a proper Spotify Client ID and Spotify Client Secret"""
def __init__(self, pool, bot: Union[commands.Bot, discord.Client, commands.AutoShardedBot, discord.AutoShardedClient], host: str, port: int, password: str, identifier: str, spotify_client_id: Optional[str], spotify_client_secret: Optional[str]):
To enable Spotify searching, pass in a proper Spotify Client ID and Spotify Client Secret
"""
def __init__(
self,
pool,
bot: Type[discord.Client],
host: str,
port: int,
password: str,
identifier: str,
spotify_client_id: Optional[str],
spotify_client_secret: Optional[str]
):
self._bot = bot
self._host = host
self._port = port
@ -52,17 +71,24 @@ class Node:
self._players = {}
self._spotify_client_id: str = spotify_client_id
self._spotify_client_secret: str = spotify_client_secret
self._spotify_client_id = spotify_client_id
self._spotify_client_secret = spotify_client_secret
if self._spotify_client_id and self._spotify_client_secret:
self._spotify_client: spotify.Client = spotify.Client(self._spotify_client_id, self._spotify_client_secret)
self._spotify_http_client: spotify.HTTPClient = spotify.HTTPClient(self._spotify_client_id, self._spotify_client_secret)
self._spotify_client = spotify.Client(
self._spotify_client_id, self._spotify_client_secret
)
self._spotify_http_client = spotify.HTTPClient(
self._spotify_client_id, self._spotify_client_secret
)
self._bot.add_listener(self._update_handler, "on_socket_response")
def __repr__(self):
return f"<Pomice.node ws_uri={self._websocket_uri} rest_uri={self._rest_uri} player_count={len(self._players)}>"
return (
f"<Pomice.node ws_uri={self._websocket_uri} rest_uri={self._rest_uri} "
f"player_count={len(self._players)}>"
)
@property
def is_connected(self) -> bool:
@ -74,15 +100,14 @@ class Node:
"""Property which returns the latency of the node in milliseconds"""
start_time = time.time()
await self.send(op="ping")
end_time = await self._bot.wait_for(f"node_ping")
end_time = await self._bot.wait_for("node_ping")
return (end_time - start_time) * 1000
@property
async def stats(self):
"""Property which returns the node stats at any given time.
Typically, accessing this property is rare due to the fact that Lavalink automatically sends updated node stats every minutes."""
"""Property which returns the node stats."""
await self.send(op="get-stats")
node_stats = await self._bot.wait_for(f"node_stats")
node_stats = await self._bot.wait_for("node_stats")
return node_stats
@property
@ -92,9 +117,13 @@ class Node:
@property
def bot(self):
"""Property which returns the discord.py Bot object linked to this node"""
"""Property which returns the discord.py client linked to this node"""
return self._bot
@property
def player_count(self):
return len(self.players)
@property
def pool(self):
"""Property which returns the node pool this node is a part of."""
@ -106,9 +135,7 @@ class Node:
if not data:
return
if data["t"] == "VOICE_SERVER_UPDATE":
guild_id = int(data["d"]["guild_id"])
try:
player = self._players[guild_id]
@ -117,7 +144,6 @@ class Node:
return
elif data["t"] == "VOICE_STATE_UPDATE":
if int(data["d"]["user_id"]) != self._bot.user.id:
return
@ -128,9 +154,6 @@ class Node:
except KeyError:
return
else:
return
async def _listen(self):
backoff = ExponentialBackoff(base=7)
@ -145,27 +168,27 @@ class Node:
else:
self._bot.loop.create_task(self._handle_payload(msg.json()))
async def _handle_payload(self, data: dict) -> None:
op = data.get('op', None)
async def _handle_payload(self, data: dict):
op = data.get("op", None)
if not op:
return
if op == 'stats':
if op == "stats":
self._stats = NodeStats(data)
return
if not (player := self._players.get(int(data['guildId']))):
if not (player := self._players.get(int(data["guildId"]))):
return
if op == 'event':
if op == "event":
await player._dispatch_event(data)
elif op == 'playerUpdate':
elif op == "playerUpdate":
await player._update_state(data)
async def send(self, **data):
if not self.available:
raise exceptions.NodeNotAvailable(f"The node '{self.identifier}' is not currently available.")
raise NodeNotAvailable(
f"The node '{self.identifier}' is not currently available.")
await self._websocket.send_str(json.dumps(data))
@ -178,21 +201,30 @@ class Node:
await self._bot.wait_until_ready()
try:
self._websocket = await self._session.ws_connect(self._websocket_uri, headers=self._headers, heartbeat=60)
self._websocket = await self._session.ws_connect(
self._websocket_uri, headers=self._headers, heartbeat=60
)
self._task = self._bot.loop.create_task(self._listen())
self._pool._nodes[self._identifier] = self
self.available = True
return self
except aiohttp.WSServerHandshakeError:
raise exceptions.NodeConnectionFailure(f"The password for node '{self.identifier}' is invalid.")
raise NodeConnectionFailure(
f"The password for node '{self.identifier}' is invalid."
)
except aiohttp.InvalidURL:
raise exceptions.NodeConnectionFailure(f"The URI for node '{self.identifier}' is invalid.")
raise NodeConnectionFailure(
f"The URI for node '{self.identifier}' is invalid."
)
except socket.gaierror:
raise exceptions.NodeConnectionFailure(f"The node '{self.identifier}' failed to connect.")
raise NodeConnectionFailure(
f"The node '{self.identifier}' failed to connect."
)
async def disconnect(self):
"""Disconnects a connected Lavalink node and removes it from the node pool. This also destroys any players connected to the node."""
"""Disconnects a connected Lavalink node and removes it from the node pool.
This also destroys any players connected to the node.
"""
for player in self.players.copy().values():
await player.destroy()
@ -203,20 +235,30 @@ class Node:
async def get_tracks(self, query: str, ctx: commands.Context = None):
"""Fetches tracks from the node's REST api to parse into Lavalink.
If you passed in Spotify API credentials, you can also pass in a Spotify URL of a playlist, album or track
and it will be parsed accordingly.
You can also pass in a discord.py Context object to get a Context object on any track you search.
If you passed in Spotify API credentials, you can also pass in a
Spotify URL of a playlist, album or track and it will be parsed accordingly.
You can also pass in a discord.py Context object to get a
Context object on any track you search.
"""
if spotify_url_check := SPOTIFY_URL_REGEX.match(query):
if not self._spotify_client_id and not self._spotify_client_secret:
raise exceptions.InvalidSpotifyClientAuthorization("You did not provide proper Spotify client authorization credentials. If you would like to use the Spotify searching feature, please obtain Spotify API credentials here: https://developer.spotify.com/")
raise InvalidSpotifyClientAuthorization(
"You did not provide proper Spotify client authorization credentials. "
"If you would like to use the Spotify searching feature, "
"please obtain Spotify API credentials here: https://developer.spotify.com/"
)
search_type = spotify_url_check.group("type")
spotify_id = spotify_url_check.group("id")
search_type = spotify_url_check.group('type')
spotify_id = spotify_url_check.group('id')
if search_type == "playlist":
results: spotify.Playlist = spotify.Playlist(client=self._spotify_client, data=await self._spotify_http_client.get_playlist(spotify_id))
results = spotify.Playlist(
client=self._spotify_client,
data=await self._spotify_http_client.get_playlist(spotify_id)
)
try:
search_tracks = await results.get_all_tracks()
tracks = [
@ -224,17 +266,37 @@ class Node:
track_id=track.id,
ctx=ctx,
spotify=True,
info={'title': track.name or 'Unknown', 'author': ', '.join(artist.name for artist in track.artists) or 'Unknown',
'length': track.duration or 0, 'identifier': track.id or 'Unknown', 'uri': track.url or 'spotify',
'isStream': False, 'isSeekable': False, 'position': 0, 'thumbnail': track.images[0].url if track.images else None},
info={
"title": track.name or "Unknown",
"author": ", ".join(
artist.name for artist in track.artists
) or "Unknown",
"length": track.duration or 0,
"identifier": track.id or "Unknown",
"uri": track.url or "spotify",
"isStream": False,
"isSeekable": False,
"position": 0,
"thumbnail": track.images[0].url if track.images else None
},
) for track in search_tracks
]
return objects.Playlist(playlist_info={"name": results.name, "selectedTrack": tracks[0]}, tracks=tracks, ctx=ctx, spotify=True)
except:
raise exceptions.SpotifyPlaylistLoadFailed(f"Unable to find results for {query}")
return objects.Playlist(
playlist_info={"name": results.name, "selectedTrack": tracks[0]},
tracks=tracks,
ctx=ctx,
spotify=True
)
except SpotifyException:
raise SpotifyPlaylistLoadFailed(
f"Unable to find results for {query}"
)
elif search_type == "album":
results: spotify.Album = await self._spotify_client.get_album(spotify_id=spotify_id)
results = await self._spotify_client.get_album(spotify_id=spotify_id)
try:
search_tracks = await results.get_all_tracks()
tracks = [
@ -242,49 +304,94 @@ class Node:
track_id=track.id,
ctx=ctx,
spotify=True,
info={'title': track.name or 'Unknown', 'author': ', '.join(artist.name for artist in track.artists) or 'Unknown',
'length': track.duration or 0, 'identifier': track.id or 'Unknown', 'uri': track.url or 'spotify',
'isStream': False, 'isSeekable': False, 'position': 0, 'thumbnail': track.images[0].url if track.images else None},
info={
"title": track.name or "Unknown",
"author": ", ".join(
artist.name for artist in track.artists
) or "Unknown",
"length": track.duration or 0,
"identifier": track.id or "Unknown",
"uri": track.url or "spotify",
"isStream": False,
"isSeekable": False,
"position": 0,
"thumbnail": track.images[0].url if track.images else None
},
) for track in search_tracks
]
return objects.Playlist(playlist_info={"name": results.name, "selectedTrack": tracks[0]}, tracks=tracks, ctx=ctx, spotify=True)
except:
raise exceptions.SpotifyAlbumLoadFailed(f"Unable to find results for {query}")
return objects.Playlist(
playlist_info={"name": results.name, "selectedTrack": tracks[0]},
tracks=tracks,
ctx=ctx,
spotify=True
)
except SpotifyException:
raise SpotifyAlbumLoadFailed(f"Unable to find results for {query}")
elif search_type == 'track':
try:
results: spotify.Track = await self._spotify_client.get_track(spotify_id=spotify_id)
return [objects.Track(
results = await self._spotify_client.get_track(spotify_id=spotify_id)
return [
objects.Track(
track_id=results.id,
ctx=ctx,
spotify=True,
info={'title': results.name or 'Unknown', 'author': ', '.join(artist.name for artist in results.artists) or 'Unknown',
'length': results.duration or 0, 'identifier': results.id or 'Unknown', 'uri': results.url or 'spotify',
'isStream': False, 'isSeekable': False, 'position': 0, 'thumbnail': results.images[0].url if results.images else None},)]
except:
raise exceptions.SpotifyTrackLoadFailed(f"Unable to find results for {query}")
info={
"title": results.name or "Unknown",
"author": ", ".join(
artist.name for artist in results.artists
) or "Unknown",
"length": results.duration or 0,
"identifier": results.id or "Unknown",
"uri": results.url or "spotify",
"isStream": False,
"isSeekable": False,
"position": 0,
"thumbnail": results.images[0].url if results.images else None
},
)
]
except SpotifyException:
raise SpotifyTrackLoadFailed(f"Unable to find results for {query}")
else:
async with self._session.get(url=f"{self._rest_uri}/loadtracks?identifier={quote(query)}", headers={"Authorization": self._password}) as response:
async with self._session.get(
url=f"{self._rest_uri}/loadtracks?identifier={quote(query)}",
headers={"Authorization": self._password}
) as response:
data = await response.json()
load_type = data.get("loadType")
if not load_type:
raise exceptions.TrackLoadError("There was an error while trying to load this track.")
raise TrackLoadError("There was an error while trying to load this track.")
elif load_type == "LOAD_FAILED":
raise exceptions.TrackLoadError(f"There was an error of severity '{data['severity']}' while loading tracks.\n\n{data['cause']}")
raise TrackLoadError(
f"There was an error of severity '{data['severity']}' "
f"while loading tracks.\n\n{data['cause']}"
)
elif load_type == "NO_MATCHES":
return None
elif load_type == "PLAYLIST_LOADED":
return objects.Playlist(playlist_info=data["playlistInfo"], tracks=data["tracks"], ctx=ctx)
return objects.Playlist(
playlist_info=data["playlistInfo"],
tracks=data["tracks"],
ctx=ctx
)
elif load_type == "SEARCH_RESULT" or load_type == "TRACK_LOADED":
return [objects.Track(track_id=track["track"], info=track["info"], ctx=ctx) for track in data["tracks"]]
return [
objects.Track(
track_id=track["track"],
info=track["info"],
ctx=ctx
)
for track in data["tracks"]
]

View File

@ -1,13 +1,20 @@
from typing import Optional
from discord.ext import commands
class Track:
"""
The base track object. Returns critical track information needed to be parsed by Lavalink.
You can also pass in commands.Context to get a discord.py Context object by passing in a valid Context object when you search for a track.
"""The base track object. Returns critical track information needed for parsing by Lavalink.
You can also pass in commands.Context to get a discord.py Context object in your track.
"""
def __init__(self, track_id: str, info: dict, ctx: commands.Context = None, spotify: bool = False):
def __init__(
self,
track_id: str,
info: dict,
ctx: Optional[commands.Context] = None,
spotify: bool = False
):
self.track_id = track_id
self.info = info
self.spotify = spotify
@ -15,9 +22,8 @@ class Track:
self.title = info.get("title")
self.author = info.get("author")
self.length = info.get("length")
if ctx:
self.ctx: commands.Context = ctx
self.requester = self.ctx.author
self.ctx = ctx
self.requester = self.ctx.author if ctx else None
self.identifier = info.get("identifier")
self.uri = info.get("uri")
self.is_stream = info.get("isStream")
@ -32,13 +38,18 @@ class Track:
class Playlist:
"""
The base playlist object. Returns critcal playlist information like the name of the playlist and what tracks are included to be parsed by Lavalink.
You can also pass in commands.Context to get a discord.py Context object by passing in a valid Context object when you search for a track.
"""The base playlist object.
Returns critical playlist information needed for parsing by Lavalink.
You can also pass in commands.Context to get a discord.py Context object in your tracks.
"""
def __init__(self, playlist_info: dict, tracks: list, ctx: commands.Context = None, spotify: bool = False):
def __init__(
self,
playlist_info: dict,
tracks: list,
ctx: Optional[commands.Context] = None,
spotify: bool = False
):
self.playlist_info = playlist_info
self.tracks_raw = tracks
self.spotify = spotify
@ -46,11 +57,15 @@ class Playlist:
self.name = playlist_info.get("name")
self.selected_track = playlist_info.get("selectedTrack")
if self.spotify == True:
if self.spotify:
self.tracks = tracks
else:
self.tracks = [Track(track_id=track["track"], info=track["info"], ctx=ctx) for track in self.tracks_raw]
self.tracks = [
Track(track_id=track["track"], info=track["info"], ctx=ctx)
for track in self.tracks_raw
]
self.track_count = len(self.tracks)
def __str__(self):
return self.name

View File

@ -1,62 +1,60 @@
import time
from typing import Any, Dict, Type
import discord
from . import exceptions
from . import filters
from . import objects
from .node import Node
from .pool import NodePool
from . import events
import discord
from discord import VoiceProtocol, VoiceChannel
from discord import VoiceChannel, VoiceProtocol
from discord.ext import commands
from typing import Dict, Optional, Any, Union
from . import events, filters, objects
from .exceptions import TrackInvalidPosition
from .pool import NodePool
class Player(VoiceProtocol):
"""The base player class for Pomice. In order to initiate a player, you must pass it in as a cls when you connect to a channel.
"""The base player class for Pomice.
In order to initiate a player, you must pass it in as a cls when you connect to a channel.
i.e: ```py
await ctx.author.voice.channel.connect(cls=pomice.Player)```
await ctx.author.voice.channel.connect(cls=pomice.Player)
```
"""
def __init__(self, client: Union[commands.Bot, discord.Client, commands.AutoShardedBot, discord.AutoShardedClient], channel: VoiceChannel):
def __init__(self, client: Type[discord.Client], channel: VoiceChannel):
super().__init__(client=client, channel=channel)
self.client: Union[commands.Bot, discord.Client, commands.AutoShardedBot, discord.AutoShardedClient] = client
self._bot: Union[commands.Bot, discord.Client, commands.AutoShardedBot, discord.AutoShardedClient] = client
self.channel: VoiceChannel = channel
self._guild: discord.Guild = channel.guild
self._dj: discord.Member = None
self.client = client
self.bot = client
self.channel = channel
self.guild: discord.Guild = self.channel.guild
self.dj: discord.Member = None
self._node: Node = NodePool.get_node()
self._current: objects.Track = None
self._filter: filters.Filter = None
self._volume: int = 100
self._paused: bool = False
self._is_connected: bool = False
self.node = NodePool.get_node()
self.current: objects.Track = None
self.filter: filters.Filter = None
self.volume = 100
self.paused = False
self.is_connected = False
self._position: int = 0
self._last_update: int = 0
self._current_track_id = None
self._voice_server_update_data = {}
self._position = 0
self._last_position = 0
self._last_update = 0
self._voice_state = {}
def __repr__(self):
return f"<Pomice.player bot={self._bot} guildId={self._guild.id} is_connected={self.is_connected} is_playing={self.is_playing}>"
return (
f"<Pomice.player bot={self.bot} guildId={self.guild.id} "
f"is_connected={self.is_connected} is_playing={self.is_playing}>"
)
@property
def position(self):
"""Property which returns the player's position in a track in milliseconds"""
if not self.is_playing or not self._current:
if not self.is_playing or not self.current:
return 0
if self.is_paused:
return min(self._last_position, self._current.length)
return min(self._last_position, self.current.length)
difference = (time.time() * 1000) - self._last_update
position = self._last_position + difference
@ -66,141 +64,140 @@ class Player(VoiceProtocol):
return min(position, self.current.length)
@property
def is_connected(self):
"""Property which returns whether or not the player is connected to a node."""
return self._is_connected
@property
def is_playing(self):
"""Property which returns whether or not the player is actively playing a track."""
return self._is_connected and self._current is not None
return self.is_connected and self.current is not None
@property
def is_paused(self):
"""Property which returns whether or not the player has a track which is paused or not."""
return self._is_connected and self._paused is True
@property
def node(self):
"""Property which returns what node is associated with this player."""
return self._node
@property
def current(self):
"""Property which returns the current track as a Pomice Track object"""
return self._current
@property
def volume(self):
"""Property which returns the players current volume as an integer"""
return self._volume
return self.is_connected and self.paused
async def _update_state(self, data: dict):
state: dict = data.get('state')
state: dict = data.get("state")
self._last_update = time.time() * 1000
self._is_connected = state.get('connected')
self._last_position = state.get('position')
self.is_connected = state.get("connected")
self._last_position = state.get("position")
async def _dispatch_voice_update(self, voice_data: Dict[str, Any]) -> None:
if {'sessionId', 'event'} != self._voice_server_update_data.keys():
async def _dispatch_voice_update(self, voice_data: Dict[str, Any]):
if {"sessionId", "event"} != self._voice_state.keys():
return
await self._node.send(
op='voiceUpdate',
guildId=str(self._guild.id),
await self.node.send(
op="voiceUpdate",
guildId=str(self.guild.id),
**voice_data
)
async def _voice_server_update(self, data: dict):
self._voice_server_update_data.update({'event': data})
await self._dispatch_voice_update(self._voice_server_update_data)
self._voice_state.update({"event": data})
await self._dispatch_voice_update(self._voice_state)
async def _voice_state_update(self, data: dict):
self._voice_server_update_data.update({'sessionId': data.get('session_id')})
if not (channel_id := data.get('channel_id')):
self._voice_state.update({"sessionId": data.get("session_id")})
if not (channel_id := data.get("channel_id")):
self.channel = None
self._voice_server_update_data.clear()
self._voice_state.clear()
return
self.channel = self._guild.get_channel(int(channel_id))
await self._dispatch_voice_update({**self._voice_server_update_data, "event": data})
self.channel = self.guild.get_channel(int(channel_id))
await self._dispatch_voice_update({**self._voice_state, "event": data})
async def _dispatch_event(self, data: dict):
event_type = data.get('type')
event_type = data.get("type")
event = getattr(events, event_type, None)
event = event(data)
self._bot.dispatch(f"pomice_{event.name}", event)
self.bot.dispatch(f"pomice_{event.name}", event)
async def get_tracks(self, query: str, ctx: commands.Context = None):
"""Fetches tracks from the node's REST api to parse into Lavalink.
If you passed in Spotify API credentials, you can also pass in a Spotify URL of a playlist, album or track
and it will be parsed accordingly.
You can also pass in a discord.py Context object to get a Context object on any track you search.
If you passed in Spotify API credentials, you can also pass in a Spotify URL of a playlist,
album or track and it will be parsed accordingly.
You can also pass in a discord.py Context object to get a
Context object on any track you search.
"""
return await self._node.get_tracks(query, ctx)
return await self.node.get_tracks(query, ctx)
async def connect(self, *, timeout: float, reconnect: bool):
await self._guild.change_voice_state(channel=self.channel)
self._node._players[self._guild.id] = self
self._is_connected = True
await self.guild.change_voice_state(channel=self.channel)
self.node._players[self.guild.id] = self
self.is_connected = True
async def stop(self):
"""Stops a currently playing track."""
self._current = None
await self._node.send(op='stop', guildId=str(self._guild.id))
self.current = None
await self.node.send(op="stop", guildId=str(self.guild.id))
async def disconnect(self, *, force: bool = False):
await self.stop()
await self._guild.change_voice_state(channel=None)
await self.guild.change_voice_state(channel=None)
self.cleanup()
self.channel = None
self._is_connected = False
del self._node._players[self._guild.id]
self.is_connected = False
del self.node._players[self.guild.id]
async def destroy(self):
"""Disconnects a player and destroys the player instance."""
await self.disconnect()
await self._node.send(op='destroy', guildId=str(self._guild.id))
await self.node.send(op="destroy", guildId=str(self.guild.id))
async def play(self, track: objects.Track, start_position: int = 0):
"""Plays a track. If a Spotify track is passed in, it will be handled accordingly."""
if track.spotify == True:
spotify_track: objects.Track = await self._node.get_tracks(f"ytmsearch:{track.title} {track.author} audio")
await self._node.send(op='play', guildId=str(self._guild.id), track=spotify_track[0].track_id, startTime=start_position, endTime=spotify_track[0].length, noReplace=False)
if track.spotify:
spotify_track: objects.Track = (await self.node.get_tracks(
f"ytmsearch:{track.title} {track.author} audio"
))[0]
await self.node.send(
op="play",
guildId=str(self.guild.id),
track=spotify_track.track_id,
startTime=start_position,
endTime=spotify_track.length,
noReplace=False
)
else:
await self._node.send(op='play', guildId=str(self._guild.id), track=track.track_id, startTime=start_position, endTime=track.length, noReplace=False)
self._current = track
return self._current
await self.node.send(
op="play",
guildId=str(self.guild.id),
track=track.track_id,
startTime=start_position,
endTime=track.length,
noReplace=False
)
self.current = track
return self.current
async def seek(self, position: float):
"""Seeks to a position in the currently playing track milliseconds"""
if position < 0 or position > self.current.length:
raise exceptions.TrackInvalidPosition(f"Seek position must be between 0 and the track length")
raise TrackInvalidPosition(
f"Seek position must be between 0 and the track length"
)
await self._node.send(op='seek', guildId=str(self._guild.id), position=position)
await self.node.send(op="seek", guildId=str(self.guild.id), position=position)
return self.position
async def set_pause(self, pause: bool):
"""Sets the pause state of the currently playing track."""
await self._node.send(op='pause', guildId=str(self._guild.id), pause=pause)
self._paused = pause
return self._paused
await self.node.send(op="pause", guildId=str(self.guild.id), pause=pause)
self.paused = pause
return self.paused
async def set_volume(self, volume: int):
"""Sets the volume of the player as an integer. Lavalink accepts an amount from 0 to 500."""
await self._node.send(op='volume', guildId=str(self._guild.id), volume=volume)
self._volume = volume
return self._volume
await self.node.send(op="volume", guildId=str(self.guild.id), volume=volume)
self.volume = volume
return self.volume
async def set_filter(self, filter: filters.Filter):
"""Sets a filter of the player. Takes a pomice.Filter object. This will only work if you are using the development version of Lavalink."""
await self._node.send(op='filters', guildId=str(self._guild.id), **filter.payload)
"""Sets a filter of the player. Takes a pomice.Filter object.
This will only work if you are using the development version of Lavalink.
"""
await self.node.send(op="filters", guildId=str(self.guild.id), **filter.payload)
await self.seek(self.position)
self._filter = filter
self.filter = filter
return filter

View File

@ -1,34 +1,39 @@
import discord
import typing
import random
from typing import Optional, Type
from . import exceptions
import discord
from .exceptions import NodeCreationError, NoNodesAvailable
from .node import Node
from typing import Optional
from discord.ext import commands
class NodePool:
"""The base class for the node poll. This holds all the nodes that are to be used by the bot."""
"""The base class for the node pool.
This holds all the nodes that are to be used by the bot.
"""
_nodes: dict = {}
def __repr__(self):
return f"<Pomice.NodePool node_count={len(self._nodes.values())}>"
return f"<Pomice.NodePool node_count={self.node_count}>"
@property
def nodes(self):
"""Property which returns a dict with the node identifier and the Node object."""
return self._nodes
@property
def node_count(self):
return len(self._nodes.values())
@classmethod
def get_node(self, *, identifier: str = None) -> Node:
"""Fetches a node from the node pool using it's identifier. If no identifier is provided, it will choose a node at random."""
"""Fetches a node from the node pool using it's identifier.
If no identifier is provided, it will choose a node at random.
"""
available_nodes = {identifier: node for identifier, node in self._nodes.items()}
if not available_nodes:
raise exceptions.NoNodesAvailable('There are no nodes available.')
raise NoNodesAvailable('There are no nodes available.')
if identifier is None:
return random.choice(list(available_nodes.values()))
@ -36,16 +41,28 @@ class NodePool:
return available_nodes.get(identifier, None)
@classmethod
async def create_node(self, bot: typing.Union[commands.Bot, discord.Client, commands.AutoShardedBot, discord.AutoShardedClient], host: str, port: str, password: str, identifier: str, spotify_client_id: Optional[str] = None, spotify_client_secret: Optional[str] = None) -> Node:
"""Creates a Node object to be then added into the node pool. If you like to have Spotify searching capabilites, pass in valid Spotify API credentials."""
async def create_node(
self,
bot: Type[discord.Client],
host: str,
port: str,
password: str,
identifier: str,
spotify_client_id: Optional[str] = None,
spotify_client_secret: Optional[str] = None
) -> Node:
"""Creates a Node object to be then added into the node pool.
For Spotify searching capabilites, pass in valid Spotify API credentials.
"""
if identifier in self._nodes.keys():
raise exceptions.NodeCreationError(f"A node with identifier '{identifier}' already exists.")
raise NodeCreationError(f"A node with identifier '{identifier}' already exists.")
node = Node(
pool=self, bot=bot, host=host, port=port, password=password,
identifier=identifier, spotify_client_id=spotify_client_id,
spotify_client_secret=spotify_client_secret
)
node = Node(pool=self, bot=bot, host=host, port=port, password=password, identifier=identifier, spotify_client_id=spotify_client_id, spotify_client_secret=spotify_client_secret)
await node.connect()
self._nodes[node._identifier] = node
return node

View File

@ -21,7 +21,6 @@ DEALINGS IN THE SOFTWARE.
import random
import time
__all__ = [
'ExponentialBackoff',
'PomiceStats'
@ -56,6 +55,7 @@ class ExponentialBackoff:
self._exp = min(self._exp + 1, self._max)
return self._randfunc(0, self._base * 2 ** self._exp)
class NodeStats:
"""The base class for the node stats object. Gives critcical information on the node, which is updated every minute."""
@ -78,5 +78,3 @@ class NodeStats:
def __repr__(self) -> str:
return f'<Pomice.NodeStats total_players={self.players_total} playing_active={self.players_active}>'

View File

@ -3,7 +3,6 @@ import setuptools
with open("README.md") as f:
readme = f.read()
setuptools.setup(
name="pomice",
author="cloudwithax",