LogoLogo
HomePricingDocumentation
  • 💿Getting Started
    • Installation and Project Setup
    • Hello Perigee!
    • Perigee Application Design
    • Hello Configuration
    • Hello Logs
    • Hello Integration
    • Troubleshooting
    • Case Studies
  • 📃License + Notice
    • 📂Licensing
    • Notice of Third Party Agreements
  • 🚀Perigee and Beyond
    • Extending - Threads
    • Extending - Loaders
    • ⏳All about CRON
  • 🔮API Generation
    • What is API Generation?
    • API Builder
  • 🗺️Architecting YOUR App
    • Design and Requirements
    • Define Sources
    • Requirements
  • 🧩Core Modules
    • 🌐PerigeeApplication
    • 🪡Thread Registry
    • Event Sources
      • Scheduled/Logic
        • CRON Thread
        • Scheduler
        • Sync Agent
      • Watchers
        • SalesForce
        • Sharepoint
        • Directory Watch
        • Directory Notifier
        • IMAP
    • Credential Management
      • Connection Strings
      • Custom Refresh Logic
      • RestSharp Authenticator
      • Credential Store SDK
      • ⁉️Troubleshooting Credentials
    • Integration Utilities
      • HTTP(S) - RestSharp
      • Transaction Coordinator
      • Limiter
      • Watermarking
    • Alert Managers
      • SMS
      • Email
      • Discord
      • Teams
    • File Formats
      • Excel
      • CSV
    • 📁File System Storage
      • File Revision Store
      • Concurrent File Store
      • FileSync + Cache
    • Third Party
      • SmartSheets
      • Microsoft Graph
    • Perigee In Parallel
      • Parallel Processing Reference
      • Extensions
      • GroupProcessor
      • SingleProcessor
    • 🧱Utility Classes
      • Metrics
      • F(x) Expressions
      • Multi-Threaded Processor (Scatter Gather)
      • OpenAI - GPT
      • XML Converter
      • Dynamic Data Table
      • Debounce
      • Thread Conditions
      • Perigee Utility Class
      • Network Utility
      • Lists
      • FileUtil
      • Inclusive2DRange
      • Strings, Numbers, Dates
      • Nested Sets
      • Behavior Trees
      • JsonCompress
      • Topological Sorting
      • DBDownloader
    • 🈁Bit Serializer
  • 📣Examples and Demos
    • API + Perigee
    • 📰Excel Quick Load
    • SalesForce Watcher
    • Report Scheduler
    • Agent Data Synchronization
    • 📩IMAP Echo bot
    • Watch and load CSVs
    • Graph Delegated Authorization + DataVerse
    • Coordinator Demo
    • Azure Service Bus
    • QuickBooks Online
  • 📘Blueprints
    • Perigee With .NET Hosting
    • Web Host Utilities
    • 🔌Plugin Load Context
  • 🎞️Transforms
    • 🌟What is Transforms?
    • 📘Terminology
    • 🦾The Mapping Document
    • 👾Transformation Process
    • 😎Profile
    • 🎒Automation
      • 🕓Package Options
      • 🔳Configuration
    • 🔧Utilities
      • 🧹Clean
      • 📑Map File
      • 🔎File Identification
      • 🗺️Map Generation
      • 🪅Insert Statement Generation
  • 🗃️Transform SDK
    • 👋Quick Start Guide
    • 🥳MapTo
    • 🔌Authoring Plugins
      • 🔘File IO Process
      • 📢Data Quality
      • 🟢Transform Process
    • SDK Reference
      • 🔘FileIOProcessData
      • 📢DataQualityContext
      • 🎛️TransformDataContext
      • 🏅TransformResult
Powered by GitBook
On this page
  • Parallel Processing basics
  • Ruleset
  • Hashing
  • Parallel write-safe structures
  • Intro to Perigee Parallel
  • 1 - Parallel Lookup Processing
  • Group Processor
  • Single Processor
  • A note on Mesh-Keys
  • 2 - Parallel Transformation Processing
  • 3 - Parallel Iteration
  • Summary
Export as PDF
  1. Core Modules
  2. Perigee In Parallel

Parallel Processing Reference

Parallel Processing basics

Let's take a look at a few things we need to know before jumping into parallel processing libraries.

Ruleset

The rules for parallel processing are as follows: (Psst, don't get too lost in them, but reading through will help you understand how to design your parallel processing architecture)

  • Don't write to anything that isn't a concurrent safe type. (Only write to concurrent safe types!)

  • Don't call instantiated class methods in parallel (unless you KNOW you can). Prefer to write static methods when calling in parallel loops to avoid concurrency issues.

  • Use synchronization constructs, like locks or semaphores, to protect shared resources and avoid race conditions.

  • Ensure that the workload is balanced across all parallel tasks to maximize efficiency and minimize the chance of bottlenecks.

  • Be mindful of the trade-offs between parallelism and performance, as not all tasks will benefit from parallelization. In some cases, the overhead of creating and managing parallel tasks may outweigh any potential speedup.

  • Test your parallel code thoroughly to ensure it is working correctly and efficiently, as debugging parallel code can be more difficult than debugging sequential code.

  • Keep in mind potential issues with exception handling, as exceptions thrown in parallel tasks may be propagated and require proper handling in the calling code.

  • Consider the target hardware and environment when designing parallel code, as factors like the number of available cores and memory limitations can impact performance and scalability.

Hashing

Hashing is a process of converting data into a fixed-size string of bytes, typically using a mathematical algorithm like SHA, MD5, etc. The output, called a hash, is a unique representation of the input data. A small change in the input data will result in a completely different hash, making it useful for verifying data integrity, securely storing passwords, and creating unique identifiers for data.

Hash based data structures, such as hash tables or hash sets, provide a fast and efficient way to look up or verify existing data. These data structures use hashing to determine the location of an item in memory, allowing for quick access and retrieval even in large datasets. This is particularly useful in parallel processing, where multiple threads or processes may be accessing the same data simultaneously.

Perigee fully embraces the hash structures as a way of providing better parallel processing for data sets.

Parallel write-safe structures

C# only has a few concurrent write-safe structures. You can find them under System.Collections.Concurrent.

The two primary concurrent classes we are interested in for this practice are:

  1. ConcurrentBag

  2. ConcurrentDictionary<K,V>

Both of these structures provide a thread safe, concurrent way of writing data back during a parallel loop. We'll use both of these while processing data in parallel.

Intro to Perigee Parallel

There are 3 main parallel concepts within Perigee:

  1. Parallel processing lookup data for rapid existence and lookup checks

  2. Parallel transforming records into a new type

  3. Parallel iteration without a defined output.

1 - Parallel Lookup Processing

We have two classes for easy parallel lookups:

  1. GroupProcessor

  2. SingleProcessor

Group Processor

The GroupProcessor allows for parallel execution over a list, DataTable, or other enumerable input. The "Group" is because it will perform a "GroupBy" on the input data, so that one input key may point to multiple input items. Original input items are retained, and the "key" is used as a hashed lookup. Let's look at some data and an example:

TypeCategory,Name
SteakRarity, Rare
SteakRarity, Medium-Rare
SteakRarity, Well Done
SteakType, T-Bone
SteakType, Flank

If you were to use a GroupProcessor on the data above, it would look as so:

using Perigee.Helpers;

//Get data, usually from a file, a database, or elsewhere
FoodCategories[] FoodCats = new FoodCategories[]
{
    new FoodCategories { TypeCategory = "SteakRarity", Name = "Rare" },
    new FoodCategories { TypeCategory = "SteakRarity", Name = "Medium-Rare" },
    new FoodCategories { TypeCategory = "SteakRarity", Name = "Well Done" },
    new FoodCategories { TypeCategory = "SteakType", Name = "T-Bone" },
    new FoodCategories { TypeCategory = "SteakType", Name = "Flank" }
};

//Perform parallel lookup
var gpCategoryLookup = FoodCats.ParallelProcessToGroupProcessor(f => f.TypeCategory);

public class FoodCategories { public string TypeCategory { get; set; } public string Name { get; set; } }

Once the data has been processed, you can perform existence checks and data retrieval all using hashed values. This means it's parallel existence check safe and parallel data retrieval safe.

//Perform a lookup, using an internal hash
bool contains = gpCategoryLookup.Contains("SteakRarity");

if (contains) {

    //Get all SteakRarity values: Rare, Medium-Rare, Well Done
    List<FoodCategories> steakRarities = gpCategoryLookup["SteakRarity"];
}

Using our parallel execution loops to produce hashed data lookup has reduced various process times down by almost 500,000% in certain production applications.

Single Processor

The SingleProcessor allows for parallel execution over a list, DataTable, or other enumerable input. The "Single" part expects only a single input key to exist within the input data, so that one input key points to a single input item. Original input items are retained and the "key" is used as a hashed lookup. Let's look at some data and an example:

TypeCategory,Name
SteakRarity, Rare
SteakRarity, Medium-Rare
SteakRarity, Well Done
SteakType, T-Bone
SteakType, Flank

If you were to use a SingleProcessor on the data above, it would look as so:

using Perigee.Helpers;

//Get data, usually from a file, a database, or elsewhere
FoodCategories[] FoodCats = new FoodCategories[]
{
    new FoodCategories { TypeCategory = "SteakRarity", Name = "Rare" },
    new FoodCategories { TypeCategory = "SteakRarity", Name = "Medium-Rare" },
    new FoodCategories { TypeCategory = "SteakRarity", Name = "Well Done" },
    new FoodCategories { TypeCategory = "SteakType", Name = "T-Bone" },
    new FoodCategories { TypeCategory = "SteakType", Name = "Flank" }
};

var spNameLookup = FoodCats.ParallelProcessToSingleProcessor(f => f.Name);

public class FoodCategories { public string TypeCategory { get; set; } public string Name { get; set; } }

Once the data has been processed, you can perform existence checks and data retrieval all using hashed values. This means it's parallel existence check safe and parallel data retrieval safe.

//Perform a lookup, using an internal hash
bool contains = spNameLookup.Contains("T-Bone");
if (contains) {

    //Get the original class item 
    FoodCategories item = spNameLookup["T-Bone"];
}

A note on Mesh-Keys

Because of the nature of hashed values, you can use multiple inputs as a mesh key.

Option 1:

Supply the key as a delimited list. An example might be:

var gpPeople = People.ParallelProcessToGroupProcessor(f => $"{f.FirstName ?? "NULL"}|{f.LastName ?? "NULL"}");

This would make the hashed lookup a mesh key of FirstName, LastName. And the retrieval of those records would require both values to match.

Option 2:

Supply the key as a hash itself. The hash can be calculated from a database, or local depending on your needs. An example using a local hash function for SHA512:

var hasher = SHA512.Create();

foreach (var person in People) {
    person.hash = hasher.ComputeHash(Encoding.UTF8.GetBytes($"{f.FirstName ?? "NULL"}|{f.LastName ?? "NULL"}"));
}

var gpPeople = People.ParallelProcessToGroupProcessor(f => f.hash));

Notice how we computed the hash outside of a parallel loop? The one caveat to local hashing is that they CANNOT be hashed in parallel on the same instance of a HashAlgorithm. For this reason I would recommend, when possible, going with Option 1.

Hashing the mesh-key is really most useful in two conditions:

  1. The hash is pre-calculated on input data, especially when coming from a database or a different system

  2. You need to compute and add together multiple hashed keys into a single lookup, so that you can perform rapid existence checks on multiple types of mesh keys.

2 - Parallel Transformation Processing

This type of parallel processing is a bit different. Instead of purely processing data for rapid lookup and retrieval, we're taking the input data and transforming it to a new type, class, structure, or even multiple new items to store again later.

For this, we want to use a parallel loop that returns a concurrent safe class. For our example, let's use a concurrent bag:

using Perigee.Extensions;
using Perigee.Helpers;

FoodCategories[] FoodCats = new FoodCategories[]
{
   new FoodCategories { TypeCategory = "SteakRarity", Name = "Rare" },
   new FoodCategories { TypeCategory = "SteakRarity", Name = "Medium-Rare" },
   new FoodCategories { TypeCategory = "SteakRarity", Name = "Well Done" },
   new FoodCategories { TypeCategory = "SteakType", Name = "T-Bone" },
   new FoodCategories { TypeCategory = "SteakType", Name = "Flank" }
};

GroceryItem[] Groceries = new GroceryItem[] {
   new GroceryItem { TypeKey = "SteakRarity" },
   new GroceryItem { TypeKey = "SteakType" },
};

var gpCategoryLookup = FoodCats.ParallelProcessToGroupProcessor(f => f.TypeCategory);

ConcurrentBag<GroceryItem> Bag = Groceries.ParallelProcessToBag(f => {

   return new GroceryItem() { 
       Name = "GroupKey", 
       TypeKey = f.TypeKey, 
       rCount = gpCategoryLookup.Contains(f.TypeKey) ? gpCategoryLookup[f.TypeKey].Count : 0
   };
    
});

//Handy quick database conversion
var DataTableFromResult = Bag.ToDataTable();

public class FoodCategories { public string TypeCategory { get; set; } public string Name { get; set; } }
public class GroceryItem { public int rCount { get; set; } public string TypeKey { get; set; } public string Name { get; set; } }

Bag now contains 2 items: SteakType, 2, and SteakRarity, 3. There's a lot of performance to gain when performing parallel operations on larger datasets.

P.S, Ready for dinner yet? I am...

3 - Parallel Iteration

This one is very similar to what's above, this time we aren't capturing a direct output, but rather sending items to a concurrent dictionary. We'll also introduce the ExceptionRows callback.

using Perigee.Extensions;
using Perigee.Helpers;

FoodCategories[] FoodCats = new FoodCategories[]
{
   new FoodCategories { TypeCategory = "SteakRarity", Name = "Rare" },
   new FoodCategories { TypeCategory = "SteakRarity", Name = "Medium-Rare" },
   new FoodCategories { TypeCategory = "SteakRarity", Name = "Well Done" },
   new FoodCategories { TypeCategory = "SteakType", Name = "T-Bone" },
   new FoodCategories { TypeCategory = "SteakType", Name = "Flank" }
};

GroceryItem[] Groceries = new GroceryItem[] {
   new GroceryItem { TypeKey = "SteakRarity" },
   new GroceryItem { TypeKey = "SteakType" },
   new GroceryItem { TypeKey = "CheeseType" },
   new GroceryItem(),

};

var gpCategoryLookup = FoodCats.ParallelProcessToGroupProcessor(f => f.TypeCategory);

ConcurrentDictionary<GroceryItem, int> ReturnedDictionary = new ConcurrentDictionary<GroceryItem, int>();
Groceries.ParallelProcess(f => {

    if (string.IsNullOrEmpty(f.TypeKey)) throw new Exception("TypeKey is null or empty");
    int v = gpCategoryLookup.Contains(f.TypeKey) ? gpCategoryLookup[f.TypeKey].Count : 0;
    ReturnedDictionary.AddOrUpdate(f, v, (k, o) => v);
    
}, 
(exceptionRows) => {
    exceptionRows.ForEach(f => Console.WriteLine($"Item {JsonConvert.SerializeObject(f.Item)} had an exception thrown: {f.exception.Message}"));

    byte[] ExcRows = JsonCompress.Compress(exceptionRows);
    var excRowsDecomp = JsonCompress.DecompressList<ParallelProcessException<GroceryItem>>(ExcRows);
});

After running the code, you'll see one item logged to the console, as expected:

Item {"rCount":0,"TypeKey":null,"Name":null} had an exception thrown: TypeKey is null or empty

We've also created 3 items in the ReturnedDictionary, with the .value property being the number of items from the grouped lookup.

Finally, I've added the JsonCompress class to serialize and deserialize the ParallelProcessException items, so that they can be stored, sent, or retrieved later.

Summary

We've looked at three different ways we can use parallel processing to boost the speed of our data conversion lookups and loops. We've taken a look at what hashing is and how to effectively use it. We've seen several examples of using the Single and Group processors. We also got to see how the ExceptionRows callback allows us to iterate over failed parallel execution items.

PreviousPerigee In ParallelNextExtensions

Last updated 1 year ago

🧩
Page cover image