Consider a distributed application that looks like this: the manager creates an arbitrary number of tasks initially, new tasks are created as tasks complete, and the manager must perform a time consuming accumulation step whenever a task returns. This style of program is common when using the Work Queue framework, and is used extensively by the Coffea data analysis framework for high energy physics.
Here is the problem: if the accumulation of results in complex, it may block the manager from sending tasks, receiving tasks, or performing other operations. In the case where tasks finish spaced out in time, the manager has the ability to post process the task and send a new one to the worker, resulting in minimal performance loss. The problem occurs when multiple workers finish tasks at similar times. The first worker to return its task goes idle and waits to receive another task, however, because the manager is busy with its accumulation step the worker can not receive a new task. As other workers finish, they also become idle as the manager remains busy and can not receive tasks or issue tasks to any of the returned/idle workers.
One solution to this problem is to create an altered version of the inner logic of the work_queue_wait loop. The current version of work_queue_wait works roughly as follows: First, the manager polls all the current tasks to see if any have been retrieved, and if this is the case the manager breaks out of the loop and returns the completed task. Otherwise, the manager continuously attempts to receive a task from any worker, send a task to a worker that can receive one, and perform other operations like looking to connect more workers. This continues until the manager times out or successfully retrieves a task and breaks out of the work_queue_wait to return it.
The alteration is a relatively small change. Instead of the current version of the loop where the work_queue_wait breaks out as soon as a task is retrieved, the program continues looping through work_queue_wait as long as either a task is retrieved, a task is sent out, or both. Once both a task is not retrieved and not sent, the work_queue_wait loop is exited with the first task to be retrieved. The advantage of this is that if multiple workers are waiting for a task, they will all be given work to do before the work_queue_wait loop exits and the manager begins accumulating a task. The feature is enabled by calling work_queue_tune(q, "wait_retrieve_many", 1)
The charts below show a synthetic performance benchmark of this new altered work_queue_wait loop. The benchmark was performed by creating a program that has four parameters. The max tasks parameter determines the total number of tasks to be ran, the concurrent tasks parameter determines how many workers can be working on tasks at any time, the task time parameter sets how long each task should take, and the post process time defines the time the manager must perform post processing every time a task returns. Each chart is formed off a base of 100 max tasks, 5 concurrent tasks, 1 second task time, and a 1 second post process time. There are four charts below, each varying one of the four variables to see its effect on total workload time.
Overall, it appears that tasks which take longer to complete as well as having more of said tasks creates a larger performance gain with the new wait_retrieve_many option. Tasks that require a significant amount of post processing do not benefit much from wait_retrieve_many because they are still mostly bound by the total amount of post processing required.
Applying wait_retrieve_many to Coffea also has promising results. As seen below, in a 4 trial run of the example Coffea program run using the work_queue executor takes about 60 seconds to complete on 10 workers. Enabling the wait_receive_many feature results in a 20% improvement in execution time for the entire application. This feature is now enabled by default in the WQ executor for Coffea.