October 4, 2018
Distributed, Incremental Dataflow Processing in the Cloud with Reflow
Many data processing tasks involve piecing together many different software packages in increasingly complex configurations. Called workflows or pipelines, these data processing tasks are often managed by workflow management systems that require users to construct explicit dependency graphs between many components; these management systems in turn rely on some underlying cluster computing fabric to distribute load.
In this talk, we’ll present Reflow, a domain specific language and runtime system designed for such data processing tasks. Reflow treats “workflow programming” like any other programming tasks: instead of declaring explicit dependency graphs, Reflow users use functional programming to directly specify their data pipelines.
Reflow presents a vertically integrated environment for data processing, comprising: a strong data model based on dataflow semantics; built-in cluster management and job scheduling; fully incremental computing; and a module system that promotes modern software engineering practices like testing and modularity.
We’ll talk about how Reflow’s approach allowed us to implement a powerful data processing system with comparatively little code and few dependencies. Particularly we’ll discuss how the co-design of Reflow’s language, runtime, and cluster management systems resulted in a simple and comprehensible systems design. We’ll then discuss how Reflow supports our large-scale data processing environment.
Marius Eriksen is a principal software engineer at GRAIL, working primarily on large-scale data processing infrastructure. Prior to GRAIL, Marius was a principal engineer at Twitter, where he was the primary author of Finagle, a library similar in spirit to our own Async library that is at the heart of many of Twitter's services. He's a long-time devotee of functional programming, having written a geospatial database in Haskell for a company called Mixer Labs which was subsequently acquired by Twitter, and since then having worked extensively in Scala, a mixed functional/oo language based on the JVM. More recently, his work on Strato (Twitter) and Reflow (GRAIL) has brought ideas in functional and incremental programming to new domains. Marius is also an old-time OpenBSD kernel hacker, among other things having ported VMWare kernel support to OpenBSD.
Thanks for coming out tonight. As Ron mentioned, I’m currently at a company called the GRAIL. Part of what we’re trying to do is develop effectively an assay that allows to detect cancer at an early stage. This is a surprisingly computational intensive problem and because of that we needed to invest fairly heavily in computational infrastructure as well. One of the sort of ubiquitous components of data infrastructure work, especially in bioinformatics, but also in many other domains, is this notion of data workflow systems. So, let’s dig in a little bit about what I mean by that so we’re all on the same page.
These systems are often similar to the kinds of systems we use for software builds, but they have slightly different constraints and dynamics. For example, most of the work, most of the jobs that we run in data processing systems can take days or hours, not minutes, stuff that you might expect from a software build system. The volume of data is often much, much higher. We can go with terabytes and maybe even petabytes of data, not gigabytes or megabytes that you might have in a software build system. Also, data workflow systems tend to require dynamism, they tend to be able to have to make decisions about data processing based on the actual input data or the output of certain stages whereas build systems tend to be much more static.
Another distinction that I think is important to make is that, when I’m talking about these data workflow systems, we’re talking about file-grained systems. These are systems where you’re piecing together external tools that do some part of a computation and in effect will form a dependency graph, and not these kind of file-grained data processing systems that could do for Spark and others like that. These sort of systems are ubiquitous in so-called ETL workloads and are very, very common in the kind of bioinformatic workloads that we do at GRAIL and independently as well.
If we survey the existing landscape of workflow systems, we find that there’s a lot of them. There seems to be sort of startup project for most bioinformatics startups to create our own certain workflow systems, but when you sort of inspect that more deeply, you find that most of the existing systems out there are fairly thin frontends, that perhaps exposed in sort of embedded ESL and Python. Or maybe a way of declaratively specifying build pipelines and something like YAML and then they tend to target backend systems like Kubernetes, or AWS batch, or some combinations. Usually, there’s not a lot of coherence in these systems in the sense that they’re all very, very different, though the commonality they tend to have is that you have to sort of specify manual dependency graphs and it tends to be fairly kludgy to build.
We made a few observations about the state of the art. The first is that really most of the existing systems out there were entirely too mechanical. What I mean by that is that, in order to specify a build pipeline, you would often have to explicitly piece together a dependency graph to arrive at the result that you desire. Also, these systems lack the data model. They were effectively just a way of coordinating execution dependencies between tasks, but the actual inputs and outputs of those tasks were invisible to the actual workflow processing systems themselves. As we’ll see a little later, there is a very, very important distinction and one that gets us a lot of leverage down the road if we take a different approach.
Finally, we made the observation that workflows are actually really just programs. The fact that we’re creating these dependency graphs is a little bit like programming an assembly language. It’s not the graph itself… the graph itself rather isn’t the artifact that we’re interested in, it’s really just an incidental implementation detail. And what we really want to do is we just want to program a data processing task that we wish to perform. So, we decided to sort of consider this and start over again and see what we could arrive at.
What is Reflow? Well, Reflow is a functional language. It has static typed, mostly inferred. It has a modular system, so we can apply modern sort of software engineering practices to the work that we do. Of course, it has to be able to compose external tools. This is really the reason for its very accessible, right? The language allows you to seamlessly compose external tools that you specify. Reflow does impose a data model, so it requires the individual tasks within the workflows, within the program, so it’s referentially transparent. That is an important point down the road that we’ll get into later.
Reflow can also… or rather is purely incremental in the sense that, when you ask Reflow to compute something, it will always compute the smallest set of computations that it has to do in order to reconcile what it is that you want to compute and what has been computed before. And that’s really, really important also for these kinds of workloads and a direct effect of defining and specifying a strict data model.
Reflow is also implicitly parallel. It can parallelize any task that can be parallelized. It does so through data flow semantics, and we’ll get to that later as well. And then, finally, one thing that we really, really wanted to make sure, or one property that we found to be really, really important is that Reflow sort of just work. It should be a tool where you can write a program and you can simply run it, and whatever resources are required to run that program should be dynamically provisioned by Reflow. Reflow actually implements its own cluster computing manager for that reason. That might seem a little strange, but actually you can get away with a very, very simple solution we shall also get into in a little bit.
This is a modern part, right? You can’t get away without showing some sort of hello, world. Now my variance on this today is sort of a hello, bioinformatics world. I would say that one of the sort of most basic things that we do in bioinformatics is something called sequence alignment. Now from a high level view, sequence alignment is really taking a couple of files, processing it often for a long period of time and then returning some result. So, this is how you would do this in Reflow.
As you can see, this looks like an ordinary program in many ways. It has the kind of normal affordances: we have value bindings, and defining a function, we are creating file references. These are the kinds of things you would see in ordinary programming. This is effectively a new philosophy. Now Reflow’s sort of magic ingredient is this notion of being able to work [inaudible 7:29], which is what you’re seeing this in this exec expression. And we’ll get into that a lot later, but the thing to note for now is that the exec expression integrates seamlessly with the rest of the environments. I’m referring here to things that are inside my lexical scope and I’m programming just like I might in an ordinary language.
Let’s get a little bit into why we decided to go, take this approach rather. Again, I think, I’ll probably stress this three or four more times in this talk, if there’s one thing that I want you to take away, it’s that workflows are really just programs, we shouldn’t really think of them as dependency graphs or anything else. We really are just programming some data processing tasks. That was sort of one of the principal motivations for why we decided to pursue Reflow.
Another really important thing is that we wanted to impose a strong data model that gives us a lot of leverage down the road. And we’ll also get into the implications of that in just a little bit. Another really, really important thing to us was that we wanted to make Reflow very, very simple to use, both in terms of using it as a tool to program these data processing task, but also in terms of operations. In 2018, we have effectively API’s to our data centers and it seems like we should be able to make use of those to effectively capsulate infrastructure concerns in this runtime. Right? Reflow’s runtime sort of expands to the data center, if you will. And we’ll also get into it a little bit later.
Then, finally, another observation that we surveyed down the road is that data really deserves an API. A lot of what we do with data processing, in a sort of traditional sense, is on a very ad hoc basis. Maybe you’re processing a bunch of files and you put them over there, and you have to keep track of where those outputs are, and perhaps use those inputs somewhere else, and so on and so forth. Reflow allows us to effectively put an API on that data because it’s their model and because of a few other things that we’ll get to as well. So, I think these are sort of the primary motivations for why we decided to create yet another system that I think is different in many important ways from the existing systems out there.
What were our goals? First, again, we want to make sure that data engineering tasks could make use of effectively tools of a modern software engineer. We want to be able to write modules that are well-encapsulated. We want to test them. We want to be able to reuse them across tasks, things like this. Again, we want to make sure that cluster computing in Reflow is a seamless process. Reflow really should just work. You should be able to run Reflow and it should do its thing and you should get the results. Just as you would run, say, a Python interpreter, a Python script, really there should be no difference.
Another important goal for us was to have a high degree of safety. Reflow has primarily achieved through type safety. Especially in these long running tasks, it’s a real big bummer just to discover that you have a typo five hours into a data processing task. So the static type safety sort of takes on a somewhat elevated importance because of this. We also wanted to make sure that Reflow exposes a very strong data model, that permits us to perform [inaudible 11:04] computation. And, of course, we’ll get into that later as well. Also, we want to make sure that Reflow had a minimal dependency footprint. So that we could easily import it from different cloud providers as an example. We keep things really, really simple.
Even with those goals in mind, why do you have to build a new system? Why do you always have to build new things? Well, in this case, I think that these properties are really quite fundamental, you can’t really bolt them on to an existing system. They are fundamental assumptions that are made in the data model, in the runtime, and [inaudible 11:37] computing system, that really you just can’t bolt on to existing systems.
Another observation that I’d like to make, and I hope to impress upon you today, is that by being able to co-design the language in the runtime, it really can make some of those much, much simpler than the sum of its parts. Because we’re able to kind of tackle this problem from a sort of more holistic point of view, or a very integrated point of view, I should say, we can build a system that has just much, much reduced systems complexity. Finally, again kind of harping on this point again, but being able to provide effectively what our APIs [inaudible 12:14] data gives us a lot of leverage down the road. And that’s actually surprised us in many ways.
Before we go any further, I want to give you, almost a silly demo, but one that gives you a bit of a feel for what it is like to use Reflow packet because it’s sort of hard to… It’s a little bit hard to put it into words, I guess, the experience of using a system like this. This is a Reflow program that’s perhaps the simplest program that you could write. Just my screen, I could make it bigger. All this is doing is computing the string hello world in the most convoluted and most expensive way possible, which is inside of Ubuntu docker image. What this is saying, this is the saying, well, execute this command echo hello world, place the output of that echo hello world, hello world, and out. What is out? Out is the declare output file of this exec.
For this task, we’re going to use the Ubuntu operating system. So, this is just a docker image, if you’re familiar with this. Ubuntu is sort of globally meaningful name in docker. I’m going to reserve a gigabyte of RAM and a single CPU. Okay. Clearly a bonkers way to say hello world, but nevertheless we’re going to do that. What happens when I run this in Reflow? Well, what we’ll see happen is that… Actually, this was cached because I have done this before. The command is right here. Right there. What happens if I run it now? Well, it’s decided I need to compute it, so it now goes out on the ec2 spot market and it’s now made a spot bid for ec5.large instance type. It’s currently awaiting fulfillment for that. Again, I told you, don’t ever compute hello world this way, but this is a special Jane Street special. Now it’s giving this instance, we have an instance now. Now it’s waiting for the system to boot, become available. Any second now we’ll be able to run this very important command on that system.
But again, one thing I want you to sort of take away is that all I did to do this was to run Reflow run in Reflow structure. All I have in my environments are connections, nothing else. Reflow takes care of the rest. It’s provisioned in this infrastructure, asking you to run this command. When it finally goes through, okay, it’s fine already. It should be here. There we go. And the results in Reflow, files are represented by the digest of the confidence of those files. This is just an abbreviation of that. So, we can now actually go and see what the outcome of that is. Okay. So there we go. That’s Reflow. All right.
I hope this is going to… This is probably the last time that I’ll tell you guys this, workflows as ordinary programs. Let me go into a little bit more detail about that right now. What I mean by that is that we really want the affordances of ordinary programming, at least if you consider ordinary programming functioning for them. We want to have abstraction [inaudible 15:58] abstractions. We want to be able define functions and modules and things like this. We also want to have static type checking for the reasons that I explained earlier. Reflow has a very simple locally inferred structural type system. This sort of gives you typing without too much typing; that’s nice. We, of course, want to be able to seamlessly integrate all the concept of third-party tools that you might want to use in the computation task. We want to kind of hit the right power to complexity balance. And we want to make sure that Reflow is so powerful enough to express most of the things that you might want to do in workflow programming. But maybe it shouldn’t be Turing complete, right? We want to make sure that we’re sort being fairly principled about what the sweet spot for Reflow is.
Another thing that’s really, really important to us, and particularly because we started out as a team of one, which is myself, is I really wanted to focus on simplicity and really make simplicity one of the kind of primary side goals of Reflow itself. When I say simplicity, I mean not just the language and views and things like this, but on the implementation. And, finally, we wanted to minimize the set of external dependencies that we depend on so that Reflow remains sort of flexible and portable so we can put them in different models, different providers and different backend initiatives.
Reflow also has blocks. This allows you to group together a set of bindings that are local to that block and whose lexical scope is limited to that block. Blocks always end in some expression, which is the result of that block. Again very familiar from other modern languages. Reflow conditionals are also expressions, and so we always have to have both branches. In this particular case, it’s taking some lists and inspecting the size of that list. If it’s one, then I’m just returning it; if it’s not one, then I’m merging all the elements and some merged value.
Now what I’ve shown you so far is not unlike other modern programming languages that you might encounter today. We sort of definitely borrowed a lot from what might consider modern programming language practices. Now where Reflow is special and, the only thing that distinguishes it from all the languages is this notion of an exec. This is how Reflow invokes external tools. The way exec expressions are constructed is that they may return a tuple of values. Those values may have types either file or directories, and so the command that’s run inside of these execs can populate those files and directories.
Again, things inside of execs can interpolate directly from its lexical environment. In this particular case, I might have some file that’s just declared in my environment and I can use that file directly in the exec. But in this particular case, I’m placing, I’m just copying this decompress file effectively to this output. When this is actually run by the runtime, this effectively materializes to what is an effective random path. We can’t rely on that path being particularly name or shape. The same is true of the output. So this gives is now a way to abstract away these identifiers and file names and things like this from the program.
Execs can also have multiple outputs as well as inputs, of course. In this particular case, I might have a decompression program and perhaps… that’s actually bad example, it should probably say “compress,” because you can specify level for decompression. But anyhow, level is an integer and I could pass that in like it’s interpolated as an integer. In this particular case, I’m outputting both an output directory as well as a log file. The type of this expression is this tuple, the directory and the file. All right.
Reflow has a number of affordances to sort of allow for brief, for more succinct code, but I think in a way that doesn’t compromise clarity. Generally speaking, things like where you have some sort of label argument, like a record or an exec or a modular instantiation, or get into even function invocation. You can omit the label as the label is the same as the identifier doesn’t mark. For example, constructing the directory of ABC is equivalent to constructing this record. And the same is true of this structure. This is again borrowed from a lot of other modern programming languages.
Another really important task that we often need to perform in data processing workflows is to be able to operate over lists or collections of things. The primary way to do this in Reflow is through comprehensions. And this is borrowed directly from Erlang, Python, Haskell, right. There’s all these same kinds of features. In this particular case, I might have a function that aligns a pair of input files. It doesn’t really matter what this does, maybe I have a list of samples, the sample has a name and a set of files that are the sequence files for that sample. Maybe also a list of inputs that are the sequence files. What this comprehension computes effectively is mapping this alignment function over those inputs. But the nice thing about comprehensions is that they allow you to kind of mix the structuring with iteration. This is very nice and succinct way to express a modern computational tasks over collections of things and make maps, list, or whatever else.
In this second case, I’m taking all the samples, taking out the files and then all the files, in this case, it’s a list from here, and then for all the files in that list aligning them in turn, and so it gives you a way to kind of unwrap multiple layers of collections. You can also intersperse filters in comprehension as well. Precisely what this does, except it filters out names that are named. You know, bad example. I mean, it filters any arbitrary expression, it can be something much more complicated.
That’s really it for the core language. Those are the basic affordances, the basic constructs that you get to construct these [inaudible 24:31]. Now, as I mentioned, Reflow has a module system and the module system is also very reasoned. Basically, every file in Reflow has its own module. Each file or each module can declare a set of parameters and those parameters are required when the module is instantiated. I take it back a little bit. If there is no default value provided for the parameter, it’s required; otherwise the default may be used instead.
In this particular case, we have a very simple module that just catenates some greeting. That’s specified as a parameter with a string that’s passed in as an argument to function. So, this module exports function hello. I should also note that we take a hint from Go and identifiers that begin with a capital letter are explored from a module, identifiers that do not begin with capital letter are in different module. It’s just a way of controlling this coding module. So, we have this hello module and then we have a main module in this case. It instantiates the hello module with a greeting that’s again a parameter of the main module, the [inaudible 00:25:49]. And then, finally, pause the hello function on the hello module.
The other interesting thing that’s illustrated by this example is that, if I have an identifier capital Main, Reflow run can take this module and run it and it will evaluate whatever expression capital Main evaluates to. The other interesting thing to note here is that, when I run a module from the command line, I can supply module parameters through the command line by just regular means of flags. This gives us a way to kind of have a fairly seamless integration between invoking and instantiating and running modules from the command line or other systems as well as being able to instantiate them from other modules to form a module parameter.
Another important thing that is provided by modules is that they allow you to document your code. Again, as in Go, comments that are adjacent to identifiers are taken to the documentation for those identifiers. So if I document my code in this way, I can run Reflow doc on this module and it shows me all the export identifiers and all the parameters as well as this documentation. Another thing to note here is that Reflow doc also shows the fill in for type. You can see here, for example, this greet function returns a value of text string. And that’s not something that I had to type in, those are inferred for me.
This is really where the rubber hits the road in terms of giving data an API, and we’ll get into more implications in this later, but I want to sort of show you an example upfront. As an example, you might have some module that computes something on that, as an example. In our case, case and sample that it could be always [inaudible 00:28:02]. And maybe I have a different module that knows how to compute some inputs for a particular sample, maybe it knows where those inputs live and has three, or maybe it looks something up in the database. And the only thing this module does is it aligns the data for the sample and then it also indexes this aligned data. But… Yeah?
Yes, correct. I’m sorry, yeah, there should be input in R1 and input in R2. Good catch. Right.
So, what you have in effect here is a set of identifiers that are associated with the module instantiation that actually describe this data. What makes this really, really powerful, as we’ll see later on, is the valuation semantics that Reflow has that allows you to treat this as you call on things, the expression names the data and vice versa.
Again, I want to kind of emphasize the fact that modules provide typing, documentation, compositionality, I can instantiate modules from other modules, I can introspect them. Also again, and we’ll get into a little bit later on, the technical details of how this works and why it is the case, the values that are exported by module are also names in the sense that they really do provide a stable identifier of a particular result. This is due to the referential transparency that I talked about earlier.
Let’s talk a little bit about how Reflow actually accomplishes this primarily through the means of evaluation, how it evaluates programs. There’s really three properties of evaluation in Reflow that together lead to purely incremental evaluation semantics. The first thing is that evaluation is lazy. Meaning that all the expressions are evaluated based on whether or not they’re needed, not based on how they’re sequenced as an example. Also evaluation follows data flow. Meaning that expressions that do not have data dependencies between them can be evaluated independently, it can be evaluated in parallel. And, finally, because we have referential transparency, we can memorize all reductions in evaluation and reuse them. Together these lead to incremental evaluation semantics, which we’ll get to in a minute.
Let’s give a few examples just to know this more clearly. In this particular example, we see that we have reconstructing a tuple of a string and a file. The file in this case is expensive to compute. It’s the ridiculously expensive hello world that I showed you earlier effectively. The string is obviously just a constant that specifying directly is very cheap to compute. So, I can take this tuple and I can do things with it, I can restructure it and take out the string and return the string.
In this case, the actual exec was never actually needed, even though by sort of inspecting this code, if you expect normals or sequential execution semantics, you would reason that this would have to be executed. But it’s actually not because of laziness. This allows for a great deal of… It enhances modularity in many different ways because it doesn’t really matter how expensive it is to compute a value. I can pass it around regardless and it’s only actually computed if it’s needed.
One perhaps clearer way to demonstrate this property is in this way. Maybe in this case I have some sort of index that I’m using for again for my alignment task and computing them might be very, very expensive. Maybe I have some dynamic behavior, so maybe I use the index only if the length of the file that I’m aligning is above a certain threshold; otherwise I can do it without an index. Now as a user I can just again can align with my index and the dynamic behavior of this alignment threshold determines whether or not an index is actually needed or not.
So, this sort of gives you I think a flavor of how laziness really enhances compositionality because it allows you to not have to reason about the implications of not computing the values in the same way that you would otherwise do. And I would say actually that laziness I think has been overplayed in sort of regular programming, but I think in workflow programming it’s a really, really nice property. Because the ratio on expensive tasks and cheap tasks is so dramatic that it actually becomes a really, really nice thing to be able to rely on.
All right. Let’s move onto dataflow evaluation. What dataflow evaluation allows us to do is effectively again express these computations, and so, in this case, I’m aligning all the repairs from a list of pairs. And as we can see, there are no data dependencies between the individual alignments here. Reflow can take advantage of that and run all of them in parallel. In fact, if you run this in Reflow, they will all be done in parallel because there exists no dependencies between, there’s no implied sequencing in the language.
The same holds of course outside of branches as well. There’s not a special feature comprehension that’s core to how evaluations perform. So, in this case, I might export both the aligned data in this case, which would be computed in parallel, but that would also be computed in parallel with this expression, which also doesn’t… Different computation in the sense that it’s in pairs. Again wherever there’s an induction or an expression that do ot have that data dependencies between them, they will be computer in parallel. In fact, users can’t even express parallelism. There’s no mechanism in the language for doing so. And that’s kind of core.
Finally, let’s talk a little bit about the implications of referential transparency and memoization. First of all, referential transparency allows us to effectively treat the result of an expression the same as the expression itself. For example, if I were to do this, which admittedly would be silly, Reflow would compute that only once, because it’s able to canonicalize these expressions underneath the code. We don’t need to actually do it twice. In this case, I might compute this and then if I later go on and compute this, which would make use of the same index, you see index is interpolated here in the exec, the index will only compute it once because it was memoized in the previous one. So, this is obviously core to incremental evaluation.
It’s really, really important actually for a lot of the kind of tasks that are typical in these data processes in workflows. Common things like building indices, or building models of references, it’s very commonplace to do things like compute them in one stage and then use them in another stage. With Reflow, you don’t really need to make that distinction because memoization is automatic, it’s already cached for you. You can simply refer to the computation that computes the index model or reference or what have you and if it’s already computed, that’s already computed. You don’t need to worry about it, right?
The really nice thing about that, of course, is that this allows Reflow to track all the dependencies. So, if any of the input data changes and the index does need to be recomputed, it will. This is really important to how Reflow makes incremental evaluation possible. Let’s make this a little bit more concrete also. Here’s a very common type of workflow that you would encounter in lots of data processing tasks. Maybe I have a sample and I want to extract a bunch of features from that sample, and then give it a bunch of features, one for each sample, I mean I’m trying to train a model and then give it a model and a different set of samples, I want to evaluate the model.
I defined three number functions of those things, then I have a training set, and the test set that are hopefully destruct. I can build my model by using the comprehension, in this case, to extract all the features and then training the model based on those. And then I can evaluate the model directly using my testing samples. So, it can be considered as a very short example.
Let’s consider what happens when we add, for example, an example to our training set. Well, in this case, Reflow would have to recompute the feature extraction just for that additional sample, then of course it has to recompute the model. The same is true of evaluation. If I changed, if I deleted an example from my testing set, Reflow will only have to recompute the evaluation. This is of course expensive code as well. So, if the code for, say, training changes, Reflow can reuse all of the extracted feature files, it has to rerun the training and also the evaluation. So, Reflow keeps track of all these dependencies for you and implements incremental evaluation.
Let’s have a slightly more exciting demo for you guys. We’re going to create a dog montage, highly parallel doc computing. All right. Let’s say, we’re going to have a module that does a very simple thing, very silly thing perhaps. It takes a bunch of input files, which are huge jpeg’s of dogs, I’m going to find it in a minute, because of certain computation. What I wanted to do is I want to resize each of them to sort of thumbnail size and then make a dog montage. I’m a software engineer and I want modularize my workflow, and so I created a dogs module. My dogs module, all it does is just export a list of dogs that are dog thumbnails. The way that’s implemented is the module takes a parameter for where the sort of original size dogs are located and using ImageMagick in a single CPU in 500 megabytes of memory. I’m resizing them down to 50 by 50 pixel jpeg image. Again very, very simple. So, that’s what my dogs module does.
Now, of course, we want to build a montage and so what this montage does is define a function that can create a montage from the set of images again, again just using ImageMagick, and there’s all sort of way of making montages and what have you. I’m now instantiating my dog’s module and then I’m creating a montage of my dogs. If I run this now, what you’ll notice, and we’ll go through the rigmarole as it did before, and it’ll again have to actually create a cluster.
Reflow actually does keep instances alive for some time. So, if you reuse them, say, within five or 10 minutes, it will generally be there. But I’ve talked for more than that amount of time since the last demo, so it’s perfect to create a new cluster. Now what you’ll see is it’s actually creating a much bigger cluster for us because I have hundreds of dog images and now Reflow will parallelize that for us. Again highly parallel doc computing. You can see now, I guess, it’s instantiated five different instances. There are different states of readiness. It’s doing it all in parallel for us. I guess it’s waiting to log 120 resizes.
Again, I want to emphasize that what Reflow is doing here, I’m not giving it anything other than my AWS credentials, and what it’s doing is that it’s going out on the spot market trying to determine what the best bid price to making those bids. You can see that it’s trying to probe for EC2 capacity in order to try to shape what those bids actually look like. That finally when it actually gets to it, now you see that it’s actually running all these resizes. The bulk of the time here, of course is just taken to provisioning the infrastructure. If I run this again, there will be more or less instant because that cluster is already there.
So, now you can see that it’s completed any of my dogs resizes. Once it finishes all of them, now you can see that it’s busy making the montage. And that was it. The montage output is this. Let’s see what it… I tried to arrive to it. All right. Let’s see what we have here. And I’ll show you my screen. All right, there we go. Dogs.
Of course, I could now go and change, if I now go back and change the way the montage is done, like for example remove border and run it again, it should be much, much faster because it’s already done all the resizing, it should now be able to, and also has a cluster, so it can go out and create new instances, you can see that these complete quite quickly. It’s just now transferring all the images that were already computed and it could compute a new montage for us.
I don’t know if this will actually be any different, but as you can see I’m just now modifying the workflow. It took much, much shorter amount of time because it was able to reuse most of the computation that was done in the previous step. I guess this looks like to be in a slightly different order, I guess. Anyway, there you go, two dog montages computed by your cloud data processing system in a highly parallel fashion.
All right. Let’s talk a little bit about Reflow’s implementation. Because I think it’s important to understand how the code design of the Reflow language and the runtime allows us to keep things very, very simple. It seems like Reflow’s doing a lot but it’s able to do so in a highly leveraged way that keeps the actual system very, very simple. Again, it’s really, really important for us that we built the system whose semantics and data model really accommodated for a very, very simple implementation. We’re close to Bell Labs here, it’s very much sort of in the spirit of Worse is Better.
One of the ways in which we did this was to make sure that our API surfaces to external components is very, very small. So, we consciously eschewed complicated features or sophisticated features of systems like it’ll be as DynamoDB, and we kept things very, very simple. Again both in terms of just keeping the system simple, but also to make it much more portable than it might otherwise be. Reflow itself is built around a very small set of core abstractions that are very, very flexible and now you’re using in different ways. Again this is all in the service of trying to minimize the overall systems complexity of Reflow and keep things very, very simple.
Now this is a very big picture, sort of overview of the various components within Reflow. It kind of highlights the important components. First of all, the actual evaluator, the kind of command and control center, is running whenever you’re running the Reflow binary place. If Reflow run on my laptop, the evaluator itself was running on my laptop. Now when Reflow does need to invoke external binaries, it will go and provision cloud infrastructure in order to do so, in order to do so in a highly parallel fashion.
The way Reflow manages this is through an idea called an alloc. An alloc is short for a resource allocation. Each alloc has a docker container attached to it, as well as a repository of files. Repositories and files in Reflow are all named by Hash. So, the API to repository is basically the system of the contents of that or puts this content with the [inaudible 00:46:06]. The important thing is that the things encapsulated by an alloc, the set of containers running inside the alloc as well as the repository live and die with the alloc. So, if alloc disappears so do the resources that were instantiated about that alloc.
When Reflow needs more additional cluster resources it asks its cluster manager to create a new alloc, which has an API that allows to run these containers and also to transfer data between them. For example, in the montage example, it might have instantiated five or 10 different allocs in order to do parallel processing, and now I could transfer all the results of those into a single alloc to compose the montage. That’s done directly by instructing any other repositories in this alloc to transfer a set of files to the repository in that alloc.
Other important implementations of repositories are some external durable ones. In the case of AWS, we use s3 to store the cache and that actually is done in exactly the same interface. It allows us to again kind of seamlessly transfer data between these durable external repositories, as well as the ones that were inside these allocs. Finally, there’s just really one more component which ties everything together, which is this notion of an association table. This effectively stores a source key, a type, and destination key. This allows us to track, say, which values are associated with which execs after they’re being cached.
Reflow itself actually has two different evaluators. It evaluates AST simply by walking directly into a flow graph, and then the flow graph is evaluated separately. This allows us to separate the concerns of data flow in concurrency from the semantics and bookkeeping of the kind of service language. The flow graph really just has two kinds of nodes. One is these exec nodes that encode these run-to-completion executions. And it could be internal, which is to internalize something some data, for example, this might be stored in s3. The docker exec runs a command inside of the docker container with dependencies on, I noticed that might have internalized rather than externalized. And then, finally, extern externalizes data from Reflow and allows us to copy them out to other systems like [inaudible 00:48:44].
The mechanism that intermediates these two, or mediates actually, I should say, is this notion of the continuation nodes. A continuation node in the flow graph basically allows the flow graph evaluator to call back into AST evaluator to produce a new subgraph. I’ll give you an example on how that works in a second.
The way incremental evaluation is performed in Reflow is that, first, the flow graph is evaluated top-down until you get a cache and then bottom up from there. I think this would be a lot clearer in an example that I have coming up. Here’s how Reflow will evaluate a small portion of the previous computation. If you remember, the first step of our dog computation problem was we had this list or directory of dogs and we want to resize each of them. So, when the AST evaluator evaluates this, it will evaluate it into a flow graph that looks roughly like this, to internalize this list of dogs and then it needs to call back into the AST evaluator effective continuation node in order to process that list of dogs.
So flow evaluator now starts by looking at the root node and computing its cache key and, if there’s a cache miss then it continues down. Same here. And finally, we don’t have a cache yet for the intern, it has to be computed. Once this is computed, call back into the continuation node, which now again will replace a subgraph with the next step, which in this case is to resize each of those images.
So now the flow graph looks something like this. Again, we have a set of executions that can now be done in parallel, and then a continuation node that needs to be evaluated when those are done. Again, Reflow goes and does top-down evaluation. So it interrogates the continuation of computes a cache key port, locks it up and cache. In this case, we didn’t have a cache yet and so it has to go further down.
So, in this case, maybe we had three caches, so we already had resized versions of three of our dog images. The two of them were still to be done, right? In this case, we have now to actually run those and then go up again to you continue, and then to root which now replaces the subgraph with the montage. So, now we have the results from all the individual resizes, and again it does the same thing again. And finally, executing the montage and then we’re done. So, that’s a very, very simple example of how the sort of top-down then bottom-up evaluation semantics interact, and how that achieves both concurrency as well as incremental evaluation through its cache look up mechanisms.
All right. I want to talk about one final thing before I go, which is somewhat more speculative. It’s a kind of recent area of work that we’re looking into, kind of again really I think giving, exploring this notion of using Reflow modules as data APIs. Again, what referential transparency gives us is effectively that an expression can be construed as a stable name for some value. That’s what referential transparency gives us because it allows us to substitute that expression for the value of that expression.
Module, of course, contains both of those things. It allows us to now compute a key that is stable again for the value of that expression; and also compute the expression itself because it is just the AST. If we sort of take this a step further, we can think about putting those keys into a namespace, which is what we call a dataspace.
But before I get there, there’s one more piece of infrastructure that Reflow provides that’s very useful in this case. Everything that I’ve showed you has been sort of interactively running these concrete scripts that live on disk and Reflow run dog montage [inaudible 00:53:13]. Of course, those are not necessarily staple artifacts. If I refer to, for example, the Ubuntu image, that image could change over time. And so, if I run it today and then tomorrow if that Ubuntu image has changed, we have to recompute those results.
What bundles allow us to do is effectively freeze a module. What I mean by that is fairly fundamental in the sense that when I freeze a module, I’m packaging that higher dependency graph of modules into a single binary artifact. I’m also producing all the external dependencies, and so resolving things docker images and so on. Bundle modules are also just first class modules. I can run them, I can document them, I can instantiate them to other modules. So, this now gives us a name that is truly stable, stable for us to use. This now refers to a computation for eternity instead of for whenever the world changes from underneath.
What are dataspaces? Well, dataspaces is really just a mapping of symbolic names to bundles. Let’s say, in this case, I have a bundled Reflow module, that maybe that’s some whole genome sequencing stuff, and I could mount that to some portion of the namespace. Now this name is meaningful just like any other module name. Again, I can dock it, I can run it, I can introspect it, things like this.
I make a claim now that these are really the data APIs that we’ve been looking for. They are well typed, we have modularity, we have documentation, stable names. Now a really interesting thing about this is that we can actually really heavily lean on the type system. We can install a policy in the namespace saying, “Well, if I overwrite a name, if I update a name, the updated module has to be a subtype of the module that’s already there.” This will help us guarantee that we can’t break any consumer’s log module. So, it’s really provides some of the interesting properties like that as you start thinking about data having an API. Of course, everything is still fully incremental. I can instantiate these modules and compute their values and the totality of the evaluation is still fully incremental.
I encourage you to use Reflow. It’s available open source on GitHub. Of course, we use it very heavily at GRAIL. It’s what does all of our compute work effectively. Some other organizations like the Tranzact [inaudible 00:55:42] Institute are using it, as well as other few small biotech startups. Again, Reflow really tries to be a sort of complete integrated solution. But really you just have to bring your credentials and it just works, at least if I’ve done my job.
With that, I’m happy to take any questions and I’ll also be around later. We can create sort of parallel discussion. Yeah.
Earlier you said that Reflow always does the smallest amount of work. So how do you define smallest [inaudible 00:56:32]?
Okay. So the question was I made this claim that Reflow always computes the smallest set of work that has to do in order to compute the desired output. The observation was, well, how do you define that given that you might have more work to do than you have available resources? So, there’s internal queuing, right? So what I really should be clear about is that it’s the total amount of work that’s done. Of course, there may be limits on the amount of parallelism that you can exploit in the underlying infrastructure and Reflow will, of course, queue things if it can’t get more parallels.
But it doesn’t decide dynamically between those object size, file size, or anything like that?
No… Yeah. I actually have some interesting observation though, because it’s something that we have been thinking about in the sense that if you want to minimize intern latency you probably want to prioritize work that will unlock more work, and things like that. Though we haven’t really done any work in that realm yet.
Do you have branching?
All right. The question was do we have branching? No. We have external in there [inaudible 00:57:49]. There’s… Where was it? It definitely has external… Where was it? Right there. There we go. There’s an external for you. Yeah, we’ve got branches. There we go.
Thank you for that presentation, very interesting work. You mentioned that you use as much work as possible, but what if one of the underlying processes change their output without the sort of change like… there’s a bunch of version updates and suddenly you have a new version of whatever application?
Okay. The question was what happens if the underlying image, for example, changes. Well, Reflow keeps track of that so we can resolve an image being into it’s stable digest that has affected the digest of the contents of that image. And that is what we use in the cache key and so, if it does change, it will recompute it right. That’s one of the principal advantages of using bundles for sort of productions that you can truly freeze the computation of that. Yeah.
[Inaudible 00:58:48] when you under specified the resources that are required for a particular task?
Okay, great question. The question was what happens if you under specified the resources that are required for a task. Reflow is happy to over subscribe. That typically works pretty well for CPU, sometimes for disk, the Achilles heel is memory where you can… right? Reflow does try to effectively classify boom errors properly so that it rerun those tasks, perhaps increase in the number of times dynamically. Over time I would like to move towards model where the resource requirements are treated as hints not as requirements and instead use much more dynamic profiling data in order to make smarter decisions about how things are packed and how things are run. But I guess the real answer is that panels are just fine, it’s just that you might do unnecessary work.
We’ve done a lot of work on parallel incremental computation systems and one thing we run into a lot is the difficulty of applying efficient algorithms, we’re really kind of saturating the system. I think maybe one difference is we’re thinking about big graphs with large nodes. I guess is maybe the issue that we feel has very small graphs [inaudible 01:00:14].
Yeah. It’s actually a great question. I was actually working on something relating to this thing this afternoon. The question was, well, at Jane Street we’ve done a lot of work, sort of making increments computation on a large graphs very efficient. And, yes, so it’s two answers. One is that, generally speaking, the graphs in Reflow are probably comparatively smaller. That being said, there’s a couple of bioinformatics at GRAIL that always know how to create huge graphs using Reflow. And so we have had to make it more sophisticated overtime, so in the first iteration of Reflow, it was the dumbest most naive evaluator possible and I used to reverse the graph every time there’s a change in order to understand which nodes needed the computing, which nodes are ready.
Now it does that entirely incremental and keep effectively every node, it keeps a list of nodes that are used during and when it’s completed and have a reference counters and things like that. And so the cost of evaluation is proportional to the amount of parallelism in the graph, not the graph size. But again probably even so we’re still talking about comparatively smaller graph [inaudible 01:01:29]. Yeah.
[Inaudible 01:01:30] of the docker image in question but would it be able to function in itself [inaudible 01:01:36]?
Yeah. So, the question was what if you have, say, a function that has a branch and condition that is conditioned on some dynamic thing of your environment like timeline or random generator. So, the answer to that is that’s actually possible for Reflow to express that. This is part of how we under parallel the language a little bit more to make sure that is always correct. So, yeah, it can do that is the answer. Yes.
There’s no way in Reflow to get the contents of a file as a first class value in the evaluative language. So files are picked to…
Well, yes, correct. This is a part of imposing a data model, right? So, the underlying assumption is that, given precisely the same inputs, we fix the environments and we fix the inputs. The computation task has to produce semantically the same output. So we do have… You can’t produce different… Sometimes it’s fine as long as it’s not semantic. So that is something that we, yeah, definitely not impose on the user. In fact, we actually not allowed to [inaudible 01:03:12]. Yeah.
Have you considered or have any interest in [inaudible 01:03:21] attached with the clouds [inaudible 01:03:22]?
Yes. The question was we consider enriching the types in Reflow itself to basically allow for stronger type safety. Yes, definitely. That’s actually something that we are planning on doing. The first version that will be very similar to polymorphic variance in OCaml where to just kind of declare a thing and so we can now [inaudible 01:03:43]. For example, you can just declare that this is a CSV file with these columns or this is a bat file with this restriction or headers.
How are you going to deal with exceptional cases and what are the nodes? If you have sort of files [inaudible 01:04:09], how are you going to deal with that?
Right. The question was how do we deal with exceptions and failures? Right now there isn’t in the language any failure handling mechanism. So those will propagate and [inaudible 01:04:22] the whole path. We have experiments that would effectively keep going mode so that we [inaudible 01:04:31] what work best concurrently ongoing while that happens. But right now there’s no failure handling mechanism in the language itself. If something fails, it will fail. That being said, Reflow does handle runtime failures. For example, the boom errors that I mentioned earlier. If you have a runtime thing like you’re running on memory, you’re running out of disk, it will attempt to rerun those tasks perhaps in a different mode or if you are concurrent [inaudible 01:04:58]. But if there’s an application failure, then there’s a failure, and that’s it. Yup.
What about the databases? Which database works for us [inaudible 01:05:07] relational, nonrelational? Which database works well with Reflow?
Yeah. The question was what kind of databases work well with Reflow. Reflow itself doesn’t make any assumptions about that. Reflow itself as a matter of implementation uses DynamoDB to store these instantiations. But in terms of inputs and outputs, it doesn’t really… Well, there’s really two answers to this. First, if you can imagine that in typical sort of ETL workloads where you might be processing some lab data, it might be a [inaudible 01:05:49], it’s sorting up data into a database. Right now we do have some experimental support to sort of enable that, but right now in Reflow we tightly control the execution environment. One of the ways in which we do that is disallow network access.
But there’s actually a way for you to circumvent that. For example, use this to load data into big [inaudible 01:06:14] inside the script. But, yeah, in principle there’s no… Reflow sort of converses in files and directories in bulk data and of course you can run things in bulk in databases but it doesn’t treat that any differently over kind of execution measurement.
Okay. Let’s hold the questions there. We are going to, right after, head downstairs and anyone who wants to join we have some space downstairs and we have some great food down at Seymour’s right after the speak. So, let’s thank the speaker and head there.