Transaction Coordinator
Last updated
Last updated
The Coordinator is like having a group of small robots that you design to each perform a specific task. These robots communicate with each other and make sure their tasks are completed before passing data to the next one.
They are designed to be as reliable as possible and can restart or shut down without losing any data.
The coordinator works like a message queue: you add a message and it gets processed a short time later. The message is immediately saved to disk in a compressed format and all the robots use this data packet to keep track of their progress. This is what makes the system so fault-tolerant.
The coordinator uses a first-in-first-out method to process messages and can handle many messages at once because it is multi-threaded.
If the system crashes, the coordinator is smart enough to resume from where it left off.
The coordinator also handles two-way data synchronization with a remote source, making sure that each step is replicated and stored on the remote server.
To see demos of the coordinator, check out the demo page!
Every bot's state is replicated to the remote source (usually a database). This allows the Coordinator to resume transactions that have yet to be completed. It also provides your company the ability to view, query and report on business critical tasks.
There are many fields logged and tracked during this process. Everything from the initial payload that was queued to the status and responses from Rest API's, exceptions thrown, modification times, and timestamps.
Due to the nature of a two way synchronized transaction log you're able to queue items both from the SDK and by creating new header records in the remote source.
The Coordinator is smart enough to pull it's remote source for unprocessed transactions and will pick them up if they are not in an "Error
" state.
In a healthy system, a transaction only needs to be queued once. All steps will succeed and the header will be marked "Complete
". When a system inevitably goes down and we've exhausted all of an items retries, the header will still be in a "Processing
" state and will be dequeued.
If an item get's dequeued and is not yet completed, it will get re-queued again shortly on the next call to get the remote sources pending transactions. This process is explained in more detail below in the ITranasctionCoordinatorDataHandler section.
The Header is what uniquely identifies a transaction. Without a header, there is nothing to process
The header contains the InitialDataObject
, which is the "message" or "payload" of the queued transaction. This can be anything you want, JSON, a string, a custom class, or an array of bytes. It's serialized and compressed during the process.
Headers are identified by a mesh key:
ID
property which is string.
ReplayID
property which is an integer (default 1).
When creating a new transaction, you need to create a new ID. We highly recommend meshing together data from where the process starts to create your ID. If you create a transaction with an already processed ID, the coordinator is smart enough to pull the remote system and use that header, which will likely be completed and nothing will occur.
A few examples of this would be:
From a message queue: $"{QueueHost}{QueueName}{ID}"
.
From a database row: $"{ServerHost}{Schema}{TableName}{RowID}"
.
From an auto incrementing ID source.
Implement your own ID system.
The items exist as children to the header. You can think of them as logical steps in the process.
Items are executed in order and contain a status field. Only when one item is a success should it move onto the next.
Items contain many properties to help identify and report on its progress. You can view when each item has started, how many times they've been retried, information about request and response times if they represent an http request
Items themselves contain DataObjects
. You can assign these data objects to be whatever you like, but they represent completed result for the item's step.
When using the built in helper methods for executing rest calls, all of the properties and fields for an item are filled out with the results of that request.
The SDK contains methods for accessing the DataObjects
at runtime. You can retrieve the current DataObject
, a previous item's DataObject
and theInitialDataObject
from the header.
As mentioned above, all transactions are replicated to a remote source, like a database. This provides the unique ability to "Replay" transactions at a later date by simply supplying the transaction ID.
The coordinator will automatically go pull that transaction from the remote source, create a new transaction with the given ID
, a new ReplayID
, and assign the InitialDataObject
from the first time it was run.
This allows you to completely run through all of the logic for a given transaction again at a later date.
Imagine for a moment that one of your third party brokers informs you they had a system error and lost all your order data from today. This would create a major issue for any other linear based system.
Since you have transaction replay at your fingertips, it would only take a few lines of code to replay all of today's transactions and all of your orders would be back where they should be.
You would save countless hours!
By default all coordinators are configured to be multi-threaded, meaning that incoming transactions can be put on different threads and executed at the same time. You can configure this during the coordinators allocation.
The multistep helper is the easiest way to use a coordinator. It simplifies the logic and does the communication with the assigned data handler. Unless you know you need to do something very specific, I would recommend using this method as the primary method of writing a coordinator function.
You may manually go through the process yourself, and that is shown below: SDK.
The MultiStep processor is part of the process callback when creating a new coordinator.
To define a MultiStep transaction, you need to supply a list of steps that will execute, and match those list of steps with a callback method.
Each section will be called in order of the array of steps. In the above example, GetAccount
should be the first callback as it is the first step, and POSTDetails
should be the second callback as it is second in the list.
To see the callback SDK, jump here!
The options that can be supplied are related to how a transaction is processed, what conditions are required, and the number of times a header or item can be retried.
There is a .Default
and .RequireInternet
option that should suit most needs. If you need to customize it further allocate a class, use the default, and override any of the following properties.
To understand the retry counts, see the below section: Retry Limits.
There are configurable limits to how many times a given header or item can be retried before falling out of process scope.
The header has it's own retry count. When the header has been queued and and there is an attempt to process it, the headers execution count is incremented by 1.
When the header can't successfully complete all of the child items, or there is a call to stop processing, then the header might exhaust all of it's allowable retries.
When the header is out of allowable retries, the header status
is set to Error. This means no more automatic requeuing will occur.
When the header is allowed to retry again later, the header status
is left alone, leaving it as Processing. The remote synchronization data source defines a period of time that must pass before requeuing a transaction that is still in Processing status to get reintroduced to the process queue.
Item's retry limits act a bit different than headers. They are a retry limit per queued process. This means that for every time a header is enqueued, the item has N number of retries before it automatically gets dequeued.
This important distinction allows for many retries of a header, and even more retries of each individual item.
Both headers and items have a status property, and they mean slightly different things during the process. The three available statuses are:
Processing
Complete
Error
By default, all new headers and items are in a Processing status.
If a header status
is Processing, it allows the coordinator to enqueue the transaction and work on the steps as long as it has not exhausted the Retry Limits.
If the header status
is set to Complete, there will be no more enqueuing or automatic retry, as the header is now completed.
If a header status
is set to Error, it will also no longer enqueue, and is now in an Error state. You may later flip the status back to Processing if you want to retry, or re-enqueue the transaction.
If an item's status
is Processing OR Error - it allows the MultiStep to continue retrying the item.
If a item's status
is set to Complete, it will also no longer retry or attempt to process this item. The MultiStep processor will move onto the next logical item in the steps defined, or complete the header if all child items are now also Complete.
Every callback in a multi-step has an SDK of calls you're able to use. They provide a way to interact with the ongoing transaction, perform lookups, request redelivery, or end processing.
The DataHandler is the interfaced class to synchronize remote transactions. See more below!
You can access any of underlying methods by accessing the handler.
Update is how you push data back to the file system as well as the remote source. You can force an update at any point during the process if you want to synchronize data before something happens.
Update is called automatically throughout the process, and calling isn't necessary unless you're implementing something custom.
You can read the initial data object, previous data objects, the current data object, or even use the DataHandler to request other headers.
The easiest way to execute request is the Execute<>() call. It will perform two update processes, one before sending the request, and one after so that both the request and response times get synchronized even in the event of a full system crash.
It automatically has a built in retry and you can specify how many times it will retry the same request.
It will log the request and response values if available (and the option is not turned off)
It will log all of the request properties (uri, timestamp, verb, body), then call Update()
.
It will then log all of the response properties (uri, timestamp, status code, body).
If the response contains a Deserialized body, it will be set as the current steps DataObject.
If the Execute call is used without the generic parameter as such: .Execute()
(instead of .Execute<T>()
) - Then the response will only be available in the ResponseBody
property if logging is enabled
If the request status code was a success, the Item is set to Complete, if it was not, the Item is set to Error.
Update()
is then called one more time as the response properties are now filled out.
You can specify other options such as not de-serializing any response data, to turn off request and response logging, how many built in retries it performs, and a few other options by setting those parameters in the .Execute()
call.
Because the .Execute()
call handles everything necessary, you can set it as the only call in a given MultiStep step. Everything necessary is now set to either move onto the next step, or this step will be retried if it failed.
StopProcessing()
will stop all current item cycling and end the transaction in the state you specify. If it was left in a Processing state, it will get picked back up again on the next call to get remote pending transactions.
The redelivery mechanism halts the current thread and redelivers the item when you ask. It will block a running thread, so use this intentionally and only when necessary. It's most useful when you know a short delay will allow an item to succeed.
Redelivery does not increment the executions count.
The current process count is how many times the current item has been retried "this queued transaction". Every time a header get's dequeued this count goes back to 0. It allows you to check how many retries have occurred since getting queued for the first time, or again.
If you want to see how many times an item has been tried in total, check the property ExecutionCount
instead.
If you want to take full control of the transaction coordinator, you can bypass the MultiStep process and execute calls manually. You have full control over every aspect of this and you are responsible for marking the item and header status, retrying items or processes yourself, creating items(steps) and calling Update()
.
This is the interface that is tied to a coordinator to allow that coordinator to communicate with a remote synchronization source. It defines interfaced methods to retrieve header data, get replay information, and it defines the period of time to be passed before re-queueing a transaction.
There are two examples of this handler and their code listed below.
The "Memory" version of this handler is for testing and debugging only. It would not be suitable for a production system.
For MSSQL: