Ask or search…
⌃K
Links

Hello Integration

This is the last of the hello series. We're going to cover the final few topics you'll need to really get started. Let's jump in!

Watermarking

Watermarking is simply storing a key/value pair from last execution and recalling it again next time it is needed. We use this when interfacing with another system we need to synchronize data to or from. A few examples of this are:
  • A Mail sync client needs to know what the UIDNEXT, or MODSEQ integers are.
  • If you're synchronizing data changes to SharePoint, you'll want to store the delta link as a string.
  • If you're pulling data by a last modified date, you'll want to make sure you store a DateTime(Offset).
These are just a few examples of how you would use watermarking in a real world production application.

The SDK

We make working with watermarks quite easy and they are very powerful.
  • Our watermarks are persisted locally to disk automatically and recalled on application startup.
  • They are debounced (We cover this in the updating section).
  • They are thread safe.
  • You can register as many subscribers as you need to the data feed.
  • You may push that data externally on update, and recall it on initialization for further backup.
//Register a watermark
Watermarking.Register("IntegrationOffset", () => Watermark.FromDateTimeOffset(DateTimeOffset.UtcNow), (nVal) => {
taskConfig.GetLogger<Watermark>().LogInformation("New value for {name} ({value})", nVal.Name, nVal.GetDateTimeOffset());
});
​
//Register additional handlers anywhere else, they receive updates to that watermark
Watermarking.RegisterAdditionalEventHandler("IntegrationOffset", (s, nVal) => {
taskConfig.GetLogger<Watermark>().LogInformation("AddlHandler: New value for {name} ({value})", nVal.Name, nVal.GetDateTimeOffset());
});
​
//Push updates to this watermark anywhere else
Watermarking.UpdateWatermark("IntegrationOffset", Watermark.FromDateTimeOffset(DateTimeOffset.UtcNow));
​
//And when you need the value, refer to it again later
Watermarking.GetWatermark("IntegrationOffset").GetDateTimeOffset()
The register call should be performed at application initialization if possible to avoid a race condition of pulling for it's value before it is registered. Requesting a watermark before it is registered will throw an exception .
PerigeeApplication.ApplicationNoInit("HelloConfig",
(taskConfig) =>
{
//Register me first!
//Then .Add other methods
});

Registering

Registering a watermark is an easy one liner. Let's look at the parameters:
1
Watermarking.Register(
2
​
3
//Name
4
"IntegrationOffset",
5
​
6
//Initialization Callback
7
() => Watermark.FromDateTimeOffset(DateTimeOffset.UtcNow),
8
​
9
//OnUpdate (optional)
10
(nVal) => {
11
taskConfig.GetLogger<Watermark>().LogInformation("New value for {name} ({value})", nVal.Name, nVal.GetDateTimeOffset());
12
});
  • Line 4 - Name - The name of the watermark
  • Line 7 - Func<Watermark> Initialization callback - If the value does not exist, the initialization callback is called and an initial value is set.
  • Line 10 - Action<Watermark> onUpdate? - An optional callback for when the value is updated.
To create the other base data types, simply provide that type in the initialization callback.
Watermark.FromString("");
Watermark.FromInt(1);
//etc
That's it. The underlying system does the heavy lifting of persisting this locally under the /watermarks folder beside it's running application.

Registering Handlers

You're able to register as many "subscribers" to the data changes you desire in other parts of the code. These will be called on a background thread with the updated value.
The RegisterAdditionalEventHandler simply takes two parameters:
1
Watermarking.RegisterAdditionalEventHandler(
2
​
3
//Name of the watermark to register
4
"IntegrationOffset",
5
​
6
//Callback function
7
(s, nVal) => {
8
taskConfig.GetLogger<Watermark>().LogInformation("AddlHandler: New value for {name} ({value})", nVal.Name, nVal.GetDateTimeOffset());
9
});
  • Line 4 - Name - The name of the watermark you want to update.
  • Line 7 - EventHandler<Watermark> - This sends you the (sender, watermark) back in the callback and you're able to read the value from there

Updating

The UpdateWatermark call takes two parameters:
1
Watermarking.UpdateWatermark(
2
​
3
//Name
4
"IntegrationOffset",
5
​
6
//New value
7
Watermark.FromDateTimeOffset(DateTimeOffset.UtcNow));
  • Line 4 - Name - The name of the watermark you want to update.
  • Line 7 - Watermark - The new watermark and value.
A lot happens behind the scenes you should be aware of:
  • Updates are debounced which means that rapid updates don't all push their values at once to any subscribers of the data updates.
    1. 1.
      Let's say you processed 10 records in a matter of 50 milliseconds and every record you updated the watermark so it went from the initial value of 0 to 10.
    2. 2.
      After about a second, all of the subscribers would get a single updated watermark event with the value of 10.
    3. 3.
      Debouncing reduces the number of calls both to the filesystem and to your subscribers
  • Updates are thread safe as they perform internal item locks before writing to disk and notifying subscribers.

Get

Getting the watermark only requires the Name of the watermark to retrieve.
Watermarking.GetWatermark("IntegrationOffset").GetDateTimeOffset()
If there are inflight updates to the watermark, this call may block the thread for a second or two while those debounces get processed. If there are no inflight updates, it returns immediately.

Credential Store

Another primary concept of writing integrations is using credentials to get access tokens and authorization for other systems. The most common example of this would be using OAuth 2.0 to retrieve an access or bearer token that expires in the future.
Credentials are all registered on startup and are usually given an expiration date. This allows for any process later in the application to ask the manager to retrieve a credential. The below example shows how to register SuperCredential, and two possible return values:
1
CredentialStore.RegisterRefresh("SuperCredential", (o) => {
2
​
3
//Call third party API
4
//Reach out to database
5
//Request keys from AWS
6
//Do anything you need to get authorization details
7
​
8
//Then return either a good credential:
9
return new CredentialStoreItem() {
10
Expiration = DateTimeOffset.Now.AddMinutes(60),
11
Authorization = "ABCDEFG",
12
Scope = "offline.access",
13
StoreA = "Custom ValueStore"
14
};
15
​
16
//Or a faulted one:
17
return new FaultedCredentialStoreItem("External ERROR", new Exception("Exception thrown while trying to get credential!!!"), retry: true);
18
});
When the application loads up or a new credential is created, the local drive is checked and all previously non expired credentials are restored immediately. This allows for applications to be safely shut down and restarted without interrupting or over-retrieving from an external API.
During the retrieval process, the CredentialStore checks for existence of the credential, it's expiration and then checks if the expiration is within N number of minutes(user defined) before returning to the caller.
//Use default settings
CredentialStore.GetCredential("SuperCredential");
​
//Set all settings on retrieval
CredentialStore.GetCredential("SuperCredential", maxRetries: 3, retryMS: 1000, expireTimeBufferSeconds: 600);
In Summary: the CredentialStore does several important things:
  • It automatically caches the authorization details to disk and retrieves them again on application reload.
  • When supplying expiration dates to a credential, it's able to renew itself when it's required before sending back the old authorization parameters.
  • The retrieval call is automatically retried to prevent a blip in connectivity from stopping a process.
  • It will automatically renew the token a defined number of minutes before expiration, allowing long running processes not to fail while in the middle of execution.
  • It integrates seamlessly with RestSharp, allowing all HTTP(S) traffic to automatically and internally pull credentials

Using Credentials - The SDK

RestSharp for HTTP(S)

To register a specific RestSharp credential, just create that type and assign the appropriate Authenticator.
CredentialStore.RegisterRefresh("ClientAPI", (o) => {
​
return new RestSharpCredentialStoreItem(
new RestSharp.Authenticators.JwtAuthenticator("MyToken"),
DateTimeOffset.UtcNow.AddMinutes(58));
});
Any time you create a client, simply assign this credential authenticator:
//Create a new client
using var client = new RestClient("https://localhost.com");
​
//Assign the CredentialAuthenticator with the name tied to the refresh
client.UseAuthenticator(new CredentialAuthenticator("ClientAPI"));
​
//Anytime you execute any requests, it's automatically maintained.
There's a whole page on this content, read more here:

Limiting

Another integration pattern is limiting the number of calls that can be made during a given period of time. This is important when rate limiting or billing may be an issue and is a perfect stop gap for a critical failure causing an erroneous number of API calls or running up an unwanted bill!
The Limiter auto resets it's count every calendar day and will not allow additional executions past the defined limit.
using Perigee;
using Perigee.Extensions;
​
PerigeeApplication.ApplicationNoInit("Limiter", (c) =>
{
​
c.AddRecurring("Limiter", (ct, l) => {
using var APILimit = new Limiter("APILimiter.json", 2, ct);
​
if (APILimit.CanExecute())
{
//Perform action
l.LogInformation("Calling API {n}/{x} executions", APILimit.GetCount(), APILimit.GetLimit());
}
else
{
l.LogInformation("Out of executions...");
}
});
});
​
/*
[16:35:07 INF] Calling API 1/2 executions
[16:35:13 INF] Calling API 2/2 executions
[16:35:18 INF] Out of executions...
*/
This is built off of the FileSyncWrite module listed below, and code for it is linked if you'd like to customize it to match your needs.
​
LimiterCode.cs
4KB
Text

Fault Tolerance

A common integration issue is that systems outside of the immediate domain will have faults, issues, disconnects, exceptions, availability or downtime. All of these issues are usually easily resolved with a fault tolerance approach to building your application.
Perigee has several classes built into it to make this a common practice within your application.

The Cancellation Token

The Cancellation Token passed alongside every ManagedThread is an important fault tolerant key to be aware of. They allow us to gracefully shut down or stop processing data, and give us an opportunity to save state before the application closes.
If you can successfully pay attention to them, they give you the opportunity you need to end your process and save state.
PerigeeApplication.ApplicationNoInit("CancelTokens", (c) => {
​
c.AddRecurring("Recurring", (ct, l) => {
​
//Since we haven't started yet, can we stop and resume later?
if (ct.IsCancellationRequested) {
//Save state or info first?
return;
}
​
//Process stuff
// ...
​
//Are we at a safe place to stop and resume later? Check the cancellation
if (ct.IsCancellationRequested) return;
});
​
});

Exit Event

Use the Exit Event alongside the cancellation tokens to store and finalize state on shut down, this allows for an easy application reload, update, or server restart. Remember Exit is called immediately before the application closes, and in some cases only allows for a few seconds to pass before ending execution. This is a last step process where data should be persisted.
PerigeeApplication.OnExit(() => {
Console.WriteLine("App is closing in a few seconds");
});

Retry and Rety Async

One of the utility class methods is a Retry and RetryAsync. They provide an easy ability to retry on exception thrown with a delay.
The response from these classes is a Tuple where
  • Item1 is a boolean indicating if it was a success(true), or failed the maximum number of times (false).
  • Item2 is the exception thrown if it was not a succeed
PerigeeApplication.ApplicationNoInit("CancelTokens", (c) => {
​
c.AddRecurring("Recurring", (ct, l) => {
​
//This will be tried 5 times, with 10 seconds between retries.
Tuple<bool, Exception> Retry = PerigeeUtil.Retry(5, (i) => {
​
bool Success = false;
if (Success == false)
{
throw new Exception("Do or do not, there is a retry");
}
​
}, 10000);
​
if (!Retry.Item1)
{
l.LogError(Retry.Item2, "Retries exceeded");
}
​
});
});

REST Retry

A lot of cases network connectivity issues can easily be dealt with by simply retrying a moment later. These handy extensions are built right alongside RestSharp to use in place. If they don't suit your needs, you can always wrap the call chain in the utility retry methods instead!
In the example below: ExecuteRetry(request, count) will execute the request at least once.
  • If a success response code is sent back, it will return and not execute the retry.
  • If a failure code is returned it will retry as many times as supplied with a 5 second delay between requests.
    • If using the CredentialAuthenticator and a 401 (unauthorized) status is returned, it will detect and invalidate the credential. This will refresh the authorization token/mechanism and retry again with the new token.
PerigeeApplication.ApplicationNoInit("CancelTokens", (c) => {
​
CredentialStore.RegisterRefresh("TokenAuth", (o) => {
// Usually this would include making a request or generating a new token,
// for this simple demo, it's creating a basic authentication header
return new RestSharpCredentialStoreItem(new HttpBasicAuthenticator("user", "pass"), DateTimeOffset.Now.AddMinutes(45));
});
​
c.AddRecurring("Recurring", (ct, l) => {
​
using RestClient rc = new RestClient("https://postman-echo.com");
RestRequest rq = new RestRequest("/status/401", Method.Get);
rc.UseAuthenticator(new CredentialAuthenticator("TokenAuth"));
//Execute retry will retry 2(n) times, and if the response code is a 401 (unauthorized)
// it will automatically invalidate the credential and refresh the token
rc.ExecuteRetry(rq, 2);
​
});
});

FileSyncWrite

FSW for short is the primary class suited for synchronizing data to the local drive. It's explained in detail here. In short, you supply the CancellationToken from the thread and it safely handles the file locking and writing of newly updated data on a regularly scheduled basis and before the application closes.
This provides a fantastic way to keep state or progress on any tasks ongoing.
If you run the below demo multiple times, you'll see the count being restored, incremented and shut down. There's a new file with the data located at bin\debug\sync\local.json.
//Declare a cancellation token source to control the FSW.
//If using FSW within a ManagedThread, use the passed cancellation token
CancellationTokenSource CTS = new CancellationTokenSource();
​
//Local persist data
LocalPersistData localData = new LocalPersistData();
​
//Declare a new FileSyncWrite, it's generic T type can be any class that supports new()
FileSyncWrite<LocalPersistData> _FSW = new FileSyncWrite<LocalPersistData>($"sync{Path.DirectorySeparatorChar}local.json",
CTS.Token, //The token mentioned above
​
rollingDelay: 1000, //Rolling delay is bumped every time an Update() is pushed in.
maximumDelay: 10000, // Maximum delay is if rolling delay is never expired
// When either event is satisfied, the data is written to disk
​
(s,data) => { } //This is an updated event callback. Any time the data is written, this is called
​
);
​
​
//If initialized, then a value was loaded back in from the disk.
//If this is FALSE, no value was loaded in
if (_FSW.InitializedValue)
{
localData = _FSW.Get();
Console.WriteLine($"FSW Initialized: {localData.Count}, {localData.Offset}");
}
​
//Register an additional callback
_FSW.UpdatedEvent += (object s, LocalPersistData e) =>
{
if (e != null)
Console.WriteLine($"FSW Updated: {e.Count}, {e.Offset}");
};
​
//Push debounced updates to it:
localData.Count++;
_FSW.Update(localData);
​
//Tell the FSW to end all pending update queues and sync data back now
CTS.Cancel();
​
Task.Delay(8000).Wait();
return;
The class used above is simply two properties with getter/setters:
public class LocalPersistData
{
public int Offset { get; set; }
public int Count { get; set; }
}

Thread Conditions

Thread Conditions provide a way to cancel processes if a condition is not met. It's a great way to turn off a service is the network is down or the hard drive is too full. They're fully customizable and allow you to set your own logic for how they should be run.

Transaction Coordination

Transaction Coordinator is the big boss of all of the above methods. It implements cancellation tokens, exit events, retries, the FSW modal, and local+remote two way synchronization.
Think of the coordinator as a way of defining the steps of a process in logical tiny blocks, and always verifying you move through those blocks in the correct order while never missing a step, or data in the process.
It's so fault tolerant you can terminate the running application, and restart it at any point and it will pick up where it left off. Impressive eh?
To see this better in action, check out it's page!
​