From 89a496b42ff94536fc4aa1a8f41651f8bfd52944 Mon Sep 17 00:00:00 2001 From: Claire Date: Tue, 12 Dec 2023 11:39:21 +0100 Subject: [PATCH] Fix status edits not always being streamed to mentioned users (#28324) --- app/services/fan_out_on_write_service.rb | 13 +++++++++++ .../services/fan_out_on_write_service_spec.rb | 22 ++++++++++++++++++- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb index f2a79c9fc9..50b414bc52 100644 --- a/app/services/fan_out_on_write_service.rb +++ b/app/services/fan_out_on_write_service.rb @@ -74,6 +74,15 @@ class FanOutOnWriteService < BaseService LocalNotificationWorker.push_bulk(mentions) do |mention| [mention.account_id, mention.id, 'Mention', 'mention'] end + + next unless update? + + # This may result in duplicate update payloads, but this ensures clients + # are aware of edits to posts only appearing in mention notifications + # (e.g. private mentions or mentions by people they do not follow) + PushUpdateWorker.push_bulk(mentions.filter { |mention| subscribed_to_streaming_api?(mention.account_id) }) do |mention| + [mention.account_id, @status.id, "timeline:#{mention.account_id}:notifications", { 'update' => true }] + end end end @@ -162,4 +171,8 @@ class FanOutOnWriteService < BaseService def broadcastable? @status.public_visibility? && !@status.reblog? && !@account.silenced? end + + def subscribed_to_streaming_api?(account_id) + redis.exists?("subscribed:timeline:#{account_id}") || redis.exists?("subscribed:timeline:#{account_id}:notifications") + end end diff --git a/spec/services/fan_out_on_write_service_spec.rb b/spec/services/fan_out_on_write_service_spec.rb index 3b554f9ea3..f657f298de 100644 --- a/spec/services/fan_out_on_write_service_spec.rb +++ b/spec/services/fan_out_on_write_service_spec.rb @@ -6,11 +6,12 @@ RSpec.describe FanOutOnWriteService, type: :service do subject { described_class.new } let(:last_active_at) { Time.now.utc } - let(:status) { Fabricate(:status, account: alice, visibility: visibility, text: 'Hello @bob #hoge') } + let(:status) { Fabricate(:status, account: alice, visibility: visibility, text: 'Hello @bob @eve #hoge') } let!(:alice) { Fabricate(:user, current_sign_in_at: last_active_at).account } let!(:bob) { Fabricate(:user, current_sign_in_at: last_active_at, account_attributes: { username: 'bob' }).account } let!(:tom) { Fabricate(:user, current_sign_in_at: last_active_at).account } + let!(:eve) { Fabricate(:user, current_sign_in_at: last_active_at, account_attributes: { username: 'eve' }).account } before do bob.follow!(alice) @@ -109,5 +110,24 @@ RSpec.describe FanOutOnWriteService, type: :service do expect(redis).to_not have_received(:publish).with('timeline:hashtag:hoge', anything) expect(redis).to_not have_received(:publish).with('timeline:public', anything) end + + context 'when handling status updates', :sidekiq_fake do + before do + subject.call(status) + + status.snapshot!(at_time: status.created_at, rate_limit: false) + status.update!(text: 'Hello @bob @eve #hoge (edited)') + status.snapshot!(account_id: status.account_id) + + redis.set("subscribed:timeline:#{eve.id}:notifications", '1') + + Sidekiq::Worker.clear_all + end + + it 'pushes the update to mentioned users through the notifications streaming channel' do + subject.call(status, update: true) + expect(PushUpdateWorker).to have_enqueued_sidekiq_job(anything, status.id, "timeline:#{eve.id}:notifications", { 'update' => true }) + end + end end end