countdown-bot

A Discord bot that runs countdown games and generates analytics
git clone https://git.ashermorgan.net/countdown-bot/
Log | Files | Refs | README

commit a927965321796cb93d2c7ff5fc7eefdf2ed805f3
parent bc248b6486c484c1ae0d227beefccdb99e35bdd4
Author: Asher Morgan <59518073+ashermorgan@users.noreply.github.com>
Date:   Sat, 27 Apr 2024 11:00:16 -0700

Remove sqlalchemy

Diffstat:
MREADME.md | 2+-
Mcountdown_bot/__main__.py | 6++----
Mcountdown_bot/analyticsCog.py | 18++++++++----------
Mcountdown_bot/bot.py | 14+++++---------
Mcountdown_bot/botUtilities.py | 104+++++++++++++------------------------------------------------------------------
Dcountdown_bot/models.py | 570-------------------------------------------------------------------------------
Mcountdown_bot/utilitiesCog.py | 8+++-----
Mrequirements.txt | 1-
8 files changed, 36 insertions(+), 687 deletions(-)

diff --git a/README.md b/README.md @@ -15,7 +15,7 @@ A Discord bot that facilitates countdowns and generates detailed countdown analy ``` TOKEN=... PREFIX=! - DATABASE=sqlite:///data.sqlite3 + DATABASE=postgresql://... LOG_FILE=log.txt LOG_LEVEL=INFO ``` diff --git a/countdown_bot/__main__.py b/countdown_bot/__main__.py @@ -6,7 +6,6 @@ import psycopg # Import modules from .bot import CountdownBot -from .models import getSessionMaker # Load settings load_dotenv() @@ -21,9 +20,8 @@ logging.basicConfig( ) # Connect to database -databaseSessionMaker = getSessionMaker(os.environ.get("DATABASE")) -db_connection = psycopg.connect(os.environ.get("DATABASE2"), row_factory=psycopg.rows.dict_row) +db_connection = psycopg.connect(os.environ.get("DATABASE"), row_factory=psycopg.rows.dict_row) # Run bot -bot = CountdownBot(databaseSessionMaker, [os.environ.get("PREFIX", "!")], db_connection) +bot = CountdownBot(db_connection, [os.environ.get("PREFIX", "!")]) bot.run(os.environ.get("TOKEN")) diff --git a/countdown_bot/analyticsCog.py b/countdown_bot/analyticsCog.py @@ -10,15 +10,13 @@ import re import tempfile # Import modules -from .botUtilities import COLORS, getContextCountdown, getUsername, getContributor, CommandError, CountdownNotFound, getContextCountdown2 -from .models import POINT_RULES +from .botUtilities import COLORS, POINT_RULES, CommandError, CountdownNotFound, getUsername, getContributor, getContextCountdown class Analytics(commands.Cog): - def __init__(self, bot, databaseSessionMaker, db_connection): + def __init__(self, bot, db_connection): self.bot = bot - self.databaseSessionMaker = databaseSessionMaker self.db_connection = db_connection @@ -48,7 +46,7 @@ class Analytics(commands.Cog): with self.db_connection.cursor() as cur: # Get countdown channel - countdown = getContextCountdown2(cur, ctx) + countdown = getContextCountdown(cur, ctx) if not countdown: raise CountdownNotFound() @@ -151,7 +149,7 @@ class Analytics(commands.Cog): with self.db_connection.cursor() as cur: # Get countdown channel - countdown = getContextCountdown2(cur, ctx) + countdown = getContextCountdown(cur, ctx) if not countdown: raise CountdownNotFound() @@ -227,7 +225,7 @@ class Analytics(commands.Cog): with self.db_connection.cursor() as cur: # Get countdown channel - countdown = getContextCountdown2(cur, ctx) + countdown = getContextCountdown(cur, ctx) if not countdown: raise CountdownNotFound() @@ -322,7 +320,7 @@ class Analytics(commands.Cog): with self.db_connection.cursor() as cur: # Get countdown channel - countdown = getContextCountdown2(cur, ctx) + countdown = getContextCountdown(cur, ctx) if not countdown: raise CountdownNotFound() @@ -404,7 +402,7 @@ class Analytics(commands.Cog): with self.db_connection.cursor() as cur: # Get countdown channel - countdown = getContextCountdown2(cur, ctx) + countdown = getContextCountdown(cur, ctx) if not countdown: raise CountdownNotFound() @@ -482,7 +480,7 @@ class Analytics(commands.Cog): with self.db_connection.cursor() as cur: # Get countdown channel - countdown = getContextCountdown2(cur, ctx) + countdown = getContextCountdown(cur, ctx) if not countdown: raise CountdownNotFound() diff --git a/countdown_bot/bot.py b/countdown_bot/bot.py @@ -6,17 +6,15 @@ import logging # Import modules from . import analyticsCog, utilitiesCog -from .botUtilities import addMessage, COLORS, CountdownNotFound, ContributorNotFound, CommandError, getCountdown, getPrefix -from .models import EmptyCountdownError +from .botUtilities import addMessage, COLORS, CountdownNotFound, ContributorNotFound, CommandError, getPrefix class CountdownBot(commands.Bot): - def __init__(self, databaseSessionMaker, prefixes, db_connection): + def __init__(self, db_connection, prefixes): # Set properties - self.databaseSessionMaker = databaseSessionMaker - self.prefixes = prefixes self.db_connection = db_connection + self.prefixes = prefixes self.logger = logging.getLogger(__name__) # Get intents @@ -29,8 +27,8 @@ class CountdownBot(commands.Bot): async def setup_hook(self): - await self.add_cog(utilitiesCog.Utilities(self, self.databaseSessionMaker, self.db_connection)) - await self.add_cog(analyticsCog.Analytics(self, self.databaseSessionMaker, self.db_connection)) + await self.add_cog(utilitiesCog.Utilities(self, self.db_connection)) + await self.add_cog(analyticsCog.Analytics(self, self.db_connection)) @@ -91,8 +89,6 @@ class CountdownBot(commands.Bot): embed.description = f"Countdown not found" elif (isinstance(error.original, ContributorNotFound)): embed.description = f"Contributor not found: `{error.original.args[0]}`" - elif (isinstance(error.original, EmptyCountdownError)): - embed.description = f"The countdown is empty" elif (isinstance(error.original, CommandError)): embed.description = error.original.args[0] else: diff --git a/countdown_bot/botUtilities.py b/countdown_bot/botUtilities.py @@ -2,10 +2,6 @@ import discord import re -# Import modules -from .models import Countdown, Message - - COLORS = { "error": 0xD52C42, @@ -27,6 +23,22 @@ class CountdownNotFound(Exception): +# The rules for awarding leaderboard points +POINT_RULES = { + "r1": ("First Number", 0), + "r2": ("1000s", 1000), + "r3": ("1001s", 500), + "r4": ("200s", 200), + "r5": ("201s", 100), + "r6": ("100s", 100), + "r7": ("101s", 50), + # "r8": ("Prime Numbers", 15), + "r8": ("Odd Numbers", 12), + "r9": ("Even Numbers", 10), +} + + + async def getUsername(bot, id): """ Get a username from a user ID @@ -49,29 +61,6 @@ async def getUsername(bot, id): -async def getNickname(bot, server, id): - """ - Get a user's nickname in a server - - Parameters - ---------- - bot : commands.Bot - The bot - server : int - The server ID - id : int - The user ID - - Returns - ------- - str - The nickname - """ - - return (await (bot.get_guild(server)).fetch_member(id)).nick or await getUsername(bot, id) - - - async def getContributor(bot, countdown, text): """ Get the ID of the countdown contributor refered to by a string @@ -103,27 +92,6 @@ async def getContributor(bot, countdown, text): -def getCountdown(session, id): - """ - Get a countdown object - - Parameters - ---------- - session : sqlalchemy.orm.Session - The database session to use - id : int - The countdown id - - Returns - ------- - Countdown - The Countdown - """ - - return session.query(Countdown).filter(Countdown.id == id).first() - - - def isCountdown(cur, id): """ Determine whether a channel is a countdown @@ -147,45 +115,7 @@ def isCountdown(cur, id): -def getContextCountdown(session, ctx): - """ - Get the most relevant countdown to a certain context - - Parameters - ---------- - session : sqlalchemy.orm.Session - The database session to use - ctx : discord.ext.commands.Context - The context - - Returns - ------- - Countdown - The countdown - - Raises - ------ - CountdownNotFound - If a matching countdown cannot be found - """ - - if (isinstance(ctx.channel, discord.channel.TextChannel)): - # Countdown channel - countdown = getCountdown(session, ctx.channel.id) - if (countdown): return countdown - - # Server with countdown channel: get first countdown in this server that use the current prefix - countdown = session.query(Countdown).filter(Countdown.server_id == ctx.channel.guild.id and ctx.prefix in [x.value for x in Countdown.prefixes]).first() - if (countdown): return countdown - - if (isinstance(ctx.channel, discord.channel.DMChannel)): - # DM with user who has contributed to a countdown: get the first countdown they ever contributed to - firstMessage = session.query(Message).filter(Message.author_id == ctx.author.id).order_by(Message.timestamp).first() - if (firstMessage): return firstMessage.countdown - - raise CountdownNotFound() - -def getContextCountdown2(cur, ctx): +def getContextCountdown(cur, ctx): """ Get the most relevant countdown to a certain context diff --git a/countdown_bot/models.py b/countdown_bot/models.py @@ -1,570 +0,0 @@ -# Import dependencies -from datetime import datetime, timedelta -import math -from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey -from sqlalchemy.orm import relationship, sessionmaker -from sqlalchemy.ext.declarative import declarative_base - - - -Base = declarative_base() - - - -def getSessionMaker(location): - """ - Create a sessionmaker from a database URI - - Parameters - ---------- - location : str - The location of the database - """ - - engine = create_engine(location) - Base.metadata.create_all(bind=engine) - return sessionmaker(bind=engine) - - - -# The rules for awarding leaderboard points -POINT_RULES = { - "r1": ("First Number", 0), - "r2": ("1000s", 1000), - "r3": ("1001s", 500), - "r4": ("200s", 200), - "r5": ("201s", 100), - "r6": ("100s", 100), - "r7": ("101s", 50), - # "r8": ("Prime Numbers", 15), - "r8": ("Odd Numbers", 12), - "r9": ("Even Numbers", 10), -} - - - -# Error classes -class EmptyCountdownError(Exception): - """Raised when an action cannot be completed because the countdown is empty""" - pass - -class MessageNotAllowedError(Exception): - """Raised when someone posts twice in a row""" - pass - -class MessageIncorrectError(Exception): - """Raised when someone posts an incorrect number""" - pass - - - -class Countdown(Base): - """ - A Discord countdown - - Attributes - ---------- - id : int - The countdown's ID - server_id : int - The countdown's server's ID - timezone : float - The countdown's UTC offset (in hours) - prefixes : list - The countdown's command prefixes - reactions : list - The countdown's custom reactions - messages : list - The messages in the countdown - """ - - __tablename__ = "countdown" - - id = Column(Integer, primary_key=True) - server_id = Column(Integer) - timezone = Column(Float) - prefixes = relationship("Prefix", back_populates="countdown", cascade="all, delete-orphan") - reactions = relationship("Reaction", back_populates="countdown", cascade="all, delete-orphan") - messages = relationship("Message", back_populates="countdown", cascade="all, delete-orphan") - - def addMessage(self, message): - """ - Add a message to the countdown - - Parameters - ---------- - message : Message - The message object - - Raises - ------ - MessageNotAllowedError - If the author posted the last message - MessageIncorrectError - If the message content is incorrect - """ - - if (len(self.messages) != 0 and message.author_id == self.messages[-1].author_id): - raise MessageNotAllowedError() - elif (len(self.messages) != 0 and message.number + 1 != self.messages[-1].number): - raise MessageIncorrectError() - else: - self.messages += [message] - - def getTimezone(self): - """ - Get the timezone as a string - - Returns - ------- - str - The timezone string - """ - - # Get tiemzone - if (self.timezone >= 0): result = f"UTC+{self.timezone}" - else: result = f"UTC-{abs(self.timezone)}" - - # Remove ".0" from the end - if (self.timezone % 1 == 0): result = result[:-2] - - # Return timezone string - return result - - def contributors(self): - """ - Get countdown contributor statistics - - Returns - ------- - list - A list of contributor statistics - """ - - # Make sure countdown has started - if (len(self.messages) == 0): - raise EmptyCountdownError() - - # Get contributors - authors = list(set([x.author_id for x in self.messages])) - - # Get contributions - contributors = [] - for author in authors: - contributors += [{ - "author":author, - "contributions":len([x for x in self.messages if x.author_id == author]), - }] - - # Sort contributors by contributions - contributors = sorted(contributors, key=lambda x: x["contributions"], reverse=True) - - # Return contributors - return contributors - - def historicalContributors(self): - """ - Get countdown contributor statistics over time - - Returns - ------- - dict - A dictionary of historical contributor statistics - """ - - # Make sure countdown has at least two messages - if (len(self.messages) == 0): - raise EmptyCountdownError() - - # Get contributors - contributors = self.contributors() - - # Get countdown total - total = self.messages[0].number - - # Initialize result dictionary - result = {} - for contributor in contributors: - result[contributor["author"]] = [{"progress":0, "percentage":0, "total":0}] - - # Populate result dictionary - for message in self.messages: - for author in result: - if (author == message.author_id): - result[author] += [{"progress":(total - message.number), "percentage":(result[author][-1]["total"] + 1)/(total - message.number + 1), "total":result[author][-1]["total"] + 1}] - else: - result[author] += [{"progress":(total - message.number), "percentage":(result[author][-1]["total"] + 0)/(total - message.number + 1), "total":result[author][-1]["total"] + 0}] - - # Return result - return result - - def eta(self, period=timedelta(days=1)): - """ - Get countdown eta statistics - - Parameters - ---------- - period : timedelta - The period size (the default is 1 day) - - Returns - ------- - list - The countdown eta statistics - """ - - # Make sure countdown has at least two messages - if (len(self.messages) == 0): - raise EmptyCountdownError() - - # Initialize period data - periodEnd = self.messages[0].timestamp + timedelta(hours=self.timezone) + period - lastMessage = 0 - - # Initialize result and add first data point - data = [[self.messages[0].timestamp + timedelta(hours=self.timezone)], [self.messages[0].timestamp + timedelta(hours=self.timezone)]] - - # Calculate timestamp for last data point - if (self.messages[-1].number == 0): - end = self.messages[-1].timestamp + timedelta(hours=self.timezone) - else: - end = datetime.utcnow() + timedelta(hours=self.timezone) - - # Add data points - while (periodEnd < end): - # Advance to last message in period - while (lastMessage+1 < len(self.messages) and self.messages[lastMessage+1].timestamp + timedelta(hours=self.timezone) < periodEnd): - lastMessage += 1 - - # Calculate data - rate = (self.messages[0].number - self.messages[lastMessage].number) / ((periodEnd - (self.messages[0].timestamp + timedelta(hours=self.timezone))) / timedelta(days=1)) - eta = periodEnd + timedelta(days=self.messages[lastMessage].number/rate) - data[0] += [periodEnd] - data[1] += [eta] - - # Advance to next period - periodEnd += period - - # Add last data point - data[0] += [end] - data[1] += [self.progress()["eta"]] - - # Return eta data - return data - - def heatmap(self, user=None): - """ - Get a heatmap of when countdown messages are sent - - Parameters - ---------- - user : int - The ID of the specific user to generate the heatmap for (the default is None) - - Returns - ------- - list - A 7x24 2D array containing the heatmap - """ - - # Make sure countdown has started - if (len(self.messages) == 0): - raise EmptyCountdownError() - - # Initialize result matrix - result = [[0 for i in range(24)] for j in range(7)] - - for message in self.messages: - if (user != None and message.author_id != user): continue - - # Apply timezone offset - timestamp = message.timestamp + timedelta(hours=self.timezone) - - # Get time and weekday - dayOfWeek = timestamp.weekday() # 0-6, 0=Monday - timeOfDay = timestamp.hour # 0-23 - - # Make Sunday the first day of the week - dayOfWeek = (dayOfWeek + 1) % 7 - - # Add data to result matrix - result[dayOfWeek][timeOfDay] += 1 - - # Return result matrix - return result - - def leaderboard(self): - """ - Get countdown leaderboard - - Returns - ------- - list - The leaderboard - """ - - # Make sure countdown has started - if (len(self.messages) == 0): - raise EmptyCountdownError() - - # Get list of prime numbers - curTest = 5 - search = 1 - primes = [2, 3] - while curTest < self.messages[0].number: - if curTest%(primes[search]) == 0: - curTest = curTest + 2 - search = 1 - else: - if primes[search] > math.sqrt(curTest): - primes.append(curTest) - curTest = curTest + 2 - search = 1 - else: - search = search + 1 - - # Calculate contributor points - points = {} - for message in self.messages: - if (message.author_id not in points): - points[message.author_id] = { - "author": message.author_id, - "breakdown": { - "1000s": 0, - "1001s": 0, - "200s": 0, - "201s": 0, - "100s": 0, - "101s": 0, - "Prime Numbers": 0, - "Odd Numbers": 0, - "Even Numbers": 0, - "First Number": 0, - }, - } - if (message.number == self.messages[0].number): points[message.author_id]["breakdown"]["First Number"] += 1 - elif (message.number % 1000 == 0): points[message.author_id]["breakdown"]["1000s"] += 1 - elif (message.number % 1000 == 1): points[message.author_id]["breakdown"]["1001s"] += 1 - elif (message.number % 200 == 0): points[message.author_id]["breakdown"]["200s"] += 1 - elif (message.number % 200 == 1): points[message.author_id]["breakdown"]["201s"] += 1 - elif (message.number % 100 == 0): points[message.author_id]["breakdown"]["100s"] += 1 - elif (message.number % 100 == 1): points[message.author_id]["breakdown"]["101s"] += 1 - elif (message.number in primes): points[message.author_id]["breakdown"]["Prime Numbers"] += 1 - elif (message.number % 2 == 1): points[message.author_id]["breakdown"]["Odd Numbers"] += 1 - else: points[message.author_id]["breakdown"]["Even Numbers"] += 1 - - # Create ranked leaderboard - leaderboard = [] - for contributor in points.values(): - contributor["contributions"] = sum(contributor["breakdown"].values()) - contributor["points"] = sum([contributor["breakdown"][x] * POINT_RULES[x] for x in contributor["breakdown"]]) - leaderboard += [contributor] - leaderboard = sorted(leaderboard, key=lambda x: x["points"], reverse=True) - return leaderboard - - def longestBreak(self): - """ - Get the longest countdown break - - Returns - ------- - dict - A dictionary containing information about the longest countdown break - """ - - # Make sure countdown has started - if (len(self.messages) == 0): - raise EmptyCountdownError() - - # Calculate longest break - breaks = [] - for i in range(0, len(self.messages) - 1): - breaks += [self.messages[i+1].timestamp - self.messages[i].timestamp] - if (self.messages[-1].number == 0): - breaks += [timedelta(seconds=0)] - else: - breaks += [datetime.utcnow() - self.messages[-1].timestamp] - longestBreak = max(breaks) - index = breaks.index(longestBreak) - start = self.messages[index].timestamp + timedelta(hours=self.timezone) - if (index == len(self.messages) - 1): - end = datetime.utcnow() + timedelta(hours=self.timezone) - else: - end = self.messages[index + 1].timestamp + timedelta(hours=self.timezone) - - # Return statistics - return { - 'duration': longestBreak, - 'start': start, - 'end': end, - } - - def progress(self): - """ - Get countdown progress statistics - - Returns - ------- - dict - A dictionary containing countdown progress statistics - """ - - # Make sure countdown has started - if (len(self.messages) == 0): - raise EmptyCountdownError() - - # Get basic statistics - total = self.messages[0].number - current = self.messages[-1].number - percentage = (total - current) / total * 100 - start = self.messages[0].timestamp - - # Get rate statistics - if (len(self.messages) > 1 and self.messages[-1].number == 0): - # The countdown has already finished - rate = (total - current)/((self.messages[-1].timestamp - self.messages[0].timestamp) / timedelta(days=1)) - eta = self.messages[-1].timestamp - elif (len(self.messages) > 1): - # The countdown is still going - rate = (total - current)/((datetime.utcnow() - self.messages[0].timestamp) / timedelta(days=1)) - eta = datetime.utcnow() + timedelta(days=current/rate) - else: - # Only 1 message in the countdown, can't compute real rate or eta - rate = 0 - eta = datetime.utcnow() + timedelta(days=1) - - # Get list of progress - progress = [{"time":x.timestamp, "progress":x.number} for x in self.messages] - - # Return stats - return { - "total": total, - "current": current, - "percentage": percentage, - "progress": progress, - "start": start, - "rate": rate, - "eta": eta, - } - - def speed(self, period=timedelta(days=1)): - """ - Get countdown speed statistics - - Parameters - ---------- - periodLength : timedelta - The period size (the default is 1 day) - - Returns - ------- - list - The countdown speed statistics - """ - - # Make sure countdown has started - if (len(self.messages) == 0): - raise EmptyCountdownError() - - # Calculate speed statistics - data = [[], []] - periodStart = datetime(2018, 1, 1) # Starts on Monday, Jan 1st - for message in self.messages: - # If data point isn't in the current period - while (message.timestamp + timedelta(hours=self.timezone) - period >= periodStart): - periodStart += period - - # Add new period if needed - if (len(data[0]) == 0 or data[0][-1] != periodStart): - data[0] += [periodStart] - data[1] += [0] - - # Otherwise add the latest diff to the current period - data[1][-1] += 1 - - # Return speed statistics - return data - - - -class Prefix(Base): - """ - A command prefix for a countdown - - Attributes - ---------- - id : int - The prefix's ID - countdown_id : int - The prefix's countdown's ID - countdown : Countdown - The prefix's countdown - value : string - The command prefix - """ - - __tablename__ = "prefix" - - id = Column(Integer, primary_key=True) - countdown_id = Column(Integer, ForeignKey("countdown.id")) - countdown = relationship("Countdown", back_populates="prefixes") - value = Column(String) - - - -class Reaction(Base): - """ - A custom countdown reaction - - Attributes - ---------- - id : int - The reaction's ID - countdown_id : int - The prefix's countdown's ID - countdown : Countdown - The prefix's countdown - number : int - The number that the reaction applies to - value : string - The reaction - """ - - __tablename__ = "reaction" - - id = Column(Integer, primary_key=True) - countdown_id = Column(Integer, ForeignKey("countdown.id")) - countdown = relationship("Countdown", back_populates="reactions") - number = Column(Integer) - value = Column(String) - - - -class Message(Base): - """ - A countdown message - - Attributes - ---------- - id : int - The message's ID - countdown_id : int - The message's countdown's ID - countdown : Countdown - The message's countdown - author_id : int - The message's author's ID - timestamp : datetime.datetime - The message's timestamp - number : int - The message's number - """ - - __tablename__ = "message" - - id = Column(Integer, primary_key=True) - countdown_id = Column(Integer, ForeignKey("countdown.id")) - countdown = relationship("Countdown", back_populates="messages") - author_id = Column(Integer) - timestamp = Column(DateTime) - number = Column(Integer) diff --git a/countdown_bot/utilitiesCog.py b/countdown_bot/utilitiesCog.py @@ -3,15 +3,13 @@ import discord from discord.ext import commands # Import modules -from .botUtilities import COLORS, CommandError, isCountdown, getContextCountdown, getCountdown, loadCountdown, getContextCountdown2, CountdownNotFound -from .models import Countdown, Prefix, Reaction +from .botUtilities import COLORS, CommandError, CountdownNotFound, isCountdown, loadCountdown, getContextCountdown class Utilities(commands.Cog): - def __init__(self, bot, databaseSessionMaker, db_connection): + def __init__(self, bot, db_connection): self.bot = bot - self.databaseSessionMaker = databaseSessionMaker self.db_connection = db_connection self.bot.remove_command("help") @@ -70,7 +68,7 @@ class Utilities(commands.Cog): with self.db_connection.cursor() as cur: # Get countdown channel - countdown = getContextCountdown2(cur, ctx) + countdown = getContextCountdown(cur, ctx) if not countdown: raise CountdownNotFound() diff --git a/requirements.txt b/requirements.txt @@ -2,4 +2,3 @@ discord matplotlib psycopg2 python-dotenv -sqlalchemy