Atille

Ruby has several good-looking schedulers available. From its standard library, the Thread and Fiber classes are fine, but really raw ; but this is a pretty good base to create a queue-based system.

Here is the initial problem

  • A software must be developed in Ruby
  • This software runs a scheduler
  • The scheduler launches several job with different intervals
  • Each job returns its resulting value
  • Each resulting value is sent to somewhere in the cloud.

This blog article will only take care of the two last steps.

Potential solution

dry-events is a Ruby library based on standard Thread one. It allows us to create several queues, subscribe to them and publish new messages to it. Since each queue is theoretically asynchronous from the other ones, we can use this mechanism to define priorities and implementation logic.

First steps

We need to create a Queues class that'll create all the different queues and return them as an object instance. From a hypothetical class called “App”, we need to be able to instantiate the Queues object and transmit it to required jobs.

# lib/queues.rb
require "dry/events/publisher"

class Queues
  include Dry::Events::Publisher[:queue]

  def initialize
    @queues = %w(queue1 queue2 queue3)
    _create_queues
    _listen_queues
  end

  protected
  def _create_queues
    @queues.each do |queue|
      register_event(queue)
    end
  end

  def _listen_queues
     @queues.each do |queue|
        subscribe(queue) do |event|
          puts "%s | (%s) : %s" % [
            Time.now.to_s, event.id, event.payload.to_s]
        end
      end
    end
end

Here, this class is pretty straight-forward: * we require the dry-events library * we define the wanted queue names * we create them (I annotate methods with _ when protected) * we listen to them, awaiting for an event.

This last step is interesting. What we do is : for each queue determined in @queues variable (queue1 queue2 and queue3), we'll subscribe to what they say and each message received will be called event. dry-events events have a specific id which is the queue name and a payload which is a hash of whatever we want.

Load this class

Now that we have the required class constructed, we need to fetch it from the application. The following app won't be that complex, but it defines the required logic.

# app.rb
require "./lib/queues"

queues = Queues.new
queues.publish('queue1', message: "hello world")

Running this piece of code should display :

2020-02-10 09:59:25 +0100 | (queue1) : {:message=>"hello world"}

Going further

Now that we saw this piece of concept is functional, we can go further. Since each queue is independent, we can define subscribe calls to handle specific tasks when a queue is populated with a message.

Still on lib/queues.rb

def initialize
  ...
  _set_routines
end

....
def _set_routines
   subscribe('queue1') do |event|
      puts "Activated routine"
    end
end

Here is what will happen when running this application: * The hello world message will be sent to queue queue1 * The previously defined routine will “see” that a new event appeared in queue1 * It'll display an additional message, “Activated routine”.

As this example is really simple and straight-forward, we need to think further, again.

Continue the exploration

As describe at this article first paragraph, I need to launch several jobs with intervals, find their returning value and publish them somewhere. With this theory, I can create a stateful application based on queues.

@queues = %w(
job.started
job.pending
job.failed
job.success

update.started
update.pending
update.failed
update.success)

On this queues definition, job. prefix refers to the jobs I want to execute through my scheduler and update. is the operation I wanna do when a job successes.

With this information, I can already create a basic logic implementation as seen before:

# lib/queues.rb
...
def _set_routines
  subscribe('job.success') do |event|
    publish('update.pending', event: event)
  end

  subscribe('update.pending') do |event|
     ...
     publish('update.started', event: event)
  end

  subscribe('update.started', event: event)
    if ExternalLibrary.update(event)
      publish('update.success', event: event)
    else
      publish('update.failed', event: event)
    end
  end

  subscribe('update.success') do
    puts "Successful update"
  end

  subscribe('update.failed') do 
    puts "Update failed."
  end
end

This is clear: when a job successes, its information are sent to another queue, update.pending. It does things, then place the operation in queue update.started. It tries to do things : it it fails, it goes on update.failed, if it successes, it goes on update.success. Both of them will print different messages.

Conclusion

This approach is interesting for different reasons:

  • complex logic can be implemented easily
  • ability to create applicative middlewares by hijacking messages to another queue (as a logger, for example)
  • Asynchronous by design, non-blocking
  • Stateless queue definitions, making deployment and tests easier.

Ruby has several good-looking schedulers available. From its standard library, the Thread and Fiber classes are fine, but really raw ; but this is a pretty good base to create a queue-based system.

Here is the initial problem

  • A software must be developed in Ruby
  • This software runs a scheduler
  • The scheduler launches several job with different intervals
  • Each job returns its resulting value
  • Each resulting value is sent to somewhere in the cloud.

This blog article will only take care of the two last steps.

Potential solution

dry-events is a Ruby library based on standard Thread one. It allows us to create several queues, subscribe to them and publish new messages to it. Since each queue is theoretically asynchronous from the other ones, we can use this mechanism to define priorities and implementation logic.

First steps

We need to create a Queues class that'll create all the different queues and return them as an object instance. From a hypothetical class called “App”, we need to be able to instantiate the Queues object and transmit it to required jobs.

# lib/queues.rb
require "dry/events/publisher"

class Queues
  include Dry::Events::Publisher[:queue]

  def initialize
    @queues = %w(queue1 queue2 queue3)
    _create_queues
    _listen_queues
  end

  protected
  def _create_queues
    @queues.each do |queue|
      register_event(queue)
    end
  end

  def _listen_queues
     @queues.each do |queue|
        subscribe(queue) do |event|
          puts "%s | (%s) : %s" % [
            Time.now.to_s, event.id, event.payload.to_s]
        end
      end
    end
end

Here, this class is pretty straight-forward: * we require the dry-events library * we define the wanted queue names * we create them (I annotate methods with _ when protected) * we listen to them, awaiting for an event.

This last step is interesting. What we do is : for each queue determined in @queues variable (queue1 queue2 and queue3), we'll subscribe to what they say and each message received will be called event. dry-events events have a specific id which is the queue name and a payload which is a hash of whatever we want.

Load this class

Now that we have the required class constructed, we need to fetch it from the application. The following app won't be that complex, but it defines the required logic.

# app.rb
require "./lib/queues"

queues = Queues.new
queues.publish('queue1', message: "hello world")

Running this piece of code should display :

2020-02-10 09:59:25 +0100 | (queue1) : {:message=>"hello world"}

Going further

Now that we saw this piece of concept is functional, we can go further. Since each queue is independent, we can define subscribe calls to handle specific tasks when a queue is populated with a message.

Still on lib/queues.rb

def initialize
  ...
  _set_routines
end

....
def _set_routines
   subscribe('queue1') do |event|
      puts "Activated routine"
    end
end

Here is what will happen when running this application: * The hello world message will be sent to queue queue1 * The previously defined routine will “see” that a new event appeared in queue1 * It'll display an additional message, “Activated routine”.

As this example is really simple and straight-forward, we need to think further, again.

Continue the exploration

As describe at this article first paragraph, I need to launch several jobs with intervals, find their returning value and publish them somewhere. With this theory, I can create a stateful application based on queues.

@queues = %w(
job.started
job.pending
job.failed
job.success

update.started
update.pending
update.failed
update.success)

On this queues definition, job. prefix refers to the jobs I want to execute through my scheduler and update. is the operation I wanna do when a job successes.

With this information, I can already create a basic logic implementation as seen before:

# lib/queues.rb
...
def _set_routines
  subscribe('job.success') do |event|
    publish('update.pending', event: event)
  end

  subscribe('update.pending') do |event|
     ...
     publish('update.started', event: event)
  end

  subscribe('update.started', event: event)
    if ExternalLibrary.update(event)
      publish('update.success', event: event)
    else
      publish('update.failed', event: event)
    end
  end

  subscribe('update.success') do
    puts "Successful update"
  end

  subscribe('update.failed') do 
    puts "Update failed."
  end
end

This is clear: when a job successes, its information are sent to another queue, update.pending. It does things, then place the operation in queue update.started. It tries to do things : it it fails, it goes on update.failed, if it successes, it goes on update.success. Both of them will print different messages.

Conclusion

This approach is interesting for different reasons:

  • complex logic can be implemented easily
  • ability to create applicative middlewares by hijacking messages to another queue (as a logger, for example)
  • Asynchronous by design, non-blocking
  • Stateless queue definitions, making deployment and tests easier.

Ruby has several good-looking schedulers available. From its standard library, the Thread and Fiber classes are fine, but really raw ; but this is a pretty good base to create a queue-based system.

Here is the initial problem

  • A software must be developed in Ruby
  • This software runs a scheduler
  • The scheduler launches several job with different intervals
  • Each job returns its resulting value
  • Each resulting value is sent to somewhere in the cloud.

Potential solution

dry-event is a Ruby library based on standard Thread one. It allows us to create several queues, subscribe to them and publish new messages on it. Since each queue is theoretically asynchronous from the other ones, we can use this mechanism to define priorities and implementation logic.

require "dry/events/publisher"

class Queues
  include Dry::Events::Publisher[:queue]

  def initialize
    @queues = %w(queue1 queue2 queue3)
    _create_queues
    _listen_queues
  end

  def _create_queues
    @queues.each do |queue|
      register_event(queue)
    end
  end

  def _listen_queues
     @queues.each do |queue|
        subscribe(queue) do |event|
          puts "%s | (%s) : %s" % [
            Time.now.to_s, event.id, event.payload.to_s]
        end
      end
    end
end