From 120965eb0b1b0da6906bb242da50a77367defd96 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Mon, 12 Apr 2021 14:25:34 +0200 Subject: [PATCH] Change Web Push API deliveries to use request pooling (#16014) --- Gemfile | 2 +- Gemfile.lock | 2 +- app/models/web/push_subscription.rb | 91 +++++++++---------- app/workers/web/push_notification_worker.rb | 65 +++++++++++-- .../web/push_notification_worker_spec.rb | 48 ++++++++++ 5 files changed, 150 insertions(+), 58 deletions(-) create mode 100644 spec/workers/web/push_notification_worker_spec.rb diff --git a/Gemfile b/Gemfile index d4385f0145..cc77ee618b 100644 --- a/Gemfile +++ b/Gemfile @@ -94,7 +94,7 @@ gem 'tty-prompt', '~> 0.23', require: false gem 'twitter-text', '~> 3.1.0' gem 'tzinfo-data', '~> 1.2021' gem 'webpacker', '~> 5.2' -gem 'webpush' +gem 'webpush', '~> 0.3' gem 'webauthn', '~> 3.0.0.alpha1' gem 'json-ld' diff --git a/Gemfile.lock b/Gemfile.lock index b1fb350d48..1d3c245954 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -808,5 +808,5 @@ DEPENDENCIES webauthn (~> 3.0.0.alpha1) webmock (~> 3.12) webpacker (~> 5.2) - webpush + webpush (~> 0.3) xorcist (~> 1.1) diff --git a/app/models/web/push_subscription.rb b/app/models/web/push_subscription.rb index c407a7789b..7609b1bfc6 100644 --- a/app/models/web/push_subscription.rb +++ b/app/models/web/push_subscription.rb @@ -24,81 +24,80 @@ class Web::PushSubscription < ApplicationRecord validates :key_p256dh, presence: true validates :key_auth, presence: true - def push(notification) - I18n.with_locale(associated_user&.locale || I18n.default_locale) do - push_payload(payload_for_notification(notification), 48.hours.seconds) - end + delegate :locale, to: :associated_user + + def encrypt(payload) + Webpush::Encryption.encrypt(payload, key_p256dh, key_auth) + end + + def audience + @audience ||= Addressable::URI.parse(endpoint).normalized_site + end + + def crypto_key_header + p256ecdsa = vapid_key.public_key_for_push_header + + "p256ecdsa=#{p256ecdsa}" + end + + def authorization_header + jwt = JWT.encode({ aud: audience, exp: 24.hours.from_now.to_i, sub: "mailto:#{contact_email}" }, vapid_key.curve, 'ES256', typ: 'JWT') + + "WebPush #{jwt}" end def pushable?(notification) - data&.key?('alerts') && ActiveModel::Type::Boolean.new.cast(data['alerts'][notification.type.to_s]) + ActiveModel::Type::Boolean.new.cast(data&.dig('alerts', notification.type.to_s)) end def associated_user return @associated_user if defined?(@associated_user) - @associated_user = if user_id.nil? - session_activation.user - else - user - end + @associated_user = begin + if user_id.nil? + session_activation.user + else + user + end + end end def associated_access_token return @associated_access_token if defined?(@associated_access_token) - @associated_access_token = if access_token_id.nil? - find_or_create_access_token.token - else - access_token.token - end + @associated_access_token = begin + if access_token_id.nil? + find_or_create_access_token.token + else + access_token.token + end + end end class << self def unsubscribe_for(application_id, resource_owner) - access_token_ids = Doorkeeper::AccessToken.where(application_id: application_id, resource_owner_id: resource_owner.id, revoked_at: nil) - .pluck(:id) - + access_token_ids = Doorkeeper::AccessToken.where(application_id: application_id, resource_owner_id: resource_owner.id, revoked_at: nil).pluck(:id) where(access_token_id: access_token_ids).delete_all end end private - def push_payload(message, ttl = 5.minutes.seconds) - Webpush.payload_send( - message: Oj.dump(message), - endpoint: endpoint, - p256dh: key_p256dh, - auth: key_auth, - ttl: ttl, - ssl_timeout: 10, - open_timeout: 10, - read_timeout: 10, - vapid: { - subject: "mailto:#{::Setting.site_contact_email}", - private_key: Rails.configuration.x.vapid_private_key, - public_key: Rails.configuration.x.vapid_public_key, - } - ) - end - - def payload_for_notification(notification) - ActiveModelSerializers::SerializableResource.new( - notification, - serializer: Web::NotificationSerializer, - scope: self, - scope_name: :current_push_subscription - ).as_json - end - def find_or_create_access_token Doorkeeper::AccessToken.find_or_create_for( application: Doorkeeper::Application.find_by(superapp: true), - resource_owner: session_activation.user_id, + resource_owner: user_id || session_activation.user_id, scopes: Doorkeeper::OAuth::Scopes.from_string('read write follow push'), expires_in: Doorkeeper.configuration.access_token_expires_in, use_refresh_token: Doorkeeper.configuration.refresh_token_enabled? ) end + + def vapid_key + @vapid_key ||= Webpush::VapidKey.from_keys(Rails.configuration.x.vapid_public_key, Rails.configuration.x.vapid_private_key) + end + + def contact_email + @contact_email ||= ::Setting.site_contact_email + end end diff --git a/app/workers/web/push_notification_worker.rb b/app/workers/web/push_notification_worker.rb index 46aeaa30b1..57f5b5c228 100644 --- a/app/workers/web/push_notification_worker.rb +++ b/app/workers/web/push_notification_worker.rb @@ -3,22 +3,67 @@ class Web::PushNotificationWorker include Sidekiq::Worker - sidekiq_options backtrace: true, retry: 5 + sidekiq_options queue: 'push', retry: 5 + + TTL = 48.hours.to_s + URGENCY = 'normal' def perform(subscription_id, notification_id) - subscription = ::Web::PushSubscription.find(subscription_id) - notification = Notification.find(notification_id) + @subscription = Web::PushSubscription.find(subscription_id) + @notification = Notification.find(notification_id) - subscription.push(notification) unless notification.activity.nil? - rescue Webpush::ResponseError => e - code = e.response.code.to_i + # Polymorphically associated activity could have been deleted + # in the meantime, so we have to double-check before proceeding + return unless @notification.activity.present? && @subscription.pushable?(@notification) - if (400..499).cover?(code) && ![408, 429].include?(code) - subscription.destroy! - else - raise e + payload = @subscription.encrypt(push_notification_json) + + request_pool.with(@subscription.audience) do |http_client| + request = Request.new(:post, @subscription.endpoint, body: payload.fetch(:ciphertext), http_client: http_client) + + request.add_headers( + 'Content-Type' => 'application/octet-stream', + 'Ttl' => TTL, + 'Urgency' => URGENCY, + 'Content-Encoding' => 'aesgcm', + 'Encryption' => "salt=#{Webpush.encode64(payload.fetch(:salt)).delete('=')}", + 'Crypto-Key' => "dh=#{Webpush.encode64(payload.fetch(:server_public_key)).delete('=')};#{@subscription.crypto_key_header}", + 'Authorization' => @subscription.authorization_header + ) + + request.perform do |response| + # If the server responds with an error in the 4xx range + # that isn't about rate-limiting or timeouts, we can + # assume that the subscription is invalid or expired + # and must be removed + + if (400..499).cover?(response.code) && ![408, 429].include?(response.code) + @subscription.destroy! + elsif !(200...300).cover?(response.code) + raise Mastodon::UnexpectedResponseError, response + end + end end rescue ActiveRecord::RecordNotFound true end + + private + + def push_notification_json + json = I18n.with_locale(@subscription.locale || I18n.default_locale) do + ActiveModelSerializers::SerializableResource.new( + @notification, + serializer: Web::NotificationSerializer, + scope: @subscription, + scope_name: :current_push_subscription + ).as_json + end + + Oj.dump(json) + end + + def request_pool + RequestPool.current + end end diff --git a/spec/workers/web/push_notification_worker_spec.rb b/spec/workers/web/push_notification_worker_spec.rb new file mode 100644 index 0000000000..5bc24f8886 --- /dev/null +++ b/spec/workers/web/push_notification_worker_spec.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +require 'rails_helper' + +describe Web::PushNotificationWorker do + subject { described_class.new } + + let(:p256dh) { 'BN4GvZtEZiZuqFxSKVZfSfluwKBD7UxHNBmWkfiZfCtgDE8Bwh-_MtLXbBxTBAWH9r7IPKL0lhdcaqtL1dfxU5E=' } + let(:auth) { 'Q2BoAjC09xH3ywDLNJr-dA==' } + let(:endpoint) { 'https://updates.push.services.mozilla.com/push/v1/subscription-id' } + let(:user) { Fabricate(:user) } + let(:notification) { Fabricate(:notification) } + let(:subscription) { Fabricate(:web_push_subscription, user_id: user.id, key_p256dh: p256dh, key_auth: auth, endpoint: endpoint, data: { alerts: { notification.type => true } }) } + let(:vapid_public_key) { 'BB37UCyc8LLX4PNQSe-04vSFvpUWGrENubUaslVFM_l5TxcGVMY0C3RXPeUJAQHKYlcOM2P4vTYmkoo0VZGZTM4=' } + let(:vapid_private_key) { 'OPrw1Sum3gRoL4-DXfSCC266r-qfFSRZrnj8MgIhRHg=' } + let(:vapid_key) { Webpush::VapidKey.from_keys(vapid_public_key, vapid_private_key) } + let(:contact_email) { 'sender@example.com' } + let(:ciphertext) { "+\xB8\xDBT}\x13\xB6\xDD.\xF9\xB0\xA7\xC8\xD2\x80\xFD\x99#\xF7\xAC\x83\xA4\xDB,\x1F\xB5\xB9w\x85>\xF7\xADr" } + let(:salt) { "X\x97\x953\xE4X\xF8_w\xE7T\x95\xC51q\xFE" } + let(:server_public_key) { "\x04\b-RK9w\xDD$\x16lFz\xF9=\xB4~\xC6\x12k\xF3\xF40t\xA9\xC1\fR\xC3\x81\x80\xAC\f\x7F\xE4\xCC\x8E\xC2\x88 n\x8BB\xF1\x9C\x14\a\xFA\x8D\xC9\x80\xA1\xDDyU\\&c\x01\x88#\x118Ua" } + let(:shared_secret) { "\t\xA7&\x85\t\xC5m\b\xA8\xA7\xF8B{1\xADk\xE1y'm\xEDE\xEC\xDD\xEDj\xB3$s\xA9\xDA\xF0" } + let(:payload) { { ciphertext: ciphertext, salt: salt, server_public_key: server_public_key, shared_secret: shared_secret } } + + describe 'perform' do + before do + allow_any_instance_of(subscription.class).to receive(:contact_email).and_return(contact_email) + allow_any_instance_of(subscription.class).to receive(:vapid_key).and_return(vapid_key) + allow(Webpush::Encryption).to receive(:encrypt).and_return(payload) + allow(JWT).to receive(:encode).and_return('jwt.encoded.payload') + + stub_request(:post, endpoint).to_return(status: 201, body: '') + + subject.perform(subscription.id, notification.id) + end + + it 'calls the relevant service with the correct headers' do + expect(a_request(:post, endpoint).with(headers: { + 'Content-Encoding' => 'aesgcm', + 'Content-Type' => 'application/octet-stream', + 'Crypto-Key' => 'dh=BAgtUks5d90kFmxGevk9tH7GEmvz9DB0qcEMUsOBgKwMf-TMjsKIIG6LQvGcFAf6jcmAod15VVwmYwGIIxE4VWE;p256ecdsa=' + vapid_public_key.delete('='), + 'Encryption' => 'salt=WJeVM-RY-F9351SVxTFx_g', + 'Ttl' => '172800', + 'Urgency' => 'normal', + 'Authorization' => 'WebPush jwt.encoded.payload', + }, body: "+\xB8\xDBT}\u0013\xB6\xDD.\xF9\xB0\xA7\xC8Ҁ\xFD\x99#\xF7\xAC\x83\xA4\xDB,\u001F\xB5\xB9w\x85>\xF7\xADr")).to have_been_made + end + end +end