Skip to content

Commit

Permalink
Add embeded message to notifications youtube videos
Browse files Browse the repository at this point in the history
- Add the method to send notifications when the event is received by the Youtube server
- Create prettify embed to send notifications message in guilds subscribed
- Change youtube webhook api route to get the new video infos and pass to service to send new video notifications messages
  • Loading branch information
rukasudev committed Aug 21, 2024
1 parent d83ef5e commit a0dbcb8
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 5 deletions.
76 changes: 76 additions & 0 deletions app/services/notifications_youtube_video.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import random
from typing import Any, Dict

import discord
Expand All @@ -7,6 +8,7 @@
from app.constants import LogTypes as logconstants
from app.data.notifications_youtube_video import (
count_youtube_video_subscription_by_guilds,
find_guilds_by_youtuber,
)
from app.services import cache
from app.services.moderations import (
Expand All @@ -25,6 +27,80 @@ async def manager(interaction: discord.Interaction, guild_id: str):
interaction, constants.NOTIFICATIONS_YOUTUBE_VIDEO_KEY, cogs
)

def send_youtube_video_notification(video_id: str, channel_id: str) -> None:
youtuber_info = bot.youtube.get_channel_info(channel_id)
video_info = bot.youtube.get_video_info(video_id)
youtuber_user = youtuber_info.get("customUrl").replace("@", "")

guilds_data = find_guilds_by_youtuber(youtuber_user)

logger.info(
f"Starting to send notifications for youtube video: **{youtuber_user}**",
log_type=logconstants.COMMAND_INFO_TYPE,
)

count = 0
for guild_data in guilds_data:
guild = bot.get_guild(int(guild_data.get("guild_id")))

notifications = guild_data.get("notifications").get("values")
for notification in notifications:
youtuber = notification.get("youtuber").get("value")
if youtuber != youtuber_user:
continue

message = compose_notification_message(notification, youtuber, video_id)
channel = guild.get_channel(int(notification.get("channel").get("value")))

embed = create_video_notification_embed(video_info, youtuber_info)

bot.loop.create_task(channel.send(content=message, embed=embed))
count += 1

logger.info(
f"Notifications sent for youtuber **{youtuber_user}** new video in {count} guilds",
log_type=logconstants.COMMAND_INFO_TYPE,
)

def create_video_notification_embed(video_info: Dict[str, Any], youtuber_info: Dict[str, Any]) -> discord.Embed:
video_link = f"https://www.youtube.com/watch?v={video_info.get('id')}"
video_thumbnail = video_info.get("thumbnails").get("maxres") or video_info.get("thumbnails").get("high")

description = youtuber_info.get("description")
profile_picture = youtuber_info.get("thumbnails").get("high").get("url")

embed = discord.Embed(
title=video_info.get("title"),
description=description.split("\n")[0],
url=video_link,
color=discord.Color.red(),
)

video_description = video_info.get("description").split("\n")[0]
video_tags = ", ".join(video_info.get("tags")[:3])

embed.set_thumbnail(url=profile_picture)
embed.set_image(url=video_thumbnail.get("url"))
embed.add_field(name="Description", value=video_description, inline=True)
embed.add_field(name="Tags", value=video_tags, inline=True)

return embed

def compose_notification_message(notification: Dict[str, Any], youtuber: str, video_id: str) -> str:
messages = notification.get("notification_messages").get("value")
video_link = f"https://www.youtube.com/watch?v={video_id}"
random_message = random.choice(messages.split(";")).lstrip()

return parse_streamer_message(random_message, youtuber, video_link)

def parse_streamer_message(message: str, youtuber: str, video_link: str) -> str:
if "{video_link}" not in message.lower():
message += "\n{video_link}"

message = message.format(youtuber=youtuber, video_link=video_link)

return message

def subscribe_youtube_new_video(interaction: discord.Interaction, form_responses: Dict[str, Any]):
for response in form_responses[0].get("value"):
youtuber = response.get("youtuber").get("value")
Expand Down
27 changes: 22 additions & 5 deletions app/webhooks/youtube.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,36 @@
import re

from flask import request

from app import logger
from app.constants import LogTypes as logconstants
from app.webhooks import webhooks

pattern_video_id = r'<yt:videoId>(.*?)</yt:videoId>'
pattern_channel_id = r'<yt:channelId>(.*?)</yt:channelId>'

@webhooks.route('/youtube', methods=['GET', 'POST'])
def youtube_webhook():
# from app import bot

challenge = request.args.get('hub.challenge')

if (challenge):
if challenge:
return challenge

logger.info(f'Received Youtube webhook: {request.data}', log_type=logconstants.COMMAND_INFO_TYPE)
data = request.data.decode('utf-8')

logger.info(f'Received Youtube webhook: {data}', log_type=logconstants.COMMAND_INFO_TYPE)

match_video_id = re.search(pattern_video_id, data)
match_channel_id = re.search(pattern_channel_id, data)

if not match_video_id or not match_channel_id:
logger.error('Invalid Youtube webhook data', log_type=logconstants.COMMAND_INFO_TYPE)
return 'Invalid request data', 403

from app.services.notifications_youtube_video import send_youtube_video_notification

logger.info(f'Video ID: {match_video_id}, Channel ID: {match_channel_id}')

send_youtube_video_notification(match_video_id, match_channel_id)

return 'Processed', 204
return 'Webhook processed', 204

0 comments on commit a0dbcb8

Please sign in to comment.