manually sync out of date changes into v2

This commit is contained in:
cloudwithax 2023-02-05 13:27:43 -05:00
parent d08f07ffdc
commit c1a9d7603f
7 changed files with 128 additions and 32 deletions

View File

@ -7,9 +7,7 @@ from a queue system, advanced queue control and more.
import discord
import pomice
import asyncio
import math
import random
from discord.ext import commands
from contextlib import suppress
@ -21,7 +19,7 @@ class Player(pomice.Player):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.queue = asyncio.Queue()
self.queue = pomice.Queue()
self.controller: discord.Message = None
# Set context here so we can send a now playing embed
self.context: commands.Context = None
@ -49,8 +47,8 @@ class Player(pomice.Player):
# Queue up the next track, else teardown the player
try:
track: pomice.Track = self.queue.get_nowait()
except asyncio.queues.QueueEmpty:
track: pomice.Track = self.queue.get()
except pomice.QueueEmpty:
return await self.teardown()
await self.play(track)
@ -143,7 +141,7 @@ class Music(commands.Cog):
async def on_pomice_track_exception(self, player: Player, track, _):
await player.do_next()
@commands.command(aliases=['join', 'joi', 'j', 'summon', 'su', 'con'])
@commands.command(aliases=['joi', 'j', 'summon', 'su', 'con', 'connect'])
async def join(self, ctx: commands.Context, *, channel: discord.VoiceChannel = None) -> None:
if not channel:
channel = getattr(ctx.author.voice, "channel", None)
@ -157,7 +155,7 @@ class Music(commands.Cog):
player: Player = ctx.voice_client
# Set the player context so we can use it so send messages
player.set_context(ctx=ctx)
await player.set_context(ctx=ctx)
await ctx.send(f"Joined the voice channel `{channel.name}`")
@commands.command(aliases=['disconnect', 'dc', 'disc', 'lv', 'fuckoff'])
@ -172,7 +170,9 @@ class Music(commands.Cog):
async def play(self, ctx: commands.Context, *, search: str) -> None:
# Checks if the player is in the channel before we play anything
if not (player := ctx.voice_client):
await ctx.invoke(self.join)
await ctx.author.voice.channel.connect(cls=Player)
player: Player = ctx.voice_client
await player.set_context(ctx=ctx)
# If you search a keyword, Pomice will automagically search the result using YouTube
# You can pass in "search_type=" as an argument to change the search type
@ -187,10 +187,10 @@ class Music(commands.Cog):
if isinstance(results, pomice.Playlist):
for track in results.tracks:
await player.queue.put(track)
player.queue.put(track)
else:
track = results[0]
await player.queue.put(track)
player.queue.put(track)
if not player.is_playing:
await player.do_next()
@ -315,7 +315,7 @@ class Music(commands.Cog):
if self.is_privileged(ctx):
await ctx.send('An admin or DJ has shuffled the queue.', delete_after=10)
player.shuffle_votes.clear()
return random.shuffle(player.queue._queue)
return player.queue.shuffle()
required = self.required(ctx)
player.shuffle_votes.add(ctx.author)
@ -323,7 +323,7 @@ class Music(commands.Cog):
if len(player.shuffle_votes) >= required:
await ctx.send('Vote to shuffle passed. Shuffling the queue.', delete_after=10)
player.shuffle_votes.clear()
random.shuffle(player.queue._queue)
player.queue.shuffle()
else:
await ctx.send(f'{ctx.author.mention} has voted to shuffle the queue. Votes: {len(player.shuffle_votes)}/{required}', delete_after=15)
@ -348,7 +348,6 @@ class Music(commands.Cog):
def setup(bot: commands.Bot):
bot.add_cog(Music(bot))
async def setup(bot: commands.Bot):
await bot.add_cog(Music(bot))

View File

@ -1,5 +1,6 @@
import re
from typing import List, Optional
from typing import List, Optional, Union
from discord import Member, User
from discord.ext import commands
@ -25,12 +26,15 @@ class Track:
spotify: bool = False,
search_type: SearchType = SearchType.ytsearch,
spotify_track = None,
filters: Optional[List[Filter]] = None
filters: Optional[List[Filter]] = None,
timestamp: Optional[float] = None,
requester: Optional[Union[Member, User]] = None
):
self.track_id = track_id
self.info = info
self.spotify = spotify
self.filters: List[Filter] = filters
self.timestamp: Optional[float] = timestamp
self.original: Optional[Track] = None if spotify else self
self._search_type = search_type
@ -54,6 +58,9 @@ class Track:
self.length = info.get("length")
self.ctx = ctx
if requester:
self.requester = requester
else:
self.requester = self.ctx.author if ctx else None
self.is_stream = info.get("isStream")
self.is_seekable = info.get("isSeekable")

View File

@ -33,10 +33,6 @@ class Filters:
"""Property which checks if any applied filters were preloaded"""
return any(f for f in self._filters if f.preload == True)
@property
def has_global(self):
"""Property which checks if any applied filters are global"""
return any(f for f in self._filters if f.preload == False)
@property
def empty(self):
@ -284,6 +280,14 @@ class Player(VoiceProtocol):
"""
return await self._node.get_tracks(query, ctx=ctx, search_type=search_type, filters=filters)
async def get_recommendations(self, *, query: str, ctx: Optional[commands.Context] = None):
"""
Gets recommendations from Spotify. Query must be a valid Spotify Track URL.
You can pass in a discord.py Context object to get a
Context object on all tracks that get recommended.
"""
return await self._node.get_recommendations(query=query, ctx=ctx)
async def connect(self, *, timeout: float, reconnect: bool, self_deaf: bool = False, self_mute: bool = False):
await self.guild.change_voice_state(channel=self.channel, self_deaf=self_deaf, self_mute=self_mute)
self._node._players[self.guild.id] = self

View File

@ -48,6 +48,14 @@ DISCORD_MP3_URL_REGEX = re.compile(
r"(?P<message_id>[0-9]+)/(?P<file>[a-zA-Z0-9_.]+)+"
)
YOUTUBE_PLAYLIST_REGEX = re.compile(
r"(?P<video>^.*?v.*?)(?P<list>&list.*)"
)
YOUTUBE_TIMESTAMP_REGEX = re.compile(
r"(?P<video>^.*?)(\?t|&start)=(?P<time>\d+)?.*"
)
AM_URL_REGEX = re.compile(
r"https?://music.apple.com/(?P<country>[a-zA-Z]{2})/(?P<type>album|playlist|song|artist)/(?P<name>.+)/(?P<id>[^?]+)"
)
@ -168,6 +176,11 @@ class Node:
"""Property which returns the latency of the node"""
return Ping(self._host, port=self._port).get_ping()
@property
def ping(self):
"""Alias for `Node.latency`, returns the latency of the node"""
return self.latency
async def _update_handler(self, data: dict):
await self._bot.wait_until_ready()
@ -350,6 +363,8 @@ class Node:
to be applied to your track once it plays.
"""
timestamp = None
if not URL_REGEX.match(query) and not re.match(r"(?:ytm?|sc)search:.", query):
query = f"{search_type}:{query}"
@ -460,6 +475,17 @@ class Node:
]
else:
# If YouTube url contains a timestamp, capture it for use later.
if (match := YOUTUBE_TIMESTAMP_REGEX.match(query)):
timestamp = float(match.group("time"))
# If query is a video thats part of a playlist, get the video and queue that instead
# (I can't tell you how much i've wanted to implement this in here)
if (match := YOUTUBE_PLAYLIST_REGEX.match(query)):
query = match.group("video")
async with self._session.get(
url=f"{self._rest_uri}/v3/loadtracks?identifier={quote(query)}",
headers={"Authorization": self._password}
@ -492,11 +518,43 @@ class Node:
track_id=track["track"],
info=track["info"],
ctx=ctx,
filters=filters
filters=filters,
timestamp=timestamp
)
for track in data["tracks"]
]
async def get_recommendations(self, *, query: str, ctx: Optional[commands.Context] = None):
"""
Gets recommendations from Spotify. Query must be a valid Spotify Track URL.
You can pass in a discord.py Context object to get a
Context object on all tracks that get recommended.
"""
results = await self._spotify_client.get_recommendations(query=query)
tracks = [
Track(
track_id=track.id,
ctx=ctx,
spotify=True,
spotify_track=track,
info={
"title": track.name,
"author": track.artists,
"length": track.length,
"identifier": track.id,
"uri": track.uri,
"isStream": False,
"isSeekable": True,
"position": 0,
"thumbnail": track.image,
"isrc": track.isrc
},
requester=self.bot.user
) for track in results
]
return tracks
@ -505,7 +563,7 @@ class NodePool:
This holds all the nodes that are to be used by the bot.
"""
_nodes = {}
_nodes: dict = {}
def __repr__(self):
return f"<Pomice.NodePool node_count={self.node_count}>"

View File

@ -125,8 +125,8 @@ class Queue(Iterable[Track]):
def _insert(self, index: int, item: Track) -> None:
self._queue.insert(index, item)
def _remove(self, index: int) -> None:
self._queue.remove(index)
def _remove(self, item: Track) -> None:
self._queue.remove(item)
def _get_random_float(self) -> float:
return random.random()
@ -183,7 +183,7 @@ class Queue(Iterable[Track]):
return self._queue
def get(self) -> Track:
def get(self):
"""Return next immediately available item in queue if any.
Raises QueueEmpty if no items in queue.
"""
@ -233,7 +233,7 @@ class Queue(Iterable[Track]):
Removes a item within the queue.
Raises ValueError if item is not in queue.
"""
return self._remove(self._index(self._check_track(item)))
return self._remove(self._check_track(item))
def find_position(self, item: Track) -> int:

View File

@ -111,3 +111,31 @@ class Client:
next_page_url = next_data["next"]
return Playlist(data, tracks)
async def get_recommendations(self, *, query: str):
if not self._bearer_token or time.time() >= self._expiry:
await self._fetch_bearer_token()
result = SPOTIFY_URL_REGEX.match(query)
spotify_type = result.group("type")
spotify_id = result.group("id")
if not result:
raise InvalidSpotifyURL("The Spotify link provided is not valid.")
if not spotify_type == "track":
raise InvalidSpotifyURL("The provided query is not a Spotify track.")
request_url = REQUEST_URL.format(type="recommendation", id=f"?seed_tracks={spotify_id}")
async with self.session.get(request_url, headers=self._bearer_headers) as resp:
if resp.status != 200:
raise SpotifyRequestException(
f"Error while fetching results: {resp.status} {resp.reason}"
)
data: dict = await resp.json(loads=json.loads)
tracks = [Track(track) for track in data["tracks"]]
return tracks

View File

@ -5,10 +5,10 @@ class Track:
"""The base class for a Spotify Track"""
def __init__(self, data: dict, image = None) -> None:
self.name = data["name"]
self.name: str = data["name"]
self.artists = ", ".join(artist["name"] for artist in data["artists"])
self.length = data["duration_ms"]
self.id = data["id"]
self.length: float = data["duration_ms"]
self.id: str = data["id"]
if data.get("external_ids"):
self.isrc = data["external_ids"]["isrc"]