From 11cbe49ffc2729f9d40588c2e270a89f328cd6dd Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Tue, 8 Nov 2016 01:32:34 +0100 Subject: [PATCH] ProcessFeedService refactor --- app/services/process_feed_service.rb | 380 ++++++++++++--------------- 1 file changed, 166 insertions(+), 214 deletions(-) diff --git a/app/services/process_feed_service.rb b/app/services/process_feed_service.rb index e1125cf0..7ec46cac 100644 --- a/app/services/process_feed_service.rb +++ b/app/services/process_feed_service.rb @@ -2,257 +2,209 @@ class ProcessFeedService < BaseService ACTIVITY_NS = 'http://activitystrea.ms/spec/1.0/'.freeze THREAD_NS = 'http://purl.org/syndication/thread/1.0'.freeze - # Create local statuses from an Atom feed - # @param [String] body Atom feed - # @param [Account] account Account this feed belongs to - # @return [Enumerable] created statuses def call(body, account) xml = Nokogiri::XML(body) - update_remote_profile_service.call(xml.at_xpath('/xmlns:feed/xmlns:author'), account) unless xml.at_xpath('/xmlns:feed').nil? - xml.xpath('//xmlns:entry').reverse_each.map { |entry| process_entry(account, entry) }.compact + + update_author(xml, account) + process_entries(xml, account) end private - def process_entry(account, entry) - return unless [:note, :comment, :activity].include? object_type(entry) - - status = Status.find_by(uri: activity_id(entry)) - - # If we already have a post and the verb is now "delete", we gotta delete it and move on! - if !status.nil? && verb(entry) == :delete - delete_post!(status) - return - end - - return unless status.nil? - - status = Status.new(uri: activity_id(entry), url: activity_link(entry), account: account, text: content(entry), created_at: published(entry), updated_at: updated(entry)) - - if verb(entry) == :share - add_reblog!(entry, status) - elsif verb(entry) == :post - if thread_id(entry).nil? - add_post!(entry, status) - else - add_reply!(entry, status) - end - else - return - end - - # If we added a status, go through accounts it mentions and create respective relations - # Also record all media attachments for the status and for the reblogged status if present - unless status.new_record? - record_remote_mentions(status, entry.xpath('./xmlns:link[@rel="mentioned"]')) - record_remote_mentions(status.reblog, entry.at_xpath('./activity:object', activity: ACTIVITY_NS).xpath('./xmlns:link[@rel="mentioned"]')) if status.reblog? - - if status.reblog? - ProcessHashtagsService.new.call(status.reblog, entry.at_xpath('./activity:object', activity: ACTIVITY_NS).xpath('./xmlns:category').map { |category| category['term'] }) - else - ProcessHashtagsService.new.call(status, entry.xpath('./xmlns:category').map { |category| category['term'] }) - end - - process_attachments(entry, status) - process_attachments(entry.xpath('./activity:object', activity: ACTIVITY_NS), status.reblog) if status.reblog? - - Rails.logger.debug "Queuing remote status #{status.id} for distribution" - DistributionWorker.perform_async(status.id) - return status - end + def update_author(xml, account) + return if xml.at_xpath('/xmlns:feed').nil? + UpdateRemoteProfileService.new.call(xml.at_xpath('/xmlns:feed/xmlns:author'), account) end - def record_remote_mentions(status, links) - return if status.local? + def process_entries(xml, account) + xml.xpath('//xmlns:entry').reverse_each.map { |entry| ProcessEntry.new.call(entry, account) }.compact + end - # Here we have to do a reverse lookup of local accounts by their URL! - # It's not pretty at all! I really wish all these protocols sticked to - # using acct:username@domain only! It would make things so much easier - # and tidier + class ProcessEntry + def call(xml, account) + @account = account + @xml = xml - links.each do |mention_link| - href_val = mention_link.attribute('href').value + return if skip_unsupported_type? - next if href_val == 'http://activityschema.org/collection/public' - - href = Addressable::URI.parse(href_val) - - if TagManager.instance.local_domain?(href.host) - # A local user is mentioned - mentioned_account = Account.find_local(href.path.gsub('/users/', '')) - - unless mentioned_account.nil? - mentioned_account.mentions.where(status: status).first_or_create(status: status) - NotificationMailer.mention(mentioned_account, status).deliver_later unless mentioned_account.blocking?(status.account) - end - else - # What to do about remote user? - # This is kinda dodgy because URLs could change, we don't index them - mentioned_account = Account.find_by(url: href.to_s) - - if mentioned_account.nil? - mentioned_account = FetchRemoteAccountService.new.call(href) - end - - unless mentioned_account.nil? - mentioned_account.mentions.where(status: status).first_or_create(status: status) - end + case verb + when :post, :share + return create_status + when :delete + return delete_status end end - end - def process_attachments(entry, status) - return if status.local? + private - entry.xpath('./xmlns:link[@rel="enclosure"]').each do |enclosure_link| - next if enclosure_link.attribute('href').nil? + def create_status + Rails.logger.debug "Creating remote status #{id}" + status = status_from_xml(@xml) - media = MediaAttachment.where(status: status, remote_url: enclosure_link.attribute('href').value).first - - next unless media.nil? - - begin - media = MediaAttachment.new(account: status.account, status: status, remote_url: enclosure_link.attribute('href').value) - media.file_remote_url = enclosure_link.attribute('href').value - media.save - rescue Paperclip::Errors::NotIdentifiedByImageMagickError - Rails.logger.debug "Error saving attachment from #{enclosure_link.attribute('href').value}" - next + if verb == :share + original_status = status_from_xml(xml.at_xpath('.//activity:object', activity: ACTIVITY_NS)) + status.reblog = original_status end - end - end - def add_post!(_entry, status) - status.save! - end - - def add_reblog!(entry, status) - status.reblog = find_original_status(entry, target_id(entry)) - - if status.reblog.nil? - status.reblog = fetch_remote_status(entry) - end - - if !status.reblog.nil? status.save! - NotificationMailer.reblog(status.reblog, status.account).deliver_later if status.reblog.local? && !status.reblog.account.blocking?(status.account) - end - end - - def add_reply!(entry, status) - status.thread = find_original_status(entry, thread_id(entry)) - status.save! - - if status.thread.nil? && !thread_href(entry).nil? - ThreadResolveWorker.perform_async(status.id, thread_href(entry)) - end - end - - def delete_post!(status) - remove_status_service.call(status) - end - - def find_original_status(_xml, id) - return nil if id.nil? - - if TagManager.instance.local_id?(id) - Status.find(TagManager.instance.unique_tag_to_local_id(id, 'Status')) - else - Status.find_by(uri: id) - end - end - - def fetch_remote_status(xml) - username = xml.at_xpath('./activity:object', activity: ACTIVITY_NS).at_xpath('./xmlns:author/xmlns:name').content - url = xml.at_xpath('./activity:object', activity: ACTIVITY_NS).at_xpath('./xmlns:author/xmlns:uri').content - domain = Addressable::URI.parse(url).host - account = Account.find_remote(username, domain) - - if account.nil? - account = follow_remote_account_service.call("#{username}@#{domain}") + Rails.logger.debug "Queuing remote status #{status.id} (#{id}) for distribution" + DistributionWorker.perform_async(status.id) + status end - status = Status.new(account: account, uri: target_id(xml), text: target_content(xml), url: target_url(xml), created_at: published(xml), updated_at: updated(xml)) - status.thread = find_original_status(xml, thread_id(xml)) - - if status.save && status.thread.nil? && !thread_href(xml).nil? - ThreadResolveWorker.perform_async(status.id, thread_href(xml)) + def delete_status + Rails.logger.debug "Deleting remote status #{id}" + status = Status.find_by(uri: id) + RemoveStatusService.new.call(status) unless status.nil? + nil end - status - rescue Goldfinger::Error, HTTP::Error - nil - end + def skip_unsupported_type? + !([:post, :share, :delete].include?(verb) && [:activity, :note, :comment].include?(type)) + end - def published(xml) - xml.at_xpath('./xmlns:published').content - end + def status_from_xml(entry) + # Return early if status already exists in db + status = find_status(id(entry)) + return status unless status.nil? - def updated(xml) - xml.at_xpath('./xmlns:updated').content - end + status = Status.create!({ + uri: id(entry), + url: url(entry), + account: account?(entry) ? find_or_resolve_account(acct(entry)) : @account, + text: content(entry), + created_at: published(entry), + }) - def content(xml) - xml.at_xpath('./xmlns:content').try(:content) - end + if thread?(entry) + status.thread = find_or_resolve_status(status, *thread(entry)) + end - def thread_id(xml) - xml.at_xpath('./thr:in-reply-to', thr: THREAD_NS).attribute('ref').value - rescue - nil - end + mentions_from_xml(status, entry) + hashtags_from_xml(status, entry) + media_from_xml(status, entry) - def thread_href(xml) - xml.at_xpath('./thr:in-reply-to', thr: THREAD_NS).attribute('href').value - rescue - nil - end + status + end - def target_id(xml) - xml.at_xpath('.//activity:object', activity: ACTIVITY_NS).at_xpath('./xmlns:id').content - rescue - nil - end + def find_or_resolve_account(acct) + FollowRemoteAccountService.new.call(acct) + end - def activity_id(xml) - xml.at_xpath('./xmlns:id').content - end + def find_or_resolve_status(parent, uri, url) + status = find_status(uri) + ThreadResolveWorker.perform_async(parent.id, url) if status.nil? - def activity_link(xml) - xml.at_xpath('./xmlns:link[@rel="alternate"]').attribute('href').value - rescue - '' - end + status + end - def target_content(xml) - xml.at_xpath('.//activity:object', activity: ACTIVITY_NS).at_xpath('./xmlns:content').content - end + def find_status(uri) + if TagManager.instance.local_id?(uri) + local_id = TagManager.instance.unique_tag_to_local_id(uri, 'Status') + return Status.find(local_id) + end - def target_url(xml) - xml.at_xpath('.//activity:object', activity: ACTIVITY_NS).at_xpath('./xmlns:link[@rel="alternate"]').attribute('href').value - end + Status.find_by(uri: uri) + end - def object_type(xml) - xml.at_xpath('./activity:object-type', activity: ACTIVITY_NS).content.gsub('http://activitystrea.ms/schema/1.0/', '').gsub('http://ostatus.org/schema/1.0/', '').to_sym - rescue - :activity - end + def mentions_from_xml(parent, xml) + processed_account_ids = [] - def verb(xml) - xml.at_xpath('./activity:verb', activity: ACTIVITY_NS).content.gsub('http://activitystrea.ms/schema/1.0/', '').gsub('http://ostatus.org/schema/1.0/', '').to_sym - rescue - :post - end + xml.xpath('./xmlns:link[@rel="mentioned"]').each do |link| + next if link['href'] == 'http://activityschema.org/collection/public' - def follow_remote_account_service - @follow_remote_account_service ||= FollowRemoteAccountService.new - end + url = Addressable::URI.parse(link['href']) - def update_remote_profile_service - @update_remote_profile_service ||= UpdateRemoteProfileService.new - end + mentioned_account = if TagManager.instance.local_domain?(url.host) + Account.find_local(url.path.gsub('/users/', '')) + else + Account.find_by(url: link['href']) || FetchRemoteAccountService.new.call(link['href']) + end - def remove_status_service - @remove_status_service ||= RemoveStatusService.new + next if mentioned_account.nil? || processed_account_ids.include?(mentioned_account.id) + + if mentioned_account.local? + # Send notifications + NotificationMailer.mention(mentioned_account, parent).deliver_later unless mentioned_account.blocking?(parent.account) + end + + mentioned_account.mentions.where(status: parent).first_or_create(status: parent) + + # So we can skip duplicate mentions + processed_account_ids << mentioned_account.id + end + end + + def hashtags_from_xml(parent, xml) + tags = xml.xpath('./xmlns:category').map { |category| category['term'] } + ProcessHashtagsService.new.call(parent, tags) + end + + def media_from_xml(parent, xml) + xml.xpath('./xmlns:link[@rel="enclosure"]').each do |link| + next unless link['href'] + + media = MediaAttachment.where(status: parent, remote_url: link['href']).first_or_initialize(account: parent.account, status: parent, remote_url: link['href']) + + begin + media.file_remote_url = link['href'] + media.save + rescue Paperclip::Errors::NotIdentifiedByImageMagickError + next + end + end + end + + def id(xml = @xml) + xml.at_xpath('./xmlns:id').content + end + + def verb(xml = @xml) + raw = xml.at_xpath('./activity:verb', activity: ACTIVITY_NS).content + raw.gsub('http://activitystrea.ms/schema/1.0/', '').gsub('http://ostatus.org/schema/1.0/', '').to_sym + rescue + :post + end + + def type(xml = @xml) + raw = xml.at_xpath('./activity:object-type', activity: ACTIVITY_NS).content + raw.gsub('http://activitystrea.ms/schema/1.0/', '').gsub('http://ostatus.org/schema/1.0/', '').to_sym + rescue + :activity + end + + def url(xml = @xml) + link = xml.at_xpath('./xmlns:link[@rel="alternate"]') + link['href'] + end + + def content(xml = @xml) + xml.at_xpath('./xmlns:content').content + end + + def published(xml = @xml) + xml.at_xpath('./xmlns:published').content + end + + def thread?(xml = @xml) + !xml.at_xpath('./thr:in-reply-to', thr: THREAD_NS).nil? + end + + def thread(xml = @xml) + thr = xml.at_xpath('./thr:in-reply-to', thr: THREAD_NS) + [thr['ref'], thr['href']] + end + + def account?(xml = @xml) + !xml.at_xpath('./xmlns:author').nil? + end + + def acct(xml = @xml) + username = xml.at_xpath('./xmlns:author/xmlns:name').content + url = xml.at_xpath('./xmlns:author/xmlns:uri').content + domain = Addressable::URI.parse(url).host + + "#{username}@#{domain}" + end end end