Clojure, Beam, and Dodging Static Types

There are many popular Java frameworks that reflect on static type information to drive runtime behavior and yet we would like to exploit these tools from Clojure, a dynamic language.

One such framework is Apache Beam which offers a powerful, unified programming model for distributed/big data processing.

While Clojure's Java interop support is excellent, when integrating with heavily-typed libraries such as Beam there are naturally some hurdles to overcome.

Can we get frameworks like Beam with a heavily statically-typed orientation to behave dynamically?

Consider Beam's “word count” example:

  public static class CountWords
    extends PTransform<PCollection<String>,
                       PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>>
           expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new DoFn() {
         @ProcessElement
         public void processElement(
            @Element String element, OutputReceiver<String> receiver) {
               for (String word : words.split(" ")) {
                  receiver.output(word);
               }}}));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = 
                words.apply(Count.perElement());
      return wordCounts;
    }
  }

This is complex-looking code but it shouldn't be: all this is doing is extracting words from sentences and counting their occurrences.

Specifically, this code consumes sentences in a Beam PCollection, splits them up into words using a Beam PTransform (a simple mapping function, really) to produce another PCollection containing the results1.

Beam's focus on static types is evident here; there are generic type parameters everywhere.

Let's wing it anyway and see if we can get this example going in Clojure.

In our first attempt we'll try a straightforward translation of the Java code, leaning on Clojure's proxy2 facility to implement Beam's function (DoFn<Input, Output>) interface:

(def count-words
  (proxy [PTransform] []
    (expand [lines]
      (-> lines
          (.apply (ParDo/of
                   (proxy [DoFn] []
                     (processElement [line receiver]
                       (doseq [word (str/split line #" ")]
                         (.output receiver word))))))
          (.apply words (Count/perElement))))))

This is not entirely satisfying: there is still quite a lot of fanfare here to express the most basic data process; yes we're in Clojure, but we're still doing things in a very Java-ish (so far, Beam-ish) way.

Still, if it works, it works, and perhaps that's good enough to call it a day on our interop attempt. Unfortunately3 it doesn't work.

Our biggest problem is the omission of generic type parameters to our DoFn; the type signature for DoFn is <Input, Output> and nowhere are we providing these parameter values.

Beam fails at runtime without them:

Execution error (ClassCastException) at org.apache.beam.sdk.transforms.reflect.DoFnSignatures/parseSignature (DoFnSignatures.java:330).
java.lang.Class cannot be cast to java.lang.reflect.ParameterizedType

Unfortunately Clojure isn't going to help us here; it doesn't give us any mechanism for filling in generic type parameters4.

Why does Beam want this information anyway?

Beam as we've mentioned is a distributed execution platform. It needs to distribute data produced by our functions (these Beam DoFn<> implementations that we provide) across the network to multiple machines.

And to do this Beam must ser-ialize our data, put it on the wire, and de-serialize it again on the receiving end.

This ser/de occurs via a Beam “coder” and which coder to use is inferred by Beam based on the type signature of our DoFn functions.

So for example if we give Beam a DoFn<Order, Long> function that maps Order objects to customer ids (Long), Beam will reflect on these types and lookup in a registry how to encode/decode values of these types.

The need for such indirection is arguable. And having multiple disparate ways—one way via static types, the other via runtime calls to .setCoder—of “telling” Beam the same thing (how to code objects) is (also, arguably) very complex.

Fortunately we can dodge these issues entirely by doing data processing in idiomatic Clojure fashion—that is, by representing our data via ad-hoc maps and standard Clojure data types—by doing so, we can use the same coder everywhere5.

Still. Beam blows up without a generic type signature, so we have to give it something anyway.

Because none of Clojure's compilation facilities have the ability to render class signatures, there is no other way around6 giving Beam the generic parameters it requires; we must use javac.

But we only need to make a single such DoFn class, a dynamic one, and have it dispatch back to Clojure. We'll write it once and use it again and again to build Beam pipelines:

class DynamicDoFn extends DoFn<Object, Object> {
   private final Var fn;

   public DoFn(Var fn) {
      this.fn = fn;
   }

   @Setup public void setup() {
      synchronized (this) {
         Clojure.var("clojure.core", "require")
            .invoke(fn.ns.name);
      }
   }

   @ProcessElement public void processElement(ProcessContext context) {
      ISeq seq = RT.seq(fn.invoke(context.element()));
      while (seq != null && (seq = seq.next()) != null)
         context.output(seq.first());
      }
   }

There we go. Now Beam has its type parameters, namely <Object, Object>—to stay dynamic we've used the most general type that Java allows, Object.

Interestingly Beam is happy just to have a type signature here even though it won't be able to infer a Coder from just an Object type.

Since our Coder will be nippy every time, let's make a helper function we can use to instantiate steps in our Beam pipeline:

(defn xform* [pcoll fn-var]
   (-> (.apply pcoll (ParDo/of (DynamicDoFn. fn-var)))
     (.setCoder (NippyCoder.))))

Now we're ready to build pipelines in Clojure. Our example now looks like this:

// Our simple mapping function.
(defn split-line [line] (str/split line #" "))

// Our data process.
(def count-words
  (proxy [PTransform] []
    (expand [lines]
      (-> lines
          (.apply (xform* #'split-line))
          (.apply (Count/perElement))))))

This is much better compared to the Java alternative.

Our split-line function could not be simpler and is REPL-ready, to boot.

With our nascent little “framework” (right now, just this xform* function) we can build complex Beam pipelines from simple Clojure functions.

To keep this post short, some of the implementation details of our DynamicDoFn will have to speak for themselves.

For one, why do we pass function vars instead of passing a direct function reference instead?

The short answer is that vars are Serializable.

Beam not only needs to distribute our data over the wire but it also needs to encode and decode our functions. DoFn is required to be Serializable.

When a var is deserialized on a Beam worker node, it rebinds to the referent function defined in the corresponding namespace. (There is much more to discuss and more power to enable here, which will be saved for a future post.)

We are just barely scratching the surface of what a Clojure to Beam integration will look like, but hopefully this post is helping pave the way for others contending with Java interop in a land of libraries that leverage type reflection.

Following similar techniques described in this post, a highly simple and dynamic library for Beam can be built up, tapping into Beam's rich model for expressing data pipelines while offering Clojure's most powerful capabilities (ad-hoc maps, destructuring, a first-class REPL experience, inherent dynamism, and so on.) More on this soon.

One last interesting consequence of this effort is that many of the catalog of Java (ParDo-style) transforms that Beam offers in-the-box are more or less obviated when we can compose pipelines in Clojure.

As a demonstrative example, let's assume we have a Beam PCollection of key-value pairs of words and counts and want to run this data process:

In Java, we would be best to tap into Beam's custom transforms:

...
coll
   .apply(Filter.by(
      new SimpleFunction<KV<String, Long>, Boolean>() {
         @Override public Boolean apply(KV<String, Long> kv) {
            return kv.getValue() > 10; }}))
   .apply(KvSwap.create())
   .apply(MapElements.via(
         new SimpleFunction<KV<Long, String>, String>() {
            @Override public Integer apply(KV<Long, String> kv) {
                 return kv.getKey() + ":" + kv.getValue() }}));

In Clojure we can do this without Beam utilities and with arguably more clarity7:

(defn filter* [[k v]] (when (> v 10) [k v]))
(defn swap* [[k v]] [v k])
(defn str* [[k v]] (str k ":" v))
...
(-> coll 
   (.apply (xform* #'filter*)) 
   (.apply (xform* #'swap*)) 
   (.apply (xform* #'str*))

Even by using Beam's built-in facilities in Java (and not using them in Clojure), Clojure turns out to be much simpler. This is one reason why so many people love this language.

More to come!

#clojure


Footnotes

1 These strange PCollection and PTransform abstractions are necessary because unlike typical in-memory functions and collection data structures these are explicitly distributed constructs.

2 https://clojuredocs.org/clojure.core/proxy

3 Or fortunately, depending on how you look at it. Because with a little maneuvering we'll see that we can have our cake (succinct, powerful Clojure code) and eat it too (have it actually work!)

4 Nor do we necessarily need proxy to support generic parameterization; though it could. Clojure's simplicity is part and parcel to its dynamism; we're looking to get out of this static-time type game.

5 For example, nippy would be a respectable choice. It's what Onyx uses.

6 Short of lobbying Alex Miller to bring it to a Clojure version, or writing a byte-code generating, say “super proxy”, function that clones proxy but adds this capability.

7 Our DynamicDoFn won't give us the key/value destructuring you see there, but it's close—and we can get there. More to come in future posts.