diff --git a/ADVANCED_FEATURES.md b/ADVANCED_FEATURES.md index 9087755..e816bc0 100644 --- a/ADVANCED_FEATURES.md +++ b/ADVANCED_FEATURES.md @@ -367,21 +367,21 @@ class Music(commands.Cog): def __init__(self, bot): self.bot = bot self.history = pomice.TrackHistory(max_size=100) - + @commands.command() async def stats(self, ctx): """Show queue statistics.""" player = ctx.voice_client stats = pomice.QueueStats(player.queue) summary = stats.get_summary() - + await ctx.send( f"**Queue Stats**\n" f"Tracks: {summary['total_tracks']}\n" f"Duration: {summary['total_duration_formatted']}\n" f"Streams: {summary['stream_count']}" ) - + @commands.command() async def export(self, ctx): """Export queue to file.""" @@ -392,18 +392,18 @@ class Music(commands.Cog): name=f"{ctx.guild.name}'s Queue" ) await ctx.send('✅ Queue exported!') - + @commands.command() async def filter_long(self, ctx): """Show tracks longer than 5 minutes.""" player = ctx.voice_client tracks = list(player.queue) - + long_tracks = pomice.TrackFilter.by_duration( tracks, min_duration=300000 # 5 minutes ) - + await ctx.send(f'Found {len(long_tracks)} long tracks!') ``` diff --git a/NEW_FEATURES_SUMMARY.md b/NEW_FEATURES_SUMMARY.md index a883e03..423f4b9 100644 --- a/NEW_FEATURES_SUMMARY.md +++ b/NEW_FEATURES_SUMMARY.md @@ -126,18 +126,18 @@ class Music(commands.Cog): def __init__(self, bot): self.bot = bot self.history = pomice.TrackHistory(max_size=100) - + @commands.Cog.listener() async def on_pomice_track_end(self, player, track, _): # Add to history when track ends self.history.add(track) - + @commands.command() async def stats(self, ctx): """Show queue statistics.""" stats = pomice.QueueStats(ctx.voice_client.queue) summary = stats.get_summary() - + await ctx.send( f"**Queue Stats**\n" f"📊 Tracks: {summary['total_tracks']}\n" @@ -145,19 +145,19 @@ class Music(commands.Cog): f"📡 Streams: {summary['stream_count']}\n" f"👥 Unique Requesters: {summary['unique_requesters']}" ) - + @commands.command() async def history(self, ctx, limit: int = 10): """Show recently played tracks.""" recent = self.history.get_last(limit) - + tracks_list = '\n'.join( f"{i}. {track.title} by {track.author}" for i, track in enumerate(recent, 1) ) - + await ctx.send(f"**Recently Played:**\n{tracks_list}") - + @commands.command() async def export(self, ctx): """Export current queue.""" diff --git a/examples/advanced_features.py b/examples/advanced_features.py index c0d014c..fbe7926 100644 --- a/examples/advanced_features.py +++ b/examples/advanced_features.py @@ -7,14 +7,15 @@ This example demonstrates: - Playlist Export/Import - Track Filtering and Search """ - import asyncio + import discord from discord.ext import commands + import pomice # Initialize bot -bot = commands.Bot(command_prefix='!', intents=discord.Intents.all()) +bot = commands.Bot(command_prefix="!", intents=discord.Intents.all()) class AdvancedMusic(commands.Cog): @@ -23,7 +24,7 @@ class AdvancedMusic(commands.Cog): def __init__(self, bot): self.bot = bot self.pomice = pomice.NodePool() - + # Track history for each guild self.track_histories = {} @@ -31,10 +32,10 @@ class AdvancedMusic(commands.Cog): """Start Lavalink nodes.""" await self.pomice.create_node( bot=self.bot, - host='127.0.0.1', - port='3030', - password='youshallnotpass', - identifier='MAIN' + host="127.0.0.1", + port="3030", + password="youshallnotpass", + identifier="MAIN", ) @commands.Cog.listener() @@ -42,10 +43,10 @@ class AdvancedMusic(commands.Cog): """Add track to history when it ends.""" if player.guild.id not in self.track_histories: self.track_histories[player.guild.id] = pomice.TrackHistory(max_size=100) - + self.track_histories[player.guild.id].add(track) - @commands.command(name='play') + @commands.command(name="play") async def play(self, ctx, *, search: str): """Play a track.""" if not ctx.voice_client: @@ -55,133 +56,111 @@ class AdvancedMusic(commands.Cog): results = await player.get_tracks(query=search, ctx=ctx) if not results: - return await ctx.send('No results found.') + return await ctx.send("No results found.") if isinstance(results, pomice.Playlist): await player.queue.put(results.tracks) - await ctx.send(f'Added playlist **{results.name}** with {len(results.tracks)} tracks.') + await ctx.send(f"Added playlist **{results.name}** with {len(results.tracks)} tracks.") else: track = results[0] await player.queue.put(track) - await ctx.send(f'Added **{track.title}** to queue.') + await ctx.send(f"Added **{track.title}** to queue.") if not player.is_playing: await player.do_next() - @commands.command(name='history') + @commands.command(name="history") async def history(self, ctx, limit: int = 10): """Show recently played tracks.""" if ctx.guild.id not in self.track_histories: - return await ctx.send('No history available.') + return await ctx.send("No history available.") history = self.track_histories[ctx.guild.id] - + if history.is_empty: - return await ctx.send('No tracks in history.') + return await ctx.send("No tracks in history.") tracks = history.get_last(limit) - - embed = discord.Embed( - title='🎵 Recently Played', - color=discord.Color.blue() - ) - + + embed = discord.Embed(title="🎵 Recently Played", color=discord.Color.blue()) + for i, track in enumerate(tracks, 1): - embed.add_field( - name=f'{i}. {track.title}', - value=f'by {track.author}', - inline=False - ) - + embed.add_field(name=f"{i}. {track.title}", value=f"by {track.author}", inline=False) + await ctx.send(embed=embed) - @commands.command(name='stats') + @commands.command(name="stats") async def queue_stats(self, ctx): """Show detailed queue statistics.""" if not ctx.voice_client: - return await ctx.send('Not connected to voice.') + return await ctx.send("Not connected to voice.") player = ctx.voice_client stats = pomice.QueueStats(player.queue) summary = stats.get_summary() - embed = discord.Embed( - title='📊 Queue Statistics', - color=discord.Color.green() - ) - + embed = discord.Embed(title="📊 Queue Statistics", color=discord.Color.green()) + + embed.add_field(name="Total Tracks", value=summary["total_tracks"], inline=True) embed.add_field( - name='Total Tracks', - value=summary['total_tracks'], - inline=True + name="Total Duration", value=summary["total_duration_formatted"], inline=True, ) embed.add_field( - name='Total Duration', - value=summary['total_duration_formatted'], - inline=True + name="Average Duration", value=summary["average_duration_formatted"], inline=True, ) - embed.add_field( - name='Average Duration', - value=summary['average_duration_formatted'], - inline=True - ) - - if summary['longest_track']: + + if summary["longest_track"]: embed.add_field( - name='Longest Track', + name="Longest Track", value=f"{summary['longest_track'].title} ({stats.format_duration(summary['longest_track'].length)})", - inline=False + inline=False, ) - + # Duration breakdown - breakdown = summary['duration_breakdown'] + breakdown = summary["duration_breakdown"] embed.add_field( - name='Duration Breakdown', + name="Duration Breakdown", value=f"Short (<3min): {breakdown['short']}\n" - f"Medium (3-6min): {breakdown['medium']}\n" - f"Long (6-10min): {breakdown['long']}\n" - f"Very Long (>10min): {breakdown['very_long']}", - inline=False + f"Medium (3-6min): {breakdown['medium']}\n" + f"Long (6-10min): {breakdown['long']}\n" + f"Very Long (>10min): {breakdown['very_long']}", + inline=False, ) - + # Top requesters top_requesters = stats.get_top_requesters(3) if top_requesters: - requesters_text = '\n'.join( - f'{i}. {req.display_name}: {count} tracks' + requesters_text = "\n".join( + f"{i}. {req.display_name}: {count} tracks" for i, (req, count) in enumerate(top_requesters, 1) ) - embed.add_field( - name='Top Requesters', - value=requesters_text, - inline=False - ) - + embed.add_field(name="Top Requesters", value=requesters_text, inline=False) + await ctx.send(embed=embed) - @commands.command(name='export') - async def export_queue(self, ctx, filename: str = 'playlist.json'): + @commands.command(name="export") + async def export_queue(self, ctx, filename: str = "playlist.json"): """Export current queue to a file.""" if not ctx.voice_client: - return await ctx.send('Not connected to voice.') + return await ctx.send("Not connected to voice.") player = ctx.voice_client - + if player.queue.is_empty: - return await ctx.send('Queue is empty.') + return await ctx.send("Queue is empty.") try: pomice.PlaylistManager.export_queue( player.queue, - f'playlists/{filename}', + f"playlists/{filename}", name=f"{ctx.guild.name}'s Playlist", - description=f'Exported from {ctx.guild.name}' + description=f"Exported from {ctx.guild.name}", ) - await ctx.send(f'✅ Queue exported to `playlists/{filename}`') + await ctx.send(f"✅ Queue exported to `playlists/{filename}`") except Exception as e: - await ctx.send(f'❌ Error exporting queue: {e}') + await ctx.send(f"❌ Error exporting queue: {e}") - @commands.command(name='import') + @commands.command(name="import") async def import_playlist(self, ctx, filename: str): """Import a playlist from a file.""" if not ctx.voice_client: @@ -190,11 +169,11 @@ class AdvancedMusic(commands.Cog): player = ctx.voice_client try: - data = pomice.PlaylistManager.import_playlist(f'playlists/{filename}') - + data = pomice.PlaylistManager.import_playlist(f"playlists/{filename}") + # Get URIs and search for tracks - uris = [track['uri'] for track in data['tracks'] if track.get('uri')] - + uris = [track["uri"] for track in data["tracks"] if track.get("uri")] + added = 0 for uri in uris: try: @@ -208,76 +187,73 @@ class AdvancedMusic(commands.Cog): added += 1 except: continue - + await ctx.send(f'✅ Imported {added} tracks from `{data["name"]}`') - + if not player.is_playing: await player.do_next() - - except FileNotFoundError: - await ctx.send(f'❌ Playlist file `{filename}` not found.') - except Exception as e: - await ctx.send(f'❌ Error importing playlist: {e}') - @commands.command(name='filter') + except FileNotFoundError: + await ctx.send(f"❌ Playlist file `{filename}` not found.") + except Exception as e: + await ctx.send(f"❌ Error importing playlist: {e}") + + @commands.command(name="filter") async def filter_queue(self, ctx, filter_type: str, *, value: str): """Filter queue by various criteria. - + Examples: !filter author Imagine Dragons !filter duration 180000-300000 (3-5 minutes in ms) !filter title Thunder """ if not ctx.voice_client: - return await ctx.send('Not connected to voice.') + return await ctx.send("Not connected to voice.") player = ctx.voice_client queue_tracks = list(player.queue) - if filter_type == 'author': + if filter_type == "author": filtered = pomice.TrackFilter.by_author(queue_tracks, value) - elif filter_type == 'title': + elif filter_type == "title": filtered = pomice.TrackFilter.by_title(queue_tracks, value) - elif filter_type == 'duration': + elif filter_type == "duration": # Parse duration range (e.g., "180000-300000") - if '-' in value: - min_dur, max_dur = map(int, value.split('-')) + if "-" in value: + min_dur, max_dur = map(int, value.split("-")) filtered = pomice.TrackFilter.by_duration( - queue_tracks, - min_duration=min_dur, - max_duration=max_dur + queue_tracks, min_duration=min_dur, max_duration=max_dur, ) else: - return await ctx.send('Duration format: min-max (in milliseconds)') + return await ctx.send("Duration format: min-max (in milliseconds)") else: - return await ctx.send('Valid filters: author, title, duration') + return await ctx.send("Valid filters: author, title, duration") if not filtered: - return await ctx.send('No tracks match the filter.') + return await ctx.send("No tracks match the filter.") embed = discord.Embed( - title=f'🔍 Filtered Results ({len(filtered)} tracks)', - color=discord.Color.purple() + title=f"🔍 Filtered Results ({len(filtered)} tracks)", color=discord.Color.purple(), ) - + for i, track in enumerate(filtered[:10], 1): stats = pomice.QueueStats(player.queue) embed.add_field( - name=f'{i}. {track.title}', - value=f'by {track.author} - {stats.format_duration(track.length)}', - inline=False + name=f"{i}. {track.title}", + value=f"by {track.author} - {stats.format_duration(track.length)}", + inline=False, ) - + if len(filtered) > 10: - embed.set_footer(text=f'Showing 10 of {len(filtered)} results') - + embed.set_footer(text=f"Showing 10 of {len(filtered)} results") + await ctx.send(embed=embed) - @commands.command(name='search_history') + @commands.command(name="search_history") async def search_history(self, ctx, *, query: str): """Search through play history.""" if ctx.guild.id not in self.track_histories: - return await ctx.send('No history available.') + return await ctx.send("No history available.") history = self.track_histories[ctx.guild.id] results = history.search(query) @@ -287,63 +263,59 @@ class AdvancedMusic(commands.Cog): embed = discord.Embed( title=f'🔍 History Search: "{query}"', - description=f'Found {len(results)} tracks', - color=discord.Color.gold() + description=f"Found {len(results)} tracks", + color=discord.Color.gold(), ) - + for i, track in enumerate(results[:10], 1): - embed.add_field( - name=f'{i}. {track.title}', - value=f'by {track.author}', - inline=False - ) - + embed.add_field(name=f"{i}. {track.title}", value=f"by {track.author}", inline=False) + if len(results) > 10: - embed.set_footer(text=f'Showing 10 of {len(results)} results') - + embed.set_footer(text=f"Showing 10 of {len(results)} results") + await ctx.send(embed=embed) - @commands.command(name='sort') - async def sort_queue(self, ctx, sort_by: str = 'duration'): + @commands.command(name="sort") + async def sort_queue(self, ctx, sort_by: str = "duration"): """Sort the queue. - + Options: duration, title, author """ if not ctx.voice_client: - return await ctx.send('Not connected to voice.') + return await ctx.send("Not connected to voice.") player = ctx.voice_client - + if player.queue.is_empty: - return await ctx.send('Queue is empty.') + return await ctx.send("Queue is empty.") queue_tracks = list(player.queue) - if sort_by == 'duration': + if sort_by == "duration": sorted_tracks = pomice.SearchHelper.sort_by_duration(queue_tracks) - elif sort_by == 'title': + elif sort_by == "title": sorted_tracks = pomice.SearchHelper.sort_by_title(queue_tracks) - elif sort_by == 'author': + elif sort_by == "author": sorted_tracks = pomice.SearchHelper.sort_by_author(queue_tracks) else: - return await ctx.send('Valid options: duration, title, author') + return await ctx.send("Valid options: duration, title, author") # Clear and refill queue player.queue._queue.clear() for track in sorted_tracks: await player.queue.put(track) - await ctx.send(f'✅ Queue sorted by {sort_by}') + await ctx.send(f"✅ Queue sorted by {sort_by}") @bot.event async def on_ready(): - print(f'{bot.user} is ready!') - await bot.get_cog('AdvancedMusic').start_nodes() + print(f"{bot.user} is ready!") + await bot.get_cog("AdvancedMusic").start_nodes() # Add cog bot.add_cog(AdvancedMusic(bot)) # Run bot -bot.run('YOUR_BOT_TOKEN') +bot.run("YOUR_BOT_TOKEN") diff --git a/pomice/history.py b/pomice/history.py index b55b22d..23a9e3f 100644 --- a/pomice/history.py +++ b/pomice/history.py @@ -13,7 +13,7 @@ __all__ = ("TrackHistory",) class TrackHistory: """Track history manager for Pomice. - + Keeps track of previously played tracks with a configurable maximum size. Useful for implementing 'previous track' functionality and viewing play history. """ @@ -22,7 +22,7 @@ class TrackHistory: def __init__(self, max_size: int = 100) -> None: """Initialize the track history. - + Parameters ---------- max_size: int @@ -46,7 +46,7 @@ class TrackHistory: def __getitem__(self, index: int) -> Track: """Get a track at the given index in history. - + Parameters ---------- index: int @@ -59,7 +59,7 @@ class TrackHistory: def add(self, track: Track) -> None: """Add a track to the history. - + Parameters ---------- track: Track @@ -70,12 +70,12 @@ class TrackHistory: def get_last(self, count: int = 1) -> List[Track]: """Get the last N tracks from history. - + Parameters ---------- count: int Number of tracks to retrieve. Defaults to 1. - + Returns ------- List[Track] @@ -87,7 +87,7 @@ class TrackHistory: def get_previous(self) -> Optional[Track]: """Get the previous track in history. - + Returns ------- Optional[Track] @@ -95,13 +95,13 @@ class TrackHistory: """ if not self._history or self._current_index <= 0: return None - + self._current_index -= 1 return self._history[self._current_index] def get_next(self) -> Optional[Track]: """Get the next track in history (when navigating backwards). - + Returns ------- Optional[Track] @@ -109,7 +109,7 @@ class TrackHistory: """ if not self._history or self._current_index >= len(self._history) - 1: return None - + self._current_index += 1 return self._history[self._current_index] @@ -120,7 +120,7 @@ class TrackHistory: def get_all(self) -> List[Track]: """Get all tracks in history. - + Returns ------- List[Track] @@ -130,12 +130,12 @@ class TrackHistory: def search(self, query: str) -> List[Track]: """Search for tracks in history by title or author. - + Parameters ---------- query: str Search query (case-insensitive) - + Returns ------- List[Track] @@ -143,13 +143,14 @@ class TrackHistory: """ query_lower = query.lower() return [ - track for track in reversed(self._history) + track + for track in reversed(self._history) if query_lower in track.title.lower() or query_lower in track.author.lower() ] def get_unique_tracks(self) -> List[Track]: """Get unique tracks from history (removes duplicates). - + Returns ------- List[Track] @@ -165,19 +166,20 @@ class TrackHistory: def get_by_requester(self, requester_id: int) -> List[Track]: """Get all tracks requested by a specific user. - + Parameters ---------- requester_id: int Discord user ID - + Returns ------- List[Track] Tracks requested by the user (most recent first) """ return [ - track for track in reversed(self._history) + track + for track in reversed(self._history) if track.requester and track.requester.id == requester_id ] diff --git a/pomice/playlist_manager.py b/pomice/playlist_manager.py index b28d18e..930842d 100644 --- a/pomice/playlist_manager.py +++ b/pomice/playlist_manager.py @@ -18,7 +18,7 @@ __all__ = ("PlaylistManager",) class PlaylistManager: """Manager for exporting and importing playlists. - + Allows saving queue contents to JSON files and loading them back, useful for persistent playlists and sharing. """ @@ -33,7 +33,7 @@ class PlaylistManager: include_metadata: bool = True, ) -> None: """Export a queue to a JSON file. - + Parameters ---------- queue: Queue @@ -48,60 +48,60 @@ class PlaylistManager: Whether to include requester and timestamp metadata. Defaults to True. """ path = Path(filepath) - + if name is None: name = path.stem - + tracks_data = [] for track in queue: track_dict = { - 'title': track.title, - 'author': track.author, - 'uri': track.uri, - 'identifier': track.identifier, - 'length': track.length, - 'is_stream': track.is_stream, + "title": track.title, + "author": track.author, + "uri": track.uri, + "identifier": track.identifier, + "length": track.length, + "is_stream": track.is_stream, } - + if include_metadata: - track_dict['requester_id'] = track.requester.id if track.requester else None - track_dict['requester_name'] = str(track.requester) if track.requester else None - track_dict['timestamp'] = track.timestamp - + track_dict["requester_id"] = track.requester.id if track.requester else None + track_dict["requester_name"] = str(track.requester) if track.requester else None + track_dict["timestamp"] = track.timestamp + if track.thumbnail: - track_dict['thumbnail'] = track.thumbnail - + track_dict["thumbnail"] = track.thumbnail + if track.isrc: - track_dict['isrc'] = track.isrc - + track_dict["isrc"] = track.isrc + if track.playlist: - track_dict['playlist_name'] = track.playlist.name - + track_dict["playlist_name"] = track.playlist.name + tracks_data.append(track_dict) - + playlist_data = { - 'name': name, - 'description': description, - 'created_at': datetime.utcnow().isoformat(), - 'track_count': len(tracks_data), - 'total_duration': sum(t['length'] for t in tracks_data), - 'tracks': tracks_data, - 'version': '1.0', + "name": name, + "description": description, + "created_at": datetime.utcnow().isoformat(), + "track_count": len(tracks_data), + "total_duration": sum(t["length"] for t in tracks_data), + "tracks": tracks_data, + "version": "1.0", } - + path.parent.mkdir(parents=True, exist_ok=True) - with open(path, 'w', encoding='utf-8') as f: + with open(path, "w", encoding="utf-8") as f: json.dump(playlist_data, f, indent=2, ensure_ascii=False) @staticmethod def import_playlist(filepath: str) -> Dict[str, Any]: """Import a playlist from a JSON file. - + Parameters ---------- filepath: str Path to the JSON file - + Returns ------- Dict[str, Any] @@ -114,10 +114,10 @@ class PlaylistManager: - 'created_at': Creation timestamp """ path = Path(filepath) - - with open(path, 'r', encoding='utf-8') as f: + + with open(path, encoding="utf-8") as f: data = json.load(f) - + return data @staticmethod @@ -129,7 +129,7 @@ class PlaylistManager: description: Optional[str] = None, ) -> None: """Export a list of tracks to a JSON file. - + Parameters ---------- tracks: List[Track] @@ -142,53 +142,53 @@ class PlaylistManager: Description for the playlist """ path = Path(filepath) - + if name is None: name = path.stem - + tracks_data = [ { - 'title': track.title, - 'author': track.author, - 'uri': track.uri, - 'identifier': track.identifier, - 'length': track.length, - 'thumbnail': track.thumbnail, - 'isrc': track.isrc, + "title": track.title, + "author": track.author, + "uri": track.uri, + "identifier": track.identifier, + "length": track.length, + "thumbnail": track.thumbnail, + "isrc": track.isrc, } for track in tracks ] - + playlist_data = { - 'name': name, - 'description': description, - 'created_at': datetime.utcnow().isoformat(), - 'track_count': len(tracks_data), - 'total_duration': sum(t['length'] for t in tracks_data), - 'tracks': tracks_data, - 'version': '1.0', + "name": name, + "description": description, + "created_at": datetime.utcnow().isoformat(), + "track_count": len(tracks_data), + "total_duration": sum(t["length"] for t in tracks_data), + "tracks": tracks_data, + "version": "1.0", } - + path.parent.mkdir(parents=True, exist_ok=True) - with open(path, 'w', encoding='utf-8') as f: + with open(path, "w", encoding="utf-8") as f: json.dump(playlist_data, f, indent=2, ensure_ascii=False) @staticmethod def get_track_uris(filepath: str) -> List[str]: """Get list of track URIs from a saved playlist. - + Parameters ---------- filepath: str Path to the JSON file - + Returns ------- List[str] List of track URIs """ data = PlaylistManager.import_playlist(filepath) - return [track['uri'] for track in data['tracks'] if track.get('uri')] + return [track["uri"] for track in data["tracks"] if track.get("uri")] @staticmethod def merge_playlists( @@ -200,7 +200,7 @@ class PlaylistManager: remove_duplicates: bool = True, ) -> None: """Merge multiple playlists into one. - + Parameters ---------- filepaths: List[str] @@ -216,34 +216,34 @@ class PlaylistManager: """ all_tracks = [] seen_uris = set() - + for filepath in filepaths: data = PlaylistManager.import_playlist(filepath) - - for track in data['tracks']: - uri = track.get('uri', '') - + + for track in data["tracks"]: + uri = track.get("uri", "") + if remove_duplicates: if uri and uri in seen_uris: continue if uri: seen_uris.add(uri) - + all_tracks.append(track) - + merged_data = { - 'name': name or 'Merged Playlist', - 'description': description or f'Merged from {len(filepaths)} playlists', - 'created_at': datetime.utcnow().isoformat(), - 'track_count': len(all_tracks), - 'total_duration': sum(t['length'] for t in all_tracks), - 'tracks': all_tracks, - 'version': '1.0', + "name": name or "Merged Playlist", + "description": description or f"Merged from {len(filepaths)} playlists", + "created_at": datetime.utcnow().isoformat(), + "track_count": len(all_tracks), + "total_duration": sum(t["length"] for t in all_tracks), + "tracks": all_tracks, + "version": "1.0", } - + output = Path(output_path) output.parent.mkdir(parents=True, exist_ok=True) - with open(output, 'w', encoding='utf-8') as f: + with open(output, "w", encoding="utf-8") as f: json.dump(merged_data, f, indent=2, ensure_ascii=False) @staticmethod @@ -254,7 +254,7 @@ class PlaylistManager: name: Optional[str] = None, ) -> None: """Export tracks to M3U playlist format. - + Parameters ---------- tracks: List[Track] @@ -266,39 +266,39 @@ class PlaylistManager: """ path = Path(filepath) path.parent.mkdir(parents=True, exist_ok=True) - - with open(path, 'w', encoding='utf-8') as f: - f.write('#EXTM3U\n') + + with open(path, "w", encoding="utf-8") as f: + f.write("#EXTM3U\n") if name: - f.write(f'#PLAYLIST:{name}\n') - + f.write(f"#PLAYLIST:{name}\n") + for track in tracks: # Duration in seconds duration = track.length // 1000 - f.write(f'#EXTINF:{duration},{track.author} - {track.title}\n') - f.write(f'{track.uri}\n') + f.write(f"#EXTINF:{duration},{track.author} - {track.title}\n") + f.write(f"{track.uri}\n") @staticmethod def get_playlist_info(filepath: str) -> Dict[str, Any]: """Get basic information about a saved playlist without loading all tracks. - + Parameters ---------- filepath: str Path to the JSON file - + Returns ------- Dict[str, Any] Dictionary with playlist metadata (name, track_count, duration, etc.) """ data = PlaylistManager.import_playlist(filepath) - + return { - 'name': data.get('name'), - 'description': data.get('description'), - 'track_count': data.get('track_count'), - 'total_duration': data.get('total_duration'), - 'created_at': data.get('created_at'), - 'version': data.get('version'), + "name": data.get("name"), + "description": data.get("description"), + "track_count": data.get("track_count"), + "total_duration": data.get("total_duration"), + "created_at": data.get("created_at"), + "version": data.get("version"), } diff --git a/pomice/queue_stats.py b/pomice/queue_stats.py index 3bf613f..8ef852d 100644 --- a/pomice/queue_stats.py +++ b/pomice/queue_stats.py @@ -15,14 +15,14 @@ __all__ = ("QueueStats",) class QueueStats: """Advanced statistics for a Pomice Queue. - + Provides detailed analytics about queue contents including duration, requester statistics, and track distribution. """ def __init__(self, queue: Queue) -> None: """Initialize queue statistics. - + Parameters ---------- queue: Queue @@ -33,7 +33,7 @@ class QueueStats: @property def total_duration(self) -> int: """Get total duration of all tracks in queue (milliseconds). - + Returns ------- int @@ -44,7 +44,7 @@ class QueueStats: @property def average_duration(self) -> float: """Get average track duration in queue (milliseconds). - + Returns ------- float @@ -57,7 +57,7 @@ class QueueStats: @property def longest_track(self) -> Optional[Track]: """Get the longest track in the queue. - + Returns ------- Optional[Track] @@ -70,7 +70,7 @@ class QueueStats: @property def shortest_track(self) -> Optional[Track]: """Get the shortest track in the queue. - + Returns ------- Optional[Track] @@ -82,7 +82,7 @@ class QueueStats: def get_requester_stats(self) -> Dict[int, Dict[str, any]]: """Get statistics grouped by requester. - + Returns ------- Dict[int, Dict[str, any]] @@ -92,53 +92,51 @@ class QueueStats: - 'tracks': List of their tracks """ stats: Dict[int, Dict] = {} - + for track in self._queue: if not track.requester: continue - + user_id = track.requester.id if user_id not in stats: stats[user_id] = { - 'count': 0, - 'total_duration': 0, - 'tracks': [], - 'requester': track.requester + "count": 0, + "total_duration": 0, + "tracks": [], + "requester": track.requester, } - - stats[user_id]['count'] += 1 - stats[user_id]['total_duration'] += track.length - stats[user_id]['tracks'].append(track) - + + stats[user_id]["count"] += 1 + stats[user_id]["total_duration"] += track.length + stats[user_id]["tracks"].append(track) + return stats def get_top_requesters(self, limit: int = 5) -> List[tuple]: """Get top requesters by track count. - + Parameters ---------- limit: int Maximum number of requesters to return. Defaults to 5. - + Returns ------- List[tuple] List of (requester, count) tuples sorted by count (descending) """ - requester_counts = Counter( - track.requester.id for track in self._queue if track.requester - ) - + requester_counts = Counter(track.requester.id for track in self._queue if track.requester) + # Get requester objects stats = self.get_requester_stats() return [ - (stats[user_id]['requester'], count) + (stats[user_id]["requester"], count) for user_id, count in requester_counts.most_common(limit) ] def get_author_distribution(self) -> Dict[str, int]: """Get distribution of tracks by author. - + Returns ------- Dict[str, int] @@ -148,12 +146,12 @@ class QueueStats: def get_top_authors(self, limit: int = 10) -> List[tuple]: """Get most common authors in the queue. - + Parameters ---------- limit: int Maximum number of authors to return. Defaults to 10. - + Returns ------- List[tuple] @@ -164,7 +162,7 @@ class QueueStats: def get_stream_count(self) -> int: """Get count of streams in the queue. - + Returns ------- int @@ -174,24 +172,24 @@ class QueueStats: def get_playlist_distribution(self) -> Dict[str, int]: """Get distribution of tracks by playlist. - + Returns ------- Dict[str, int] Dictionary mapping playlist names to track counts """ distribution: Dict[str, int] = {} - + for track in self._queue: if track.playlist: playlist_name = track.playlist.name distribution[playlist_name] = distribution.get(playlist_name, 0) + 1 - + return distribution def get_duration_breakdown(self) -> Dict[str, int]: """Get breakdown of tracks by duration categories. - + Returns ------- Dict[str, int] @@ -202,34 +200,34 @@ class QueueStats: - 'very_long' (> 10 min) """ breakdown = { - 'short': 0, # < 3 minutes - 'medium': 0, # 3-6 minutes - 'long': 0, # 6-10 minutes - 'very_long': 0 # > 10 minutes + "short": 0, # < 3 minutes + "medium": 0, # 3-6 minutes + "long": 0, # 6-10 minutes + "very_long": 0, # > 10 minutes } - + for track in self._queue: duration_minutes = track.length / 60000 # Convert ms to minutes - + if duration_minutes < 3: - breakdown['short'] += 1 + breakdown["short"] += 1 elif duration_minutes < 6: - breakdown['medium'] += 1 + breakdown["medium"] += 1 elif duration_minutes < 10: - breakdown['long'] += 1 + breakdown["long"] += 1 else: - breakdown['very_long'] += 1 - + breakdown["very_long"] += 1 + return breakdown def format_duration(self, milliseconds: int) -> str: """Format duration in milliseconds to human-readable string. - + Parameters ---------- milliseconds: int Duration in milliseconds - + Returns ------- str @@ -238,33 +236,33 @@ class QueueStats: seconds = milliseconds // 1000 minutes, seconds = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) - + if hours > 0: return f"{hours}:{minutes:02d}:{seconds:02d}" return f"{minutes}:{seconds:02d}" def get_summary(self) -> Dict[str, any]: """Get a comprehensive summary of queue statistics. - + Returns ------- Dict[str, any] Dictionary containing various queue statistics """ return { - 'total_tracks': len(self._queue), - 'total_duration': self.total_duration, - 'total_duration_formatted': self.format_duration(self.total_duration), - 'average_duration': self.average_duration, - 'average_duration_formatted': self.format_duration(int(self.average_duration)), - 'longest_track': self.longest_track, - 'shortest_track': self.shortest_track, - 'stream_count': self.get_stream_count(), - 'unique_authors': len(self.get_author_distribution()), - 'unique_requesters': len(self.get_requester_stats()), - 'duration_breakdown': self.get_duration_breakdown(), - 'loop_mode': self._queue.loop_mode, - 'is_looping': self._queue.is_looping, + "total_tracks": len(self._queue), + "total_duration": self.total_duration, + "total_duration_formatted": self.format_duration(self.total_duration), + "average_duration": self.average_duration, + "average_duration_formatted": self.format_duration(int(self.average_duration)), + "longest_track": self.longest_track, + "shortest_track": self.shortest_track, + "stream_count": self.get_stream_count(), + "unique_authors": len(self.get_author_distribution()), + "unique_requesters": len(self.get_requester_stats()), + "duration_breakdown": self.get_duration_breakdown(), + "loop_mode": self._queue.loop_mode, + "is_looping": self._queue.is_looping, } def __repr__(self) -> str: diff --git a/pomice/track_utils.py b/pomice/track_utils.py index 1884230..9554c21 100644 --- a/pomice/track_utils.py +++ b/pomice/track_utils.py @@ -13,7 +13,7 @@ __all__ = ("TrackFilter", "SearchHelper") class TrackFilter: """Advanced filtering utilities for tracks. - + Provides various filter functions to find tracks matching specific criteria. """ @@ -25,7 +25,7 @@ class TrackFilter: max_duration: Optional[int] = None, ) -> List[Track]: """Filter tracks by duration range. - + Parameters ---------- tracks: List[Track] @@ -34,26 +34,26 @@ class TrackFilter: Minimum duration in milliseconds max_duration: Optional[int] Maximum duration in milliseconds - + Returns ------- List[Track] Filtered tracks """ result = tracks - + if min_duration is not None: result = [t for t in result if t.length >= min_duration] - + if max_duration is not None: result = [t for t in result if t.length <= max_duration] - + return result @staticmethod def by_author(tracks: List[Track], author: str, *, exact: bool = False) -> List[Track]: """Filter tracks by author name. - + Parameters ---------- tracks: List[Track] @@ -62,7 +62,7 @@ class TrackFilter: Author name to search for exact: bool Whether to match exactly. Defaults to False (case-insensitive contains). - + Returns ------- List[Track] @@ -70,14 +70,14 @@ class TrackFilter: """ if exact: return [t for t in tracks if t.author == author] - + author_lower = author.lower() return [t for t in tracks if author_lower in t.author.lower()] @staticmethod def by_title(tracks: List[Track], title: str, *, exact: bool = False) -> List[Track]: """Filter tracks by title. - + Parameters ---------- tracks: List[Track] @@ -86,7 +86,7 @@ class TrackFilter: Title to search for exact: bool Whether to match exactly. Defaults to False (case-insensitive contains). - + Returns ------- List[Track] @@ -94,21 +94,21 @@ class TrackFilter: """ if exact: return [t for t in tracks if t.title == title] - + title_lower = title.lower() return [t for t in tracks if title_lower in t.title.lower()] @staticmethod def by_requester(tracks: List[Track], requester_id: int) -> List[Track]: """Filter tracks by requester. - + Parameters ---------- tracks: List[Track] List of tracks to filter requester_id: int Discord user ID - + Returns ------- List[Track] @@ -119,34 +119,31 @@ class TrackFilter: @staticmethod def by_playlist(tracks: List[Track], playlist_name: str) -> List[Track]: """Filter tracks by playlist name. - + Parameters ---------- tracks: List[Track] List of tracks to filter playlist_name: str Playlist name to search for - + Returns ------- List[Track] Filtered tracks """ playlist_lower = playlist_name.lower() - return [ - t for t in tracks - if t.playlist and playlist_lower in t.playlist.name.lower() - ] + return [t for t in tracks if t.playlist and playlist_lower in t.playlist.name.lower()] @staticmethod def streams_only(tracks: List[Track]) -> List[Track]: """Filter to only include streams. - + Parameters ---------- tracks: List[Track] List of tracks to filter - + Returns ------- List[Track] @@ -157,12 +154,12 @@ class TrackFilter: @staticmethod def non_streams_only(tracks: List[Track]) -> List[Track]: """Filter to exclude streams. - + Parameters ---------- tracks: List[Track] List of tracks to filter - + Returns ------- List[Track] @@ -173,14 +170,14 @@ class TrackFilter: @staticmethod def custom(tracks: List[Track], predicate: Callable[[Track], bool]) -> List[Track]: """Filter tracks using a custom predicate function. - + Parameters ---------- tracks: List[Track] List of tracks to filter predicate: Callable[[Track], bool] Function that returns True for tracks to include - + Returns ------- List[Track] @@ -202,7 +199,7 @@ class SearchHelper: case_sensitive: bool = False, ) -> List[Track]: """Search tracks by query string. - + Parameters ---------- tracks: List[Track] @@ -215,7 +212,7 @@ class SearchHelper: Whether to search in authors. Defaults to True. case_sensitive: bool Whether search is case-sensitive. Defaults to False. - + Returns ------- List[Track] @@ -223,17 +220,17 @@ class SearchHelper: """ if not case_sensitive: query = query.lower() - + results = [] for track in tracks: title = track.title if case_sensitive else track.title.lower() author = track.author if case_sensitive else track.author.lower() - + if search_title and query in title: results.append(track) elif search_author and query in author: results.append(track) - + return results @staticmethod @@ -243,14 +240,14 @@ class SearchHelper: reverse: bool = False, ) -> List[Track]: """Sort tracks by duration. - + Parameters ---------- tracks: List[Track] List of tracks to sort reverse: bool If True, sort longest to shortest. Defaults to False. - + Returns ------- List[Track] @@ -265,14 +262,14 @@ class SearchHelper: reverse: bool = False, ) -> List[Track]: """Sort tracks alphabetically by title. - + Parameters ---------- tracks: List[Track] List of tracks to sort reverse: bool If True, sort Z to A. Defaults to False. - + Returns ------- List[Track] @@ -287,14 +284,14 @@ class SearchHelper: reverse: bool = False, ) -> List[Track]: """Sort tracks alphabetically by author. - + Parameters ---------- tracks: List[Track] List of tracks to sort reverse: bool If True, sort Z to A. Defaults to False. - + Returns ------- List[Track] @@ -310,7 +307,7 @@ class SearchHelper: by_title_author: bool = False, ) -> List[Track]: """Remove duplicate tracks from a list. - + Parameters ---------- tracks: List[Track] @@ -319,7 +316,7 @@ class SearchHelper: Remove duplicates by URI. Defaults to True. by_title_author: bool Remove duplicates by title+author combination. Defaults to False. - + Returns ------- List[Track] @@ -327,7 +324,7 @@ class SearchHelper: """ seen = set() result = [] - + for track in tracks: if by_uri: key = track.uri @@ -335,22 +332,22 @@ class SearchHelper: key = (track.title.lower(), track.author.lower()) else: key = track.track_id - + if key not in seen: seen.add(key) result.append(track) - + return result @staticmethod def group_by_author(tracks: List[Track]) -> dict[str, List[Track]]: """Group tracks by author. - + Parameters ---------- tracks: List[Track] List of tracks to group - + Returns ------- dict[str, List[Track]] @@ -367,12 +364,12 @@ class SearchHelper: @staticmethod def group_by_playlist(tracks: List[Track]) -> dict[str, List[Track]]: """Group tracks by playlist. - + Parameters ---------- tracks: List[Track] List of tracks to group - + Returns ------- dict[str, List[Track]] @@ -390,18 +387,19 @@ class SearchHelper: @staticmethod def get_random_tracks(tracks: List[Track], count: int) -> List[Track]: """Get random tracks from a list. - + Parameters ---------- tracks: List[Track] List of tracks count: int Number of random tracks to get - + Returns ------- List[Track] Random tracks (without replacement) """ import random + return random.sample(tracks, min(count, len(tracks)))