Skip to content

Recursive Parallel Iterators

Latest

Choose a tag to compare

@orxfun orxfun released this 02 Nov 20:31
· 228 commits to main since this release
a857613

Changes

Recursive Parallel Iterators

IntoParIterRec trait can be used to create a parallel recursive iterator over an initial set of elements which is useful when working with non-linear data structures such as trees and graphs.

Consider, for instance, a tree which is defined by the following node struct:

pub struct Node<T> {
    pub data: T,
    pub children: Vec<Node<T>>,
}

Assume that we want to map all the data with map: impl Fn(T) -> u64 and compute the sum of mapped values of all nodes descending from a root: &Node.

We can express this computation and execute in parallel with the following:

fn extend<'a>(node: &&'a Node, queue: &Queue<&'a Node>) {
    queue.extend(&node.children);
}

[root].into_par_rec(extend).map(map).sum()

Instead of into_par, we use into_par_rec and provide extend function as its argument. This function defines the recursive extension of the parallel iterator such that every time we process a node we first add its children to the queue. Queue is the queue of elements to be processed and it exposes two growth methods to define the recursive extension: push and extend.

Although we create the parallel iterator differently, we get a ParIter. Therefore, we have access to all features of a regular parallel iterator.

For instance, assume we want to filter nodes first. Further, instead of summing up the mapped values, we need to collect them in a vector. We can express this computation just as we would do on a linear data structure:

[root].into_par_rec(extend).filter(filter).map(map).collect()

For more details, you may see the parallelization_on_tree example.

Diagnostics

ParallelExecutorWithDiagnostics executor is created. Any parallel executor can be converted into one with diagnostics. This executor is meant to be used for testing parallel computations and understand the distribution of the workload to threads. During the parallel computation, it collects diagnostics about:

  • how many threads are used for the parallel computation
  • how many times each thread received a tasks
  • average chunk size; i.e., average number of tasks, that each thread received
  • and finally, explicit chunk sizes for the first task assignments.

These metrics are printed on the stdout once the parallel computation is completed. Therefore, it is not meant to be used for production.

Running a parallel computation with diagnostics is convenient.

let sum = range
    .par()
    .with_runner(DefaultRunner::default().with_diagnostics()) // this line enables diagnostics
    .map(|x| x + 1)
    .filter(|x| x.is_multiple_of(2))
    .sum();

Related Issues

Not exactly fixes but provides a solution to #104 with probably a different approach than intended. Please also see the related computation experiments.

edit after second iteration

Fixes #104

Thanks to @davidlattimore for suggestions and feedback on the api.