diff --git a/lib/action_subscriber/bunny/subscriber.rb b/lib/action_subscriber/bunny/subscriber.rb index d4a1ebd..69419fd 100644 --- a/lib/action_subscriber/bunny/subscriber.rb +++ b/lib/action_subscriber/bunny/subscriber.rb @@ -33,6 +33,7 @@ def auto_pop! :message_id => nil, :routing_key => delivery_info.routing_key, :queue => queue.name, + :middleware => route.middleware } env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties) enqueue_env(route.threadpool, env) @@ -57,6 +58,7 @@ def auto_subscribe! :message_id => properties.message_id, :routing_key => delivery_info.routing_key, :queue => queue.name, + :middleware => route.middleware } env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties) enqueue_env(route.threadpool, env) @@ -72,7 +74,7 @@ def enqueue_env(threadpool, env) logger.info "RECEIVED #{env.message_id} from #{env.queue}" threadpool.async(env) do |env| ::ActiveSupport::Notifications.instrument "process_event.action_subscriber", :subscriber => env.subscriber.to_s, :routing_key => env.routing_key, :queue => env.queue do - ::ActionSubscriber.config.middleware.call(env) + env.middleware.call(env) end end end diff --git a/lib/action_subscriber/march_hare/subscriber.rb b/lib/action_subscriber/march_hare/subscriber.rb index a6be0fd..c820ab9 100644 --- a/lib/action_subscriber/march_hare/subscriber.rb +++ b/lib/action_subscriber/march_hare/subscriber.rb @@ -29,6 +29,7 @@ def auto_pop! :message_id => metadata.message_id, :routing_key => metadata.routing_key, :queue => queue.name, + :middleware => route.middleware } env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties) enqueue_env(route.threadpool, env) @@ -54,6 +55,7 @@ def auto_subscribe! :message_id => metadata.message_id, :routing_key => metadata.routing_key, :queue => queue.name, + :middleware => route.middleware } env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties) enqueue_env(route.threadpool, env) @@ -73,7 +75,7 @@ def enqueue_env(threadpool, env) logger.info "RECEIVED #{env.message_id} from #{env.queue}" threadpool.async(env) do |env| ::ActiveSupport::Notifications.instrument "process_event.action_subscriber", :subscriber => env.subscriber.to_s, :routing_key => env.routing_key, :queue => env.queue do - ::ActionSubscriber.config.middleware.call(env) + env.middleware.call(env) end end end diff --git a/lib/action_subscriber/middleware.rb b/lib/action_subscriber/middleware.rb index 883762d..6c33c1f 100644 --- a/lib/action_subscriber/middleware.rb +++ b/lib/action_subscriber/middleware.rb @@ -3,11 +3,12 @@ require "action_subscriber/middleware/error_handler" require "action_subscriber/middleware/router" require "action_subscriber/middleware/runner" +require "action_subscriber/middleware/stack" module ActionSubscriber module Middleware def self.initialize_stack - builder = ::Middleware::Builder.new(:runner_class => ::ActionSubscriber::Middleware::Runner) + builder = ::ActionSubscriber::Middleware::Stack.new(:runner_class => ::ActionSubscriber::Middleware::Runner) builder.use ErrorHandler builder.use Decoder diff --git a/lib/action_subscriber/middleware/env.rb b/lib/action_subscriber/middleware/env.rb index bef420c..5528b6f 100644 --- a/lib/action_subscriber/middleware/env.rb +++ b/lib/action_subscriber/middleware/env.rb @@ -13,7 +13,8 @@ class Env :message_id, :routing_key, :queue, - :subscriber + :subscriber, + :middleware ## # @param subscriber [Class] the class that will handle this message @@ -38,6 +39,16 @@ def initialize(subscriber, encoded_payload, properties) @queue = properties.fetch(:queue) @routing_key = properties.fetch(:routing_key) @subscriber = subscriber + @middleware = properties.fetch(:middleware) { ::ActionSubscriber.config.middleware.forked } + end + + #allow env to be get/set from outside, like rack middleware allows + def [](key) + instance_variable_get(:"@#{key}") + end + + def []=(key, value) + instance_variable_set(:"@#{key}", value) end def acknowledge diff --git a/lib/action_subscriber/middleware/stack.rb b/lib/action_subscriber/middleware/stack.rb new file mode 100644 index 0000000..95a4a20 --- /dev/null +++ b/lib/action_subscriber/middleware/stack.rb @@ -0,0 +1,13 @@ +require 'middleware/builder' + +module ActionSubscriber + module Middleware + class Stack < ::Middleware::Builder + def forked + forked_stack = self.class.new(:runner_class => @runner_class) + forked_stack.instance_variable_set(:@stack, @stack.dup) + forked_stack + end + end + end +end diff --git a/lib/action_subscriber/route.rb b/lib/action_subscriber/route.rb index cf7cd05..8753b9d 100644 --- a/lib/action_subscriber/route.rb +++ b/lib/action_subscriber/route.rb @@ -2,13 +2,14 @@ module ActionSubscriber class Route attr_reader :acknowledgements, :action, - :durable, + :durable, :exchange, :prefetch, :queue, :routing_key, :subscriber, - :threadpool + :threadpool, + :middleware def initialize(attributes) @acknowledgements = attributes.fetch(:acknowledgements) @@ -20,6 +21,7 @@ def initialize(attributes) @routing_key = attributes.fetch(:routing_key) @subscriber = attributes.fetch(:subscriber) @threadpool = attributes.fetch(:threadpool) { ::ActionSubscriber::Threadpool.pool(:default) } + @middleware = attributes.fetch(:middleware) { ::ActionSubscriber.config.middleware.forked } end def acknowledgements? @@ -29,5 +31,10 @@ def acknowledgements? def queue_subscription_options { :manual_ack => acknowledgements? } end + + delegate :use, :to => :middleware + delegate :insert, :to => :middleware + delegate :insert_after, :to => :middleware + delegate :insert_before, :to => :middleware end end diff --git a/lib/action_subscriber/router.rb b/lib/action_subscriber/router.rb index ce4e540..5793e13 100644 --- a/lib/action_subscriber/router.rb +++ b/lib/action_subscriber/router.rb @@ -2,6 +2,7 @@ module ActionSubscriber class Router def self.draw_routes(&block) router = self.new + ::ActiveSupport.run_load_hooks(:action_subscriber_routes, router) router.instance_eval(&block) router.routes end @@ -39,11 +40,22 @@ def resource_name(route_settings) route_settings[:subscriber].name.underscore.gsub(/_subscriber/, "").to_s end - def route(subscriber, action, options = {}) + def stack(name, &block) + stacks[name] ||= ::ActionSubscriber.config.middleware.forked.instance_eval(&block) + end + + def stacks + @stacks ||= {} + end + + def route(subscriber, action, options = {}, &block) route_settings = DEFAULT_SETTINGS.merge(options).merge(:subscriber => subscriber, :action => action) route_settings[:routing_key] ||= default_routing_key_for(route_settings) route_settings[:queue] ||= default_queue_for(route_settings) - routes << Route.new(route_settings) + route_settings[:middleware] = stacks[options[:stack]] if options.key?(:stack) + _route = Route.new(route_settings) + _route.instance_eval(&block) if block_given? + routes << _route end def routes diff --git a/routing.md b/routing.md index 246205a..a554692 100644 --- a/routing.md +++ b/routing.md @@ -38,9 +38,26 @@ The `route` method supports the following options: * `exchange` specify which exchange you expect messages to be published to (default `"events"`) * This is the equivalent of calling `exchange :actions` in your subscriber * `publisher` this will prefix your queue and routing key with the publishers name - * This is the equivalent of puting `publisher :foo` in your subscriber + * This is the equivalent of putting `publisher :foo` in your subscriber * `queue` specifies which queue you will subscribe to rather than letting ActionSubscriber infer it from the name of the subscriber and action * `routing_key` specifies the routing key that will be bound to your queue +* `stack` Lets you use a custom middleware stack you already defined using the stack method + +## Middleware Stacks + +This give you the ability to build and apply a middleware stack on a per route basis. + +``` ruby +::ActionSubscriber.draw_routes do + stack :resourceful do + use ParseResourcePayload + use LoadResource + end + + default_routes_for ::UserSubscriber + route ::NotificationSubscriber, :created, :publisher => :newman, :exchange => :events, :stack => :resourceful +end +```

Footnotes

diff --git a/spec/lib/action_subscriber/middleware/env_spec.rb b/spec/lib/action_subscriber/middleware/env_spec.rb index 2bdcaa6..334fb8e 100644 --- a/spec/lib/action_subscriber/middleware/env_spec.rb +++ b/spec/lib/action_subscriber/middleware/env_spec.rb @@ -25,6 +25,19 @@ specify { expect(subject.routing_key).to eq(properties[:routing_key]) } specify { expect(subject.queue).to eq(properties[:queue]) } + describe "#[]" do + it "gets instance variable" do + expect(subject[:action]).to eq :created + end + end + + describe "#[]=" do + it "sets instance variable" do + subject[:whatever] = :something + expect(subject[:whatever]).to eq :something + end + end + describe "#acknowledge" do it "sends an acknowledgement to rabbitmq" do expect(channel).to receive(:ack).with(properties[:delivery_tag], false) diff --git a/spec/lib/action_subscriber/middleware/stack_spec.rb b/spec/lib/action_subscriber/middleware/stack_spec.rb new file mode 100644 index 0000000..5417561 --- /dev/null +++ b/spec/lib/action_subscriber/middleware/stack_spec.rb @@ -0,0 +1,15 @@ +describe ActionSubscriber::Middleware::Stack do + subject { ::ActionSubscriber::Middleware.initialize_stack } + + context "#forked" do + let(:forked_stack) { subject.forked } + + it "duplicates the stack without modifying original" do + class A; end; + forked_stack.use(A) + expect(forked_stack.instance_variable_get(:@stack).object_id).to_not eq subject.instance_variable_get(:@stack).object_id + expect(forked_stack.instance_variable_get(:@stack).map(&:first)).to include(A) + expect(subject.instance_variable_get(:@stack).map(&:first)).to_not include(A) + end + end +end diff --git a/spec/lib/action_subscriber/router_spec.rb b/spec/lib/action_subscriber/router_spec.rb index 24eeee8..f8e013c 100644 --- a/spec/lib/action_subscriber/router_spec.rb +++ b/spec/lib/action_subscriber/router_spec.rb @@ -139,4 +139,28 @@ def dim; end expect(routes.last.subscriber).to eq(SparkleSubscriber) expect(routes.last.queue).to eq("alice.tommy.sparkle.dim") end + + it "can define a stack of middleware and use on a per route basis" do + class FakeMiddleware + def initialize(app) + @app = app + end + + def call(env) + env + end + end + + routes = described_class.draw_routes do + stack :fake_stack do + use FakeMiddleware + end + + route FakeSubscriber, :foo, :stack => :fake_stack + route FakeSubscriber, :bar + end + + expect(routes.first.middleware.instance_variable_get(:@stack).last.first).to eq(FakeMiddleware) + expect(routes.last.middleware.instance_variable_get(:@stack).last.first).to_not eq(FakeMiddleware) + end end