Multi-Threaded Processor (Scatter Gather)
Multi-Threaded Processor
An Example
PerigeeApplication.ApplicationNoInit("MTP", (c) => {
c.Add("Multi-Threaded Processor", (ct, l) => {
//Let's declare a list of fruits to work on
var Fruits = new List<string>()
{
"Apples",
"Bananas",
"Cherries",
"Dates",
"Elderberries",
"Figs",
"Grapes",
"Honeydew",
"Indian Fig",
"Jackfruit",
"Kiwi",
"Lemon",
"Mango",
"Nectarine",
"Oranges",
"Papaya",
"Quince",
"Raspberries",
"Strawberries",
"Tangerines",
"Ugli Fruit",
"Vanilla Bean",
"Watermelon",
"Xigua",
"Yellow Passion Fruit"
};
l.LogInformation("--------============ Example 1 ============--------");
//Example 1: Using the IEnumerable extension method to simple "scatter gather" across a number of defined threads. In this case, 5 threads are used
var mtpResult = Fruits.ParallelProcessMultiThread((s) =>
{
l.LogInformation("Processing {fruit}", s);
Task.Delay(400).Wait(); //Artificial delay
return s.GetHashCode();
}, null, concurrency: 5);
//Every object in this case is wrapped in a result object, giving you exceptions thrown, process time, and access to the input/output
foreach (var rs in mtpResult)
{
l.LogInformation("Passed in {in}, got {out}, in {time}ms", rs.InData, rs.OutData, rs.ProcessTime?.TotalMilliseconds.ToString("N0") ?? "");
}
//Example 2: Separate the loading and processing of the scatter gather across threads
// In this example, we're declaring the multi-threaded processor directly, along with the input type, output type, and the method to be called.
// Notice the 3 on the ThreadCount? This will only process and declare 3 threads to work on input items
l.LogInformation("--------============ Example 2 ============--------");
var mtp = new MultiThreadedProcessor<string, int>((s) =>
{
l.LogInformation("Processing {fruit}", s);
return s.GetHashCode();
}, ct, ThreadCount: 3, l);
//To actively watch for processed items, attach to the event
mtp.OnDataProcessed += (sender, args) =>
{
l.LogInformation("Passed in {in}, got {out}, in {time}ms", args.InData, args.OutData, args.ProcessTime?.TotalMilliseconds.ToString("N0") ?? "");
};
//Now let's simulate a delayed loading of the items by using Enqueue
foreach (var fruit in Fruits)
{
mtp.Enqueue(fruit);
Task.Delay(Random.Shared.Next(50, 200)).Wait(); //Artificial random delay
}
//Once all of the items have been enqueued, wait for the last item to be processed
mtp.AwaitProcessed(ct);
//Done! Everything has been processed and we have successfully awaited a multi-threaded, separated "scatter gather"
while (PerigeeApplication.delayOrCancel(1000, ct)) { };
});
});SDK
ParallelProcessMultiThread
Example:
Enqueue
Example:
AwaitProcessed
Class Reference
MultiProcessResult<TIn, TOut>
InData
OutData
Exception
ProcessTime
Last updated

