Wednesday, February 22, 2023

TaskVine System Architecture

TaskVine is our newest framework for building large scale data intensive dynamic workflows.  This is the second in a series of posts giving a brief introduction to the system.

A TaskVine application consists of a manager and multiple workers running in a cluster.  The manager is a Python program (or C if you prefer) that defines the files and tasks making up a workflow, and gives them to the TaskVine library to run.  Both the files and the tasks are distributed to the workers and put together in order to produce the results.  As a general rule, data is left in place on the workers wherever possible, so that it can be consumed by later tasks, rather than bringing it back to the manager.

The workflow begins by declaring the input files needed by the workflow.  Here, "file" is used generally and means any kind of data asset: a single file on the filesystem, a large directory tree, a tarball downloaded from an external url, or even just a single string passed from the manager.  These files are pulled into the worker nodes on the cluster as needed. Then, the manager defines tasks that consume those files.  Each task consumes one or more files, and produces one or more files.

A file declaration looks like one of these:

a = FileURL("")

b = FileLocal("/path/to/simulate.exe")

c = FileTemp()

And then a task declaration looks like this:

t = Task("simulate.exe -i input.dat")




Tasks produce files which get consumed by more tasks until you have a large graph of the entire computation that might look like this:

To keep things clean, each task runs in a sandbox, where files are mapped in as names convenient to that task.  As tasks produce outputs, they generally stay on the worker where they were created, until needed elsewhere. Later tasks that consume those outputs simply pull them from other workers as needed.  (Or, in the ideal case, run on the same node where the data already is.).

A workflow might run step by step like this:

You can see that popular files will be naturally replicated through the cluster, which is why we say the workflow "grows like a vine" as it runs. When final outputs from the workflow are needed, the manager pulls them back to the head node or sends them to another storage target, as directed.

A Preview of TaskVine

We have seemed a bit quiet in the Cooperative Computing Lab lately, as we have been focused on building TaskVine, our new system for executing dynamic data intensive workflows.

I am excited that TaskVine is up and running in the lab now!  We are working on refining a few technical issues and completing documentation and examples before making a general release.  I would like to share some of the details and philosophy of the system as a preview before general availability.

TaskVine builds upon our many years of experience building large scale workflow systems that manage millions of jobs running on thousands of nodes in clusters, clouds, and grids.  Some of our prior systems include Makeflow, which enabled the construction of large (static) task graphs to run on batch systems; Work Queue, which enabled the creation of applications that define and consume tasks dynamically; and Prune, which enabled the management of reproducible and shareable computations.  (And you can still use those systems if you like.)

TaskVine has some familiar concepts: it too allows the creation of large dynamically parallel distributed applications.  But where it really shines is in data management.

In TaskVine, data is first class citizen alongside tasks.  Large data sets, software packages, and software services are explicitly declared and put into a working cluster where they can be replicated and shared among nodes.  As an application runs, source data and derived results stay in the cluster and gradually creep from node to node, accelerating future tasks.  Even later workflows can take advantage of data prepared or produced by prior workflows.  We say that the workflow "grows like a vine" through the cluster.


The upshot is that TaskVine takes load off of the facility shared filesystem, which is often the bottleneck in scaling up large applications.  Big software stacks, reference datasets, and intermediate steps are made accessible on the worker nodes in a disciplined way, rather than a thousand nodes pounding the same filesystem at once.

Of course, to make all this happen, there are a lot of fine details.  In the next few posts, I'll give you an overview of the system architecture, and the new capabilities that we are building out.

Tuesday, February 7, 2023

Landlord Container Paper in TPDS 2023

Our latest work on container management was recently accepted to IEEE TPDS: 

LANDLORD: Coordinating Dynamic Software Environments to Reduce Container Sprawl

his paper is the result of a continuing collaboration between the CCL at Notre Dame and the Parsl group at University of Chicago, led by Dr. Kyle Chard.  Recent PhD grad Tim Shaffer led this work as part of a DOE Computational Science Graduate Fellowship, and current PhD student Thanh Phung joined the project and helped us to view the problem from a clustering perspective.

The problem is this: a variety of online services (like Binder, FuncX, and others) generate container images from software environment specifications, like a list of Conda packages.  These images are used to execute code on clusters: sometimes long-running code like simulations, but also sometimes short bits of code, maybe even a single Python function call.  If every user of the system asks for a slightly different software environment, then the system will quickly run out of space from those large container environments.  ("container sprawl")  So, we need some entity to manage the space and decide what containers to generate from packages, and which ones to delete:

We observe that multiple requests might be satisfied by the same container.  For example, these three jobs all express some constraints on packages A and B.  (And one of those jobs doesn't care about the version at all.  If we figure out the overlap between those requests, we can just use a single image to satisfy all three:

Going further, we can view this as an incremental online clustering problem.  At any given moment, there are a certain number of containers instantiated.  If a request matches one already, that's a hit and we just use it.  Otherwise that's a miss and we have two choices: either insert a brand new container image that matches the request exactly, or merge an existing container with some new packages in order to satisfy the request.  We decide what to do based on a single parameter alpha, which is a measure of distance between the request and the existing container.

Ok, so now how do we pick alpha?  We evaluated this by running the Landlord algorithm through traces of software requests from high energy physics workloads and a public trace of binder requests.

There are two extremes to avoid: if alpha is very small, then we end up with a small number of very large containers.   This is good for managing the shared cache, but bad because we have to keep moving very large containers out to the compute nodes.  (And they may not fit!). On the other hand, if alpha is very large, then we end up with a large number of small containers.  This results in a lot of duplication of packages, so the shared cache fills up faster, but the images are small and easy to move around.

As in most computer systems, there isn't one single number that is the answer: rather, it is a tradeoff between desiderata. Here, we have a broad sweet spot around alpha=0.8.  But the good news is that the system has a broad "operational zone" in which the tradeoff is productive. 

Sound interesting?  Check out the full paper: