Friday, December 10, 2021

Tuning High Throughput Task Dispatch in Coffea

Consider a distributed application that looks like this: the manager creates an arbitrary number of tasks initially, new tasks are created as tasks complete, and the manager must perform a time consuming accumulation step whenever a task returns. This style of program is common when using the Work Queue framework, and is used extensively by the Coffea data analysis framework for high energy physics.

Here is the problem: if the accumulation of results in complex, it may block the manager from sending tasks, receiving tasks, or performing other operations. In the case where tasks finish spaced out in time, the manager has the ability to post process the task and send a new one to the worker, resulting in minimal performance loss. The problem occurs when multiple workers finish tasks at similar times. The first worker to return its task goes idle and waits to receive another task, however, because the manager is busy with its accumulation step the worker can not receive a new task. As other workers finish, they also become idle as the manager remains busy and can not receive tasks or issue tasks to any of the returned/idle workers.

One solution to this problem is to create an altered version of the inner logic of the work_queue_wait loop. The current version of work_queue_wait works roughly as follows: First, the manager polls all the current tasks to see if any have been retrieved, and if this is the case the manager breaks out of the loop and returns the completed task. Otherwise, the manager continuously attempts to receive a task from any worker, send a task to a worker that can receive one, and perform other operations like looking to connect more workers. This continues until the manager times out or successfully retrieves a task and breaks out of the work_queue_wait to return it.

The alteration is a relatively small change. Instead of the current version of the loop where the work_queue_wait breaks out as soon as a task is retrieved, the program continues looping through work_queue_wait as long as either a task is retrieved, a task is sent out, or both. Once both a task is not retrieved and not sent, the work_queue_wait loop is exited with the first task to be retrieved. The advantage of this is that if multiple workers are waiting for a task, they will all be given work to do before the work_queue_wait loop exits and the manager begins accumulating a task. The feature is enabled by calling work_queue_tune(q, "wait_retrieve_many", 1)

The charts below show a synthetic performance benchmark of this new altered work_queue_wait loop. The benchmark was performed by creating a program that has four parameters. The max tasks parameter determines the total number of tasks to be ran, the concurrent tasks parameter determines how many workers can be working on tasks at any time, the task time parameter sets how long each task should take, and the post process time defines the time the manager must perform post processing every time a task returns. Each chart is formed off a base of 100 max tasks, 5 concurrent tasks, 1 second task time, and a 1 second post process time. There are four charts below, each varying one of the four variables to see its effect on total workload time.

Overall, it appears that tasks which take longer to complete as well as having more of said tasks creates a larger performance gain with the new wait_retrieve_many option. Tasks that require a significant amount of post processing do not benefit much from wait_retrieve_many because they are still mostly bound by the total amount of post processing required.

 

Applying wait_retrieve_many to Coffea also has promising results. As seen below, in a 4 trial run of the example Coffea program run using the work_queue executor takes about 60 seconds to complete on 10 workers. Enabling the wait_receive_many feature results in a 20% improvement in execution time for the entire application. This feature is now enabled by default in the WQ executor for Coffea.

 


- David Simonetti, undergraduate student in the CCL

Scalable Molecular Dynamics with Work Queue at UT-Austin

The Biomolecular Engineering Lab at UT-Austin routinely requires large scale molecular dynamics for predicting ligand-protein binding affinity.  The lab makes use of the Cooperative Computing Tools to build and run a variety of distributed applications on their 124 node, 75 GPU cluster.  Custom Work Queue applications are run on the cluster for months at a time to generate large amounts of ab-initio data to parameterize the AMOEBA model for small molecules, and perform single-point computations via Poltype 2.  In addition, the lab makes use of the ForceBalance application built on Work Queue for liquid property fitting for Van der Waals parameter refinement.


Friday, November 5, 2021

JX Language: REPL Tool and Dot Operator

Undergraduate student Jack Rundle has been making improvements to the JX language used throughout the CCTools package for expressing workflows, database queries, and other structured information.


First, we added a new command line tool, jx_repl, which provides an interactive REPL environment to work with the JX language:

 

 

In addition to standard JX evaluation, the tool also reserves a number of symbols in the context, acting as commands when entered (ex: "help", "quit", etc.).  A full guide for the REPL is outlined in the CCTools documentation.  One interesting feature is that both the input expression and output for each line are stored throughout the program's life-cycle.  Previous input expressions can be referenced via "in_#" and the associated output via "out_#".  Furthermore, JX will resolve symbols in the input expressions, which themselves may include references to "out_#". 

Next, we provide support for a new operator in JX: the “dot” operator, which resembles anaphoric macros in Lisp.  The dot operator can be placed after an expression (A) and before a function (B), then JX will evaluate the operator by inserting the expression as the first parameter of the function (B(A)).  In cases of functions with multiple parameters, the other parameters simply get shifted over.  For example:

BEFORE: len([1,2,3,4]) # 4
AFTER: [1,2,3,4].len() # 4

BEFORE: like("abc", "a.+") # true
AFTER: "abc".like("a.+") # true

BEFORE: format("ceil(%f) -> %d", 9.1, 10) # "ceil(9.1) -> 10"
AFTER: "ceil(%f) -> %d".format(9.1, 10) # "ceil(9.1) -> 10"

BEFORE: len(project(select([{"a": 1}, {"a": 2}], a>0), a)) # 2
AFTER: [{"a": 1}, {"a": 2}].select(a>0).project(a).len() # 2
 

In order to make this work, we did have to swap the parameter order for three different functions: project(), select(), and like().  However, we can now query the global catalog server with database like queries:

fetch("http://catalog.cse.nd.edu:9097/query.json").select(type=="wq_master").project([name,tasks_total_cores])

Yields:

[
    ["earth.crc.nd.edu",7373],
    ["hallofa.ps.uci.edu",15],
    ["hpc-services1.oit.uci.edu",2],
    ["vm65-195.iplantcollaborative.org",1460],
    ...
]


Thursday, November 4, 2021

PONCHO Toolkit for Portable Python


PONCHO, is a lightweight Python based toolkit which allows users to synthesize environments from a concise, human-readable JSON file containing the necessary information required to build a self-contained Conda virtual environment needed to execute scientific applications on distributed systems. Poncho is composed of three parts: poncho_package_analyze, poncho_package_create and poncho_package_run

poncho_package_analyze performs a static analysis of dependencies used within a python application. The output is JSON file listing the dependencies.

poncho_package_analyze application.py spec.json

This will give you a dependency file like this:

{

"conda":{

"channels":[

"defaults",

"conda-forge"

],

"packages":[

"ndcctools=7.3.0",

"parsl=1.1.0",

]

},

"pip": [

"topcoffea"

]

}


Then if needed, you can manually add other kinds of code and data dependencies like this:

{

"git": {

       "DATA_DIR": {

           "remote": "http://.../repo.git"

       }

},

    "http": {

"REFERENCE_DB": {

"type": "file",

"url": "https://.../example.dat"

}

}

}

poncho_package_create allows users to create an environment from a JSON specification file. This specification may include Conda packages, Pip packages, remote Git repos and arbitrary files accessible via HTTPS.  This environment is then packaged into a tarball.

poncho_package_create spec.json env.tar.gz

poncho_package_run will unpack and and activate the an environment. As an input, a command will then be executed within this environment. Any Git repos or files specified within the environment will be set as environment variables. 

poncho_package_run -e env.tar.gz python application.py  

This programmable interface allows us to now take a Python application and easily move it from place to place within a cluster, and is in production with the Coffea data analysis application and the Parsl workflow system when using Work Queue as an execution system.

The poncho tools can be found in the latest release of the Cooperative Computing Tools.


WORKS Paper: Adaptive Resource Allocation for Heterogeneous Tasks in Dynamic Workflows

CCL graduate student Thanh Son Phung will be presenting his recent work on managing dynamic tasks at the WORKS workshop at Supercomputing 2021:

Dynamic workflows are emerging as the preferable class of workflow management systems due to their offerings of flexibility, convenience, and performance to users. They allow users to generate tasks automatically and programmatically at run time, abstract away the gory implementation details, and retain the intrinsic benefit of parallelism from underlying distributed systems. The below figure shows the full picture of the transitions from logical task generations to actual task deployments and executions in the Colmena-XTB workflow.

However, from a systems developer's standpoint, the dynamic nature of task generation poses a significant problem in term of resource management. That is, what quantity of resources should we allocate for a newly generated task? Figure below shows the memory consumption of tasks over time in the Colmena-XTB workflow.

As demonstrated, tasks vary significantly in their memory consumption (from 2GBs to 30GBs). A large allocation will decrease the probability of task failure due to under-allocation, but increase the potential waste of resource as tasks may only consume a small portion of it. On the other hand, a small allocation has the opposite effects.

We observe that task allocation can be automated and improved considerably by grouping tasks with similar consumption. A task scheduler can use this information of completed tasks to allocate ready tasks. Figure below visually shows our strategy in task allocation, where each task is first allocated with the value of the blue line, and upon failure due to under-allocation, is allocated with the value of the upper line.

We evaluated our strategies on seven datasets of resource consumption and noticed the substantial improvement of resource allocation efficiency. In details, the average task consumption efficiency under our allocation strategies can range anywhere from 16.1% to 93.9% with the mean of 62.1%.

Read the full paper here:

Thanh Son Phung, Logan Ward, Kyle Chard, and Douglas Thain, Not All Tasks Are Created Equal: Adaptive Resource Allocation for Heterogeneous Tasks in Dynamic WorkflowsWORKS Workshop on Workflows at Supercomputing, November, 2021.


Monday, August 9, 2021

New PythonTask Interface in Work Queue

The most recent version of Work Queue supports two different categories of tasks.  Standard Tasks describe a Unix command line and corresponding files, just as before.  The new PythonTask describes a Python function invocation and corresponding arguments:

def my_sum(x, y):
    import math
    return x+y

task = wq.PythonTask(mysum,10,20)
queue.submit(task)
When a task is returned, the function value is available as t.output:
task = queue.wait(5);
if task:
    print("task {} completed with result {}".format(task.id,task.output))
Underneath, a PythonTask serializes the desired function and arguments, and turns it into a standard task which can be remotely executed, using all of the existing capabilities of Work Queue.  And so, a task can be given a resource allocation, time limits, tags, and everything else needed to manage distributed execution:
task.specify_cores(4)
task.specify_gpus(1)
task.specify_tag("hydrazine")
Thanks for new CCL graduate student Barry Sly-Delgado for adding this new capability to Work Queue!  See the full documentation.


Monday, August 2, 2021

Harnessing HPC at User Level for High Energy Physics

Ben Tovar presented some recent work at the (virtual) CHEP 2021 conference:  "Harnessing HPC Resources for CMS Jobs Using a Virtual Private Network".

The future computing needs of the Compact Muon Solenoid (CMS) experiment will require making use of national HPC facilities.  These facilities have substantial computational power, but for a variety of reasons, are not set up to allow network access from computational nodes out to the Internet.  This presents a barrier for CMS analysis workloads, which expect to make use of wide area data federations (like XROOTD) and global filesystems (like CVMFS) in order to execute.

In this paper, Ben demonstrates a prototype that creates a user-level virtual private network (VPN) that is dynamically deployed alongside a running analysis application.  This trick here is to make the whole thing work without requiring any root-level privileges, because that simply isn't possible at an HPC facility.  The solution brings together a variety of technologies -- namespaces, openconnect, slirp4netns, ld_preload -- in order to provide a complete user-level solution:

The performance of this setup is a bit surprising: when running a single worker node, the throughput of a single file transfer is substantially lower when tunneled (196 Mbps) compared to native performance (872 Mbps).  However, when running twenty or so workers, the tunneled solution achieves the same aggregate bandwidth as native.  (872 Mbps)  The most likely explanation is that tunneling a TCP connections over another TCP connections results in substantial start-up penalty while both stacks perform slow-start.

You can try the solution yourself here:

https://github.com/cooperative-computing-lab/userlevel-vpn-tun-tap 


New CCL Swag!

Check out the new CCL swag! Send us a brief note of how you use Work Queue or Makeflow to scale up your computational work, and we'll send you some laptop stickers and other items as a small thank-you.





Thursday, July 29, 2021

CCTools Version 7.3.0 released

The Cooperative Computing Lab is pleased to announce the release of version 7.3.0 of the Cooperative Computing Tools including Parrot, Chirp, JX, Makeflow, WorkQueue, and other software.

The software may be installed from here: http://ccl.cse.nd.edu/software/download

This is a minor release with some new features and bug fixes. Among them:

- [WorkQueue] PythonTask to directly execute python functions as WorkQueue tasks. (Barry Sly-Delgado)
- [WorkQueue] Fix max mode allocation to work as a high-water mark when dispatching tasks. (Ben Tovar)
- [WorkQueue] Reworked documentation in https://cctools.readthedocs.io. (Douglas Thain)
- [WorkQueue] API to show summary of workers connected. (David Simonetti)
- [WorkQueue] Adds --wall-time limit to workers. (Thanh Son Phung)
- [Resource Monitor] Time is now reported in seconds, rather than microseconds. (Ben Tovar)
- [JX] jx_repl tool for JX language exploration. (Jack Rundle)


Thanks goes to the contributors for many features, bug fixes, and tests:

- David Rundle
- Barry Sly-Delgado
- Thanh Son Phung
- Tim Shaffer
- David Simonetti
- Douglas Thain
- Ben Tovar

Please send any feedback to the CCTools discussion mailing list:

http://ccl.cse.nd.edu/community/forum

Enjoy!

Wednesday, April 21, 2021

Lightweight Function Paper at IPDPS

Tim Shaffer, a Ph.D student in the CCL, will be presenting a paper "Lightweight Function Monitors for Fine-Grained Management in Large Scale Python Applications" at the International Parallel and Distributed Processing Symposium (IPDPS) in May 2021.  This work is the result of a collaboration between our group and the Parsl workflow team at the University of Chicago, led by Kyle Chard.

Emerging large scale science applications may consist of a large number of dynamic tasks to be run across a large number of workers.  When written in a Python-oriented framework like Parsl or FuncX those tasks are not heavyweight Unix processes, but rather lightweight invocations of individual functions.  Running these functions at large scale presents two distinct challenges:


1 - The precise software dependencies needed by the function must be made available at each worker node.  These dependencies must be chosen accurately: too few, and the function won't work; too many, and the cost of distribution is too high.  We show a method for determining, distributing, and caching the exact dependencies needed at runtime, without user intervention.

2 - The right number of functions must be "packed" into large worker nodes that may have hundreds of cores and many GB of memory.  Too few, and the system is badly underutilized; too many, and performance will suffer or invocations will crash.  We show an automatic method for monitoring and predicting the resources consumed by categories of functions.  This results in resource allocation that is much more efficient than an unmanaged approach, and is very close to an "oracle" that predicts perfectly.


The techniques shown in this paper are integrated into the Parsl workflow system from U-Chicago, and the Work Queue distributed execution framework from Notre Dame, both of which are open source software supported by the NSF CSSI program.

Citation:
  • Tim Shaffer, Zhuozhao Li, Ben Tovar, Yadu Babuji, TJ Dasso, Zoe Surma, Kyle Chard, Ian Foster, and Douglas Thain, Lightweight Function Monitors for Fine-Grained Management in Large Scale Python Applications, IEEE International Parallel & Distributed Processing Symposium, May, 2021. 

Ph.D. Defense - Nathaniel Kremer-Herman

Congratulations to Dr. Kremer-Herman, who successfully defended his Ph.D. dissertation "Log Discovery, Log Custody, and the Web Inspired Approach for Open Distributed Systems Troubleshooting".  His work created a system TLQ (Troubleshooting via Log Queries) that enables the structured query of distributed data logged by independent components including workflows, batch systems, and application file access.  Prof. Kremer-Herman recently began a faculty position at Hanover College in Indiana.  Congrads!

Thursday, February 18, 2021

CCTools 7.2.0 released

The Cooperative Computing Lab is pleased to announce the release of version 7.2.0 of the Cooperative Computing Tools including Parrot, Chirp, JX, Makeflow, WorkQueue, and other software.

The software may be downloaded here:
http://ccl.cse.nd.edu/software/download

This is a minor release with some new features and bug fixes. Among them:

  • [Batch] Improved gpu handling with HTcondor. (Douglas Thain)
  • [WorkQueue] Resource usage report with work_queue_status per task and worker. (Thanh Son Phung)
  • [WorkQueue] Improved gpu handling. (Douglas Thain, Tim Shaffer)
  • [WorkQueue] Assign tasks to specific gpus with CUDA_VISIBLE_DEVICES. (Douglas Thain)
  • [Makeflow] Several fixes for sge_submit_makeflow. (Ben Tovar)


Thanks goes to the contributors for many features, bug fixes, and tests:

  • Ben Tovar
  • Douglas Thain
  • Nathaniel Kremer-Herman
  • Thanh Son Phung
  • Tim Shaffer


Please send any feedback to the CCTools discussion mailing list:

http://ccl.cse.nd.edu/community/forum

Enjoy!