# Multi-Threaded Processor (Scatter Gather)

## Multi-Threaded Processor

Multi-Threaded Processor allows you to efficiently process data in parallel across multiple threads, using both simple and advanced approaches for scatter-gather operations.

Unlike a traditional "scatter-gather", MTP allows you to separate the loading and processing across threads and then await the results. This is incredibly powerful when the loading is done from a remote source where latency is involved.

### An Example

```csharp
PerigeeApplication.ApplicationNoInit("MTP", (c) => {

    c.Add("Multi-Threaded Processor", (ct, l) => {

        //Let's declare a list of fruits to work on
        var Fruits = new List<string>()
        {
            "Apples",
            "Bananas",
            "Cherries",
            "Dates",
            "Elderberries",
            "Figs",
            "Grapes",
            "Honeydew",
            "Indian Fig",
            "Jackfruit",
            "Kiwi",
            "Lemon",
            "Mango",
            "Nectarine",
            "Oranges",
            "Papaya",
            "Quince",
            "Raspberries",
            "Strawberries",
            "Tangerines",
            "Ugli Fruit",
            "Vanilla Bean",
            "Watermelon",
            "Xigua",
            "Yellow Passion Fruit"
        };

        l.LogInformation("--------============ Example 1 ============--------");
        //Example 1: Using the IEnumerable extension method to simple "scatter gather" across a number of defined threads. In this case, 5 threads are used
        var mtpResult = Fruits.ParallelProcessMultiThread((s) =>
        {
            l.LogInformation("Processing {fruit}", s);
            Task.Delay(400).Wait(); //Artificial delay
            return s.GetHashCode();
        }, null, concurrency: 5);

        //Every object in this case is wrapped in a result object, giving you exceptions thrown, process time, and access to the input/output
        foreach (var rs in mtpResult)
        {
            l.LogInformation("Passed in {in}, got {out}, in {time}ms", rs.InData, rs.OutData, rs.ProcessTime?.TotalMilliseconds.ToString("N0") ?? "");
        }

        //Example 2: Separate the loading and processing of the scatter gather across threads
        //  In this example, we're declaring the multi-threaded processor directly, along with the input type, output type, and the method to be called.
        //  Notice the 3 on the ThreadCount? This will only process and declare 3 threads to work on input items
        l.LogInformation("--------============ Example 2 ============--------");
        var mtp = new MultiThreadedProcessor<string, int>((s) =>
        {
            l.LogInformation("Processing {fruit}", s);
            return s.GetHashCode();
        }, ct, ThreadCount: 3, l);

        //To actively watch for processed items, attach to the event
        mtp.OnDataProcessed += (sender, args) =>
        {
            l.LogInformation("Passed in {in}, got {out}, in {time}ms", args.InData, args.OutData, args.ProcessTime?.TotalMilliseconds.ToString("N0") ?? "");
        };

        //Now let's simulate a delayed loading of the items by using Enqueue
        foreach (var fruit in Fruits)
        {
            mtp.Enqueue(fruit);
            Task.Delay(Random.Shared.Next(50, 200)).Wait(); //Artificial random delay
        }

        //Once all of the items have been enqueued, wait for the last item to be processed
        mtp.AwaitProcessed(ct);


        //Done! Everything has been processed and we have successfully awaited a multi-threaded, separated "scatter gather"


        while (PerigeeApplication.delayOrCancel(1000, ct)) { };

    });

});
```

## SDK

### ParallelProcessMultiThread

Processes a collection of items in parallel using a specified processing function.

**Parameters:**

* `items` (`IEnumerable<TIn>`): The collection of items to process.
* `Process` (`Func<TIn, TOut>`): The function to process each item.
* `cancelToken` (`CancellationToken?`, optional): An optional cancellation token to cancel the processing.
* `concurrency` (`int`, optional): The number of concurrent threads to use. Default is 3.
* `Logger` (`ILogger`, optional): An optional logger for logging.

**Returns:**

* `List<MultiProcessResult<TIn, TOut>>`: A list of processing results.

#### Example:

```csharp
var results = myItems.ParallelProcessMultiThread(item => ProcessItem(item));
```

### Enqueue

Enqueue an item to be processed.

**Parameters:**

* `item` (`TIn`): The item to be processed.

#### Example:

```csharp
processor.Enqueue(myItem);
```

###

### AwaitProcessed

Await until all pending executions have been processed. Optionally accepts a cancellation token or a time span to limit the wait.

**Example:**

```csharp
processor.AwaitProcessed();

// With CancellationToken
processor.AwaitProcessed(cancellationToken);

// With TimeSpan
processor.AwaitProcessed(TimeSpan.FromMinutes(1));
```

## Class Reference

### MultiProcessResult\<TIn, TOut>

A class that represents the result of processing an individual item.

#### InData

The input data that was processed.

#### OutData

The output data produced by the process.

#### Exception

An optional exception that occurred during processing.

#### ProcessTime

An optional timespan representing the processing time.
