Multi-Threaded Processor (Scatter Gather)
Multi-Threaded Processor
Multi-Threaded Processor allows you to efficiently process data in parallel across multiple threads, using both simple and advanced approaches for scatter-gather operations.
Unlike a traditional "scatter-gather", MTP allows you to separate the loading and processing across threads and then await the results. This is incredibly powerful when the loading is done from a remote source where latency is involved.
An Example
PerigeeApplication.ApplicationNoInit("MTP", (c) => {
c.Add("Multi-Threaded Processor", (ct, l) => {
//Let's declare a list of fruits to work on
var Fruits = new List<string>()
{
"Apples",
"Bananas",
"Cherries",
"Dates",
"Elderberries",
"Figs",
"Grapes",
"Honeydew",
"Indian Fig",
"Jackfruit",
"Kiwi",
"Lemon",
"Mango",
"Nectarine",
"Oranges",
"Papaya",
"Quince",
"Raspberries",
"Strawberries",
"Tangerines",
"Ugli Fruit",
"Vanilla Bean",
"Watermelon",
"Xigua",
"Yellow Passion Fruit"
};
l.LogInformation("--------============ Example 1 ============--------");
//Example 1: Using the IEnumerable extension method to simple "scatter gather" across a number of defined threads. In this case, 5 threads are used
var mtpResult = Fruits.ParallelProcessMultiThread((s) =>
{
l.LogInformation("Processing {fruit}", s);
Task.Delay(400).Wait(); //Artificial delay
return s.GetHashCode();
}, null, concurrency: 5);
//Every object in this case is wrapped in a result object, giving you exceptions thrown, process time, and access to the input/output
foreach (var rs in mtpResult)
{
l.LogInformation("Passed in {in}, got {out}, in {time}ms", rs.InData, rs.OutData, rs.ProcessTime?.TotalMilliseconds.ToString("N0") ?? "");
}
//Example 2: Separate the loading and processing of the scatter gather across threads
// In this example, we're declaring the multi-threaded processor directly, along with the input type, output type, and the method to be called.
// Notice the 3 on the ThreadCount? This will only process and declare 3 threads to work on input items
l.LogInformation("--------============ Example 2 ============--------");
var mtp = new MultiThreadedProcessor<string, int>((s) =>
{
l.LogInformation("Processing {fruit}", s);
return s.GetHashCode();
}, ct, ThreadCount: 3, l);
//To actively watch for processed items, attach to the event
mtp.OnDataProcessed += (sender, args) =>
{
l.LogInformation("Passed in {in}, got {out}, in {time}ms", args.InData, args.OutData, args.ProcessTime?.TotalMilliseconds.ToString("N0") ?? "");
};
//Now let's simulate a delayed loading of the items by using Enqueue
foreach (var fruit in Fruits)
{
mtp.Enqueue(fruit);
Task.Delay(Random.Shared.Next(50, 200)).Wait(); //Artificial random delay
}
//Once all of the items have been enqueued, wait for the last item to be processed
mtp.AwaitProcessed(ct);
//Done! Everything has been processed and we have successfully awaited a multi-threaded, separated "scatter gather"
while (PerigeeApplication.delayOrCancel(1000, ct)) { };
});
});
SDK
ParallelProcessMultiThread
Processes a collection of items in parallel using a specified processing function.
Parameters:
items
(IEnumerable<TIn>
): The collection of items to process.Process
(Func<TIn, TOut>
): The function to process each item.cancelToken
(CancellationToken?
, optional): An optional cancellation token to cancel the processing.concurrency
(int
, optional): The number of concurrent threads to use. Default is 3.Logger
(ILogger
, optional): An optional logger for logging.
Returns:
List<MultiProcessResult<TIn, TOut>>
: A list of processing results.
Example:
var results = myItems.ParallelProcessMultiThread(item => ProcessItem(item));
Enqueue
Enqueue an item to be processed.
Parameters:
item
(TIn
): The item to be processed.
Example:
processor.Enqueue(myItem);
AwaitProcessed
Await until all pending executions have been processed. Optionally accepts a cancellation token or a time span to limit the wait.
Example:
processor.AwaitProcessed();
// With CancellationToken
processor.AwaitProcessed(cancellationToken);
// With TimeSpan
processor.AwaitProcessed(TimeSpan.FromMinutes(1));
Class Reference
MultiProcessResult<TIn, TOut>
A class that represents the result of processing an individual item.
InData
The input data that was processed.
OutData
The output data produced by the process.
Exception
An optional exception that occurred during processing.
ProcessTime
An optional timespan representing the processing time.
Last updated