LogoLogo
HomePricingDocumentation
  • 💿Getting Started
    • Installation and Project Setup
    • Hello Perigee!
    • Perigee Application Design
    • Hello Configuration
    • Hello Logs
    • Hello Integration
    • Troubleshooting
    • Case Studies
  • 📃License + Notice
    • 📂Licensing
    • Notice of Third Party Agreements
  • 🚀Perigee and Beyond
    • Extending - Threads
    • Extending - Loaders
    • ⏳All about CRON
  • 🔮API Generation
    • What is API Generation?
    • API Builder
  • 🗺️Architecting YOUR App
    • Design and Requirements
    • Define Sources
    • Requirements
  • 🧩Core Modules
    • 🌐PerigeeApplication
    • 🪡Thread Registry
    • Event Sources
      • Scheduled/Logic
        • CRON Thread
        • Scheduler
        • Sync Agent
      • Watchers
        • SalesForce
        • Sharepoint
        • Directory Watch
        • Directory Notifier
        • IMAP
    • Credential Management
      • Connection Strings
      • Custom Refresh Logic
      • RestSharp Authenticator
      • Credential Store SDK
      • ⁉️Troubleshooting Credentials
    • Integration Utilities
      • HTTP(S) - RestSharp
      • Transaction Coordinator
      • Limiter
      • Watermarking
    • Alert Managers
      • SMS
      • Email
      • Discord
      • Teams
    • File Formats
      • Excel
      • CSV
    • 📁File System Storage
      • File Revision Store
      • Concurrent File Store
      • FileSync + Cache
    • Third Party
      • SmartSheets
      • Microsoft Graph
    • Perigee In Parallel
      • Parallel Processing Reference
      • Extensions
      • GroupProcessor
      • SingleProcessor
    • 🧱Utility Classes
      • Metrics
      • F(x) Expressions
      • Multi-Threaded Processor (Scatter Gather)
      • OpenAI - GPT
      • XML Converter
      • Dynamic Data Table
      • Debounce
      • Thread Conditions
      • Perigee Utility Class
      • Network Utility
      • Lists
      • FileUtil
      • Inclusive2DRange
      • Strings, Numbers, Dates
      • Nested Sets
      • Behavior Trees
      • JsonCompress
      • Topological Sorting
      • DBDownloader
    • 🈁Bit Serializer
  • 📣Examples and Demos
    • API + Perigee
    • 📰Excel Quick Load
    • SalesForce Watcher
    • Report Scheduler
    • Agent Data Synchronization
    • 📩IMAP Echo bot
    • Watch and load CSVs
    • Graph Delegated Authorization + DataVerse
    • Coordinator Demo
    • Azure Service Bus
    • QuickBooks Online
  • 📘Blueprints
    • Perigee With .NET Hosting
    • Web Host Utilities
    • 🔌Plugin Load Context
  • 🎞️Transforms
    • 🌟What is Transforms?
    • 📘Terminology
    • 🦾The Mapping Document
    • 👾Transformation Process
    • 😎Profile
    • 🎒Automation
      • 🕓Package Options
      • 🔳Configuration
    • 🔧Utilities
      • 🧹Clean
      • 📑Map File
      • 🔎File Identification
      • 🗺️Map Generation
      • 🪅Insert Statement Generation
  • 🗃️Transform SDK
    • 👋Quick Start Guide
    • 🥳MapTo
    • 🔌Authoring Plugins
      • 🔘File IO Process
      • 📢Data Quality
      • 🟢Transform Process
    • SDK Reference
      • 🔘FileIOProcessData
      • 📢DataQualityContext
      • 🎛️TransformDataContext
      • 🏅TransformResult
Powered by GitBook
On this page
  • What is the Coordinator?
  • A Demo?
  • Remote Synchronization and Logging
  • Queueing Process
  • Headers, Items, and Data Objects
  • Replay Events
  • Multi-threaded queue
  • Multistep helper
  • Defining a MultiStep
  • Multistep Options
  • Retry Limits
  • The Status
  • The callback
  • SDK
  • ITranasctionCoordinatorDataHandler
  • Memory Handler Code
  • SQL Handler Code
Export as PDF
  1. Core Modules
  2. Integration Utilities

Transaction Coordinator

PreviousHTTP(S) - RestSharpNextLimiter

Last updated 2 years ago

What is the Coordinator?

The Coordinator is like having a group of small robots that you design to each perform a specific task. These robots communicate with each other and make sure their tasks are completed before passing data to the next one.

They are designed to be as reliable as possible and can restart or shut down without losing any data.

The coordinator works like a message queue: you add a message and it gets processed a short time later. The message is immediately saved to disk in a compressed format and all the robots use this data packet to keep track of their progress. This is what makes the system so fault-tolerant.

The coordinator uses a first-in-first-out method to process messages and can handle many messages at once because it is multi-threaded.

If the system crashes, the coordinator is smart enough to resume from where it left off.

The coordinator also handles two-way data synchronization with a remote source, making sure that each step is replicated and stored on the remote server.

A Demo?

To see demos of the coordinator, check out the demo page!

Remote Synchronization and Logging

Every bot's state is replicated to the remote source (usually a database). This allows the Coordinator to resume transactions that have yet to be completed. It also provides your company the ability to view, query and report on business critical tasks.

There are many fields logged and tracked during this process. Everything from the initial payload that was queued to the status and responses from Rest API's, exceptions thrown, modification times, and timestamps.

Queueing Process

Due to the nature of a two way synchronized transaction log you're able to queue items both from the SDK and by creating new header records in the remote source.

The Coordinator is smart enough to pull it's remote source for unprocessed transactions and will pick them up if they are not in an "Error" state.

Re-queueing

Headers, Items, and Data Objects

Header

The Header is what uniquely identifies a transaction. Without a header, there is nothing to process

The header contains the InitialDataObject, which is the "message" or "payload" of the queued transaction. This can be anything you want, JSON, a string, a custom class, or an array of bytes. It's serialized and compressed during the process.

Headers are identified by a mesh key:

  1. ID property which is string.

  2. ReplayID property which is an integer (default 1).

When creating a new transaction, you need to create a new ID. We highly recommend meshing together data from where the process starts to create your ID. If you create a transaction with an already processed ID, the coordinator is smart enough to pull the remote system and use that header, which will likely be completed and nothing will occur.

A few examples of this would be:

  • From a message queue: $"{QueueHost}{QueueName}{ID}".

  • From a database row: $"{ServerHost}{Schema}{TableName}{RowID}".

  • From an auto incrementing ID source.

  • Implement your own ID system.

Items

The items exist as children to the header. You can think of them as logical steps in the process.

Items are executed in order and contain a status field. Only when one item is a success should it move onto the next.

Items contain many properties to help identify and report on its progress. You can view when each item has started, how many times they've been retried, information about request and response times if they represent an http request

Items themselves contain DataObjects. You can assign these data objects to be whatever you like, but they represent completed result for the item's step.

When using the built in helper methods for executing rest calls, all of the properties and fields for an item are filled out with the results of that request.

Data Objects

The SDK contains methods for accessing the DataObjects at runtime. You can retrieve the current DataObject, a previous item's DataObject and theInitialDataObject from the header.

Replay Events

As mentioned above, all transactions are replicated to a remote source, like a database. This provides the unique ability to "Replay" transactions at a later date by simply supplying the transaction ID.

The coordinator will automatically go pull that transaction from the remote source, create a new transaction with the given ID, a new ReplayID, and assign the InitialDataObject from the first time it was run.

This allows you to completely run through all of the logic for a given transaction again at a later date.

Imagine for a moment that one of your third party brokers informs you they had a system error and lost all your order data from today. This would create a major issue for any other linear based system.

Since you have transaction replay at your fingertips, it would only take a few lines of code to replay all of today's transactions and all of your orders would be back where they should be.

You would save countless hours!

Multi-threaded queue

By default all coordinators are configured to be multi-threaded, meaning that incoming transactions can be put on different threads and executed at the same time. You can configure this during the coordinators allocation.

c.AddTransactionCoordinator("OrderProcess", memHandler, (ct, l, process) =>
{
    //Process code here
}, IsMultithreaded: true, ThreadCount: 10);

Multistep helper

The multistep helper is the easiest way to use a coordinator. It simplifies the logic and does the communication with the assigned data handler. Unless you know you need to do something very specific, I would recommend using this method as the primary method of writing a coordinator function.

Defining a MultiStep

The MultiStep processor is part of the process callback when creating a new coordinator.

To define a MultiStep transaction, you need to supply a list of steps that will execute, and match those list of steps with a callback method.

(process) => {
    process.MultiStep(
        new string[] { "GetAccount", "POSTDetails" }, 
        MultiStepOptions.RequireInternet,
        (GetAccount) => {},
        (POSTDetails) => {}
    );
}

Each section will be called in order of the array of steps. In the above example, GetAccount should be the first callback as it is the first step, and POSTDetails should be the second callback as it is second in the list.

Multistep Options

The options that can be supplied are related to how a transaction is processed, what conditions are required, and the number of times a header or item can be retried.

There is a .Default and .RequireInternet option that should suit most needs. If you need to customize it further allocate a class, use the default, and override any of the following properties.

var opts = MultiStepOptions.Default; // or .RequireInternet

//Item and header retry counts. 
opts.MaxItemExecutionCount = 2;
opts.MaxHeaderExecutionCount = 20;

//Default redelivery time if requesting a redelivery
opts.DefaultRedeliveryTime = TimeSpan.FromSeconds(5);

//If true, require internet connectivity to proceed with processing this request.
opts.RequiresInternetConnectivity = true;

//If true, when the header is completed, all the dataobjects of the items will be NULLED out to reduce data clutter
opts.CleanDataObjectsOnFinalCompletion = true;

Retry Limits

There are configurable limits to how many times a given header or item can be retried before falling out of process scope.

For headers

The header has it's own retry count. When the header has been queued and and there is an attempt to process it, the headers execution count is incremented by 1.

When the header can't successfully complete all of the child items, or there is a call to stop processing, then the header might exhaust all of it's allowable retries.

When the header is out of allowable retries, the header status is set to Error. This means no more automatic requeuing will occur.

For Items

Item's retry limits act a bit different than headers. They are a retry limit per queued process. This means that for every time a header is enqueued, the item has N number of retries before it automatically gets dequeued.

This important distinction allows for many retries of a header, and even more retries of each individual item.

The Status

Both headers and items have a status property, and they mean slightly different things during the process. The three available statuses are:

  • Processing

  • Complete

  • Error

By default, all new headers and items are in a Processing status.

For headers

If the header status is set to Complete, there will be no more enqueuing or automatic retry, as the header is now completed.

If a header status is set to Error, it will also no longer enqueue, and is now in an Error state. You may later flip the status back to Processing if you want to retry, or re-enqueue the transaction.

For Items

If an item's status is Processing OR Error - it allows the MultiStep to continue retrying the item.

If a item's status is set to Complete, it will also no longer retry or attempt to process this item. The MultiStep processor will move onto the next logical item in the steps defined, or complete the header if all child items are now also Complete.

The callback

Every callback in a multi-step has an SDK of calls you're able to use. They provide a way to interact with the ongoing transaction, perform lookups, request redelivery, or end processing.

(GetAccount) => {
    GetAccount.Update();
    VerifyAccount.Redeliver();
},

DataHandler

You can access any of underlying methods by accessing the handler.

Order.DataHandler;

Update()

Update is how you push data back to the file system as well as the remote source. You can force an update at any point during the process if you want to synchronize data before something happens.

Update is called automatically throughout the process, and calling isn't necessary unless you're implementing something custom.

//Update an order local and remote, default
Order.Update();

//Skip updating remote, use this if you only want to save state local before attempting to do something that might cause an exception or error
Order.Update(false);

Reading info from the objects

//Get the current data object from this step
GetAccount.GetDataObjectAs<account>();

//Get the data object from last step
GetAccount.GetDataObjectFromLastStepAs<account>();

//Get the initial data object from the header
GetAccount.GetInitialDataObjectAs<account>();

//Using the DataHandler, get a different header and get it's initial data object
GetAccount.DataHandler.GetHeaderByID("abc", 2).GetInitialDataObjectAs<account>();

Performing http requests

The easiest way to execute request is the Execute<>() call. It will perform two update processes, one before sending the request, and one after so that both the request and response times get synchronized even in the event of a full system crash.

  • It automatically has a built in retry and you can specify how many times it will retry the same request.

  • It will log the request and response values if available (and the option is not turned off)

  • It will then log all of the response properties (uri, timestamp, status code, body).

    • If the Execute call is used without the generic parameter as such: .Execute() (instead of .Execute<T>()) - Then the response will only be available in the ResponseBody property if logging is enabled

  • If the request status code was a success, the Item is set to Complete, if it was not, the Item is set to Error.

//Define and use a client, and request.
//This will result in : https://localhost:7216/order?order=123
var client = new RestClient("https://localhost:7216");
var rq = new RestRequest("/order", Method.Get).AddParameter("order", "123");

//Execute the request
var response = Order.Execute<account>(client, rq);

//Check results
if (response.IsTimeout())
{
    //Nothing to deserialize here
}
else if (response.IsSuccessful)
{
    //Get the deserialized account object from the response data
    account acc = response.Data;
}

You can specify other options such as not de-serializing any response data, to turn off request and response logging, how many built in retries it performs, and a few other options by setting those parameters in the .Execute() call.

Because the .Execute() call handles everything necessary, you can set it as the only call in a given MultiStep step. Everything necessary is now set to either move onto the next step, or this step will be retried if it failed.

(Order) => {
   Order.Execute<account>(client, 
      new RestRequest("/order", Method.Get)
         .AddParameter("order", "123"));
},
(Verify) => {
   account acc = Verify.GetDataObjectFromLastStepAs<account>();
}

Stop processing

StopProcessing() will stop all current item cycling and end the transaction in the state you specify. If it was left in a Processing state, it will get picked back up again on the next call to get remote pending transactions.

Order.StopProcessing(TransactionStatus.Pending);
Order.StopProcessing(TransactionStatus.Error);
Order.StopProcessing(TransactionStatus.Completed);

In-Place Redelivery

The redelivery mechanism halts the current thread and redelivers the item when you ask. It will block a running thread, so use this intentionally and only when necessary. It's most useful when you know a short delay will allow an item to succeed.

Redelivery does not increment the executions count.

//Will redeliver in 10 seconds
Order.Redeliver(TimeSpan.FromSeconds(10));

//Will redeliver in the default redelivery time set by the options
Order.Redeliver();

Current Process count

The current process count is how many times the current item has been retried "this queued transaction". Every time a header get's dequeued this count goes back to 0. It allows you to check how many retries have occurred since getting queued for the first time, or again.

If you want to see how many times an item has been tried in total, check the property ExecutionCount instead.

//Current process count
Order.CurrentProcessCount;

//Total execution count
Order.Item.ExecutionCount;

SDK

(ct, l, process) => {
    
    //Check if it's a replay
    bool isAReplay = process.IsReplay;
    
    //Get the initial data object
    account ido = process.Header.GetInitialDataObjectAs<account>();
    
    //Use the header to get or create an item
    orderStep = process.Header.AddOrGetItem("Order");
    
    //Check the item status
    if (orderStep.Status != TransactionStatus.Completed)
    {
        if (orderStep.ExecutionCount < 10)
        {
            orderStep.Execute(client, request);
        }
    }
    
    //Verify there are two steps and they are completed
    if (process.Header.AreItemsCompleted(2))
    {
        //Clean transaction data on completion
        process.DataHandler.CleanTransaction(process.Header.ID, process.Header.ReplayID);
    }
    
    //call update
    process.Update();
}

ITranasctionCoordinatorDataHandler

This is the interface that is tied to a coordinator to allow that coordinator to communicate with a remote synchronization source. It defines interfaced methods to retrieve header data, get replay information, and it defines the period of time to be passed before re-queueing a transaction.

/// <summary>
/// Retrieve the header by ID, or NULL if it does not exist
/// </summary>
/// <param name="ID">The transaction ID</param>
/// <param name="ReplayID">A replay ID, 1 by default</param>
/// <returns>A <see cref="TransactionHeader"/> or NULL if it does not exist</returns>
public TransactionHeader GetHeaderByID(string ID, int ReplayID = 1);

/// <summary>
/// Synchronize the entire tree back to the remote source
/// </summary>
/// <param name="transactionHeader">The header to sync</param>
/// <returns></returns>
public bool SyncRemoteTransaction(TransactionHeader transactionHeader);

/// <summary>
/// Get the pending transactions
/// </summary>
/// <param name="CoordinatorName">Name of the coordinator</param>
/// <returns></returns>
public List<TransactionHeader> GetPendingTransactions(string CoordinatorName);

/// <summary>
/// Clean the transaction, this is the final call before local cleanup.<br/>
/// <para>* if <paramref name="CleanDataObjects"/> is true, <b>This should null out the <see cref="TransactionItem.DataObject"/> </b>, and allows specific handlers to perform any final cleanup required by the specific handler</para>
/// </summary>
/// <param name="ID">The transaction ID</param>
/// <param name="ReplayID">A replay ID, 1 by default</param>
/// <param name="CleanDataObjects">If true, null out <see cref="TransactionItem.DataObject"/></param>
public void CleanTransaction(string ID, int ReplayID = 1, bool CleanDataObjects = true);

/// <summary>
/// Set a transaction to a specific status, helpful for requeing an errored transaction, or failing a transaction that is no longer valid.
/// </summary>
/// <param name="ID">The transaction ID</param>
/// <param name="ReplayID">A replay ID, 1 by default</param>
/// <param name="status">The status to assign</param>
public void SetTransactionStatus(string ID, int ReplayID = 1, TransactionStatus status = TransactionStatus.Completed);

/// <summary>
/// Get the latest replayID, or NULL if it does not exist
/// </summary>
/// <param name="ID">ID of the transaction to lookup</param>
/// <returns></returns>
public int? GetLastReplayID(string ID);

/// <summary>
/// Retrieve all IDS from the source where the last modified date is between two dates
/// </summary>
/// <param name="startDateInclusive">Start date to search for</param>
/// <param name="endDateInclusive">The end date, or now if not supplied</param>
/// <param name="Queue">Optional queue filter</param>
/// <param name="completeOnly">If true, only completed transactions will be returned</param>
/// <param name="IncludeReplays">If true, include replays as well</param>
/// <returns></returns>
public List<TransactionHeader> GetHeaderIDs(DateTimeOffset startDateInclusive, DateTimeOffset? endDateInclusive, string Queue = "", bool completeOnly = true, bool IncludeReplays = false);

There are two examples of this handler and their code listed below.

Memory Handler Code

The "Memory" version of this handler is for testing and debugging only. It would not be suitable for a production system.

SQL Handler Code

For MSSQL:

In a healthy system, a transaction only needs to be queued once. All steps will succeed and the header will be marked "Complete". When a system inevitably goes down and we've exhausted all of an , the header will still be in a "Processing" state and will be dequeued.

If an item get's dequeued and is not yet completed, it will get re-queued again shortly on the next call to get the remote sources pending transactions. This process is explained in more detail below in the .

You may manually go through the process yourself, and that is shown below: .

To see the !

To understand the retry counts, see the below section: .

When the header is allowed to retry again later, the header status is left alone, leaving it as Processing. defines a period of time that must pass before requeuing a transaction that is still in Processing status to get reintroduced to the process queue.

If a header status is Processing, it allows the coordinator to enqueue the transaction and work on the steps as long as it has not exhausted the .

The DataHandler is the interfaced class to synchronize remote transactions. !

You can read the initial data object, previous data objects, the current data object, or even use the to request other headers.

It will log all of the request properties (uri, timestamp, verb, body), then call .

If the response contains a Deserialized body, it will be set as the current steps .

is then called one more time as the response properties are now filled out.

If you want to take full control of the transaction coordinator, you can bypass the MultiStep process and execute calls manually. You have full control over every aspect of this and you are responsible for marking the item and header status, retrying items or processes yourself, creating items(steps) and calling .

Coordinator Demo
items retries
ITranasctionCoordinatorDataHandler section
SDK
callback SDK, jump here
Retry Limits
The remote synchronization data source
Retry Limits
See more below
DataHandler
Update()
DataObject
Update()
Update()
🧩
7KB
TransactionSource_Memory.cs
21KB
TransactionSource_MSSQL.cs
Page cover image