diff --git a/src/invidious.cr b/src/invidious.cr index e0bd0101..e0ec61a0 100644 --- a/src/invidious.cr +++ b/src/invidious.cr @@ -180,8 +180,9 @@ if CONFIG.popular_enabled Invidious::Jobs.register Invidious::Jobs::PullPopularVideosJob.new(PG_DB) end -CONNECTION_CHANNEL = ::Channel({Bool, ::Channel(PQ::Notification)}).new(32) -Invidious::Jobs.register Invidious::Jobs::NotificationJob.new(CONNECTION_CHANNEL, CONFIG.database_url) +NOTIFICATION_CHANNEL = ::Channel(VideoNotification).new(32) +CONNECTION_CHANNEL = ::Channel({Bool, ::Channel(PQ::Notification)}).new(32) +Invidious::Jobs.register Invidious::Jobs::NotificationJob.new(NOTIFICATION_CHANNEL, CONNECTION_CHANNEL, CONFIG.database_url) Invidious::Jobs.register Invidious::Jobs::ClearExpiredItemsJob.new diff --git a/src/invidious/channels/channels.cr b/src/invidious/channels/channels.cr index be739673..e76647eb 100644 --- a/src/invidious/channels/channels.cr +++ b/src/invidious/channels/channels.cr @@ -249,11 +249,7 @@ def fetch_channel(ucid, pull_all_videos : Bool) if was_insert LOGGER.trace("fetch_channel: #{ucid} : video #{video_id} : Inserted, updating subscriptions") - if CONFIG.enable_user_notifications - Invidious::Database::Users.add_notification(video) - else - Invidious::Database::Users.feed_needs_update(video) - end + NOTIFICATION_CHANNEL.send(VideoNotification.from_video(video)) else LOGGER.trace("fetch_channel: #{ucid} : video #{video_id} : Updated") end @@ -285,11 +281,7 @@ def fetch_channel(ucid, pull_all_videos : Bool) if Time.utc - video.published > 1.minute was_insert = Invidious::Database::ChannelVideos.insert(video) if was_insert - if CONFIG.enable_user_notifications - Invidious::Database::Users.add_notification(video) - else - Invidious::Database::Users.feed_needs_update(video) - end + NOTIFICATION_CHANNEL.send(VideoNotification.from_video(video)) end end end diff --git a/src/invidious/database/users.cr b/src/invidious/database/users.cr index d54e6a76..4a3056ea 100644 --- a/src/invidious/database/users.cr +++ b/src/invidious/database/users.cr @@ -119,15 +119,15 @@ module Invidious::Database::Users # Update (notifs) # ------------------- - def add_notification(video : ChannelVideo) + def add_multiple_notifications(channel_id : String, video_ids : Array(String)) request = <<-SQL UPDATE users - SET notifications = array_append(notifications, $1), + SET notifications = array_cat(notifications, $1), feed_needs_update = true WHERE $2 = ANY(subscriptions) SQL - PG_DB.exec(request, video.id, video.ucid) + PG_DB.exec(request, video_ids, channel_id) end def remove_notification(user : User, vid : String) @@ -154,14 +154,14 @@ module Invidious::Database::Users # Update (misc) # ------------------- - def feed_needs_update(video : ChannelVideo) + def feed_needs_update(channel_id : String) request = <<-SQL UPDATE users SET feed_needs_update = true WHERE $1 = ANY(subscriptions) SQL - PG_DB.exec(request, video.ucid) + PG_DB.exec(request, channel_id) end def update_preferences(user : User) diff --git a/src/invidious/jobs/notification_job.cr b/src/invidious/jobs/notification_job.cr index b445107b..b70e9ef4 100644 --- a/src/invidious/jobs/notification_job.cr +++ b/src/invidious/jobs/notification_job.cr @@ -1,8 +1,32 @@ +struct VideoNotification + getter video_id : String + getter channel_id : String + getter published : Time + + def_hash @channel_id, @video_id + + def ==(other) + video_id == other.video_id + end + + def self.from_video(video : ChannelVideo) : self + VideoNotification.new(video.id, video.ucid, video.published) + end + + def initialize(@video_id, @channel_id, @published) + end + + def clone : VideoNotification + VideoNotification.new(video_id.clone, channel_id.clone, published.clone) + end +end + class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob + private getter notification_channel : ::Channel(VideoNotification) private getter connection_channel : ::Channel({Bool, ::Channel(PQ::Notification)}) private getter pg_url : URI - def initialize(@connection_channel, @pg_url) + def initialize(@notification_channel, @connection_channel, @pg_url) end def begin @@ -10,6 +34,58 @@ class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob PG.connect_listen(pg_url, "notifications") { |event| connections.each(&.send(event)) } + # hash of channels to their videos (id+published) that need notifying + to_notify = Hash(String, Set(VideoNotification)).new(->(hash : Hash(String, Set(VideoNotification)), key : String) { hash[key] = Set(VideoNotification).new }) + + # fiber to locally cache all incoming notifications (from pubsub webhooks and refresh channels job) + spawn do + begin + loop do + notification = notification_channel.receive + to_notify[notification.channel_id] << notification + end + end + end + # fiber to regularly persist all cached notifications + spawn do + loop do + begin + LOGGER.debug("NotificationJob: waking up") + cloned = to_notify.clone + to_notify.clear + + cloned.each do |channel_id, notifications| + if notifications.empty? + next + end + + LOGGER.info("NotificationJob: updating channel #{channel_id} with #{notifications.size} notifications") + if CONFIG.enable_user_notifications + video_ids = notifications.map { |n| n.video_id } + Invidious::Database::Users.add_multiple_notifications(channel_id, video_ids) + notifications.each do |n| + # Deliver notifications to `/api/v1/auth/notifications` + payload = { + "topic" => n.channel_id, + "videoId" => n.video_id, + "published" => n.published.to_unix, + }.to_json + PG_DB.exec("NOTIFY notifications, E'#{payload}'") + end + else + Invidious::Database::Users.feed_needs_update(channel_id) + end + end + + LOGGER.trace("NotificationJob: Done, sleeping") + rescue ex + LOGGER.error("NotificationJob: #{ex.message}") + end + sleep 1.minute + Fiber.yield + end + end + loop do action, connection = connection_channel.receive diff --git a/src/invidious/routes/feeds.cr b/src/invidious/routes/feeds.cr index e20a7139..14d3cdf8 100644 --- a/src/invidious/routes/feeds.cr +++ b/src/invidious/routes/feeds.cr @@ -425,16 +425,6 @@ module Invidious::Routes::Feeds next # skip this video since it raised an exception (e.g. it is a scheduled live event) end - if CONFIG.enable_user_notifications - # Deliver notifications to `/api/v1/auth/notifications` - payload = { - "topic" => video.ucid, - "videoId" => video.id, - "published" => published.to_unix, - }.to_json - PG_DB.exec("NOTIFY notifications, E'#{payload}'") - end - video = ChannelVideo.new({ id: id, title: video.title, @@ -450,11 +440,7 @@ module Invidious::Routes::Feeds was_insert = Invidious::Database::ChannelVideos.insert(video, with_premiere_timestamp: true) if was_insert - if CONFIG.enable_user_notifications - Invidious::Database::Users.add_notification(video) - else - Invidious::Database::Users.feed_needs_update(video) - end + NOTIFICATION_CHANNEL.send(VideoNotification.from_video(video)) end end end