aaron

There are many popular Java frameworks that reflect on static type information to drive runtime behavior and yet we would like to exploit these tools from Clojure, a dynamic language.

One such framework is Apache Beam which offers a powerful, unified programming model for distributed/big data processing.

While Clojure's Java interop support is excellent, when integrating with heavily-typed libraries such as Beam there are naturally some hurdles to overcome.

Can we get frameworks like Beam with a heavily statically-typed orientation to behave dynamically?

Consider Beam's “word count” example:

  public static class CountWords
    extends PTransform<PCollection<String>,
                       PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>>
           expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new DoFn() {
         @ProcessElement
         public void processElement(
            @Element String element, OutputReceiver<String> receiver) {
               for (String word : words.split(" ")) {
                  receiver.output(word);
               }}}));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = 
                words.apply(Count.perElement());
      return wordCounts;
    }
  }

This is complex-looking code but it shouldn't be: all this is doing is extracting words from sentences and counting their occurrences.

Specifically, this code consumes sentences in a Beam PCollection, splits them up into words using a Beam PTransform (a simple mapping function, really) to produce another PCollection containing the results1.

Beam's focus on static types is evident here; there are generic type parameters everywhere.

Let's wing it anyway and see if we can get this example going in Clojure.

In our first attempt we'll try a straightforward translation of the Java code, leaning on Clojure's proxy2 facility to implement Beam's function (DoFn<Input, Output>) interface:

(def count-words
  (proxy [PTransform] []
    (expand [lines]
      (-> lines
          (.apply (ParDo/of
                   (proxy [DoFn] []
                     (processElement [line receiver]
                       (doseq [word (str/split line #" ")]
                         (.output receiver word))))))
          (.apply words (Count/perElement))))))

This is not entirely satisfying: there is still quite a lot of fanfare here to express the most basic data process; yes we're in Clojure, but we're still doing things in a very Java-ish (so far, Beam-ish) way.

Still, if it works, it works, and perhaps that's good enough to call it a day on our interop attempt. Unfortunately3 it doesn't work.

Our biggest problem is the omission of generic type parameters to our DoFn; the type signature for DoFn is <Input, Output> and nowhere are we providing these parameter values.

Beam fails at runtime without them:

Execution error (ClassCastException) at org.apache.beam.sdk.transforms.reflect.DoFnSignatures/parseSignature (DoFnSignatures.java:330).
java.lang.Class cannot be cast to java.lang.reflect.ParameterizedType

Unfortunately Clojure isn't going to help us here; it doesn't give us any mechanism for filling in generic type parameters4.

Why does Beam want this information anyway?

Beam as we've mentioned is a distributed execution platform. It needs to distribute data produced by our functions (these Beam DoFn<> implementations that we provide) across the network to multiple machines.

And to do this Beam must ser-ialize our data, put it on the wire, and de-serialize it again on the receiving end.

This ser/de occurs via a Beam “coder” and which coder to use is inferred by Beam based on the type signature of our DoFn functions.

So for example if we give Beam a DoFn<Order, Long> function that maps Order objects to customer ids (Long), Beam will reflect on these types and lookup in a registry how to encode/decode values of these types.

The need for such indirection is arguable. And having multiple disparate ways—one way via static types, the other via runtime calls to .setCoder—of “telling” Beam the same thing (how to code objects) is (also, arguably) very complex.

Fortunately we can dodge these issues entirely by doing data processing in idiomatic Clojure fashion—that is, by representing our data via ad-hoc maps and standard Clojure data types—by doing so, we can use the same coder everywhere5.

Still. Beam blows up without a generic type signature, so we have to give it something anyway.

Because none of Clojure's compilation facilities have the ability to render class signatures, there is no other way around6 giving Beam the generic parameters it requires; we must use javac.

But we only need to make a single such DoFn class, a dynamic one, and have it dispatch back to Clojure. We'll write it once and use it again and again to build Beam pipelines:

class DynamicDoFn extends DoFn<Object, Object> {
   private final Var fn;

   public DoFn(Var fn) {
      this.fn = fn;
   }

   @Setup public void setup() {
      synchronized (this) {
         Clojure.var("clojure.core", "require")
            .invoke(fn.ns.name);
      }
   }

   @ProcessElement public void processElement(ProcessContext context) {
      ISeq seq = RT.seq(fn.invoke(context.element()));
      while (seq != null && (seq = seq.next()) != null)
         context.output(seq.first());
      }
   }

There we go. Now Beam has its type parameters, namely <Object, Object>—to stay dynamic we've used the most general type that Java allows, Object.

Interestingly Beam is happy just to have a type signature here even though it won't be able to infer a Coder from just an Object type.

Since our Coder will be nippy every time, let's make a helper function we can use to instantiate steps in our Beam pipeline:

(defn xform* [pcoll fn-var]
   (-> (.apply pcoll (ParDo/of (DynamicDoFn. fn-var)))
     (.setCoder (NippyCoder.))))

Now we're ready to build pipelines in Clojure. Our example now looks like this:

// Our simple mapping function.
(defn split-line [line] (str/split line #" "))

// Our data process.
(def count-words
  (proxy [PTransform] []
    (expand [lines]
      (-> lines
          (.apply (xform* #'split-line))
          (.apply (Count/perElement))))))

This is much better compared to the Java alternative.

Our split-line function could not be simpler and is REPL-ready, to boot.

With our nascent little “framework” (right now, just this xform* function) we can build complex Beam pipelines from simple Clojure functions.

To keep this post short, some of the implementation details of our DynamicDoFn will have to speak for themselves.

For one, why do we pass function vars instead of passing a direct function reference instead?

The short answer is that vars are Serializable.

Beam not only needs to distribute our data over the wire but it also needs to encode and decode our functions. DoFn is required to be Serializable.

When a var is deserialized on a Beam worker node, it rebinds to the referent function defined in the corresponding namespace. (There is much more to discuss and more power to enable here, which will be saved for a future post.)

We are just barely scratching the surface of what a Clojure to Beam integration will look like, but hopefully this post is helping pave the way for others contending with Java interop in a land of libraries that leverage type reflection.

Following similar techniques described in this post, a highly simple and dynamic library for Beam can be built up, tapping into Beam's rich model for expressing data pipelines while offering Clojure's most powerful capabilities (ad-hoc maps, destructuring, a first-class REPL experience, inherent dynamism, and so on.) More on this soon.

One last interesting consequence of this effort is that many of the catalog of Java (ParDo-style) transforms that Beam offers in-the-box are more or less obviated when we can compose pipelines in Clojure.

As a demonstrative example, let's assume we have a Beam PCollection of key-value pairs of words and counts and want to run this data process:

  • Filter out words that occur less than 10 times
  • Swap the key-value pair
  • Concatenate the key and value with a “:” separator

In Java, we would be best to tap into Beam's custom transforms:

...
coll
   .apply(Filter.by(
      new SimpleFunction<KV<String, Long>, Boolean>() {
         @Override public Boolean apply(KV<String, Long> kv) {
            return kv.getValue() > 10; }}))
   .apply(KvSwap.create())
   .apply(MapElements.via(
         new SimpleFunction<KV<Long, String>, String>() {
            @Override public Integer apply(KV<Long, String> kv) {
                 return kv.getKey() + ":" + kv.getValue() }}));

In Clojure we can do this without Beam utilities and with arguably more clarity7:

(defn filter* [[k v]] (when (> v 10) [k v]))
(defn swap* [[k v]] [v k])
(defn str* [[k v]] (str k ":" v))
...
(-> coll 
   (.apply (xform* #'filter*)) 
   (.apply (xform* #'swap*)) 
   (.apply (xform* #'str*))

Even by using Beam's built-in facilities in Java (and not using them in Clojure), Clojure turns out to be much simpler. This is one reason why so many people love this language.

More to come!

#clojure


Footnotes

1 These strange PCollection and PTransform abstractions are necessary because unlike typical in-memory functions and collection data structures these are explicitly distributed constructs.

2 https://clojuredocs.org/clojure.core/proxy

3 Or fortunately, depending on how you look at it. Because with a little maneuvering we'll see that we can have our cake (succinct, powerful Clojure code) and eat it too (have it actually work!)

4 Nor do we necessarily need proxy to support generic parameterization; though it could. Clojure's simplicity is part and parcel to its dynamism; we're looking to get out of this static-time type game.

5 For example, nippy would be a respectable choice. It's what Onyx uses.

6 Short of lobbying Alex Miller to bring it to a Clojure version, or writing a byte-code generating, say “super proxy”, function that clones proxy but adds this capability.

7 Our DynamicDoFn won't give us the key/value destructuring you see there, but it's close—and we can get there. More to come in future posts.

An Evolution: Crawl → Walk → Run (Class → Struct → Map)

Here’s an article on code architecture in Rails. It’s among many of its kind:

Separating Data and Code in Rails Architecture

The idea presented in this article isn’t Rails-specific, though: it’s a realization arrived at by many developers regardless of their programming language. The idea specifically is that OO classes (which couple behavior—methods—to data) does not result in composable, readily reuseable code.

The natural “solution” to this “problem” is to write functions not methods. Stealing directly from that article, in Ruby this looks something like the following:

# instead of this...
user.purchase_policy(params)

# ...you do this:
PolicyService.purchase_policy(:user => user, :params => params)

(Side note: this is the essential first step toward functional programming. There are more tools in the FP kitchen sink — eg, immutability and first-class functions — but the core power move is this: put functions first, not methods.)

Anyway, the point of this particular post is to introduce another natural step that occurs in the general evolution away from OO.

After you’ve decoupled your behaviors and state, you’re left with a method-less data class like this:

class User
   attr_accessor :name, :age, :etc
end

And this is pretty good. However the defensive programmer usually isn’t satisfied. To discourage others from adding methods back in, this programmer might jettison the class, replacing it with something like so:

User = Struct.new(:name, :age, :etc)

An improvement: we get the features we want but this definition is more to the point and more method-resistant than its class counterpart.

But — and we’re getting closer to the point now — many functional programmers are not even satisfied with this. Coupling (minimizing it) and cohesion (maximizing it) are the pillars of good software design and the FP kids (the, say, Clojure variety) will claim that we’re still suffering on both of those counts.

The easiest way to demonstrate this without going too abstract is to show what the alternative looks like when you pull things apart: i.e., when you pull the data validation concern from the data domain definition concern. Specifically, what things look like when you express your validation completely (cohesively) and separately (decoupled) from your data form.

This is executable Ruby code:

# define what is a valid user...
validation_rules = lambda { |_|
   validates :name, presence: true
   validates_numericality_of :age, :greater_than => -1
}
 
...
 
 # define a user instance...
user_instance = 
   OpenStruct.valid(
      { :name => “Aaron”, :age => 35 }, &validation_rules)
 
...
# ...finally, exploit our data for fun and profit:
user_instance.valid? # => true
user_instance.errors # => []
...

Note what’s happening here: we have specified our validation rules completely independently from our data object. The rules and data object can both be declared, exist and passed around independently. At any point, we can bring them back together: we can compose our validation specification with the data for the given use case at hand.

Now this sort of decoupling, like the separation of methods from classes above, gives more direct freedom and flexibility. Example: I can compose different validations with the same user data object depending on use case/context, which is something I cannot do fully with the standard class/Struct approach in which part of the “validation” is inextricably coupled to the data.

(Side note: Just because you get more freedom and flexibility by decoupling doesn’t mean that’s what’s always done or should be done. There is a lot of subjective and contentious partisanship in the developer world here that we don’t need to get caught on, at least in this post.)

As a last note, there is a ton of “Prior Art” here: Clojure and Clojure spec, to give just one example, strongly fall on the “decouple it!” side of the divide. (Clojure Spec is essentially a more feature-rich and explicitly decoupled version of ActiveModel::Validations. Incidentally it appears that somebody has made a Ruby clone of Clojure spec.)

You don’t have to take a pure stance on any of the above to get value out of these ideas however.

ActiveModel::Validations wasn’t explicitly designed to be decoupled and used dynamically as shown above, but it can be used that way fairly straight-forwardly, thanks to Ruby’s module dynamics. And there’s one place where data is already naturally decoupled from validation: Sidekiq/queue worker messages. (And more generally: any place where you take data off of the wire and bring it into code — API method handlers is another example.)

Consider this Sidekiq worker, which has a perform method that receives messages from a work queue:

class MyWorker
 include Sidekiq::Worker
   
 def perform(payload)
   # ...do stuff with payload...
 end
end

When taking an object off of the wire like this, a very useful practice is to do “front-door validation” and either fail-fast or bail-fast before you start getting into your worker process and doing things like writing to the database etc. (It also makes debugging simpler because you get an immediate feedback that the failure happened right at the front door.)

Anyway — without a standardized way to express these front-door validations, you are left with very limited and manual attempts at doing this validation.

A simpler, clearer, and much richer way to express these front-door validations would be to leverage the dynamic validations discussed above:

class MyWorker
   include Sidekiq::Worker
 
   validation = lambda { |_|
      validates :external_id, presence: true
      validates_numericality_of :type, inclusion: {in: [‘fruit’, ‘veggie’]}
    }
    
def perform(payload)
   valid_payload = OpenStruct.valid!(payload, &validation)
   # ...do stuff with valid_payload...
   Record.create!(external_id: valid_payload.external_id, ...)
 end
end

(Note: in this snippet OpenStruct.valid! raises/throws however OpenStruct.valid would simply return the validated payload on which the downstream code could call valid? and errors if it wanted to be more graceful about things.)

How do you use ActiveModel::Validations dynamically like this? One attempt is to simply include it on any given instance’s eigenclass:

...
valid_payload = OpenStruct.new(payload)
valid_payload.singleton_class.include ActiveModel::Validations
class << valid_payload
   validates :name, presence: true
   validates_numericality_of :age, :greater_than => -1
end
...
valid_payload.valid?

That works. And the upside here is that there is no monkey-punching going on (although it could use a significant bit of syntactic cleanup). But the biggest issue with this (eigenclass) approach is performance. The Benchmark for this approach has it at 6 times slower than baseline. This turn out to all be due to the cost of including ActiveModel::Validations each time you want to validate an instance.

This isn’t necessary. We can get nearly no overhead by creating an include-able module like so:

module DynamicValidations
   def self.included(clazz)
      clazz.include(ActiveModel::Validations)
      clazz.extend(ClassMethods)
   end
   
   module ClassMethods
      def valid(*args, &block)
         obj = new(*args)
         obj.class_eval(&block)
         obj
      end
      def valid!(*args, &block)
         validated = validated(*args, &block)
         raise ArgumentError, validated.errors.messages.to_s unless validated.valid?
         validated
      end
   end
end

The cool thing about this module is that it can be included on any type that you wish to dynamically validate not just OpenStructs, and including presumably plain ol’ Structs.

Now to get all of the OpenStruct validation examples shown above, we simply do:

OpenStruct.include DynamicValidations

And with this, concise, rich, dynamic validations are at our fingertips. For fun and profit, use them as your front-door validations or anywhere else you find valuable!

#ruby