Page cover image

Multi-Threaded Processor (Scatter Gather)

Multi-Threaded Processor

Multi-Threaded Processor allows you to efficiently process data in parallel across multiple threads, using both simple and advanced approaches for scatter-gather operations.

Unlike a traditional "scatter-gather", MTP allows you to separate the loading and processing across threads and then await the results. This is incredibly powerful when the loading is done from a remote source where latency is involved.

An Example

PerigeeApplication.ApplicationNoInit("MTP", (c) => {

    c.Add("Multi-Threaded Processor", (ct, l) => {

        //Let's declare a list of fruits to work on
        var Fruits = new List<string>()
        {
            "Apples",
            "Bananas",
            "Cherries",
            "Dates",
            "Elderberries",
            "Figs",
            "Grapes",
            "Honeydew",
            "Indian Fig",
            "Jackfruit",
            "Kiwi",
            "Lemon",
            "Mango",
            "Nectarine",
            "Oranges",
            "Papaya",
            "Quince",
            "Raspberries",
            "Strawberries",
            "Tangerines",
            "Ugli Fruit",
            "Vanilla Bean",
            "Watermelon",
            "Xigua",
            "Yellow Passion Fruit"
        };

        l.LogInformation("--------============ Example 1 ============--------");
        //Example 1: Using the IEnumerable extension method to simple "scatter gather" across a number of defined threads. In this case, 5 threads are used
        var mtpResult = Fruits.ParallelProcessMultiThread((s) =>
        {
            l.LogInformation("Processing {fruit}", s);
            Task.Delay(400).Wait(); //Artificial delay
            return s.GetHashCode();
        }, null, concurrency: 5);

        //Every object in this case is wrapped in a result object, giving you exceptions thrown, process time, and access to the input/output
        foreach (var rs in mtpResult)
        {
            l.LogInformation("Passed in {in}, got {out}, in {time}ms", rs.InData, rs.OutData, rs.ProcessTime?.TotalMilliseconds.ToString("N0") ?? "");
        }

        //Example 2: Separate the loading and processing of the scatter gather across threads
        //  In this example, we're declaring the multi-threaded processor directly, along with the input type, output type, and the method to be called.
        //  Notice the 3 on the ThreadCount? This will only process and declare 3 threads to work on input items
        l.LogInformation("--------============ Example 2 ============--------");
        var mtp = new MultiThreadedProcessor<string, int>((s) =>
        {
            l.LogInformation("Processing {fruit}", s);
            return s.GetHashCode();
        }, ct, ThreadCount: 3, l);

        //To actively watch for processed items, attach to the event
        mtp.OnDataProcessed += (sender, args) =>
        {
            l.LogInformation("Passed in {in}, got {out}, in {time}ms", args.InData, args.OutData, args.ProcessTime?.TotalMilliseconds.ToString("N0") ?? "");
        };

        //Now let's simulate a delayed loading of the items by using Enqueue
        foreach (var fruit in Fruits)
        {
            mtp.Enqueue(fruit);
            Task.Delay(Random.Shared.Next(50, 200)).Wait(); //Artificial random delay
        }

        //Once all of the items have been enqueued, wait for the last item to be processed
        mtp.AwaitProcessed(ct);


        //Done! Everything has been processed and we have successfully awaited a multi-threaded, separated "scatter gather"


        while (PerigeeApplication.delayOrCancel(1000, ct)) { };

    });

});

SDK

ParallelProcessMultiThread

Processes a collection of items in parallel using a specified processing function.

Parameters:

  • items (IEnumerable<TIn>): The collection of items to process.

  • Process (Func<TIn, TOut>): The function to process each item.

  • cancelToken (CancellationToken?, optional): An optional cancellation token to cancel the processing.

  • concurrency (int, optional): The number of concurrent threads to use. Default is 3.

  • Logger (ILogger, optional): An optional logger for logging.

Returns:

  • List<MultiProcessResult<TIn, TOut>>: A list of processing results.

Example:

var results = myItems.ParallelProcessMultiThread(item => ProcessItem(item));

Enqueue

Enqueue an item to be processed.

Parameters:

  • item (TIn): The item to be processed.

Example:

processor.Enqueue(myItem);

AwaitProcessed

Await until all pending executions have been processed. Optionally accepts a cancellation token or a time span to limit the wait.

Example:

processor.AwaitProcessed();

// With CancellationToken
processor.AwaitProcessed(cancellationToken);

// With TimeSpan
processor.AwaitProcessed(TimeSpan.FromMinutes(1));

Class Reference

MultiProcessResult<TIn, TOut>

A class that represents the result of processing an individual item.

InData

The input data that was processed.

OutData

The output data produced by the process.

Exception

An optional exception that occurred during processing.

ProcessTime

An optional timespan representing the processing time.

Last updated