This commit is contained in:
.Arhaan 2025-12-28 16:16:51 +00:00 committed by GitHub
commit 0675854cfb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 1792 additions and 37 deletions

226
FEATURES.md Normal file
View File

@ -0,0 +1,226 @@
# Pomice Advanced Features Guide
## 🎉 Overview
Pomice now comes with built-in advanced features to help you build powerful music bots. These features are **integrated directly into the Player and Queue classes**, providing a "batteries-included" experience.
### Key Enhancements
- **Integrated Queue & History**: Every `Player` now has its own `queue` and `history` automatically.
- **Auto-History**: Tracks are automatically added to history when they finish playing.
- **Advanced Analytics**: Detailed statistics available directly via `player.get_stats()` or `queue.get_stats()`.
- **Integrated Utilities**: Filtering, sorting, and playlist management.
---
## 📚 Table of Contents
1. [Integrated Features](#-integrated-features)
2. [Track History](#-track-history)
3. [Queue Statistics](#-queue-statistics)
4. [Playlist Manager](#-playlist-manager)
5. [Track Utilities](#-track-utilities)
6. [Complete Examples](#-complete-examples)
---
## 🚀 Integrated Features
Since these features are now part of the core classes, usage is extremely simple:
```python
# Every player now has a queue and history by default
player = ctx.voice_client
# Access the queue
player.queue.put(track)
# Play the next track from the queue
await player.do_next()
# Access the history (automatically updated)
last_song = player.history.current
# Get real-time statistics
stats = player.get_stats()
print(f"Queue Duration: {stats.format_duration(stats.total_duration)}")
```
---
## 🕐 Track History
The `player.history` object automatically tracks every song that finishes playing.
### Features
- Configurable maximum history size (default: 100)
- Navigation: `get_previous()`, `get_next()`
- Search: `history.search("query")`
- Filter: `get_by_requester(user_id)`
- Unique tracks: `get_unique_tracks()`
### Usage
```python
# Show last 10 songs
recent = player.history.get_last(10)
# Search history
results = player.history.search("Imagine Dragons")
# Play previous track
prev = player.history.get_previous()
if prev:
await player.play(prev)
```
---
## 📊 Queue Statistics
Access advanced analytics via `player.get_stats()` or `player.queue.get_stats()`.
### Features
- Total/Average duration
- Longest/Shortest tracks
- Requester analytics (who added what)
- Author distribution
- Duration breakdown (short/medium/long)
### Usage
```python
stats = player.get_stats()
summary = stats.get_summary()
print(f"Total Tracks: {summary['total_tracks']}")
print(f"Total Duration: {summary['total_duration_formatted']}")
# Who added the most songs?
top = stats.get_top_requesters(3)
for user, count in top:
print(f"{user.display_name}: {count} tracks")
```
---
## 💾 Playlist Manager
Export and import playlists to/from JSON and M3U formats.
### Usage
```python
import pomice
# Export current queue to file
pomice.PlaylistManager.export_queue(
player.queue,
filepath='playlists/party.json',
name='Party Mix'
)
# Import a playlist
data = pomice.PlaylistManager.import_playlist('playlists/rock.json')
uris = pomice.PlaylistManager.get_track_uris('playlists/rock.json')
for uri in uris:
results = await player.get_tracks(query=uri)
if results:
player.queue.put(results[0])
```
---
## 🔧 Track Utilities
Advanced filtering and sorting.
### Filtering
```python
import pomice
tracks = list(player.queue)
# Get tracks under 5 minutes
short = pomice.TrackFilter.by_duration(tracks, max_duration=300000)
# Get tracks by a specific artist
artist_songs = pomice.TrackFilter.by_author(tracks, "Artist Name")
```
### Sorting
```python
# Sort queue by title
sorted_tracks = pomice.SearchHelper.sort_by_title(list(player.queue))
# Clear and refill with sorted tracks
player.queue.clear()
player.queue.extend(sorted_tracks)
```
---
## 🎯 Complete Examples
### Integrated Music Cog
```python
import pomice
from discord.ext import commands
class Music(commands.Cog):
def __init__(self, bot):
self.bot = bot
@commands.command()
async def play(self, ctx, *, search: str):
if not ctx.voice_client:
await ctx.author.voice.channel.connect(cls=pomice.Player)
player = ctx.voice_client
results = await player.get_tracks(query=search, ctx=ctx)
if not results:
return await ctx.send("No results.")
track = results[0]
player.queue.put(track)
await ctx.send(f"Added **{track.title}** to queue.")
if not player.is_playing:
await player.do_next()
@commands.command()
async def history(self, ctx):
"""Show recently played songs."""
player = ctx.voice_client
recent = player.history.get_last(5)
msg = "\n".join(f"{i}. {t.title}" for i, t in enumerate(recent, 1))
await ctx.send(f"**Recently Played:**\n{msg}")
@commands.command()
async def stats(self, ctx):
"""Show queue analytics."""
stats = ctx.voice_client.get_stats()
summary = stats.get_summary()
await ctx.send(
f"**Queue Stats**\n"
f"Tracks: {summary['total_tracks']}\n"
f"Duration: {summary['total_duration_formatted']}"
)
```
---
## 📖 Quick Reference
| Feature | Integrated Access |
| :--- | :--- |
| **Queue** | `player.queue` |
| **History** | `player.history` |
| **Statistics** | `player.get_stats()` |
| **Next Track** | `await player.do_next()` |
---
**Happy coding! 🎵**

View File

@ -301,6 +301,85 @@ class Music(commands.Cog):
delete_after=15, delete_after=15,
) )
@commands.command()
async def loop(self, ctx: commands.Context, mode: str = "off"):
"""Sets the loop mode: off, track, queue."""
player: Player = ctx.voice_client
if not player:
return
mode = mode.lower()
if mode == "track":
player.loop_mode = pomice.LoopMode.TRACK
elif mode == "queue":
player.loop_mode = pomice.LoopMode.QUEUE
else:
player.loop_mode = None
await ctx.send(f"Loop mode set to **{mode}**")
@commands.command()
async def autoplay(self, ctx: commands.Context):
"""Toggles autoplay to keep the music going with recommendations when the queue is empty."""
player: Player = ctx.voice_client
if not player:
return
player.autoplay = not player.autoplay
await ctx.send(f"Autoplay is now **{'on' if player.autoplay else 'off'}**")
@commands.command()
async def move(self, ctx: commands.Context, from_index: int, to_index: int):
"""Moves a track's position in the queue (e.g., !move 5 1)."""
player: Player = ctx.voice_client
if not player or player.queue.is_empty:
return await ctx.send("The queue is empty.")
try:
player.queue.move(from_index - 1, to_index - 1)
await ctx.send(f"Moved track from #{from_index} to #{to_index}.")
except IndexError:
await ctx.send("Sorry, I couldn't find a track at that position.")
@commands.command(aliases=["clean"])
async def deduplicate(self, ctx: commands.Context):
"""Removes any double-posted songs from your queue."""
player: Player = ctx.voice_client
if not player:
return
removed = player.queue.remove_duplicates()
await ctx.send(f"All cleaned up! Removed **{removed}** duplicate tracks.")
@commands.command()
async def filter(self, ctx: commands.Context, preset: str = "off"):
"""Apply a sound preset: pop, soft, metal, boost, nightcore, vaporwave, off."""
player: Player = ctx.voice_client
if not player:
return
preset = preset.lower()
await player.reset_filters()
if preset == "off":
return await ctx.send("Filters cleared.")
presets = {
"pop": pomice.Equalizer.pop(),
"soft": pomice.Equalizer.soft(),
"metal": pomice.Equalizer.metal(),
"boost": pomice.Equalizer.boost(),
"nightcore": pomice.Timescale.nightcore(),
"vaporwave": pomice.Timescale.vaporwave(),
"bass": pomice.Equalizer.bass_boost_light(),
}
if preset not in presets:
return await ctx.send(f"Available presets: {', '.join(presets.keys())}")
await player.add_filter(presets[preset])
await ctx.send(f"Applied the **{preset}** sound preset!")
@commands.command() @commands.command()
async def stop(self, ctx: commands.Context): async def stop(self, ctx: commands.Context):
"""Stop the player and clear all internal states.""" """Stop the player and clear all internal states."""

View File

@ -0,0 +1,134 @@
"""
Example usage of Pomice's integrated advanced features.
This example shows how easy it is to use:
- Integrated Track History (auto-tracking)
- Integrated Player Queue
- Integrated Analytics with player.get_stats()
- Playlist Import/Export
"""
import discord
from discord.ext import commands
import pomice
# Initialize bot
bot = commands.Bot(command_prefix="!", intents=discord.Intents.all())
class IntegratedMusic(commands.Cog):
"""Music cog with integrated advanced features."""
def __init__(self, bot):
self.bot = bot
self.pomice = pomice.NodePool()
async def start_nodes(self):
"""Start Lavalink nodes."""
await self.pomice.create_node(
bot=self.bot,
host="127.0.0.1",
port="3030",
password="youshallnotpass",
identifier="MAIN",
)
@commands.command(name="play")
async def play(self, ctx, *, search: str):
"""Play a track using the integrated queue."""
if not ctx.voice_client:
await ctx.author.voice.channel.connect(cls=pomice.Player)
player: pomice.Player = ctx.voice_client
results = await player.get_tracks(query=search, ctx=ctx)
if not results:
return await ctx.send("No results found.")
if isinstance(results, pomice.Playlist):
player.queue.extend(results.tracks)
await ctx.send(f"Added playlist **{results.name}** ({len(results.tracks)} tracks).")
else:
track = results[0]
player.queue.put(track)
await ctx.send(f"Added **{track.title}** to queue.")
if not player.is_playing:
await player.do_next()
@commands.command(name="history")
async def history(self, ctx, limit: int = 10):
"""Show recently played tracks (tracked automatically!)."""
player: pomice.Player = ctx.voice_client
if not player:
return await ctx.send("Not connected.")
if player.history.is_empty:
return await ctx.send("No tracks in history.")
tracks = player.history.get_last(limit)
embed = discord.Embed(title="🎵 Recently Played", color=discord.Color.blue())
for i, track in enumerate(tracks, 1):
embed.add_field(name=f"{i}. {track.title}", value=f"by {track.author}", inline=False)
await ctx.send(embed=embed)
@commands.command(name="stats")
async def queue_stats(self, ctx):
"""Show detailed queue statistics via integrated get_stats()."""
player: pomice.Player = ctx.voice_client
if not player:
return await ctx.send("Not connected.")
stats = player.get_stats()
summary = stats.get_summary()
embed = discord.Embed(title="📊 Queue Statistics", color=discord.Color.green())
embed.add_field(name="Tracks", value=summary["total_tracks"], inline=True)
embed.add_field(name="Duration", value=summary["total_duration_formatted"], inline=True)
# Who added the most?
top_requesters = stats.get_top_requesters(3)
if top_requesters:
text = "\n".join(f"{u.display_name}: {c} tracks" for u, c in top_requesters)
embed.add_field(name="Top Requesters", value=text, inline=False)
await ctx.send(embed=embed)
@commands.command(name="export")
async def export_queue(self, ctx, filename: str = "my_playlist.json"):
"""Export current integrated queue."""
player: pomice.Player = ctx.voice_client
if not player or player.queue.is_empty:
return await ctx.send("Queue is empty.")
pomice.PlaylistManager.export_queue(
player.queue,
f"playlists/{filename}",
name=f"{ctx.guild.name}'s Playlist",
)
await ctx.send(f"✅ Queue exported to `playlists/{filename}`")
@commands.command(name="sort")
async def sort_queue(self, ctx):
"""Sort the queue using integrated utilities."""
player: pomice.Player = ctx.voice_client
if not player or player.queue.is_empty:
return await ctx.send("Queue is empty.")
# Use SearchHelper to sort the queue list
sorted_tracks = pomice.SearchHelper.sort_by_title(list(player.queue))
player.queue.clear()
player.queue.extend(sorted_tracks)
await ctx.send("✅ Queue sorted alphabetically.")
@bot.event
async def on_ready():
print(f"{bot.user} is ready!")
if __name__ == "__main__":
print("Example script ready for use!")

View File

@ -35,3 +35,7 @@ from .queue import *
from .player import * from .player import *
from .pool import * from .pool import *
from .routeplanner import * from .routeplanner import *
from .history import *
from .queue_stats import *
from .playlist_manager import *
from .track_utils import *

View File

@ -70,7 +70,8 @@ class TrackInvalidPosition(PomiceException):
class TrackLoadError(PomiceException): class TrackLoadError(PomiceException):
"""There was an error while loading a track.""" """There was an error while loading a track."""
pass def __init__(self, message: str = "Sorry, I ran into trouble trying to load that track."):
super().__init__(message)
class FilterInvalidArgument(PomiceException): class FilterInvalidArgument(PomiceException):
@ -112,13 +113,17 @@ class QueueException(Exception):
class QueueFull(QueueException): class QueueFull(QueueException):
"""Exception raised when attempting to add to a full Queue.""" """Exception raised when attempting to add to a full Queue."""
pass def __init__(self, message: str = "Whoops! The queue is completely full right now."):
super().__init__(message)
class QueueEmpty(QueueException): class QueueEmpty(QueueException):
"""Exception raised when attempting to retrieve from an empty Queue.""" """Exception raised when attempting to retrieve from an empty Queue."""
pass def __init__(
self, message: str = "It looks like the queue is empty. There's no more music to play!",
):
super().__init__(message)
class LavalinkVersionIncompatible(PomiceException): class LavalinkVersionIncompatible(PomiceException):

View File

@ -110,10 +110,7 @@ class Equalizer(Filter):
@classmethod @classmethod
def boost(cls) -> "Equalizer": def boost(cls) -> "Equalizer":
"""Equalizer preset which boosts the sound of a track, """A lively preset that boosts both bass and highs, making the music feel more energetic and fun."""
making it sound fun and energetic by increasing the bass
and the highs.
"""
levels = [ levels = [
(0, -0.075), (0, -0.075),
@ -134,11 +131,16 @@ class Equalizer(Filter):
] ]
return cls(tag="boost", levels=levels) return cls(tag="boost", levels=levels)
@classmethod
def bass_boost_light(cls) -> "Equalizer":
"""A light touch for people who want a bit more bass without it becoming overwhelming."""
levels = [(0, 0.15), (1, 0.1), (2, 0.05)]
return cls(tag="bass_boost_light", levels=levels)
@classmethod @classmethod
def metal(cls) -> "Equalizer": def metal(cls) -> "Equalizer":
"""Equalizer preset which increases the mids of a track, """A heavy preset designed to bring out the intensity of metal and rock.
preferably one of the metal genre, to make it sound It boosts the mids and highs to create a fuller, stage-like sound experience.
more full and concert-like.
""" """
levels = [ levels = [
@ -161,6 +163,54 @@ class Equalizer(Filter):
return cls(tag="metal", levels=levels) return cls(tag="metal", levels=levels)
@classmethod
def pop(cls) -> "Equalizer":
"""A balanced preset that enhances vocals and adds a bit of 'pop' to the rhythm.
Perfect for mainstream hits.
"""
levels = [
(0, -0.02),
(1, -0.01),
(2, 0.08),
(3, 0.1),
(4, 0.15),
(5, 0.1),
(6, 0.05),
(7, 0.0),
(8, 0.0),
(9, 0.0),
(10, 0.05),
(11, 0.1),
(12, 0.15),
(13, 0.1),
(14, 0.05),
]
return cls(tag="pop", levels=levels)
@classmethod
def soft(cls) -> "Equalizer":
"""A gentle preset that smooths out harsh frequencies.
Ideal for acoustic tracks or when you just want a more relaxed listening experience.
"""
levels = [
(0, 0.0),
(1, 0.0),
(2, 0.0),
(3, -0.05),
(4, -0.1),
(5, -0.1),
(6, -0.05),
(7, 0.0),
(8, 0.05),
(9, 0.1),
(10, 0.1),
(11, 0.05),
(12, 0.0),
(13, 0.0),
(14, 0.0),
]
return cls(tag="soft", levels=levels)
@classmethod @classmethod
def piano(cls) -> "Equalizer": def piano(cls) -> "Equalizer":
"""Equalizer preset which increases the mids and highs """Equalizer preset which increases the mids and highs

196
pomice/history.py Normal file
View File

@ -0,0 +1,196 @@
from __future__ import annotations
from collections import deque
from typing import Deque
from typing import Iterator
from typing import List
from typing import Optional
from .objects import Track
__all__ = ("TrackHistory",)
class TrackHistory:
"""Track history manager for Pomice.
Keeps track of previously played tracks with a configurable maximum size.
Useful for implementing 'previous track' functionality and viewing play history.
"""
__slots__ = ("_history", "max_size", "_current_index")
def __init__(self, max_size: int = 100) -> None:
"""Initialize the track history.
Parameters
----------
max_size: int
Maximum number of tracks to keep in history. Defaults to 100.
"""
self.max_size = max_size
self._history: Deque[Track] = deque(maxlen=max_size)
self._current_index: int = -1
def __len__(self) -> int:
"""Return the number of tracks in history."""
return len(self._history)
def __bool__(self) -> bool:
"""Return True if history contains tracks."""
return bool(self._history)
def __iter__(self) -> Iterator[Track]:
"""Iterate over tracks in history (newest to oldest)."""
return reversed(self._history)
def __getitem__(self, index: int) -> Track:
"""Get a track at the given index in history.
Parameters
----------
index: int
Index of the track (0 = most recent)
"""
return self._history[-(index + 1)]
def __repr__(self) -> str:
return f"<Pomice.TrackHistory size={len(self._history)} max_size={self.max_size}>"
def add(self, track: Track) -> None:
"""Add a track to the history.
Parameters
----------
track: Track
The track to add to history
"""
self._history.append(track)
self._current_index = len(self._history) - 1
def get_last(self, count: int = 1) -> List[Track]:
"""Get the last N tracks from history.
Parameters
----------
count: int
Number of tracks to retrieve. Defaults to 1.
Returns
-------
List[Track]
List of the last N tracks (most recent first)
"""
if count <= 0:
return []
return list(reversed(list(self._history)[-count:]))
def get_previous(self) -> Optional[Track]:
"""Get the previous track in history.
Returns
-------
Optional[Track]
The previous track, or None if at the beginning
"""
if not self._history or self._current_index <= 0:
return None
self._current_index -= 1
return self._history[self._current_index]
def get_next(self) -> Optional[Track]:
"""Get the next track in history (when navigating backwards).
Returns
-------
Optional[Track]
The next track, or None if at the end
"""
if not self._history or self._current_index >= len(self._history) - 1:
return None
self._current_index += 1
return self._history[self._current_index]
def clear(self) -> None:
"""Clear all tracks from history."""
self._history.clear()
self._current_index = -1
def get_all(self) -> List[Track]:
"""Get all tracks in history.
Returns
-------
List[Track]
All tracks in history (most recent first)
"""
return list(reversed(self._history))
def search(self, query: str) -> List[Track]:
"""Search for tracks in history by title or author.
Parameters
----------
query: str
Search query (case-insensitive)
Returns
-------
List[Track]
Matching tracks (most recent first)
"""
query_lower = query.lower()
return [
track
for track in reversed(self._history)
if query_lower in track.title.lower() or query_lower in track.author.lower()
]
def get_unique_tracks(self) -> List[Track]:
"""Get unique tracks from history (removes duplicates).
Returns
-------
List[Track]
Unique tracks (most recent occurrence kept)
"""
seen = set()
unique = []
for track in reversed(self._history):
if track.track_id not in seen:
seen.add(track.track_id)
unique.append(track)
return unique
def get_by_requester(self, requester_id: int) -> List[Track]:
"""Get all tracks requested by a specific user.
Parameters
----------
requester_id: int
Discord user ID
Returns
-------
List[Track]
Tracks requested by the user (most recent first)
"""
return [
track
for track in reversed(self._history)
if track.requester and track.requester.id == requester_id
]
@property
def is_empty(self) -> bool:
"""Check if history is empty."""
return len(self._history) == 0
@property
def current(self) -> Optional[Track]:
"""Get the current track in navigation."""
if not self._history or self._current_index < 0:
return None
return self._history[self._current_index]

View File

@ -26,11 +26,11 @@ from .exceptions import TrackInvalidPosition
from .exceptions import TrackLoadError from .exceptions import TrackLoadError
from .filters import Filter from .filters import Filter
from .filters import Timescale from .filters import Timescale
from .history import TrackHistory
from .objects import Playlist from .objects import Playlist
from .objects import Track from .objects import Track
from .pool import Node from .queue import Queue
from .pool import NodePool from .queue_stats import QueueStats
from pomice.utils import LavalinkVersion
if TYPE_CHECKING: if TYPE_CHECKING:
from discord.types.voice import VoiceServerUpdate from discord.types.voice import VoiceServerUpdate
@ -154,6 +154,9 @@ class Player(VoiceProtocol):
"_log", "_log",
"_voice_state", "_voice_state",
"_player_endpoint_uri", "_player_endpoint_uri",
"queue",
"history",
"autoplay",
) )
def __call__(self, client: Client, channel: VoiceChannel) -> Player: def __call__(self, client: Client, channel: VoiceChannel) -> Player:
@ -191,6 +194,10 @@ class Player(VoiceProtocol):
self._player_endpoint_uri: str = f"sessions/{self._node._session_id}/players" self._player_endpoint_uri: str = f"sessions/{self._node._session_id}/players"
self.queue: Queue = Queue()
self.history: TrackHistory = TrackHistory()
self.autoplay: bool = False
def __repr__(self) -> str: def __repr__(self) -> str:
return ( return (
f"<Pomice.player bot={self.bot} guildId={self.guild.id} " f"<Pomice.player bot={self.bot} guildId={self.guild.id} "
@ -247,7 +254,7 @@ class Player(VoiceProtocol):
@property @property
def is_paused(self) -> bool: def is_paused(self) -> bool:
"""Property which returns whether or not the player has a track which is paused or not.""" """Returns True if the music is currently paused."""
return self._is_connected and self._paused return self._is_connected and self._paused
@property @property
@ -358,6 +365,8 @@ class Player(VoiceProtocol):
event: PomiceEvent = getattr(events, event_type)(data, self) event: PomiceEvent = getattr(events, event_type)(data, self)
if isinstance(event, TrackEndEvent) and event.reason not in ("REPLACED", "replaced"): if isinstance(event, TrackEndEvent) and event.reason not in ("REPLACED", "replaced"):
if self._current:
self.history.add(self._current)
self._current = None self._current = None
event.dispatch(self._bot) event.dispatch(self._bot)
@ -763,3 +772,39 @@ class Player(VoiceProtocol):
if self._log: if self._log:
self._log.debug(f"Fast apply passed, now removing all filters instantly.") self._log.debug(f"Fast apply passed, now removing all filters instantly.")
await self.seek(self.position) await self.seek(self.position)
async def do_next(self) -> Optional[Track]:
"""Automatically picks the next track from the queue and plays it.
If the queue is empty and autoplay is on, it will search for recommended tracks.
Returns
-------
Optional[Track]
The track that's now playing, or None if we've run out of music.
"""
if self.queue.is_empty:
if self.autoplay and self._current:
recommendations = await self.get_recommendations(track=self._current)
if recommendations:
if isinstance(recommendations, Playlist):
track = recommendations.tracks[0]
else:
track = recommendations[0]
await self.play(track)
return track
return None
track = self.queue.get()
await self.play(track)
return track
def get_stats(self) -> QueueStats:
"""Get detailed statistics for the current player and queue.
Returns
-------
QueueStats
A QueueStats object containing detailed analytics.
"""
return QueueStats(self.queue)

304
pomice/playlist_manager.py Normal file
View File

@ -0,0 +1,304 @@
from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .objects import Track
from .queue import Queue
__all__ = ("PlaylistManager",)
class PlaylistManager:
"""Manager for exporting and importing playlists.
Allows saving queue contents to JSON files and loading them back,
useful for persistent playlists and sharing.
"""
@staticmethod
def export_queue(
queue: Queue,
filepath: str,
*,
name: Optional[str] = None,
description: Optional[str] = None,
include_metadata: bool = True,
) -> None:
"""Export a queue to a JSON file.
Parameters
----------
queue: Queue
The queue to export
filepath: str
Path to save the JSON file
name: Optional[str]
Name for the playlist. Defaults to filename.
description: Optional[str]
Description for the playlist
include_metadata: bool
Whether to include requester and timestamp metadata. Defaults to True.
"""
path = Path(filepath)
if name is None:
name = path.stem
tracks_data = []
for track in queue:
track_dict = {
"title": track.title,
"author": track.author,
"uri": track.uri,
"identifier": track.identifier,
"length": track.length,
"is_stream": track.is_stream,
}
if include_metadata:
track_dict["requester_id"] = track.requester.id if track.requester else None
track_dict["requester_name"] = str(track.requester) if track.requester else None
track_dict["timestamp"] = track.timestamp
if track.thumbnail:
track_dict["thumbnail"] = track.thumbnail
if track.isrc:
track_dict["isrc"] = track.isrc
if track.playlist:
track_dict["playlist_name"] = track.playlist.name
tracks_data.append(track_dict)
playlist_data = {
"name": name,
"description": description,
"created_at": datetime.utcnow().isoformat(),
"track_count": len(tracks_data),
"total_duration": sum(t["length"] for t in tracks_data),
"tracks": tracks_data,
"version": "1.0",
}
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(playlist_data, f, indent=2, ensure_ascii=False)
@staticmethod
def import_playlist(filepath: str) -> Dict[str, Any]:
"""Import a playlist from a JSON file.
Parameters
----------
filepath: str
Path to the JSON file
Returns
-------
Dict[str, Any]
Dictionary containing playlist data:
- 'name': Playlist name
- 'description': Playlist description
- 'tracks': List of track data dictionaries
- 'track_count': Number of tracks
- 'total_duration': Total duration in milliseconds
- 'created_at': Creation timestamp
"""
path = Path(filepath)
with open(path, encoding="utf-8") as f:
data = json.load(f)
return data
@staticmethod
def export_track_list(
tracks: List[Track],
filepath: str,
*,
name: Optional[str] = None,
description: Optional[str] = None,
) -> None:
"""Export a list of tracks to a JSON file.
Parameters
----------
tracks: List[Track]
List of tracks to export
filepath: str
Path to save the JSON file
name: Optional[str]
Name for the playlist
description: Optional[str]
Description for the playlist
"""
path = Path(filepath)
if name is None:
name = path.stem
tracks_data = [
{
"title": track.title,
"author": track.author,
"uri": track.uri,
"identifier": track.identifier,
"length": track.length,
"thumbnail": track.thumbnail,
"isrc": track.isrc,
}
for track in tracks
]
playlist_data = {
"name": name,
"description": description,
"created_at": datetime.utcnow().isoformat(),
"track_count": len(tracks_data),
"total_duration": sum(t["length"] for t in tracks_data),
"tracks": tracks_data,
"version": "1.0",
}
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(playlist_data, f, indent=2, ensure_ascii=False)
@staticmethod
def get_track_uris(filepath: str) -> List[str]:
"""Get list of track URIs from a saved playlist.
Parameters
----------
filepath: str
Path to the JSON file
Returns
-------
List[str]
List of track URIs
"""
data = PlaylistManager.import_playlist(filepath)
return [track["uri"] for track in data["tracks"] if track.get("uri")]
@staticmethod
def merge_playlists(
filepaths: List[str],
output_path: str,
*,
name: Optional[str] = None,
description: Optional[str] = None,
remove_duplicates: bool = True,
) -> None:
"""Merge multiple playlists into one.
Parameters
----------
filepaths: List[str]
List of playlist file paths to merge
output_path: str
Path to save the merged playlist
name: Optional[str]
Name for the merged playlist
description: Optional[str]
Description for the merged playlist
remove_duplicates: bool
Whether to remove duplicate tracks (by URI). Defaults to True.
"""
all_tracks = []
seen_uris = set()
for filepath in filepaths:
data = PlaylistManager.import_playlist(filepath)
for track in data["tracks"]:
uri = track.get("uri", "")
if remove_duplicates:
if uri and uri in seen_uris:
continue
if uri:
seen_uris.add(uri)
all_tracks.append(track)
merged_data = {
"name": name or "Merged Playlist",
"description": description or f"Merged from {len(filepaths)} playlists",
"created_at": datetime.utcnow().isoformat(),
"track_count": len(all_tracks),
"total_duration": sum(t["length"] for t in all_tracks),
"tracks": all_tracks,
"version": "1.0",
}
output = Path(output_path)
output.parent.mkdir(parents=True, exist_ok=True)
with open(output, "w", encoding="utf-8") as f:
json.dump(merged_data, f, indent=2, ensure_ascii=False)
@staticmethod
def export_to_m3u(
tracks: List[Track],
filepath: str,
*,
name: Optional[str] = None,
) -> None:
"""Export tracks to M3U playlist format.
Parameters
----------
tracks: List[Track]
List of tracks to export
filepath: str
Path to save the M3U file
name: Optional[str]
Playlist name for the header
"""
path = Path(filepath)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write("#EXTM3U\n")
if name:
f.write(f"#PLAYLIST:{name}\n")
for track in tracks:
# Duration in seconds
duration = track.length // 1000
f.write(f"#EXTINF:{duration},{track.author} - {track.title}\n")
f.write(f"{track.uri}\n")
@staticmethod
def get_playlist_info(filepath: str) -> Dict[str, Any]:
"""Get basic information about a saved playlist without loading all tracks.
Parameters
----------
filepath: str
Path to the JSON file
Returns
-------
Dict[str, Any]
Dictionary with playlist metadata (name, track_count, duration, etc.)
"""
data = PlaylistManager.import_playlist(filepath)
return {
"name": data.get("name"),
"description": data.get("description"),
"track_count": data.get("track_count"),
"total_duration": data.get("total_duration"),
"created_at": data.get("created_at"),
"version": data.get("version"),
}

View File

@ -310,8 +310,8 @@ class Queue(Iterable[Track]):
return new_queue return new_queue
def clear(self) -> None: def clear(self) -> None:
"""Remove all items from the queue.""" """Wipes the entire queue clean, removing all tracks."""
self._queue.clear() self._queue = []
def set_loop_mode(self, mode: LoopMode) -> None: def set_loop_mode(self, mode: LoopMode) -> None:
""" """
@ -343,8 +343,40 @@ class Queue(Iterable[Track]):
self._loop_mode = None self._loop_mode = None
def shuffle(self) -> None: def shuffle(self) -> None:
"""Shuffles the queue.""" """Mixes up the entire queue in a random order."""
return random.shuffle(self._queue) random.shuffle(self._queue)
def move(self, from_index: int, to_index: int) -> None:
"""Moves a track from one spot in the queue to another.
Parameters
----------
from_index: int
The current position of the track (0-indexed).
to_index: int
Where you want to put the track.
"""
if from_index == to_index:
return
track = self._queue.pop(from_index)
self._queue.insert(to_index, track)
def remove_duplicates(self) -> int:
"""Looks through the queue and removes any tracks that appear more than once.
Returns the number of duplicate tracks removed.
"""
initial_count = len(self._queue)
seen_ids = set()
unique_queue = []
for track in self._queue:
if track.track_id not in seen_ids:
unique_queue.append(track)
seen_ids.add(track.track_id)
self._queue = unique_queue
return initial_count - len(self._queue)
def clear_track_filters(self) -> None: def clear_track_filters(self) -> None:
"""Clears all filters applied to tracks""" """Clears all filters applied to tracks"""
@ -372,3 +404,15 @@ class Queue(Iterable[Track]):
else: else:
new_queue = self._queue[index : self.size] new_queue = self._queue[index : self.size]
self._queue = new_queue self._queue = new_queue
def get_stats(self) -> pomice.QueueStats:
"""Get detailed statistics for this queue.
Returns
-------
QueueStats
A QueueStats object containing detailed analytics.
"""
from .queue_stats import QueueStats
return QueueStats(self)

272
pomice/queue_stats.py Normal file
View File

@ -0,0 +1,272 @@
from __future__ import annotations
from collections import Counter
from typing import Dict
from typing import List
from typing import Optional
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .objects import Track
from .queue import Queue
__all__ = ("QueueStats",)
class QueueStats:
"""Advanced statistics for a Pomice Queue.
Provides detailed analytics about queue contents including duration,
requester statistics, and track distribution.
"""
def __init__(self, queue: Queue) -> None:
"""Initialize queue statistics.
Parameters
----------
queue: Queue
The queue to analyze
"""
self._queue = queue
@property
def total_duration(self) -> int:
"""Get total duration of all tracks in queue (milliseconds).
Returns
-------
int
Total duration in milliseconds
"""
return sum(track.length for track in self._queue)
@property
def average_duration(self) -> float:
"""Get average track duration in queue (milliseconds).
Returns
-------
float
Average duration in milliseconds, or 0.0 if queue is empty
"""
if self._queue.is_empty:
return 0.0
return self.total_duration / len(self._queue)
@property
def longest_track(self) -> Optional[Track]:
"""Get the longest track in the queue.
Returns
-------
Optional[Track]
The longest track, or None if queue is empty
"""
if self._queue.is_empty:
return None
return max(self._queue, key=lambda t: t.length)
@property
def shortest_track(self) -> Optional[Track]:
"""Get the shortest track in the queue.
Returns
-------
Optional[Track]
The shortest track, or None if queue is empty
"""
if self._queue.is_empty:
return None
return min(self._queue, key=lambda t: t.length)
def get_requester_stats(self) -> Dict[int, Dict[str, any]]:
"""Get statistics grouped by requester.
Returns
-------
Dict[int, Dict[str, any]]
Dictionary mapping user IDs to their stats:
- 'count': Number of tracks requested
- 'total_duration': Total duration of their tracks (ms)
- 'tracks': List of their tracks
"""
stats: Dict[int, Dict] = {}
for track in self._queue:
if not track.requester:
continue
user_id = track.requester.id
if user_id not in stats:
stats[user_id] = {
"count": 0,
"total_duration": 0,
"tracks": [],
"requester": track.requester,
}
stats[user_id]["count"] += 1
stats[user_id]["total_duration"] += track.length
stats[user_id]["tracks"].append(track)
return stats
def get_top_requesters(self, limit: int = 5) -> List[tuple]:
"""Get top requesters by track count.
Parameters
----------
limit: int
Maximum number of requesters to return. Defaults to 5.
Returns
-------
List[tuple]
List of (requester, count) tuples sorted by count (descending)
"""
requester_counts = Counter(track.requester.id for track in self._queue if track.requester)
# Get requester objects
stats = self.get_requester_stats()
return [
(stats[user_id]["requester"], count)
for user_id, count in requester_counts.most_common(limit)
]
def get_author_distribution(self) -> Dict[str, int]:
"""Get distribution of tracks by author.
Returns
-------
Dict[str, int]
Dictionary mapping author names to track counts
"""
return dict(Counter(track.author for track in self._queue))
def get_top_authors(self, limit: int = 10) -> List[tuple]:
"""Get most common authors in the queue.
Parameters
----------
limit: int
Maximum number of authors to return. Defaults to 10.
Returns
-------
List[tuple]
List of (author, count) tuples sorted by count (descending)
"""
author_counts = Counter(track.author for track in self._queue)
return author_counts.most_common(limit)
def get_stream_count(self) -> int:
"""Get count of streams in the queue.
Returns
-------
int
Number of streams
"""
return sum(1 for track in self._queue if track.is_stream)
def get_playlist_distribution(self) -> Dict[str, int]:
"""Get distribution of tracks by playlist.
Returns
-------
Dict[str, int]
Dictionary mapping playlist names to track counts
"""
distribution: Dict[str, int] = {}
for track in self._queue:
if track.playlist:
playlist_name = track.playlist.name
distribution[playlist_name] = distribution.get(playlist_name, 0) + 1
return distribution
def get_duration_breakdown(self) -> Dict[str, int]:
"""Get breakdown of tracks by duration categories.
Returns
-------
Dict[str, int]
Dictionary with counts for different duration ranges:
- 'short' (< 3 min)
- 'medium' (3-6 min)
- 'long' (6-10 min)
- 'very_long' (> 10 min)
"""
breakdown = {
"short": 0, # < 3 minutes
"medium": 0, # 3-6 minutes
"long": 0, # 6-10 minutes
"very_long": 0, # > 10 minutes
}
for track in self._queue:
duration_minutes = track.length / 60000 # Convert ms to minutes
if duration_minutes < 3:
breakdown["short"] += 1
elif duration_minutes < 6:
breakdown["medium"] += 1
elif duration_minutes < 10:
breakdown["long"] += 1
else:
breakdown["very_long"] += 1
return breakdown
def format_duration(self, milliseconds: int) -> str:
"""Format duration in milliseconds to human-readable string.
Parameters
----------
milliseconds: int
Duration in milliseconds
Returns
-------
str
Formatted duration (e.g., "1:23:45" or "5:30")
"""
seconds = milliseconds // 1000
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours > 0:
return f"{hours}:{minutes:02d}:{seconds:02d}"
return f"{minutes}:{seconds:02d}"
def get_summary(self) -> Dict[str, any]:
"""Get a comprehensive summary of queue statistics.
Returns
-------
Dict[str, any]
Dictionary containing various queue statistics
"""
return {
"total_tracks": len(self._queue),
"total_duration": self.total_duration,
"total_duration_formatted": self.format_duration(self.total_duration),
"average_duration": self.average_duration,
"average_duration_formatted": self.format_duration(int(self.average_duration)),
"longest_track": self.longest_track,
"shortest_track": self.shortest_track,
"stream_count": self.get_stream_count(),
"unique_authors": len(self.get_author_distribution()),
"unique_requesters": len(self.get_requester_stats()),
"duration_breakdown": self.get_duration_breakdown(),
"loop_mode": self._queue.loop_mode,
"is_looping": self._queue.is_looping,
}
def __repr__(self) -> str:
return (
f"<Pomice.QueueStats tracks={len(self._queue)} "
f"duration={self.format_duration(self.total_duration)}>"
)

View File

@ -286,25 +286,16 @@ class Client:
# Fetch pages in rolling waves; yield promptly as soon as a wave completes. # Fetch pages in rolling waves; yield promptly as soon as a wave completes.
wave_size = self._playlist_concurrency * 2 wave_size = self._playlist_concurrency * 2
for i, offset in enumerate(remaining_offsets): remaining_offsets_list = list(remaining_offsets)
# Build wave
if i % wave_size == 0: for i in range(0, len(remaining_offsets_list), wave_size):
wave_offsets = list( wave_offsets = remaining_offsets_list[i : i + wave_size]
o for o in remaining_offsets if o >= offset and o < offset + wave_size results = await asyncio.gather(*[fetch(o) for o in wave_offsets])
) for page_tracks in results:
results = await asyncio.gather(*[fetch(o) for o in wave_offsets]) if not page_tracks:
for page_tracks in results: continue
if not page_tracks: for j in range(0, len(page_tracks), batch_size):
continue yield page_tracks[j : j + batch_size]
for j in range(0, len(page_tracks), batch_size):
yield page_tracks[j : j + batch_size]
# Skip ahead in iterator by adjusting enumerate drive (consume extras)
# Fast-forward the generator manually
for _ in range(len(wave_offsets) - 1):
try:
next(remaining_offsets) # type: ignore
except StopIteration:
break
async def get_recommendations(self, *, query: str) -> List[Track]: async def get_recommendations(self, *, query: str) -> List[Track]:
if not self._bearer_token or time.time() >= self._expiry: if not self._bearer_token or time.time() >= self._expiry:

405
pomice/track_utils.py Normal file
View File

@ -0,0 +1,405 @@
from __future__ import annotations
from typing import Callable
from typing import List
from typing import Optional
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .objects import Track
__all__ = ("TrackFilter", "SearchHelper")
class TrackFilter:
"""Advanced filtering utilities for tracks.
Provides various filter functions to find tracks matching specific criteria.
"""
@staticmethod
def by_duration(
tracks: List[Track],
*,
min_duration: Optional[int] = None,
max_duration: Optional[int] = None,
) -> List[Track]:
"""Filter tracks by duration range.
Parameters
----------
tracks: List[Track]
List of tracks to filter
min_duration: Optional[int]
Minimum duration in milliseconds
max_duration: Optional[int]
Maximum duration in milliseconds
Returns
-------
List[Track]
Filtered tracks
"""
result = tracks
if min_duration is not None:
result = [t for t in result if t.length >= min_duration]
if max_duration is not None:
result = [t for t in result if t.length <= max_duration]
return result
@staticmethod
def by_author(tracks: List[Track], author: str, *, exact: bool = False) -> List[Track]:
"""Filter tracks by author name.
Parameters
----------
tracks: List[Track]
List of tracks to filter
author: str
Author name to search for
exact: bool
Whether to match exactly. Defaults to False (case-insensitive contains).
Returns
-------
List[Track]
Filtered tracks
"""
if exact:
return [t for t in tracks if t.author == author]
author_lower = author.lower()
return [t for t in tracks if author_lower in t.author.lower()]
@staticmethod
def by_title(tracks: List[Track], title: str, *, exact: bool = False) -> List[Track]:
"""Filter tracks by title.
Parameters
----------
tracks: List[Track]
List of tracks to filter
title: str
Title to search for
exact: bool
Whether to match exactly. Defaults to False (case-insensitive contains).
Returns
-------
List[Track]
Filtered tracks
"""
if exact:
return [t for t in tracks if t.title == title]
title_lower = title.lower()
return [t for t in tracks if title_lower in t.title.lower()]
@staticmethod
def by_requester(tracks: List[Track], requester_id: int) -> List[Track]:
"""Filter tracks by requester.
Parameters
----------
tracks: List[Track]
List of tracks to filter
requester_id: int
Discord user ID
Returns
-------
List[Track]
Filtered tracks
"""
return [t for t in tracks if t.requester and t.requester.id == requester_id]
@staticmethod
def by_playlist(tracks: List[Track], playlist_name: str) -> List[Track]:
"""Filter tracks by playlist name.
Parameters
----------
tracks: List[Track]
List of tracks to filter
playlist_name: str
Playlist name to search for
Returns
-------
List[Track]
Filtered tracks
"""
playlist_lower = playlist_name.lower()
return [t for t in tracks if t.playlist and playlist_lower in t.playlist.name.lower()]
@staticmethod
def streams_only(tracks: List[Track]) -> List[Track]:
"""Filter to only include streams.
Parameters
----------
tracks: List[Track]
List of tracks to filter
Returns
-------
List[Track]
Only stream tracks
"""
return [t for t in tracks if t.is_stream]
@staticmethod
def non_streams_only(tracks: List[Track]) -> List[Track]:
"""Filter to exclude streams.
Parameters
----------
tracks: List[Track]
List of tracks to filter
Returns
-------
List[Track]
Only non-stream tracks
"""
return [t for t in tracks if not t.is_stream]
@staticmethod
def custom(tracks: List[Track], predicate: Callable[[Track], bool]) -> List[Track]:
"""Filter tracks using a custom predicate function.
Parameters
----------
tracks: List[Track]
List of tracks to filter
predicate: Callable[[Track], bool]
Function that returns True for tracks to include
Returns
-------
List[Track]
Filtered tracks
"""
return [t for t in tracks if predicate(t)]
class SearchHelper:
"""Helper utilities for searching and sorting tracks."""
@staticmethod
def search_tracks(
tracks: List[Track],
query: str,
*,
search_title: bool = True,
search_author: bool = True,
case_sensitive: bool = False,
) -> List[Track]:
"""Search tracks by query string.
Parameters
----------
tracks: List[Track]
List of tracks to search
query: str
Search query
search_title: bool
Whether to search in titles. Defaults to True.
search_author: bool
Whether to search in authors. Defaults to True.
case_sensitive: bool
Whether search is case-sensitive. Defaults to False.
Returns
-------
List[Track]
Matching tracks
"""
if not case_sensitive:
query = query.lower()
results = []
for track in tracks:
title = track.title if case_sensitive else track.title.lower()
author = track.author if case_sensitive else track.author.lower()
if search_title and query in title:
results.append(track)
elif search_author and query in author:
results.append(track)
return results
@staticmethod
def sort_by_duration(
tracks: List[Track],
*,
reverse: bool = False,
) -> List[Track]:
"""Sort tracks by duration.
Parameters
----------
tracks: List[Track]
List of tracks to sort
reverse: bool
If True, sort longest to shortest. Defaults to False.
Returns
-------
List[Track]
Sorted tracks
"""
return sorted(tracks, key=lambda t: t.length, reverse=reverse)
@staticmethod
def sort_by_title(
tracks: List[Track],
*,
reverse: bool = False,
) -> List[Track]:
"""Sort tracks alphabetically by title.
Parameters
----------
tracks: List[Track]
List of tracks to sort
reverse: bool
If True, sort Z to A. Defaults to False.
Returns
-------
List[Track]
Sorted tracks
"""
return sorted(tracks, key=lambda t: t.title.lower(), reverse=reverse)
@staticmethod
def sort_by_author(
tracks: List[Track],
*,
reverse: bool = False,
) -> List[Track]:
"""Sort tracks alphabetically by author.
Parameters
----------
tracks: List[Track]
List of tracks to sort
reverse: bool
If True, sort Z to A. Defaults to False.
Returns
-------
List[Track]
Sorted tracks
"""
return sorted(tracks, key=lambda t: t.author.lower(), reverse=reverse)
@staticmethod
def remove_duplicates(
tracks: List[Track],
*,
by_uri: bool = True,
by_title_author: bool = False,
) -> List[Track]:
"""Remove duplicate tracks from a list.
Parameters
----------
tracks: List[Track]
List of tracks
by_uri: bool
Remove duplicates by URI. Defaults to True.
by_title_author: bool
Remove duplicates by title+author combination. Defaults to False.
Returns
-------
List[Track]
List with duplicates removed (keeps first occurrence)
"""
seen = set()
result = []
for track in tracks:
if by_uri:
key = track.uri
elif by_title_author:
key = (track.title.lower(), track.author.lower())
else:
key = track.track_id
if key not in seen:
seen.add(key)
result.append(track)
return result
@staticmethod
def group_by_author(tracks: List[Track]) -> dict[str, List[Track]]:
"""Group tracks by author.
Parameters
----------
tracks: List[Track]
List of tracks to group
Returns
-------
dict[str, List[Track]]
Dictionary mapping author names to their tracks
"""
groups = {}
for track in tracks:
author = track.author
if author not in groups:
groups[author] = []
groups[author].append(track)
return groups
@staticmethod
def group_by_playlist(tracks: List[Track]) -> dict[str, List[Track]]:
"""Group tracks by playlist.
Parameters
----------
tracks: List[Track]
List of tracks to group
Returns
-------
dict[str, List[Track]]
Dictionary mapping playlist names to their tracks
"""
groups = {}
for track in tracks:
if track.playlist:
playlist_name = track.playlist.name
if playlist_name not in groups:
groups[playlist_name] = []
groups[playlist_name].append(track)
return groups
@staticmethod
def get_random_tracks(tracks: List[Track], count: int) -> List[Track]:
"""Get random tracks from a list.
Parameters
----------
tracks: List[Track]
List of tracks
count: int
Number of random tracks to get
Returns
-------
List[Track]
Random tracks (without replacement)
"""
import random
return random.sample(tracks, min(count, len(tracks)))