Monday, November 20, 2023

Distant Futures at SC 2023

 Grad student Barry Sly-Delgado presented his recent work on "Minimizing Data Movement Using Distant Futures" at the research poster session at Supercomputing 2023:


Futures are a widely used concept for organizing concurrent computations.  A Future is a variable that represents the result of a function call whose computation may still be pending.  When evaluated, the future blocks (if needed) until the result becomes available.  This allows for the easy construction of highly parallel programs.

Barry developed a future based execution model for the TaskVine workflow execution system.  This allows one to submit and chain function calls written in plain Python.  TaskVine then schedules the tasks throughout the cluster, providing a variety of data management services.

However, at very large scales, a natural bottleneck is the return of computed values back to the original manager.  To overcome this, we introduce the notion of Distant Futures in which a value is not only pending in time, but potentially left in the cluster on a remote node.  Tasks requiring this value can then be scheduled to the same node, or to transfer the value within the cluster, rather than bringing it back home.

Combining distant futures with asynchronous transfer provides significant benefits for applications that are bottlenecked in data transfer, as our results show:

For more details, check out the poster and short paper here:

Maximizing Data Utility at HPPSS/SC 2023

Thanh Son Phung presented Maximizing Data Utility for HPC Python Workflow Execution at the High Performance Python for Science at Scale workshop at Supercomputing 2023.

This paper describes how the Parsl workflow system integrates with the TaskVine workflow executor in order to run complex Python environments at scale.  Parsl provides a function-oriented programming model, generates a large number of tasks to be executed in a graph, and then passes the tasks down to TaskVine for execution on a cluster:

A significant challenging in scaling up Python applications is managing the large body of dependencies in the form of Python modules, system libraries, and so forth, that must be loaded every time a process starts.  At very large scales, the cost of pulling all of these resources from a shared parallel filesystem can become a significant bottleneck.  (Typically, parallel filesystems are optimized for the delivery of a few large files, rather than a large number of small files.)

TaskVine addresses the problem by exploiting the storage capability of each execution node, which is typically equipped with a local SSD for temporary storage.  The TaskVine workers pull data into local storage, and then share files with each other (at the direction of the manager), resulting in less pressure on the shared filesystem, and overall speedups that improve with scale. 

This technique was evaluated on a machine learning application requiring a (quite common) 4.4GB of software scaling up from 32 nodes x 16 cores to 256 nodes x 16 cores on a campus HPC cluster.  At small scales, TaskVine provides a modest improvement, but as scale increases, the separation becomes more distinct:

The distribution of task execution times shows that the TaskVine variants (bottom row) shows that tasks run more consistently with fast individual execution times, once data is distributed across the cluster.

For the full details, see our paper here:

Tuesday, November 14, 2023

TaskVine Paper at WORKS/SC 2023

Barry Sly-Delgado presented our overview paper on TaskVine at the Workshop on Workflows in Support of Large Scale Science at Supercomputing 2023 in Denver, Colorado.

TaskVine is a system for executing data intensive workflows on large clusters.  These workflows ma consist of thousands to millions of individual tasks that are connected together in a graph structure like this:

When executing this sort of workflow in a cluster, the movement of data between tasks is often the primary bottleneck, especially if each item must flow back and forth between a shared filesystem.

The key idea of TaskVine is to exploit the local storage available on each node of a cluster to relieve much of the load placed on a shared filesystem.  A running TaskVine system consists of a manager process that coordinates data transfers between workers, like this:


A workflow in TaskVine is expressed in in Python by declaring the data assets needed by the workflow as file objects, and then connecting them to the namespace of each task to execute:

When the tasks execute on each worker, the various data objects are cached on local storage, and then linked into the execution sandbox of each running task.  This provides the opportunity for data to be shared between tasks without moving data to/from the shared filesystem:


This paper gives three examples of applications constructed using TaskVine.  TopEFT is a physics data analysis application built using Coffea+TaskVine, Colmena-XTB is a molecular modeling application built using Parsl+TaskVine, and BGD is a model training application built using TaskVine alone.  Each of these codes is able to scale to 400-1000 nodes running on 1000-27000 cores total.

For all the details, please check out our paper here:


Tuesday, October 3, 2023

CCTools 7.7.0 Released

We are pleased to announce the release of version 7.7. of the Cooperative Computing Tools from the University of Notre Dame, including TaskVine, Work Queue, Makeflow, Parrot, Chirp, and other software.

This is a feature release that primarily improves the scalability and performance of TaskVine:

Main Features:

  • TaskVine workers now perform transfers asynchronously while communicating with manager, by @BarrySlyDelgado
  • TaskVine Serverless function execution simplified and accelerated by @tphung3
  • Improved integration of Parsl and TaskVine by @tphung3
  • Improved scheduler performance in Work Queue and TaskVine by @colinthomas-z80
  • Multiple improvements to reliability at scale with physics applications by @btovar
  • Change to API for cancelling tasks: all tasks are now returned by vine_wait by @dthain
  • Change to serverless resource management: functions consume library resources by @dthain
  • Improved visualization by @JinZhou5042
  • New environment.yml for development dependencies by @thieber22

Full Changelognightly...release/7.7.0

Monday, July 24, 2023

CCTools 7.6.1 Released

We are pleased to announce the release of version 7.6.1
of the Cooperative Computing Tools from the University of Notre Dame,
including TaskVine, Work Queue, Makeflow, Parrot, Chirp, and other software.

Docs:  https://cctools.readthedocs.io/
Download: http://ccl.cse.nd.edu/software/download
Forum: http://ccl.cse.nd.edu/community/forum

This release introduces several bug fixes and minor improvements to the TaskVine workflow executor and Work Queue scheduler.
TaskVine:
- Fix bug in task scheduling that would result in manager hanging. (Colin Thomas)
- Fix bug in execution of Dask task graphs that would lead to a hang. (Ben Tovar)
- Fix bug in deployment of serverless LibraryTask. (Thanh Phung)
- Add option to specify Python package dependencies inline. (Barry Sly-Delgado)
- Add visualization of task graph and documentation. (Douglas Thain)

Work Queue:
- Fix bug in task scheduling that would result in manager hanging. (Colin Thomas)

Friday, July 7, 2023

CCTools 7.6.0 Released

We are pleased to announce the release of version 7.6.0
of the Cooperative Computing Tools from the University of Notre Dame,
including TaskVine, Work Queue, Makeflow, Parrot, Chirp, and other software.

- Docs: https://cctools.readthedocs.io
- Download: http://ccl.cse.nd.edu/software/download
- Forum: http://ccl.cse.nd.edu/community/forum

This release introduces a number of performance, usability, and
documentation improvements to the TaskVine workflow executor.

TaskVine -
- Integration with Parsl workflow system. (Thanh Phung)
- Integration with Dask workflow system (Ben Tovar)
- Optimized scheduling performance with large numbers of workers. (Colin Thomas)
- Enhanced dispatch performance for small tasks. (Colin Thomas)
- vine_submit_workers combined tool for submtting workers.  (Thanh Phung)
- Improved generation of file cache names (Barry Sly-Delgado)
- fetch_file interface to obtain cached files in the cluster.  (Douglas Thain)
- Worker-to-worker authentication for peer transfers.
- Updated documentation.  (All)

Work Queue:
- Enhanced dispatch performance for small tasks. (Colin Thomas)

Makeflow:
- Now supports TaskVine as an execution mode.  (Douglas Thain)

General:
- TCP catalog updates are performed in a background process to reduce latency.

Thanks goes to the contributors for many features, bug fixes, and tests:

Thanh Son Phung
Barry Sly-Delgado
Colin Thomas
Ben Tovar
Douglas Thain

Enjoy!


Friday, April 28, 2023

Intro to TaskVine at GCASR 2023

Prof. Thain gave the afternoon keynote (Data Intensive Computing with TaskVine) at the GCASR Workshop in Chicago on April 24th.  TaskVine is our latest workflow execution system, just released in March 2023.

The key idea of TaskVine is to exploit the local storage already embedded into clusters for use by complex workflows:


To accomplish this, each node runs a general purpose worker process that manages cached data under unique names, and runs each task within a private namespace:



You can write workflows directly to the TaskVine API, declaring files and tasks and connecting them together.  Or, TaskVine can be used as an executor under existing workflow systems:



Learn more and try it out here: http://ccl.cse.nd.edu/software/taskvine



Monday, April 17, 2023

CCTools version 7.5.2 released

 

The Cooperative Computing Lab is pleased to announce the release of version 7.5.2 of the Cooperative Computing Tools including TaskVine, WorkQueue, Makeflow, Poncho, Chirp, JX, Parrot, and other software.

The software may be downloaded here:
http://ccl.cse.nd.edu/software/download

This is our first release of TaskVine, our new data intensive workflow execution system. TaskVine make extensive use of in-cluster storage to accelerate workflows, so that data "grows like a vine" through the cluster. See our extensive documentation and examples to learn more!

Detailed items in this release:

  • [TaskVine] First official release (Everyone!)
  • [General] Change catalog update to tcp by default (D. Thain)
  • [Work Queue] Specify manager as an argument when using the factory in python (Ben Tovar)
  • [Work Queue] New dynamic resource allocation strategies (Thanh Phung)

Thanks goes to the contributors for many features, bug fixes, and tests:

  • Matthew Carbonaro
  • Jachob Dolak
  • Joseph Duggan
  • Kelci Mohrman
  • Thanh Son Phung
  • David Simonetti
  • Barry Sly-Delgado
  • Douglas Thain
  • Colin Thomas
  • Ben Tovar

Wednesday, February 22, 2023

TaskVine System Architecture

TaskVine is our newest framework for building large scale data intensive dynamic workflows.  This is the second in a series of posts giving a brief introduction to the system.

A TaskVine application consists of a manager and multiple workers running in a cluster.  The manager is a Python program (or C if you prefer) that defines the files and tasks making up a workflow, and gives them to the TaskVine library to run.  Both the files and the tasks are distributed to the workers and put together in order to produce the results.  As a general rule, data is left in place on the workers wherever possible, so that it can be consumed by later tasks, rather than bringing it back to the manager.


The workflow begins by declaring the input files needed by the workflow.  Here, "file" is used generally and means any kind of data asset: a single file on the filesystem, a large directory tree, a tarball downloaded from an external url, or even just a single string passed from the manager.  These files are pulled into the worker nodes on the cluster as needed. Then, the manager defines tasks that consume those files.  Each task consumes one or more files, and produces one or more files.


A file declaration looks like one of these:


a = FileURL("http://server.edu/path/to/data.tar.gz")

b = FileLocal("/path/to/simulate.exe")

c = FileTemp()


And then a task declaration looks like this:


t = Task("simulate.exe -i input.dat")

t.add_input(a,"simulate.exe")

t.add_input(b,"input.dat")

t.add_output(c,"output.txt")


Tasks produce files which get consumed by more tasks until you have a large graph of the entire computation that might look like this:



To keep things clean, each task runs in a sandbox, where files are mapped in as names convenient to that task.  As tasks produce outputs, they generally stay on the worker where they were created, until needed elsewhere. Later tasks that consume those outputs simply pull them from other workers as needed.  (Or, in the ideal case, run on the same node where the data already is.).


A workflow might run step by step like this:




You can see that popular files will be naturally replicated through the cluster, which is why we say the workflow "grows like a vine" as it runs. When final outputs from the workflow are needed, the manager pulls them back to the head node or sends them to another storage target, as directed.

A Preview of TaskVine

We have seemed a bit quiet in the Cooperative Computing Lab lately, as we have been focused on building TaskVine, our new system for executing dynamic data intensive workflows.

I am excited that TaskVine is up and running in the lab now!  We are working on refining a few technical issues and completing documentation and examples before making a general release.  I would like to share some of the details and philosophy of the system as a preview before general availability.

TaskVine builds upon our many years of experience building large scale workflow systems that manage millions of jobs running on thousands of nodes in clusters, clouds, and grids.  Some of our prior systems include Makeflow, which enabled the construction of large (static) task graphs to run on batch systems; Work Queue, which enabled the creation of applications that define and consume tasks dynamically; and Prune, which enabled the management of reproducible and shareable computations.  (And you can still use those systems if you like.)

TaskVine has some familiar concepts: it too allows the creation of large dynamically parallel distributed applications.  But where it really shines is in data management.


In TaskVine, data is first class citizen alongside tasks.  Large data sets, software packages, and software services are explicitly declared and put into a working cluster where they can be replicated and shared among nodes.  As an application runs, source data and derived results stay in the cluster and gradually creep from node to node, accelerating future tasks.  Even later workflows can take advantage of data prepared or produced by prior workflows.  We say that the workflow "grows like a vine" through the cluster.


 


The upshot is that TaskVine takes load off of the facility shared filesystem, which is often the bottleneck in scaling up large applications.  Big software stacks, reference datasets, and intermediate steps are made accessible on the worker nodes in a disciplined way, rather than a thousand nodes pounding the same filesystem at once.


Of course, to make all this happen, there are a lot of fine details.  In the next few posts, I'll give you an overview of the system architecture, and the new capabilities that we are building out.


Tuesday, February 7, 2023

Landlord Container Paper in TPDS 2023

Our latest work on container management was recently accepted to IEEE TPDS: 

LANDLORD: Coordinating Dynamic Software Environments to Reduce Container Sprawl

his paper is the result of a continuing collaboration between the CCL at Notre Dame and the Parsl group at University of Chicago, led by Dr. Kyle Chard.  Recent PhD grad Tim Shaffer led this work as part of a DOE Computational Science Graduate Fellowship, and current PhD student Thanh Phung joined the project and helped us to view the problem from a clustering perspective.


The problem is this: a variety of online services (like Binder, FuncX, and others) generate container images from software environment specifications, like a list of Conda packages.  These images are used to execute code on clusters: sometimes long-running code like simulations, but also sometimes short bits of code, maybe even a single Python function call.  If every user of the system asks for a slightly different software environment, then the system will quickly run out of space from those large container environments.  ("container sprawl")  So, we need some entity to manage the space and decide what containers to generate from packages, and which ones to delete:


We observe that multiple requests might be satisfied by the same container.  For example, these three jobs all express some constraints on packages A and B.  (And one of those jobs doesn't care about the version at all.  If we figure out the overlap between those requests, we can just use a single image to satisfy all three:

Going further, we can view this as an incremental online clustering problem.  At any given moment, there are a certain number of containers instantiated.  If a request matches one already, that's a hit and we just use it.  Otherwise that's a miss and we have two choices: either insert a brand new container image that matches the request exactly, or merge an existing container with some new packages in order to satisfy the request.  We decide what to do based on a single parameter alpha, which is a measure of distance between the request and the existing container.


Ok, so now how do we pick alpha?  We evaluated this by running the Landlord algorithm through traces of software requests from high energy physics workloads and a public trace of binder requests.

There are two extremes to avoid: if alpha is very small, then we end up with a small number of very large containers.   This is good for managing the shared cache, but bad because we have to keep moving very large containers out to the compute nodes.  (And they may not fit!). On the other hand, if alpha is very large, then we end up with a large number of small containers.  This results in a lot of duplication of packages, so the shared cache fills up faster, but the images are small and easy to move around.


As in most computer systems, there isn't one single number that is the answer: rather, it is a tradeoff between desiderata. Here, we have a broad sweet spot around alpha=0.8.  But the good news is that the system has a broad "operational zone" in which the tradeoff is productive. 

Sound interesting?  Check out the full paper: