aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--app/workers/activitypub/delivery_worker.rb2
-rw-r--r--app/workers/activitypub/processing_worker.rb2
-rw-r--r--app/workers/concerns/exponential_backoff.rb18
-rw-r--r--config/initializers/sidekiq.rb5
-rw-r--r--lib/argument_deduplication.rb26
-rw-r--r--lib/argument_deduplication/argument.rb49
-rw-r--r--lib/argument_deduplication/client.rb25
-rw-r--r--lib/argument_deduplication/server.rb38
8 files changed, 163 insertions, 2 deletions
diff --git a/app/workers/activitypub/delivery_worker.rb b/app/workers/activitypub/delivery_worker.rb
index d9153132b..cd265a20d 100644
--- a/app/workers/activitypub/delivery_worker.rb
+++ b/app/workers/activitypub/delivery_worker.rb
@@ -8,7 +8,7 @@ class ActivityPub::DeliveryWorker
STOPLIGHT_FAILURE_THRESHOLD = 10
STOPLIGHT_COOLDOWN = 60
- sidekiq_options queue: 'push', retry: 16, dead: false
+ sidekiq_options queue: 'push', retry: 16, dead: false, deduplicate_arguments: 0
HEADERS = { 'Content-Type' => 'application/activity+json' }.freeze
diff --git a/app/workers/activitypub/processing_worker.rb b/app/workers/activitypub/processing_worker.rb
index 4d06ad079..3fc97fcd1 100644
--- a/app/workers/activitypub/processing_worker.rb
+++ b/app/workers/activitypub/processing_worker.rb
@@ -3,7 +3,7 @@
class ActivityPub::ProcessingWorker
include Sidekiq::Worker
- sidekiq_options backtrace: true, retry: 8
+ sidekiq_options backtrace: true, retry: 8, deduplicate_arguments: 1
def perform(actor_id, body, delivered_to_account_id = nil, actor_type = 'Account')
case actor_type
diff --git a/app/workers/concerns/exponential_backoff.rb b/app/workers/concerns/exponential_backoff.rb
index f2b931e33..3a4ebbc98 100644
--- a/app/workers/concerns/exponential_backoff.rb
+++ b/app/workers/concerns/exponential_backoff.rb
@@ -4,6 +4,24 @@ module ExponentialBackoff
extend ActiveSupport::Concern
included do
+ # # | Next retry backoff | Total waiting time
+ # ---|--------------------|--------------------
+ # 1 | 32 | 32
+ # 2 | 320 | 352
+ # 3 | 1022 | 1374
+ # 4 | 3060 | 4434
+ # 5 | 6778 | 11212
+ # 6 | 16088 | 27300
+ # 7 | 37742 | 65042
+ # 8 | 53799 | 118841
+ # 9 | 105677 | 224518
+ # 10 | 129972 | 354490
+ # 11 | 270226 | 624716
+ # 12 | 301127 | 925843
+ # 13 | 287711 | 1213554
+ # 14 | 616223 | 1829777
+ # 15 | 607261 | 2437038
+ # 16 | 1161984 | 3599022
sidekiq_retry_in do |count|
15 + 10 * (count**4) + rand(10 * (count**4))
end
diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb
index c1327053d..ac96de85d 100644
--- a/config/initializers/sidekiq.rb
+++ b/config/initializers/sidekiq.rb
@@ -1,6 +1,7 @@
# frozen_string_literal: true
require_relative '../../lib/mastodon/sidekiq_middleware'
+require_relative '../../lib/argument_deduplication'
Sidekiq.configure_server do |config|
config.redis = REDIS_SIDEKIQ_PARAMS
@@ -10,13 +11,16 @@ Sidekiq.configure_server do |config|
end
config.server_middleware do |chain|
+ chain.add ArgumentDeduplication::Server
chain.add SidekiqUniqueJobs::Middleware::Server
end
config.client_middleware do |chain|
+ chain.add ArgumentDeduplication::Client
chain.add SidekiqUniqueJobs::Middleware::Client
end
+ ArgumentDeduplication.configure(config)
SidekiqUniqueJobs::Server.configure(config)
end
@@ -24,6 +28,7 @@ Sidekiq.configure_client do |config|
config.redis = REDIS_SIDEKIQ_PARAMS
config.client_middleware do |chain|
+ chain.add ArgumentDeduplication::Client
chain.add SidekiqUniqueJobs::Middleware::Client
end
end
diff --git a/lib/argument_deduplication.rb b/lib/argument_deduplication.rb
new file mode 100644
index 000000000..f271b6f96
--- /dev/null
+++ b/lib/argument_deduplication.rb
@@ -0,0 +1,26 @@
+# frozen_string_literal: true
+
+require_relative './argument_deduplication/argument'
+require_relative './argument_deduplication/server'
+require_relative './argument_deduplication/client'
+
+module ArgumentDeduplication
+ class CorruptedArgumentError < ::RuntimeError; end
+
+ PREFIX = 'argument_store'
+
+ # The time-to-live is based on the maximum amount of time
+ # a job can possibly spend in the retry queue, assuming
+ # the exponential backoff algorithm and a maximum number
+ # of 16 retries. It is intended as a safe-guard against
+ # any arguments being orphaned due to interruptions.
+ TTL = 50.days.to_i
+
+ DEATH_HANDLER = ->(job) {
+ Argument.new(job['args'][job['deduplicate_arguments']]).pop! if job['deduplicate_arguments']
+ }.freeze
+
+ def self.configure(config)
+ config.death_handlers << DEATH_HANDLER
+ end
+end
diff --git a/lib/argument_deduplication/argument.rb b/lib/argument_deduplication/argument.rb
new file mode 100644
index 000000000..60be6bca2
--- /dev/null
+++ b/lib/argument_deduplication/argument.rb
@@ -0,0 +1,49 @@
+# frozen_string_literal: true
+
+module ArgumentDeduplication
+ class Argument
+ def self.from_value(value)
+ new(Digest::SHA256.base64digest(value), value)
+ end
+
+ attr_reader :content_hash, :value
+
+ def initialize(content_hash, value)
+ @content_hash = content_hash
+ @value = value
+ end
+
+ def push!
+ with_redis do |redis|
+ redis.multi do |transaction|
+ transaction.set("#{PREFIX}:value:#{content_hash}", value, ex: TTL)
+ transaction.incr("#{PREFIX}:refcount:#{content_hash}")
+ transaction.expire("#{PREFIX}:refcount:#{content_hash}", TTL)
+ end
+ end
+ end
+
+ def pop!
+ with_redis do |redis|
+ redis.decr("#{PREFIX}:refcount:#{content_hash}")
+
+ redis.watch("#{PREFIX}:refcount:#{content_hash}") do
+ if redis.get("#{PREFIX}:refcount:#{content_hash}").to_i <= 0
+ redis.multi do |transaction|
+ transaction.del("#{PREFIX}:refcount:#{content_hash}")
+ transaction.del("#{PREFIX}:value:#{content_hash}")
+ end
+ else
+ redis.unwatch
+ end
+ end
+ end
+ end
+
+ private
+
+ def with_redis(&block)
+ Sidekiq.redis(&block)
+ end
+ end
+end
diff --git a/lib/argument_deduplication/client.rb b/lib/argument_deduplication/client.rb
new file mode 100644
index 000000000..b4c6f5999
--- /dev/null
+++ b/lib/argument_deduplication/client.rb
@@ -0,0 +1,25 @@
+# frozen_string_literal: true
+
+module ArgumentDeduplication
+ class Client
+ include Sidekiq::ClientMiddleware
+
+ def call(_worker, job, _queue, _redis_pool)
+ process_arguments!(job)
+ yield
+ end
+
+ private
+
+ def process_arguments!(job)
+ return unless job['deduplicate_arguments']
+
+ argument_index = job['deduplicate_arguments']
+ argument = Argument.from_value(job['args'][argument_index])
+
+ argument.push!
+
+ job['args'][argument_index] = argument.content_hash
+ end
+ end
+end
diff --git a/lib/argument_deduplication/server.rb b/lib/argument_deduplication/server.rb
new file mode 100644
index 000000000..0da4d1c62
--- /dev/null
+++ b/lib/argument_deduplication/server.rb
@@ -0,0 +1,38 @@
+# frozen_string_literal: true
+
+module ArgumentDeduplication
+ class Server
+ include Sidekiq::ServerMiddleware
+
+ def call(_worker, job, _queue)
+ argument = process_argument!(job)
+
+ yield
+
+ # If the job completes successfully, we can remove
+ # the argument from the store. If there is an exception,
+ # the job will be retried, so we can't remove the argument
+ # from the store yet. When retries are exhausted, or when
+ # retries are disabled for the worker, the configured death
+ # handler will remove it.
+
+ argument&.pop!
+ end
+
+ private
+
+ def process_argument!(job)
+ return unless job['deduplicate_arguments']
+
+ argument_index = job['deduplicate_arguments']
+ content_hash = job['args'][argument_index]
+ value = Sidekiq.redis { |redis| redis.get("#{PREFIX}:value:#{content_hash}") }
+
+ raise CorruptedArgumentError, "The argument for hash #{content_hash} could not be found" if value.nil?
+
+ job['args'][argument_index] = value
+
+ Argument.new(content_hash, value)
+ end
+ end
+end