Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Perigee Applications are created around two main concepts: Threads and Events.
Events are the source for starting a process:
An API call
An email received
A scheduled task at a specific date/time
A file dropped on disk
A new message from a queue
A task recurring every minute
Events live and execute their code within Managed Threads.
We use the Perigee Application Callbacks to tell our application what events we want to subscribe to, and Perigee automatically manages the relevant threads for you.
Threads are managed independent code blocks, completely separate from other threads. This is a key part of the application design process, as one thread can be shut down without affecting another thread. One thread can crash without taking down the application or other running threads.
Threads are managed by Perigee internally. This allows for the application to track its running state, restart them if they're down, and allows programmatic access to manage them.
Threads provide a clear separation of running code that does not allow one running block to cancel or crash another. By designing applications that use this separation of logic, we can create highly customizable, scalable, and fault-tolerant applications where a fault in one process doesn't take down other processes.
Threads are configured during the Perigee Application Callbacks phase. We have some options during this startup, and we aren't forced to start threads immediately. They may be configured to start later or from a different event.
Imagine for a moment you have a directory watcher to read all the account PDFs from a folder and securely upload them to a third party for processing. Let's take a look at the power of managed threads and how they can help you and your business.
Inherently, you've got several points of failure and things to consider in this scenario:
The drive the documents live on may fail
The third party may be offline, or down
We may need to perform maintenance on the server
There may be an emergency call to stop processing documents
All of the above scenarios are easily handled within Perigee. Let's create a watcher to notify us of a new PDF on our drive.
Much of the difficult code is handled within the Directory Watcher
. As we progress through the "Hello" series, we will be covering all of the fault-tolerant design patterns to follow. Let's take a look at a few of the mentioned issues above:
Maybe you need to take that processor down and perform maintenance. You can just shut down that thread, and leave the remainder of the application running and untouched!
The way you implement this logic is up to you. Maybe you decide to expose an internal, authorized endpoint to shut down or restart that process:
Maybe the drive had a fault and is now offline, which caused the thread itself to go offline. Perigee implements several powerful ways to assist in the restart of this process. Another powerful feature of threads is that we're able to set up and configure how threads should react when they crash. Below is an example of adding the optional callback for restarting a down thread.
Line 5 The first callback () => {}
allows you to perform logic before telling Perigee to restart that thread. If you return true, the thread will be started again.
Line 13 restartTimerDelay
is to tell Perigee how long to wait until checking the callback again, if it returns false.
Line 14 shutdownWaitMS
is how long Perigee will wait until dropping the thread when attempting to cancel it.
Line 17 started
is an optional way to tell Perigee only to configure and store the thread, but do not start it yet. You have control over how to start this later.
Simply press ctrl-c if you're within a console application and wait while Perigee gracefully cancels all running tasks. No data will be lost, and everything will be persisted to disk. You'll see the logs indicate what is currently running, and what is awaiting shutdown.
You may want to write custom logic to shut the app down gracefully as well. Check out the Thread Registry section to see how to cancel the parent token.
Whether you are writing an integration, data pipeline, worker bot, synchronization agent, data process, or even an order transaction coordinator, they all have an event source. There are only two types of event sources. Let's look at each of them:
Triggered event sources are used in most types of applications. They can be new accounts created in Salesforce, an email arriving in an inbox, or a new message in a queue. They act as a starting point for additional logic and are events created outside of our control. Here are a few more examples:
A directory watcher receiving a new file.
An email client getting an email.
A Kafka topic receiving a new message.
Salesforce pushtopic getting data.
An API with JSON data posted to it.
Scheduled event sources are everything not triggered by external/internal sources. Typically these are CRON jobs, although Perigee has a few additional scheduling types to use. A few examples of using these:
We want a data process to kick off at 4 AM.
We need to check every hour for new data on the network drive.
We pull an external API every 5 minutes for the status update of our request
Now that we have a basic understanding of how event sources work, let's look at what that might look like inside Perigee.
These are just a few of the ways you can integrate different kinds of events inside Perigee. We'll cover all of them in examples and blueprints in more detail.
Every single one of the .Add ... ()
methods contain a callback action of when to process an event.
For the CRON, this callback is when we're within the time and timezone to execute.
For the IMAP Watch, it's when we receive a new email in the inbox.
For the Directory Watch, it's when a new file is present and is no longer being written to
All of the callbacks within perigee contain a Cancellation Token and an ILogger. See the below example:
The Cancellation Token will get flagged cancelled when a shutdown event has occurred, and the process is expected to exit soon. Pay attention to this if you want to be able to gracefully shutdown, or cancel tasks in process. You can store your data or finish your process. If you're able to safely stop processing, you can respect it's request to shut down and end execution to return back to Perigee.
The ILogger is a Serilog enabled logging source, enabling you to write your logs to the console, files, external data sources, and many other sinks. Serilog is awesome, go check them out!
Line 2 - The initialization callback is how we inject and define our properties. Maybe our application requires a connection string, a custom property loader, or API credentials. This initialization block is how we inject additional configuration before the primary configuration block is called.
The init
parameter is simply an IConfiguration
. This allows full access to your appsettings.json
file for reading any configuration values you need to set up additional components.
Line 5 - Perigee has a primary configuration callback defined every time you create an application. This block is where we define how and what the application does. More specifically, we define what event sources we use and any managed threads we want to create.
For configuring a Perigee application, we've created an easy-to-use SDK, this is why you get the taskConfig
parameter, which is the ThreadRegistry.
When building an application it is very important to have information communicated clearly about what is going on inside your application. Whether this is a success message or a critical failure that needs someones attention.
We use Serilog behind the scenes. So any configuration you can perform with Serilog, also applies here.
Serilog can output it's data to all kinds of sources. The console, Azure, Amazon, Elastic, Log4Net, Loggly, NLog, SQL Server, file... just to name a few. These sources are called Log Sinks.
Any time a sink is added to the configuration, all logging within Perigee automatically sends it's log data to the configured sources. This simple configuration strategy makes it incredibly easy to send log data wherever you need it.
We recommend doing most of this configuration within the appsettings.json file, so let's take a look at configuring a simple logger.
In the Hello Configuration Section we configured the console sink to have a different logging template. using the following section of code:
The first tag, MinimumLevel
- This describes what events get emitted to the sinks. At Debug
, anything logged at the Verbose
level is ignored. It's great for controlling what types of messages are visible.
The seconds tag, WriteTo
is the array of configured logging sinks. You can control these sinks with the various settings they accept and can be found on the Serilog Sink Configuration page for the given sink.
If you configure another sink like the Microsft SQL Server sink, don't forget to also add the relevent nuget package to your project or it will be ignored!
1) Add the appropriate object to the AppSettings.Serilog.WriteTo
array:
2) Install the package with Nuget:
Install-Package
Serilog.Sinks.MSSqlServer
3) You're now ready to log away as usual!
The hosting configuration also includes a few additional keys, let's take a look here:
This time, the MinimumLevel
is an object, not a string. This allows us to set the default logging level, as well as override the logging levels for several other namespaces.
The overrides supplied here will squash the incredible amount of information being logged from ASPNetCore hosting by default.
Logging is built throughout Perigee. Any thread that is managed and created is automatically sent a logger with the appropriate configurations.
We can easily get a logger directly from the ThreadRegistry
any time we want outside of a ManagedThread. This logger is tied into the Serilog system and will automatically inherit the configuration defined.
As shown above, you can override the logging template with additional parameters. Here we've added the ThreadName
to the logging template. The reason this works out of the box is because Perigee's internal thread management system injects this log scope before sending an ILogger
back.
You can add as many custom log scopes as you need. They will appear differently for different log sinks. This is very helpful when adding a BatchID
, CorrelationID
, SystemID
, NetworkID
or other identifying information to the log request.
If logging to a database with an additional column added to the logging table - any matching log scope will fill that column value
If logging to something like Splunk, you'll see those log scopes present in the additional log data
If an overriden template in the console logger includes the custom scope, it will be written when present
Make sure to setup your appsettings.json
with the modified logging template:
Then add a simple recurring method and override the ThreadName supplied by Perigee:
The output will look like this:
Over the final few years in development we have worked closely with other companies to test, refine, and apply Perigee to their business needs. Here are a few testimonials of successfully implementing Perigee in their current production applications.
With the assistance of Perigee we utilized the powerful set of parallel processing libraries to speed up our customer data transformation process by a little over %500,000 - from hours to microseconds. Perigee made this possible for us.
Perigee allowed us to schedule our complicated daily refresh processes with ease. The “Sync Agents” allowed us to build up a tiered and structured refresh system that follows our requirements. We get reports sent to our team with the status of every job after they complete so we can easily monitor the process. It’s been running for almost a year now and through the logging has helped us identify multiple outages on our third party system.
Perigee serves as the foundation for our daily integration and data mapping between source systems. It enables us to schedule synchronizations daily, with the flexibility to initiate ad-hoc syncs as needed. The powerful logging allows us to query our database for each step of the refresh process on a per-transaction basis. In case of any issues, we receive instant notifications on our devices, making us aware of system outages within minutes.
We used Perigee to work directly with Microsoft Graph and Azure Service Bus to enable a custom Sharepoint solution for our analysts. Perigee handles the daily Authentication requirements, and provides us with an easy way to temporarily turn off the service bus when applying patches to our code base. We saved several hundred development hours by using Perigee.
Our email bot has processed thousands of files over the past 8 months, all thanks to Perigee's IMAP connector. It features a robust SDK that allows us to receive files, respond with dynamic HTML, and process the results as we get new requests. Perigee enabled our team to finish this task with minimal effort and saved weeks of development time.
Perigee has empowered our company to efficiently process and prepare large hierarchical documents, owing to its parallel processing and NestedSet helpers. We can effortlessly query the document hierarchy and pose complex questions from the UI, which would be impossible without the data preparation carried out by Perigee.
We build a powerful transformation engine powered by the incredible CSV reader provided by Perigee. We're able to load, process, and transform customer data at lightning speed - far better than anything we tried previously.
Go ahead and create a new .NET 5/6+ console application. Open up Program.cs
, and head to the first step to get started!
If you haven't gone through the installation step, please do so first!
Let's start with the basics. First let's create a new application in Program.cs
. Depending on the version of .NET you started with, you'll see two different things when you open Program.cs
The .NET 6 ++ version looks like this:
If this is the case, delete Line 2 - Console.WriteLine("Hello, World!");
and start there.
The .NET 5 and below versions looks like this:
If this is the case, start coding on Line 6.
Now that we know where to start, here's a simple Perigee application.
Let's look what what we have here in the constructor:
Line 2 - "FirstApp"
- The name of the application, this can be whatever you like!
Line 3 - taskConfig block
- This block is where we add any and all thread managed tasks.
This wouldn't be a hello tutorial without writing "Hello, World!", so let's add a CRON method to log Hello Perigee
every 15 seconds!
The .AddCron
method adds a new thread to the system. Each thread is managed by Perigee and is independent of every other thread. This is an important aspect of Perigee Application Design as it allows for individual thread management. Threads do not affect each other, and Perigee's internal thread management system has mechanisms in place to automatically restart downed threads.
Line 4 - We use fluent syntax to .AddCRON()
- This method takes a name
, CRON string
, and a callback(cancelToken, ILogger)
.
Line 5 - We use the built in logger passed to us to log information in a templated format, passing in the name of the application to the logger.
Running the application produces a log new line every 15 seconds!
To close this application using graceful shutdown, press CTRL-C, it will start the safe shutdown procedure allowing the application to properly stop all running tasks before just exiting out.
Simply replace .AddCron()
with the directory watcher instead, full code below:
Here's a sample.csv
:
Voila! You're now watching for CSV files, reading them in, and reporting on the number of rows and columns.
For an intro to Perigee we accomplished quite a lot.
We learned about threads - how they're added and independent of each other.
We learned about logging information to the available sinks.
CRON strings and how to use a CRON thread.
How to configure Perigee.
What "Graceful Shutdown" looks like.
We even got to see the CSV reader in action.
Let's hop over to the next section and continue!
Find the package manager and type install-package Perigee
. It's available on NuGet.
To install your .license file, simply add it to the projects root folder and set it to "Copy Always".
Installation on a deployed application is exactly the same! Make sure the .license is in the executable directory.
To actually use the logging, we need a basic appsettings.json file in the project.
A) Either drop the included sample configuration into the project
B) OR - Create the file (appsettings.json) in the root of the project with the code below:
We will cover additional logging and configuration in /HelloLogs later!
Since we use Serilog, you may be inclined to use those namespaces at the top of your code. Typically speaking, we import the Microsoft.Extensions.Logging
package instead. Since Serilog extends that generic functionality, we can do everything we need to with its inclusion. To get access to the logging functions, put this include at the top of the project:
That's it! Perigee is installed and ready to go. Head to the next section, let's write our first "Hello Perigee!" application
Lets take a closer look at how the configuration works within Perigee.
To understand the configuration process we need to understand how the sections work and why we separate them. To keep it concise, sections create a logical separation of values for the different processes in our application.
As an example we have a section called Serilog. This section is entirely dedicated to setting up our logging, where we send logs to and in what format. It wouldn't make any sense to put a message queue URL or an API key inside of this section.
We create a logical separation between our logging needs and a specific threads' requirements by keeping those configuration values separate.
Configuration sections can be registered in our application in a number of ways including but not limited to:
Environment variables
main(string[] args)
appsettings.json file
Custom providers
Back in the Installation and Project Setup we used a configuration file. A section is a key off of the root json node and as you can see there are 4 different sections in our startup file.
Our goal is to write an application that logs out our custom configuration sections. We will do so by reading the configuration directly at runtime as well as binding the configuration section to a custom class.
You will also see some helpful ways you can plug the configuration system into managed threads
To start, we need to add another section to our appsettings.json file called HelloConfig and give it some values to read.
This creates a section for us to read in our code. We will read it directly by calling taskConfig.GetValue<>()
.
This handy method is a quick way to reference ALL of the incoming configuration providers and it works by supplying two things:
The C# Type - This could be a string, or int, or whatever other value type you're reading.
A special format: Section:Key.
To read the Name from our configuration section the Type would be string
and our Section:Key would be: HelloConfig:Name
Here's a fully working Perigee application
Line 1 - This is a simplified Perigee start method that skips IPC tokens and initialization actions.
Line 3 - AddRecurring
simply adds a managed thread that is automatically called every n milliseconds (supplied by an optional parameter, default 5000 (5 seconds))
Line 5 - We log out the configuration value reading it live at runtime.
You'll see below the results of running this application. We get a log every 5 seconds with the value captured from our configuration file.
Next let's look at binding the section we added to a custom class. Here's the file if you prefer to import it.
The class simply has 3 properties that are an exact case sensitive name match to the configuration section we created.
To bind a class, simply use the taskConfig
and call GetConfigurationAs<>()
:
The Generic T Parmater
, HelloConfig, is the class we're creating and assigning. It's a type reference.
The string
, "HelloConfig", is what section to get from our configuration so that we can assign it's properties to the referenced type.
This is all that is needed to bind our configuration section to a class. Here's the full application up to this point!
Line 10 - We got the configuration as a custom class by binding to it.
Line 11 - We use the custom classes properties to write to our log.
Perigee will, by default, hot-reload the appsettings.json
files. This is a fantastic way to give control to a configuration file to enable or disable a thread without shutting down the entire application.
Perigee has a great helper method built into it which enables the thread-to-configuration linking for enabling or disabling managed threads through a configuration boolean.
Our example config section we added (HelloConfig
) which has a key called Enabled
, and we're going to use that Boolean to tell Perigee whether or not to start or stop our thread.
Line 12 - We added started: false
to the methods optional parameter so that with the addition of the linking command, it will be in charge of starting or stopping this thread.
LinkToConfig
was added and we supplied our Section:Key to it.
Because configurations are loaded, hot-reloaded, and maintained internally by the configuration system, please also use the started: false
flag on the thread itself. This will prevent the thread from starting while the configuration value is "false".
The "debugging" version of the app config should be at: {projectRoot}\bin\Debug\net{version}\appsettings.json
If you change the appsettings.json file in your project you aren't actually modifying the version of the file the debugged application is reading.
If you'd prefer more control over how to start or stop your threads, or even make different decisions based on the configuration changes, you can always subscribe the change notification directly. .LinkToConfig()
is essentially the same code as below but managed by Perigee internally and automatically, saving you from a few lines of code.
Perigee will automatically add an additional config property block to each of it's loggers that it passes to the managed threads. This property block is called ThreadName. As a fun aside, let's update our logging template to include this property block.
Open the appsettings.json file and under the Serilog
section, update the console sink to include the updated outputTemplate
.
Now our application's logs have the name of the managed thread they are coming from right in the log!
You can add your own properties to the loggers, we'll show you how on the Hello Logs page.
Perigee has several ways built in that allow you to encrypt your configuration values which may provide an extra layer of security/obscurity. It uses AES256 bit encryption on the values for secure encrypting and decrypting values and has built in methods for reading them.
To secure a value you must provide the encryption key, encryption IV, and the source of where to find these values.
There are 3 sources for the Encryption keys
Environment Variable source
Arguments Source
Variable Source
Here's an example of setting this up all within code using the variable source. The methods are exactly the same though if you decide to pass in the encryption key from an argument, or read it from an environment variable.
With that done you should see something like this printed to the console:
These are your two encryption keys, and we will use them to decrypt values in the next step. Remember you can easily put these in the environment variables, command line arguments, or even set the variables in code. For the ease of this demonstration, we'll assign them directly:
If you're reading encrypted configuration values, there's a handy little helper than does the read and decrypt in one pass:
There's always an option of rolling your own or using custom methods of reading/decrypting values. If you want to see how to implement a custom property loader then check out the link!
Hopefully you understand a bit better about how configuration, sections, and providers work as well as how to use them in code.
PRO TIP: If you plan to support hot-reloading, never cache the configuration values in such a way that a hot-reload would be ignored. If you're binding a configuration section to a class do so at the time you need it and dispose of the class when you're finished so that in the event the configuration section is changed between iterations of your code you get the updated values by rebinding.
Make sure you have everything included in your using
statements by clicking the icon or press ctrl+.
to find missing namespaces
My oh my, we logged something! Exciting right? Let's take our first demo application a step further and watch for CSV files, read them in, and report the row and data counts.
Make sure you have everything included in your using
statements by clicking the icon or press ctrl+.
to find missing namespaces
You can always click the icon or press ctrl+.
to find missing namespaces:
Notice the comma after the section? Make sure when you add new sections you keep the file valid JSON
Now if you run the application and navigate over to the debug folder to open up the appsettings.json file, you can change that HelloConfig:Enabled value from true
to false
or false
to true
and watch Perigee start and gracefully stop that thread
If you're having issues getting a Perigee application to start or load properly, check the below items:
The license check run's immediately on application startup, and if a valid license is not detected, it will immediately shut down the application.
Make sure you have a valid license, and your trial period is not expired
Make sure you're copying that license file to the output directory, or it is installed on the server.
If a Serilog sink is configured to startup on a remote source (like a database), Please verify that it has an open connection and there are no firewalls preventing Serilog from initializing.
It's very easy to add custom providers.
It's very easy to add custom providers. Remember that .NET creates a ConfigurationSource that implements and returns a ConfigurationProvider.
You can see this pattern here: Implement a custom configuration provider.
To implement a custom source and provider only requires a single line of code.
Call the ConfigureConfiguration method.
This callback sends you two parameters:
IConfigurationBuilder This builder allows you to add a custom source.
EnivronmentString the string variable that is the current environment.
Simply add a new instance of the source. (See line 5)
You can see how easy it is in the future to reload the providers. You can reload all of them or specific types by supplying that type information.
If you're trying to load values from MSSQL then Perigee ships with a loader ready to go in only a single line of code. Simply call the configuration method and supply the connection string, query, and prefix.
SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
To obtain a valid license after the trial period, visit https://Perigee.software to purchase and view license terms and conditions.
The packages listed are licenced under Apache 2.0: http://www.apache.org/licenses/LICENSE-2.0
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
The contents of this file are subject to the Mozilla Public License Version 1.1 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.mozilla.org/MPL/
Software distributed under the License is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License.
The MIT License (MIT)
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Let's take a look at how you might extend Perigee to fit your companies very specific needs. Although we ship with many different connectors, watchers, agents, and synchronization features you may want to add something to the Perigee core system. It is very easy to implement custom threads. This is one the reasons Perigee is so powerful as we don't lock you out of adding and customizing the application.
There are two types of custom threads you can add where both have different execution rules as they allow you create any additional functionality needed for your specific requirements.
The first type is a managed thread. The ManagedThread calls the callback method when it is started, and doesn't expect that method to return, throw an exception, or exit until the cancellation token is canceled.
When the callback does exit before token cancelation, it will restart the thread a second later expecting the process to start again. If an exception is thrown and is uncaught within the callback, it will be caught in the ManagedThread, and the ExceptionRestartTime property of the manager will be awaited before restarting.
Below is a fully working extension that adds a new method to call from TaskConfig
.
Then to use this custom method on startup.
This is nearly identical to the managed thread, however it's execution is built on a CRON expression. This changes the way the callback functions slightly.
The Callback Method is ONLY called the expression is at execution time. This means the process is expected to kick off, and end by returning out of the method.
If an exception is thrown, it will be caught in ManagedThread, but no additional delay is added as the next execution period of the CRON expression will simply call the callback again.
The takeaway about the two types:
Expression Threads: Get called, expect to exit.
Managed Threads: Get called, don't expect to exit, and will trigger the restart functionality on uncaught exceptions.
To purchase a license, please visit https://perigee.software.
Contact us at Sales@perigee.software. We are happy to hear from you!
This is the last of the hello series. We're going to cover the final few topics you'll need to really get started. Let's jump in!
Watermarking is simply storing a key/value pair from last execution and recalling it again next time it is needed. We use this when interfacing with another system we need to synchronize data to or from. A few examples of this are:
A Mail sync client needs to know what the UIDNEXT, or MODSEQ integers are.
If you're synchronizing data changes to SharePoint, you'll want to store the delta link as a string.
If you're pulling data by a last modified date, you'll want to make sure you store a DateTime(Offset).
These are just a few examples of how you would use watermarking in a real world production application.
We make working with watermarks quite easy and they are very powerful.
Our watermarks are persisted locally to disk automatically and recalled on application startup.
They are debounced (We cover this in the updating section).
They are thread safe.
You can register as many subscribers as you need to the data feed.
You may push that data externally on update, and recall it on initialization for further backup.
The register call should be performed at application initialization if possible to avoid a race condition of pulling for it's value before it is registered. Requesting a watermark before it is registered will throw an exception .
Registering a watermark is an easy one liner. Let's look at the parameters:
Line 4 - Name - The name of the watermark
Line 7 - Func<Watermark> Initialization callback - If the value does not exist, the initialization callback is called and an initial value is set.
Line 10 - Action<Watermark> onUpdate? - An optional callback for when the value is updated.
To create the other base data types, simply provide that type in the initialization callback.
That's it. The underlying system does the heavy lifting of persisting this locally under the /watermarks
folder beside it's running application.
You're able to register as many "subscribers" to the data changes you desire in other parts of the code. These will be called on a background thread with the updated value.
The RegisterAdditionalEventHandler
simply takes two parameters:
Line 4 - Name - The name of the watermark you want to update.
Line 7 - EventHandler<Watermark> - This sends you the (sender, watermark)
back in the callback and you're able to read the value from there
The UpdateWatermark
call takes two parameters:
Line 4 - Name - The name of the watermark you want to update.
Line 7 - Watermark - The new watermark and value.
A lot happens behind the scenes you should be aware of:
Updates are debounced which means that rapid updates don't all push their values at once to any subscribers of the data updates.
Let's say you processed 10 records in a matter of 50 milliseconds and every record you updated the watermark so it went from the initial value of 0
to 10
.
After about a second, all of the subscribers would get a single updated watermark event with the value of 10
.
Debouncing reduces the number of calls both to the filesystem and to your subscribers
Updates are thread safe as they perform internal item locks before writing to disk and notifying subscribers.
Getting the watermark only requires the Name of the watermark to retrieve.
If there are inflight updates to the watermark, this call may block the thread for a second or two while those debounces get processed. If there are no inflight updates, it returns immediately.
Another primary concept of writing integrations is using credentials to get access tokens and authorization for other systems. The most common example of this would be using OAuth 2.0 to retrieve an access or bearer token that expires in the future.
Credentials are all registered on startup and are usually given an expiration date. This allows for any process later in the application to ask the manager to retrieve a credential. The below example shows how to register SuperCredential, and two possible return values:
When the application loads up or a new credential is created, the local drive is checked and all previously non expired credentials are restored immediately. This allows for applications to be safely shut down and restarted without interrupting or over-retrieving from an external API.
During the retrieval process, the CredentialStore checks for existence of the credential, it's expiration and then checks if the expiration is within N number of minutes(user defined) before returning to the caller.
In Summary: the CredentialStore does several important things:
It automatically caches the authorization details to disk and retrieves them again on application reload.
When supplying expiration dates to a credential, it's able to renew itself when it's required before sending back the old authorization parameters.
The retrieval call is automatically retried to prevent a blip in connectivity from stopping a process.
It will automatically renew the token a defined number of minutes before expiration, allowing long running processes not to fail while in the middle of execution.
It integrates seamlessly with RestSharp, allowing all HTTP(S) traffic to automatically and internally pull credentials
To register a specific RestSharp credential, just create that type and assign the appropriate Authenticator.
Any time you create a client, simply assign this credential authenticator:
There's a whole page on this content, read more here:
Another integration pattern is limiting the number of calls that can be made during a given period of time. This is important when rate limiting or billing may be an issue and is a perfect stop gap for a critical failure causing an erroneous number of API calls or running up an unwanted bill!
The Limiter auto resets it's count every calendar day and will not allow additional executions past the defined limit.
A common integration issue is that systems outside of the immediate domain will have faults, issues, disconnects, exceptions, availability or downtime. All of these issues are usually easily resolved with a fault tolerance approach to building your application.
Perigee has several classes built into it to make this a common practice within your application.
The Cancellation Token passed alongside every ManagedThread is an important fault tolerant key to be aware of. They allow us to gracefully shut down or stop processing data, and give us an opportunity to save state before the application closes.
If you can successfully pay attention to them, they give you the opportunity you need to end your process and save state.
Use the Exit Event alongside the cancellation tokens to store and finalize state on shut down, this allows for an easy application reload, update, or server restart. Remember Exit is called immediately before the application closes, and in some cases only allows for a few seconds to pass before ending execution. This is a last step process where data should be persisted.
One of the utility class methods is a Retry and RetryAsync. They provide an easy ability to retry on exception thrown with a delay.
The response from these classes is a Tuple where
Item1
is a boolean indicating if it was a success(true), or failed the maximum number of times (false).
Item2
is the exception thrown if it was not a succeed
A lot of cases network connectivity issues can easily be dealt with by simply retrying a moment later. These handy extensions are built right alongside RestSharp to use in place. If they don't suit your needs, you can always wrap the call chain in the utility retry methods instead!
In the example below: ExecuteRetry(request, count)
will execute the request at least once.
If a success response code is sent back, it will return and not execute the retry.
If a failure code is returned it will retry as many times as supplied with a 5 second delay between requests.
If using the CredentialAuthenticator
and a 401 (unauthorized) status is returned, it will detect and invalidate the credential. This will refresh the authorization token/mechanism and retry again with the new token.
FSW for short is the primary class suited for synchronizing data to the local drive. It's explained in detail here. In short, you supply the CancellationToken
from the thread and it safely handles the file locking and writing of newly updated data on a regularly scheduled basis and before the application closes.
This provides a fantastic way to keep state or progress on any tasks ongoing.
If you run the below demo multiple times, you'll see the count being restored, incremented and shut down. There's a new file with the data located at bin\debug\sync\local.json
.
The class used above is simply two properties with getter/setters:
Thread Conditions provide a way to cancel processes if a condition is not met. It's a great way to turn off a service is the network is down or the hard drive is too full. They're fully customizable and allow you to set your own logic for how they should be run.
Transaction Coordinator is the big boss of all of the above methods. It implements cancellation tokens, exit events, retries, the FSW modal, and local+remote two way synchronization.
Think of the coordinator as a way of defining the steps of a process in logical tiny blocks, and always verifying you move through those blocks in the correct order while never missing a step, or data in the process.
It's so fault tolerant you can terminate the running application, and restart it at any point and it will pick up where it left off. Impressive eh?
To see this better in action, check out it's page!
This static class enables users to create an API specification using both custom specifications as well as specifications derived from Postman integration. Most notably, it provides a way to fully generate a complete Web API given an APISpecification.
This method allows users to manually build an API Specification using a fluent syntax.
You can easily add endpoints after declaring the specification like so:
This method allows users to build an API Specification from Postman. This is useful for integrating existing Postman collections into new or existing projects.
If your collection contains {{environment variables}}, please supply the environment.json as well as the collection.json.
This method generates a full web API given an APISpecification. It requires the specification object, the project path, as well as optional parameters for IIS, HTTP and HTTPS Ports, additional installs, additional App Settings, a GPT Token and Model for generative AI, and boolean flags for generating Controller and Method summaries.
API Generation empowers developers to accelerate the development process of setting up a new API significantly. Whether you're creating an API for the first time or you're a seasoned veteran, crafting an API is more straightforward with the tools provided by Perigee.
API Generation refers to the creation of an API from a source document, such as Postman, or directly in code using the fluent syntax, without the in-depth knowledge required to configure a new API from scratch. It might seem too good to be true...
Did you know you could save hundreds of hours by generating an API from a JSON source?
When contemplating your next API project, here are some points to consider:
Mocking is the procedure of emulating a production API and creating a "shadow copy" for testing purposes. This ensures tests are conducted without the repercussions of calling the actual API. Such capability grants developers complete control over integrations. They can alter API responses, introduce delays, and disable parts of the API, facilitating better integration development from the outset.
Embarking on a new API for testing or production? Feed the generator a set of JSON documents and gain a substantial advantage in your new venture. Sidestep the tedious hours spent on configuring authorization, settings, or Swagger examples—let the code handle it!
Eliminate concerns over model typos or overlooked code. We generate everything based on a model registry that ensures the output is pristine C# code.
In our demo, we replicated the DropBox API, resulting in approximately 35,000 lines of code, encompassing:
Class Models
Documentation
Controllers and Actions
Swagger Samples
Comments
Authorization
Ponder over this: How long would it take your developers to rewrite 35,000 lines of code—error-free, typo-free, and meticulously documented? A lot of caffeine might be needed to answer that!
Curious about how OpenAI's ChatGPT API hinting operates? Here's an insight:
Consider an API method you've set up named EchoMessage
. The model defined for this is a class named Echo
that comprises a message
string.
Your AI hint suggests: "Take my echo message method and return a JSON object that sends the user's message back to them."
In response, the API Generator composes a specific message for GPT, containing your input/output models and the organized method, providing GPT with the complete context to fulfill your request. It appears like:
GPT could suggest the following improved code (this is an actual response from GPT!):
Impressive, isn't it?
Both application starting points are nearly identical. The full callback has an IPC token and the initialize callback. Both are described in Hello Perigee.
You may also run the ApplicationNoInit
as an async method as well:
Sometimes you need to initialize outside of the scope of a PerigeeApplication. We do this for API applications, blazor applications, and other scenarios where we need to call something before Perigee.
To get information about the host it's running on, there is a single method:
The response class, HostStatistics
contains properties for the machine it's running on including machine name, drive info, cpu time and running directory.
To get an exit callback, use the helper method supplied.
After return is executed and the domain/service is about to unload, OnExit
has a few seconds to perform any last tasks, like persisting data.
Let's take a look at what sources we may use and create the configuration, credentials, and refreshes we need to use.
Do we want to put connection strings in the appsettings.json and read them?
Create the json sections and keys
Optionally, encrypt them
Do we want to implement a property loader?
See Property Loaders
Do we communicate with anything on the web?
Read all about the HTTP(S) communication and extensions here.
Do we need to define additional services to dependency inject?
See the Core Module: Application.
Do you need to have things run on a timer?
Check out the CRON thread.
Have a scheduler pick up items from a remote source, like a database.
Have a network of Sync Agent's communicating and performing refresh tasks.
Do you need to coordinate a list of steps and NEVER have a failure cause the process to break?
Create a Transaction Coordinator.
Talk with third parties:
Do you have any instances of:
Testing Network Connectivity?
Sending notifications?
Run logic through a behavior tree?
Sort and query hierarchical data?
For this section, let's walkthrough start to finish how you might design and build your companies specific application needs.
Here's the quick bullet list of every step of this process:
Starting template
Define Sources
Requirements
Does our system need to respond to API requests?
Is data pushed to your service with POST requests?
Does it need to respond to a ping
request?
Does it provide on-demand data through restful calls?
Do you plan on implementing administrative requests for your system?
Do you want to be able to shutdown threads with an administrative login?
Do you want to check the status, runtime, or other system metrics?
If you answered "yes" to any of the above: lets start with this blueprint for built in hosting:
Otherwise we can create a standard .NET 6 project.
Not corn, but close...
CRON strings are a concise way of telling the application when to schedule or run something. Perigee uses CRON string data throughout for advanced process scheduling and events.
Most CRON strings contain five bits. Perigee's CRON can contain 6 if you use the optional seconds bit.
The ASTERISK character means "All Available Values"
For hour, this is 0-23
For Minute, this is 0-59
For Day this is 1-31
For Month this is 1-12
For Day-Of-Week this is 0-6 (Sunday being 0)
The COMMA character is a list of values
If the Month bit is 0,1,2
- Then the CRON is only run in Jan, Feb and March.
The DASH character means a range from a-z.
If the day bit is 0-6
- Then the CRON is only run the first 7 days of the month
The SLASH character is a divider of all available values.
If the second bit is */5
- Then the CRON is executed on every fifth second.
There are several special characters that can only be used in certain situations.
The F character is for FIRST OF:
This can be used in the Day or Day-Of-Week bits
In the day bit, F
- The CRON would run the first day of the month
The L character is for LAST OF:
This can be used in the Day or Day-Of-Week bits
In the day bit, L-2
- The CRON would run the second from last day of the month
The # is for day-of-week selector
This is only used in the Day-Of-Week bit:
0#2
would be the second Sunday (0 Sunday #2)
1#3
would be the third Monday (1 Monday #3)
Let's leave out the seconds since we don't need that here.
If we don't specify 0 in the minute bit the CRON it will run every minute of 5AM.
5AM is 5 in the hour bit.
The key request is "every day", so let's set the day bit to an asterisk.
Every other
This is where that special L character comes into play.
Day-Of-Week starts as 0 Sunday, so Friday would be 5.
Asking for the last friday of the month would be L5 in day-of-week bit.
If we don't specify 0 in the minute bit the CRON it will run every minute of 1AM.
1AM in the hour bit is 1
Asterisks for the other fields
This is where that special L character comes into play again.
Asking for the second to last day of of the month would be L-2 in day bit.
If we don't specify 0 in the minute bit the CRON it will run every minute of 1AM.
1AM is the hour bit is 1
Asterisks for the other bits
Call Parse on any CRON string. It will detect if seconds are present.
Get the next occurrence from a given date or supply a different time zone to find the next occurrence in that time zone.
You can use that parsed CRON to display info about it.
The only other method you may care about is the InversePeriod
. InversePeriod
is interesting as it takes the CRON string and determines how consecutively it runs.
Notice how this CRON would run from 1AM consistently at every interval (minute) inclusively to 2:59AM? This method is an interesting way to use CRON data to specify ranges of time.
There are bunch of ways to use CRON strings in Perigee. Let's take a look at a few:
SyncAgents use CRON strings to determine when to run, and even when to create blackout periods. You can see more in the linked section. Here's a quick preview!
This is a handy way of creating an awaitable task, or thread blocking mechanism using a CRON string.
As part of the core set of events in Perigee, you can easily add CRON methods.
A handy little method to print the definition of a CRON string, as well as print the next occurrence:
This is an amazing site to visually and instantly see the results of a CRON.
Every application design has requirements, and before starting a new project, or convert an old project you're going to want to define what those requirements are.
This is different for every organization and every process so in this walkthrough we'll take a few common examples and show how we might architect those scenarios.
Whether JSON, plaintext, xml, or file content, the built in .NET hosting with Perigee has you covered. Check out the blueprint here:
For this type of process we have several options.
The first option is our Sync Agent:
This handy agent is highly configurable and can use everything from recurring timers, CRON strings, blackout periods, behavior trees, inter agent communications and more.
You the designer have complete control over the system and how/when it executes the processes
They can be easily turned on or off with the SDK.
They can communicate with other defined agents and use them as dependencies to execute.
They know how to execute behavior trees to determine run conditions
The second option is a simple CRON method:
CRON methods are very lightweight and they're only job is to fire on on time.
All of the implementation details, retry logic, and execution is up to you.
The third option is the Transaction Coordinator:
The coordinator is highly fault tolerant and is typically used with a queue of requests, however they can be used to perform a singular daily task with very high tolerance.
The coordinator tracks EVERY step of the process from initialization data to final completion. It stores a highly compressed file at every step of the process and can resume locally and remotely when the application or server is stopped and restarted.
If you need to replay the same transaction later, you're able to inherit the initial request and replay it again later
If you need direct API access or to use push topics and have data streamed to your application, we have that covered here:
Use the DirectoryWatcher:
It checks file locks automatically
Verifies file size over time to prevent callbacks on actively writing content
Asynchronous file operations
Options for failures
Use the built in Scheduler:
It allows for individual rows in a database to control when a function is in code executed.
It tracks changes to the rows in real time and will adjust scheduled tasks accordingly when the rules change.
Accepts a CRON string, a timezone, and a runtime type with arguments.
This is where the Transaction Coordinator comes in.
The coordinator is highly fault tolerant and has a built in multi-threaded processor to handle a defined number of queued transactions using the FIFO ruleset.
The coordinator tracks EVERY step of the process from the initial JSON payload to every single operation that is performed afterwards.
Immediately upon queuing the transaction, the system can persist that item to disk and it is tracked and maintained in a highly compressed format that is updated as the transaction progresses.
The Coordinator tracks the responses, requests, timing and codes as part of this transactional process automatically. No more pawing through logs trying to figure out what happened.
Retry logic, automatic transaction requeuing and a rich SDK to fine tune the steps are all available to you.
If the downstream processor loses a day worth of data to faulty server, you're able able to simply replay an entire transaction process automatically by inheriting the initial JSON payload and walking through the same steps.
NEVER lose an order again.
Maybe start with the API template if you want to accept a file through an API call:
Then let's use the ExcelReader class after we get the file posted to us:
It's very fault tolerant, especially with customer created data.
It uses very advanced data detection algorithms to scan for the actual table within a tab which can bypass headers and notes left by a user.
We want to use three different methods to make this process easy:
Let's create a new Watermark that stores our DeltaKey.
This will automatically sync this delta key to the local disk, and restore it upon application restart.
It has hooks to initialize the source, and provides event handlers when the data is updated
It's very easy to synchronize the watermark to a remote source, like a database or the cloud.
Let's register our RestSharp credentials so we don't have to worry about authentication issues:
This takes care of the reauthoring the token and making sure it's ready to authorize when needed.
Anytime we create a new RestClient, we supply this to the Authenticator
of the RestClientOptions
.
Let's use a CRON scheduled thread to perform this operation on our schedule.
We now have the DeltaKey stored, maintained and refreshed on application load and shutdown.
We have the authentication token and parameters taken care of by the RestSharp Authenticators.
Run the logic:
Simply call the third party with the token and auth ->
Process any data if it exists ->
If you processed data, update the watermark ->
Let the CRON fire again and repeat.
An Expression Managed Thread is called every N seconds and is expected to exit the method call. It will repeat until the application is shut down or the thread is turned off.
Most of the Demos have the property named c
when registering a new PerigeeApplication
. The c
simply stands for config. Let's take a look at how you can access and control the managed threads within an application.
There is a single "global" token that all other tokens are joined to. This is a great place to listen for application shutdown as it's cancelled the moment a SIG_ABORT or CONTROL-C event is received.
To get the globally registered cancellation token, it's available as such:
If you were to cancel this token you will shut off the entire application and trigger a graceful shutdown event. Only do this if you want the whole application to quit.
AppName
- is available for the registered application name, if used for logging or custom logic.
Args[]
- is available if passed into the constructor, and is the application start parameters.
Configuration
- is the IConfiguration that was loaded and built after it's Configuration phase.
ServiceProvider
- The IServiceProvider after service build configuration
Event_ConfigurationUpdated
- Is the event handler for subscribing to hot-reloads. You may also use the helper method c.ConfigUpdatedEvent((nconfig) => {})
.
RegisteredTaskCount
- The number of registered threads
Hosted application task is respected and shut down alongside Perigee.
HostTask
- When hosting an API application, supply the host task and it is stored here. The demo for this functionality can be seen here:
SecureValueKey
- The secure key
SecureValueIV
- The secure IV
SecureValueSource
- The secure source, either args, environment, or direct.
To see the demo for this, check out:
To override the IConfiguration
, call ConfigureConfiguration()
. This builds the IConfiguration with the default options and whatever you've added to the builder.
If you need to configure the service provider, do so with the ConfigureServices()
callback.
If you want to register an autostart for the windows platform, it does so by adding values to the registry. The app MUST be run with administrative privileges the first time this key is set.
Get all threads from the management system
Restarts all threads in the system. This will start stopped threads, it's intended to perform a full reboot of the application and all of the threads, regardless of their current state.
You could use the RunTime
property to only restart threads that haven't been restarted in a certain period of time:
Starts or restarts a thread with the specified name.
Starts a thread with the specified name.
NOTE We highly recommend using QueueStop()
over Stop()
. The reason is that Stop is not thread safe, and may cause issues. It exists only for niche, same thread operation. Use with caution!
Queues the stop of a thread, waiting for the task to complete.
Checks whether a thread with the specified name is running.
Starts a thread with the specified name if it is not already running.
Gets a thread with the specified name, if it exists. Use with caution.
Checks if the collection contains a thread with the specified name.
Executes a CRON thread immediately by its name. Optionally, you can provide a callback to be invoked when the CRON operation is completed and the number of seconds to wait for a start operation
The best example of this is shown here:
You can link managed threads to configuration values in one of two ways:
Use the handy .LinkToConfig()
method immediately following the thread
Use the .LinkConfigToThread()
method somewhere later
Because configurations are loaded, hot-reloaded, and maintained internally by the configuration system, please also use the started: false
flag on the thread itself. This will prevent the thread from starting while the configuration value is "false".
Agents are explained fully over here:
To get an agent after it's been added to the system, call GetAgent()
.
To reconfigure an agent after it's been added to the system:
Queues an agent for remote refresh. The parameters are:
The agent name.
A boolean to say whether or not to queue if the current agent is executing.
If TRUE, it will wait for the current execution to finish and then trigger another exeucution.
If FALSE, it will not trigger another execution, but return.
An optional wait time before giving up. If the wait time is not supplied, it will wait indefinitely.
Sync agents are a more powerful and configurable ways of defining tasks that must perform under a given sequence, dependency tree, or schedule. They can be configured in a number of ways that allow for very complex and fine grain control over how and when they execute.
A few of examples of why we would use a sync agent:
A task or job needs to be fired at a given time
We need the ability to schedule something in a different time zone
We want to request that an agent performs it's refresh from a remote source (like a database)
The task is dependant on other tasks completing
We have data that may expire if not kept current
We need to supply multiple CRON strings to specify which times the task runs
We need to supply a blackout range where the task should not be executed
Agents have several main blocks to them, they are as follows:
The first callback is the configuration section. We can set all kinds of settings including:
Maximum executions per day.
A timespan of how often to execute
An array of CRON strings to specifiy when to execute
Blackout periods in the form of Timespan and CRON strings.
Setting "* 5 * * *" as a blackout CRON would disallow the agent to run during the entire fifth hour of the day (from 5:00AM to 5:59AM inclusive)
The execution callback
This callback is only ever called when the agent is in an active refresh/sync state.
You can perform whatever logic you need here, and simply return exec.Complete()
or exec.Failure()
depending on the results of your process
Tree check callback
This callback is only for late binding trees, in the below example you can see how it's used to setup a behavior tree for checking previous agent runs
In the below example we configure 3 agents.
The first is run every 5 seconds, once a day.
The second is run on the minute 0 mark, once a day.
The level 2 agent has a late binding dependency tree to check to determine whether the first two succeeded and the data is not expired. If this is true, then it runs
The code for the sync agent MSSQL source can be found here. If you're writing a connector to another database engine, this is a template and a guide on how to do so:
The code for the memory source is linked here. A great example of how to implement the source interface
The Scheduler is a way of declaring tasks to execute on a given schedule and time zone. It provides an easy to use SDK for scheduling, automatic de-scheduling and re-scheduling for event tasks.
Line 5 - Declare a new event source. This demo uses the memory scheduler.
Line 9-10 - Add the two scheduled items, A, and B, to schedule and execute
Line 13 - Add a scheduler using the source we defined, and declare the callback.
You're given a CancellationToken for respecting graceful shutdown event.
An ILogger for logging to the system and defined sink sources.
And the GenericScheduledItem<ushort> Which is the interfaced item that allows you to access it's definition values.
Line 14 - You can execute or perform any tasks you need. Like generating a report with parameters.
You can see the call to GetRunType()
. There's also GetRunArgs()
, GetLastRunDate()
, GetName()
, etc.
The Memory source stores all records, run times, and event descriptions on the path specified. It's a great way to test, debug, or play with the scheduler without having to stand up a database. Typically speaking for production scenarios, you would likely use a remote source, like a database or file to maintain the scheduled items.
MSSQL Source is the example remote source included with the Scheduler. This source pulls records from a table in MSSQL which allows for 1 database record to create and maintain 1 scheduled task.
Using the MSSQL source is very easy, it only required a registered connection string credential.
This The included scheduled item interfaced class. It's suitable for most scenarios, but as the whole callback process is interfaced, you're able to implement your own class to use if needed.
Below are the interfaced methods for a scheduled item. These methods provide an easy way to retrieve the relevant properties on a scheduled task regardless of where that event description came from (in a database, a file, remotely, etc).
This memory source shows the most simplified way possible of implementing the interface and can be used to learn how it works.
If you're writing a custom connector to another database engine, this is a great template and guide to do so:
FYI, They don't supports seconds or the special character options
To learn about behavior trees, check out the page written on within Perigee.
The demo code uses the as it does not require a remote source. Typically speaking you would tie the event task descriptions back to a database record (like the MSSQL Scheduler Source).
The code can actually be .
The Directory Notifier is able to monitor a folder and report back when a new file is present. It has several key checks in place to make sure the file isn't actively being written too, or locked by another application before executing the file ready callback.
Unlike the Directory Watch that expects the file to be processed out of the folder, notifier is intended to only signal that a file has been changed, added, or removed. It does not contain a failure policy because of this reason.
Directory Notifier is great for supporting hot-reload at runtime.
Directory Watch is great for file processing where the file is expected to be removed.
This demo will:
Watch the C:\Watch
folder.
It will report on any JSON files (.*\.json$
) - This uses and supports full Regex patterns.
It will search TopDirectoryOnly
(no subDirectories).
NotifyInitial is true
which tells the system to send notifications on all files currently in the path.
If this was false
, it would not call the callback with the pre-existing files.
SharePoint watcher is easy enough to setup and only requires the Graph authorization details.
The Watcher automatically polls for folder changes within a sharepoint library and gives you an SDK as well as a callback any time a new file is added or changed. You will also get the item in the callback if a field has been updated.
This allows for automated processes to be kicked off any time something within the watched SharePoint directory is modified.
Here's the fully configured and ready to use snippet of setting this up within Perigee.
All of the above are referencing a node in the appsettings.json
file, you would need to fill this out with your own authorization details.
tenant/appID/appSecret - These will be pulled from the Azure portal app registration.
The drivePath supplied may be blank if you're pulling files from the root of the drive. Otherwise supply the name of the folder to watch.
The site key is the full address of the sharepoint site. It's used to perform a graph lookup of the ID records required.
The listName is the name of the list behind the drive. If using the default document library, this will be "Documents
". If you've created a custom library, something like "Input", then you will need to set the listName to that instead.
The enabled flag is to turn the process on or off and is tied to the .LinkToConfig()
line above.
The sync
property sent back by the SharePoint watcher contains all of the relevant ID's needed to perform requests within SharePoint. Most importantly:
ListID
DriveID
SiteID
SiteName
Items
callbackWhen new items arrive to be processed, you'll get a list of those items in that callback. Each item in the list contains all of the properties you'd need to check the item that was changed.
To see the full list and response please check out the Microsoft documentation
Want to check if the changed notification is a file?
This call retrieves the item from a drive, and expands the response to include the list detail and fields as well
The list fields are important as they allow you to get the custom field values. In this example, we have a custom field called "Status" that we can update along the process:
To get an item if you already have the path.
If expand is true, you'll also receive the list item details along with the response.
To get an item if you already have the path for a given DriveID.
If expand is true, you'll also receive the list item details along with the response.
You can generate a direct download link to a SharePoint item by giving the Site, library, and path to the item.
The resulting string is generated and uses the SharePoint /Download.aspx
page.
To remove an item by ID:
After retrieving the List Details, you can patch any of the fields with the PatchField
command:
If you need to get a different drive, say a Processed drive, you can do so by querying for the drive:
To upload an item to a drive you must supply the DriveID
, Name
, Content
, and MIME Type
.
We'll upload a document to the Processed drive as shown here.
To download an item from SharePoint we highly recommend first getting the List Details as well as it can provide a backup download link. Then simply call download:
To call any other method not supplied here use the internal graph call client and supply the right path, the rest will be taken care of automatically.
IMAP watcher is able to watch a single inbox and give a callback any time a new message is available to process.
There's a rich SDK that enables the IMAP client to respond to messages, attach images or HTML, get the body body, create reply messages, etc.
There are two basic ways to authenticate with your IMAP server:
This is the "direct" method. If the server supports it, you can supply a username and password.
Simple Authentication Security Layer: This is for the authentication methods that require OAUTH2 or other methods.
An example of the google SASL callback:
All of the SDK references below are available when a callback occurs for new mail messages to be processed.
To get the various addresses:
The BodyBuilder
callback is how you configure the outgoing message. You can see an example here of adding an image with CIDs.
The Reply method automatically configures the method reply parameters including the correct message headers, subject, response addresses, and if includeReplyText
is true
, it will also quote the original message back as any normal client would do.
To see if the message is <FLAG>
:
To delete a message, issue the delete command.
The parameter is for expunging the message from the server as well as issuing the deleted
flag.
You can get a list of attachment parts, and iterate over them to get the actual content, mime type, name, etc.
To query the inbox:
If you need to access the sent box, we provide an easy way to retrieve and open the sent mailbox.
IMAP only allows a single mailbox to be open at once. Don't forget to call:
mail.OpenInbox();
.
This verifies and prevents future issues with any subsequent calls to the mail client.
Sometimes you just need whatever the person said, excluding their signature and reply content. This method takes several passes at retrieving just the top level user input and skipping the rest:
There are a bunch of prebuilt methods to help with a mail client. If you want to do something specific, you can get the client and folder access as so, and use any of the available methods from MailKit:
We highly recommend:
Putting a stop gap on the mail receive handler, something like IsAnswered
, as a way of preventing unwanted reprocessing of messages.
Catching the exception and attempting to mark it answered and label it error if the client supports labelling.
The Directory Watch is able to monitor a folder and report back when a new file is present. It has several key checks in place to make sure the file isn't actively being written too, or locked by another application before executing the file ready callback.
Directory Watch expects that the file will be removed from the directory after it has been processed. For this reason there are several options on the DirectoryWatchFailurePolicy on how to handle when a file hasn't been removed.
Directory Watch is great for file processing where the file is expected to be removed.
Directory Notifier is great for supporting hot-reload at runtime.
This demo will:
Watch the C:\Watch
folder
It will report on any CSV files (*.csv
)
It will search AllDirectories
(subDirectoties) as well as it's root
The DirectoryWatchFailurePolicy
is set to move the file to a _Failed
folder if the file has not been moved or deleted after the callback has completed.
As a bonus, we'll read that CSV data in and report on it's load.
When the above example run's against this file, it will log:
At the moment, Perigee contains 3 different failure policies:
Redelivery after a certain period of time
Move to Failed Folder (_Failed)
Delete removes the file from the system.
You can easily register hot-reloadable connection strings by registering them with the helper methods:
The AppSettings.json
file:
If you are having issues with the credential store, please verify a couple of things. More than likely, it's one of the following issues:
Verify that you are not setting the current working directory
to something outside of the executable while the Perigee is initializing.
A typical load cycle for a Perigee Application looks like this:
This verifies that the current working directory and pathing is correct. This will load the credential file under $WorkingDirectory/credentialStore/credentials.[pcb|pce]
depending on the encryption settings.
Unless you've got a VERY specific reason to do so, please don't name the credential. The reason is that you want the RefreshName
to match the CredentialName
.
In the below BAD example, we're naming it "DiffName"
, this will cause the credential to be persisted as this name, not "customCred"
. - the refresh name. If the application was to request a new credential it would forcibly refresh every time as the stored credential name is different.
To register a specific RestSharp credential, just create that type and assign the appropriate Authenticator.
Any time you create a client, simply assign this credential authenticator:
Below are the SDK methods tied to a credential and how they are called.
This is the primary method for registering a refresh action.
The (o) =>
action callback is the refreshParam
optionally supplied by calling GetCredential()
. It is not required, but can be optionally used for custom logic
To register a connection string as a credential, which is a great way to support hot-reloading:
The AppSettings.json
file:
This is the way you register a SalesForce JWT Authentication method.
Configure is how we setup credential encryption, credential backup and restore policies.
Supplying AES Keys will tell Perigee to use an encrypted credential store file. Even if you're already running without encryption, Perigee will detect the request to encrypt and convert the credential file before starting up.
The initializationCallback
and writeCallack
can be used to effectively back up and restore your credential file remotely.
Every time a new credential file is written the writeCallback
is called, this is your opportunity to store that elsewhere, even in a database blob object or on the cloud.
Every time the application starts up and NO credential file is present, the initializationCallback
is called. This allows you to restore a credential file written elsewhere.
Here's an example of setting up your Perigee Application with credential encryption, backup and restore.
Configure should be called FIRST after instanciating a perigee application.
This converts all the current credentials to a byte array.
If AES keys are provided, the bytes are encrypted.
If they are not provided, the bytes are compressed only.
This converts a dictionary of credentials to an optionally encrypted byte array.
If AES keys are provided, the bytes are encrypted.
If they are not provided, the bytes are compressed only.
The inverse of this operation works exactly the same way:
If you're attempting to decrypt the encrypted credential file and revert back to using a non encrypted file, please call this before application start once, then remove. It will revert back the encryption in place.
The AES Key and IV will need to be supplied.
The altPath is a pointer to revert a different encrytped credential file than the default path. It's optional to supply this value
To get a credential again at a later time, call GetCredential(). It will go through the process of synchronously locking and retrieving that credential and re-authorizing if needed.
The optional parameters are as follows:
maxRetries
- If a refresh action returns a faulted credential then it defines the number of times a credential can attempt to "re-retrieve" it.
retryMS
- How many milliseconds between retry attempts.
expireTimeBufferSeconds
- How many seconds to buffer the expiration so an early refresh is called. Very useful to prevent a lapse in operation.
disableRefresh
- If true, it will ONLY return the credential if it exists and it will not refresh or renew it. Useful for pulling expired credentials.
refreshParam
- an object that can be passed to the registered refresh. Useful for supplying a one time use authorization token, or additional details to customize the way the refresh happens.
To get a credential of type ConnectionStringCredentialStoreItem:
If you want to know what credentials have/share an authorization property:
Get a refresh token from a credential, even if that credential is expired.
Will retrieve a credential without refreshing it, if it is invalid.
Passes an authorization code the named refresh.
You can await valid credentials, which is useful when waiting on a refresh or awaiting user interaction to authenticate a client. This method is async
and can be awaited.
It pulls the credential once attempting to refresh it, if that fails then pulls it again on a timer with disableRefresh:true
, meaning it won't try to refresh the credential and will wait on a valid credential.
Returns true if a refresh action is registered for this credential name.
Returns true if the credentials exists, OR a refresh action is tied to that name.
You can query all credentials without calling the refresh actions by using the ByPredicate
method.
Invalidation simply caused the credential's expiration to be set to DateTimeOffset.MinValue
and then forces an immediate persist. Any future call to GetCredential()
will force a refresh since it is now force-expired.
The Item has several properties you may use:
You can check if a credential is expired, or is about to expire by supplying the bufferSeconds
parameter.
This will decode a JWT token into the JSON string of the body.
Credentials are all registered on startup and are usually given an expiration date. This allows for any process later in the application to ask the manager to retrieve a credential. The below example shows how to register SuperCredential, and two possible return values:
When the application loads up or a new credential is created, the local drive is checked and all previously non expired credentials are restored immediately. This allows for applications to be safely shut down and restarted without interrupting or over-retrieving from an external API.
During the retrieval process, the CredentialStore checks for existence of the credential, it's expiration and then checks if the expiration is within N number of minutes(user defined) before returning to the caller.
In Summary: the CredentialStore does several important things:
It automatically caches the authorization details to disk and retrieves them again on application reload.
When supplying expiration dates to a credential, it's able to renew itself when it's required before sending back the old authorization parameters.
The retrieval call is automatically retried to prevent a blip in connectivity from stopping a process.
It will automatically renew the token a defined number of minutes before expiration, allowing long running processes not to fail while in the middle of execution.
It integrates seamlessly with RestSharp, allowing all HTTP(S) traffic to automatically and internally pull credentials
The PerigeeSalesForceClient is an easy to use implementation on top of NetCoreForce
that allows you to connect and authorize easily. It manages the token refreshes and local disk persistence as well as provides several additional methods for easy communication to SalesForce.
There are two ways to authorize and keep the connection automatically active.
The Consumer Key and Consumer Secret are from the connected application
Username (or password if using UserPass flow) are the login user details
If using JWT, you'll supply the Certificate and optional password that was uploaded to the connected application
Once connected with either of the two methods mentioned above, you'll have full access to the available commands!
To get to the connected applications in SalesForce, go to "Setup => Build => Create => Apps"
Under the "Connected Apps" Create or Edit one
Enter a Name and Contact Email
Click the box: Enable OAuth Settings
Enter a Callback Url, something like: https://login.salesforce.com/services/oauth2/callback
Click Use digital signatures
Upload a certificate (.cer file) (it can be self-signed)
One option for creating a certificate is the Java Key Tool
.
Perigee ships with a CertGen
as well, code is linked below
Add Selected OAuth Scopes.
An example of these scopes might be:
full
api
refresh_token,offline_access
Click Save
There's a button called: "Manage Consumer Details" - This will show your consumer key and consumer secret. Store them.
Click the "Manage" button at the top of the connected app, and then "Edit policies"
Setup the polices as you need.
An example of this would be:
Permitted Users: "Admin Approved users are pre-authorized"
IP Relaxation: "Relax IP restrictions"
Our little demo model class is available here. As you can see, we've used Newtonsoft
to remap "Name
" to "AccountName
".
To perform a single describe call:
If you're going to be calling the auto-mapped version frequently, please cache the map results and supply them to the GetObjectsAsync call as shown:
We provide an asynchronous block that automatically does all the date time handling, offsets, delays, authentication paramaters, etc. You can easily start or stop this process and it will pick up from the last set of records that were sent through.
To add a watch directly from Perigee Startup as a Managed Thread:
Make sure to configure your own:
Certificate
Consumer key
Consumer secret
User
Domain
There are two easy ways to update a single record
To execute a query, simply supply the query along with a class to map back to:
See the documentation in .
This is automatically created as part of the SalesForce client included in Perigee. See the for more info.
To remove encryption after encrypting, see the call below
We have used and implemented custom logic into NetCoreForce
. To get more information about it's functionality the client documentation can be
There are a bunch of ways to get objects, including for them. You may also supply direct IDS in a few ways as well:
For the full list of methods and other ways of working with SalesForce, please visit the !
Another integration pattern is limiting the number of calls that can be made during a given period of time. This is important when rate limiting or billing may be an issue and is a perfect stop gap for a critical failure causing an erroneous number of API calls or running up an unwanted bill!
The Limiter auto resets it's count on the defined TimeSpan
, and will not allow additional executions past the defined limit.
We love and use RestSharp throughout the project. It's the best possible way of interesting with any modern API. It provides an easy to use method to call, execute, and parse data back into structured classes.
With Perigee, we've integrated the RestSharp package directly as a dependency, and we've written a few Extension methods to further integrate it into our system.
Here's a demo of using RestSharp to call out to Postman-Echo
, and retrieve your IP address.
The steps are as follows:
Create a RestClient
, which usually contains the baseAddress
of the endpoint you're going to be connecting to (https://postman-echo.com
)
Create a RestRequest
, this contains information about the specific request being made.
The Uri (/ip
)
The verb (POST/GET/PATCH
)
If you are adding body data, or query parameters (see demo usage for adding body, or using the other helper methods)
Call Execute()
.
Log:
This is an identical pattern to above, only this time we've added an authenticator, and changed the request uri.
Log:
This performs the same HTTPS request, only this time, we're using Perigee's Credential Store to manage, refresh, and obtain tokens. This gives a lot of flexibility on how to retrieve and maintain an active token state.
Part of the "secret sauce" here is that any time a credential is about to expire (within a few minutes), a preemptive request is sent to the registered refresh action to renew the token before it's expired.
This prevents an expired token from being sent to the API downstream
This prevents the need for complicated patterns when an error occurs due to expired tokens
These credentials are automatically persisted to disk, and reloaded on application start. This is explained in more detail in the Custom Refresh Logic section.
Some authentication sources will rate limit your authentication requests, this is another method to use to prevent this issue from occurring.
Log:
There are a few notable extensions added by Perigee:
This method has quite a few overloads you can call, but all of them do the same thing:
Retry a request if the method failed to connect, or if it timed out.
If the third party API suddenly revoked your current API credentials AND you are using a CredentialAuthenticator, it will fire the renewal for a new token before attempting another retry
If the request timed out, this is an easy Boolean to check so you can proceed with other options.
Teams requires a third party library and card content.
DEMO coming soon:
Perigee provides several easy methods to read Excel files into DataTables
or DataSets
.
Perigee currently supports two different SMS clients right out of the box. It's easy to register them and use them from anywhere within your application.
At the start of your Perigee application, register the related alert managers.
Anywhere from the code later on, call AlertManager.SMS()
and supply the
Name
(The name of the account you registered)
ToNumber
(Who it is going to, if within the US, start with the digit 1
)
Message Body
(The SMS body content)
The Coordinator is like having a group of small robots that you design to each perform a specific task. These robots communicate with each other and make sure their tasks are completed before passing data to the next one.
They are designed to be as reliable as possible and can restart or shut down without losing any data.
The coordinator works like a message queue: you add a message and it gets processed a short time later. The message is immediately saved to disk in a compressed format and all the robots use this data packet to keep track of their progress. This is what makes the system so fault-tolerant.
The coordinator uses a first-in-first-out method to process messages and can handle many messages at once because it is multi-threaded.
If the system crashes, the coordinator is smart enough to resume from where it left off.
The coordinator also handles two-way data synchronization with a remote source, making sure that each step is replicated and stored on the remote server.
To see demos of the coordinator, check out the demo page!
Every bot's state is replicated to the remote source (usually a database). This allows the Coordinator to resume transactions that have yet to be completed. It also provides your company the ability to view, query and report on business critical tasks.
There are many fields logged and tracked during this process. Everything from the initial payload that was queued to the status and responses from Rest API's, exceptions thrown, modification times, and timestamps.
Due to the nature of a two way synchronized transaction log you're able to queue items both from the SDK and by creating new header records in the remote source.
The Coordinator is smart enough to pull it's remote source for unprocessed transactions and will pick them up if they are not in an "Error
" state.
In a healthy system, a transaction only needs to be queued once. All steps will succeed and the header will be marked "Complete
". When a system inevitably goes down and we've exhausted all of an items retries, the header will still be in a "Processing
" state and will be dequeued.
If an item get's dequeued and is not yet completed, it will get re-queued again shortly on the next call to get the remote sources pending transactions. This process is explained in more detail below in the ITranasctionCoordinatorDataHandler section.
The Header is what uniquely identifies a transaction. Without a header, there is nothing to process
The header contains the InitialDataObject
, which is the "message" or "payload" of the queued transaction. This can be anything you want, JSON, a string, a custom class, or an array of bytes. It's serialized and compressed during the process.
Headers are identified by a mesh key:
ID
property which is string.
ReplayID
property which is an integer (default 1).
When creating a new transaction, you need to create a new ID. We highly recommend meshing together data from where the process starts to create your ID. If you create a transaction with an already processed ID, the coordinator is smart enough to pull the remote system and use that header, which will likely be completed and nothing will occur.
A few examples of this would be:
From a message queue: $"{QueueHost}{QueueName}{ID}"
.
From a database row: $"{ServerHost}{Schema}{TableName}{RowID}"
.
From an auto incrementing ID source.
Implement your own ID system.
The items exist as children to the header. You can think of them as logical steps in the process.
Items are executed in order and contain a status field. Only when one item is a success should it move onto the next.
Items contain many properties to help identify and report on its progress. You can view when each item has started, how many times they've been retried, information about request and response times if they represent an http request
Items themselves contain DataObjects
. You can assign these data objects to be whatever you like, but they represent completed result for the item's step.
When using the built in helper methods for executing rest calls, all of the properties and fields for an item are filled out with the results of that request.
The SDK contains methods for accessing the DataObjects
at runtime. You can retrieve the current DataObject
, a previous item's DataObject
and theInitialDataObject
from the header.
As mentioned above, all transactions are replicated to a remote source, like a database. This provides the unique ability to "Replay" transactions at a later date by simply supplying the transaction ID.
The coordinator will automatically go pull that transaction from the remote source, create a new transaction with the given ID
, a new ReplayID
, and assign the InitialDataObject
from the first time it was run.
This allows you to completely run through all of the logic for a given transaction again at a later date.
Imagine for a moment that one of your third party brokers informs you they had a system error and lost all your order data from today. This would create a major issue for any other linear based system.
Since you have transaction replay at your fingertips, it would only take a few lines of code to replay all of today's transactions and all of your orders would be back where they should be.
You would save countless hours!
By default all coordinators are configured to be multi-threaded, meaning that incoming transactions can be put on different threads and executed at the same time. You can configure this during the coordinators allocation.
The multistep helper is the easiest way to use a coordinator. It simplifies the logic and does the communication with the assigned data handler. Unless you know you need to do something very specific, I would recommend using this method as the primary method of writing a coordinator function.
You may manually go through the process yourself, and that is shown below: SDK.
The MultiStep processor is part of the process callback when creating a new coordinator.
To define a MultiStep transaction, you need to supply a list of steps that will execute, and match those list of steps with a callback method.
Each section will be called in order of the array of steps. In the above example, GetAccount
should be the first callback as it is the first step, and POSTDetails
should be the second callback as it is second in the list.
To see the callback SDK, jump here!
The options that can be supplied are related to how a transaction is processed, what conditions are required, and the number of times a header or item can be retried.
There is a .Default
and .RequireInternet
option that should suit most needs. If you need to customize it further allocate a class, use the default, and override any of the following properties.
To understand the retry counts, see the below section: Retry Limits.
There are configurable limits to how many times a given header or item can be retried before falling out of process scope.
The header has it's own retry count. When the header has been queued and and there is an attempt to process it, the headers execution count is incremented by 1.
When the header can't successfully complete all of the child items, or there is a call to stop processing, then the header might exhaust all of it's allowable retries.
When the header is out of allowable retries, the header status
is set to Error. This means no more automatic requeuing will occur.
When the header is allowed to retry again later, the header status
is left alone, leaving it as Processing. The remote synchronization data source defines a period of time that must pass before requeuing a transaction that is still in Processing status to get reintroduced to the process queue.
Item's retry limits act a bit different than headers. They are a retry limit per queued process. This means that for every time a header is enqueued, the item has N number of retries before it automatically gets dequeued.
This important distinction allows for many retries of a header, and even more retries of each individual item.
Both headers and items have a status property, and they mean slightly different things during the process. The three available statuses are:
Processing
Complete
Error
By default, all new headers and items are in a Processing status.
If a header status
is Processing, it allows the coordinator to enqueue the transaction and work on the steps as long as it has not exhausted the Retry Limits.
If the header status
is set to Complete, there will be no more enqueuing or automatic retry, as the header is now completed.
If a header status
is set to Error, it will also no longer enqueue, and is now in an Error state. You may later flip the status back to Processing if you want to retry, or re-enqueue the transaction.
If an item's status
is Processing OR Error - it allows the MultiStep to continue retrying the item.
If a item's status
is set to Complete, it will also no longer retry or attempt to process this item. The MultiStep processor will move onto the next logical item in the steps defined, or complete the header if all child items are now also Complete.
Every callback in a multi-step has an SDK of calls you're able to use. They provide a way to interact with the ongoing transaction, perform lookups, request redelivery, or end processing.
The DataHandler is the interfaced class to synchronize remote transactions. See more below!
You can access any of underlying methods by accessing the handler.
Update is how you push data back to the file system as well as the remote source. You can force an update at any point during the process if you want to synchronize data before something happens.
Update is called automatically throughout the process, and calling isn't necessary unless you're implementing something custom.
You can read the initial data object, previous data objects, the current data object, or even use the DataHandler to request other headers.
The easiest way to execute request is the Execute<>() call. It will perform two update processes, one before sending the request, and one after so that both the request and response times get synchronized even in the event of a full system crash.
It automatically has a built in retry and you can specify how many times it will retry the same request.
It will log the request and response values if available (and the option is not turned off)
It will log all of the request properties (uri, timestamp, verb, body), then call Update()
.
It will then log all of the response properties (uri, timestamp, status code, body).
If the response contains a Deserialized body, it will be set as the current steps DataObject.
If the Execute call is used without the generic parameter as such: .Execute()
(instead of .Execute<T>()
) - Then the response will only be available in the ResponseBody
property if logging is enabled
If the request status code was a success, the Item is set to Complete, if it was not, the Item is set to Error.
Update()
is then called one more time as the response properties are now filled out.
You can specify other options such as not de-serializing any response data, to turn off request and response logging, how many built in retries it performs, and a few other options by setting those parameters in the .Execute()
call.
Because the .Execute()
call handles everything necessary, you can set it as the only call in a given MultiStep step. Everything necessary is now set to either move onto the next step, or this step will be retried if it failed.
StopProcessing()
will stop all current item cycling and end the transaction in the state you specify. If it was left in a Processing state, it will get picked back up again on the next call to get remote pending transactions.
The redelivery mechanism halts the current thread and redelivers the item when you ask. It will block a running thread, so use this intentionally and only when necessary. It's most useful when you know a short delay will allow an item to succeed.
Redelivery does not increment the executions count.
The current process count is how many times the current item has been retried "this queued transaction". Every time a header get's dequeued this count goes back to 0. It allows you to check how many retries have occurred since getting queued for the first time, or again.
If you want to see how many times an item has been tried in total, check the property ExecutionCount
instead.
If you want to take full control of the transaction coordinator, you can bypass the MultiStep process and execute calls manually. You have full control over every aspect of this and you are responsible for marking the item and header status, retrying items or processes yourself, creating items(steps) and calling Update()
.
This is the interface that is tied to a coordinator to allow that coordinator to communicate with a remote synchronization source. It defines interfaced methods to retrieve header data, get replay information, and it defines the period of time to be passed before re-queueing a transaction.
There are two examples of this handler and their code listed below.
The "Memory" version of this handler is for testing and debugging only. It would not be suitable for a production system.
For MSSQL:
The ConcurrentFileStore
class enables the use of a concurrent dictionary as a means of storing data that is compressed, locally stored, debounced updated, and subject to revision checks. It handles all of the low-level code necessary to implement a concurrent store, offering benefits such as file verification, backup and restore functionalities, and a rich SDK for programmatic management and interaction.
The "revision checked" feature serves as a safety mechanism in the ConcurrentFileStore
, ensuring the integrity of the files written to disk. Before overwriting, each file is assigned a revision, and the bytes are transactionally verified to prevent partial or corrupt file writes. This meticulous verification process safeguards the data by confirming that only complete and accurate revisions are written, thus minimizing the risk of data corruption or loss due to incomplete or erroneous file writes.
Here's an exampel that shows allocating and using a concurrent file store.
This method initializes a new instance of the ConcurrentFileStore class. It takes the following parameters:
fileStore
: Path where the file will be stored.
cancelToken
: Token used for cancelling the operation.
debounceTimeMS
: Specifies the time between updates/deletes until the write occurs. Default is 1000 milliseconds.
maximumDebounceTimeMS
: If repeated updates stretch debounces beyond this limit, it will trigger an immediate write. Default is 10000 milliseconds.
initialization
: Optional. If no value is read or initialized from the disk, and this callback is set, it will be called to initialize the concurrent store.
writeCallback
: Optional. If set, this function is called every time a debounced write is verified and the written bytes are also checked.
processFunction
: Optional. If assigned, the Process function is called once on startup when values are loaded back from disk or from the initialization callback. This is final step to modify the data before regular use starts.
AES32Key
: Optional. If assigned, contents will be encrypted to disk using this AES 32 bit key, in non hex format.
AES16IV
: Optional. If assigned, contents will be encrypted to disk using this AES 16 bit IV, in non hex format.
A helper method that will read and verify a file path using the built-in compression and validation logic. Useful for initializations outside the scope of the process. This method needs a path
parameter for the file path, a FailedVerification
parameter to check if the verification failed, and optional AES32Key
and AES16IV
parameters for encryption.
This function decompresses bytes, optionally with aes keys. The parameters are bytes
of the data to be decompressed and optional AES32Key
and AES16IV
for encryption.
This function compresses an object to bytes, optionally with aes keys. It requires a compObject
which is the compression object and optional AES32Key
and AES16IV
for encryption.
This function writes the file using the same logic the ConcurrentFileStore uses internally. Takes parameters Dict
which is the dictionary to write, FilePath
which is the file to write to, Bytes
which are the bytes converted, and optional AES32Key
and AES16IV
for encryption.
This function returns the underlying cache. Any changes made here will be reflected and may cause issues if modified.
The function initializes a concurrent dictionary using built-in compression/serialization methods. Requires a bytes
parameter to initialize from and optional AES32Key
and AES16IV
for encryption.
This function can be used to await any outstanding items. It does not await items being present.
This function allows to await for any source to cause an initialization of values. This may wait indefinitely if no value is sent to the concurrent writer.
The function gets an item by key. Requires a key
as parameter.
The function removes an item by key. Requires a key
as parameter.
The function checks that an item exists by key. Requires a key
as parameter.
This function adds or updates a value. Takes parameters key
and value
.
This function returns all keys.
Signal that the underlying data was modified outside of an AddOrUpdate
or Remove
. This is not the recommended way of changing or modifying data, but it does exist if you need to trigger a re-save check
This allows you to modify the cache while maintaining proper locking. It's intended only to be used when you need to freeze the entire cache and operate on it, instead of individual Key
objects.
This function searches the values using a predicate. The predicate is a function specifying the condition to be met for the values.
FileRevisionStore
class handles file revision storage. It caters for concurrent safe read/write operations across the entire application using a globally unique thread-safe lock. This class also offers the following features:
File verification processing
Data file versioning with a system-level transactional replace to prevent partial file writes or data corruption
Ability to rewind corrupt files back to a previous version when verification fails
An UpdateFile
method that performs a thread-locked read/write on the same transaction
The "revision checked" feature serves as a safety mechanism in the FileRevisionStore
, ensuring the integrity of the files written to disk. Before overwriting, each file is assigned a revision, and the bytes are transactionally verified to prevent partial or corrupt file writes. This meticulous verification process safeguards the data by confirming that only complete and accurate revisions are written, thus minimizing the risk of data corruption or loss due to incomplete or erroneous file writes.
This method saves a file and returns a boolean value indicating if the operation was successful.
The Path is supplied to where the file is stored.
A byte array is passed of the data to store
A Verification function is optionally supplied to verify the data written to disk.
In the example below, we're writing data using JsonCompress. The verification function is simply checking that we can decompress the bytes again from disk.
ReadFile
method reads a file and performs verification.
The Path is supplied to where the file is stored.
An out boolean is sent back saying whether or not the validation was a failure.
A Verification function is optionally supplied to verify the data read back from the disk.
In the below example, we are using JsonCompress to verify the data can be decompressed from disk without an exception thrown.
UpdateFile
method performs a thread-locked read/write operation on the same transaction.
It's very similar to the above two methods:
The Path is supplied to where the file is stored.
An out boolean is sent back saying whether or not the validation was a failure.
A callback with the bytes being read is supplied, this is how you update the file
A Verification function is optionally supplied to verify the data read back from the disk.
This is actually called twice in the update cycle. Once on file read, and a second time on file write.
There is an included SmartSheets downloader for pulling data from SmartSheets.
The AccessToken
is found in your API credentials.
There is an included Graph client with two authorization models built right in.
Most of the prebuilt functionality is around the drive, site, lists and teams workloads. However, you can easily extend the functionality if you need something specific!
This authorization type is used when supplying direct application ID/Secrets from a registered application. The registered permissions are usually administratively granted and there is no delegation ("as a user") required.
That's it! Once you have the client, call other methods:
This authorization type is best used when you're authorizing a user and your application permissions are assigned as delegated.
A good example of this is a service account authorized to pull a DataVerse table, or read teams messages.
And there you have it, once the initial code is supplied the client is automatically maintained from there on out. If a new token is required the refresh token is automatically supplied and you don't have to think about it again!
To see a full demo including receiving a token from the the response, and awaiting the credentials on load:
If a method is missing or you need to override functionality, feel free to use the built in call to submit your own call with authorization, credential management, and retries built in.
Use RestGraphCall
, as shown below:
Perigee ships with several powerful CSV tools that make reading even complicated CSV's very easy and fast with memory efficient span based data reads.
To write CSV data supply a DataTable
and any additional handlers to override specifics on a data type.
We'll read in the Products.CSV
as our source.
Let's then declare a writer, and set it to BAR
delimited
Register a decimal handler that write's any decimals with 3 digits after the period
The handler sends you back the object (in this case a decimal)
The Row Index.
The Column Name.
It expects a return value of the string converted version, as well as a boolean
indicating if the transform was successful.
Any NON-SUCCESSFUL transformed handlers are added to the writers LoadLog property and can be viewed after conversion.
Then simply call .Write()
.
This transforms the very badly formatted CSV Sample Data into this:
Included is the sample data and classes used above.
FSW for short is a class suited for synchronizing data to the local drive. In short, you supply the CancellationToken from the thread and it safely handles the file locking and writing of newly updated data on a regularly scheduled basis and before the application closes.
If you run the below demo multiple times, you'll see the count being restored, incremented and shut down. There's a new file with the data located at bin\debug\sync\local.json
.
The class used above is simply two properties with getter/setters:
This implements . Check it out!
To use either, go to and create an app registration.
The SDK methods closely match what's defined in the .
Maybe the only thing you need to do is take the absolutely horrendous CSV data in the and just create a CSV that can be cleanly loaded into another system.
It's much lighter than the and , as such, there's no transactional lock and write verification fallbacks. The plus side is that there's fewer read/writes and if those features aren't of interest to you, this would be the next best choice!
The SingleProcessor
class is a generic class that provides parallel processing functionality for collections of single elements. It allows users to easily perform parallel operations on a collection of items and return the results as different types of collections, such as ConcurrentBag
, ConcurrentDictionary
, or a single item.
This method processes the collection in parallel and invokes the provided callback for each item.
This method processes the collection in parallel and adds the results to a ConcurrentBag
by invoking the provided callback for each item.
This method processes the collection in parallel and adds the results to a ConcurrentDictionary
by invoking the provided callback for each item.
This method processes the keys of the collection in parallel and invokes the provided callback for each key.
This method processes the keys of the collection in parallel and adds the results to a ConcurrentBag
by invoking the provided callback for each key.
This method processes the keys of the collection in parallel and adds the results to a ConcurrentDictionary
by invoking the provided callback for each key.
This method processes the keys of the collection in parallel and returns a new SingleProcessor
instance containing the results.
This method processes the keys of the collection in parallel and returns a new GroupProcessor
instance containing the results, grouped by the provided expression.
Returns all keys in the processor as an enumerable.
This extension method processes a DataTable in parallel and returns a ConcurrentDictionary
containing the results, with the keys generated by the provided callback.
This extension method processes an IEnumerable in parallel and returns a ConcurrentDictionary
containing the results, with the keys generated by the provided callback.
This extension method processes an IEnumerable in parallel to a new grouped processor.
Converts an IEnumerable to a SingleProcessor using a provided callback function.
This extension method processes an IEnumerable in parallel and returns a ConcurrentBag
containing the transformed items, with the transformation function provided by the callback.
This extension method processes a DataTable in parallel and returns a ConcurrentBag
containing the transformed items, with the transformation function provided by the callback.
This extension method processes an IEnumerable in parallel, returns a ConcurrentBag
containing the transformed items, and handles exceptions using the provided callback.
This extension method processes a ConcurrentBag
in parallel, invoking the provided callback for each item.
This extension method processes a ConcurrentDictionary
in parallel, invoking the provided callback for each key-value pair.
Let's take a look at a few things we need to know before jumping into parallel processing libraries.
The rules for parallel processing are as follows: (Psst, don't get too lost in them, but reading through will help you understand how to design your parallel processing architecture)
Don't write to anything that isn't a concurrent safe type. (Only write to concurrent safe types!)
Don't call instantiated class methods in parallel (unless you KNOW you can). Prefer to write static methods when calling in parallel loops to avoid concurrency issues.
Use synchronization constructs, like locks or semaphores, to protect shared resources and avoid race conditions.
Ensure that the workload is balanced across all parallel tasks to maximize efficiency and minimize the chance of bottlenecks.
Be mindful of the trade-offs between parallelism and performance, as not all tasks will benefit from parallelization. In some cases, the overhead of creating and managing parallel tasks may outweigh any potential speedup.
Test your parallel code thoroughly to ensure it is working correctly and efficiently, as debugging parallel code can be more difficult than debugging sequential code.
Keep in mind potential issues with exception handling, as exceptions thrown in parallel tasks may be propagated and require proper handling in the calling code.
Consider the target hardware and environment when designing parallel code, as factors like the number of available cores and memory limitations can impact performance and scalability.
Hashing is a process of converting data into a fixed-size string of bytes, typically using a mathematical algorithm like SHA, MD5, etc. The output, called a hash, is a unique representation of the input data. A small change in the input data will result in a completely different hash, making it useful for verifying data integrity, securely storing passwords, and creating unique identifiers for data.
Hash based data structures, such as hash tables or hash sets, provide a fast and efficient way to look up or verify existing data. These data structures use hashing to determine the location of an item in memory, allowing for quick access and retrieval even in large datasets. This is particularly useful in parallel processing, where multiple threads or processes may be accessing the same data simultaneously.
Perigee fully embraces the hash structures as a way of providing better parallel processing for data sets.
C# only has a few concurrent write-safe structures. You can find them under System.Collections.Concurrent
.
The two primary concurrent classes we are interested in for this practice are:
ConcurrentBag
ConcurrentDictionary<K,V>
Both of these structures provide a thread safe, concurrent way of writing data back during a parallel loop. We'll use both of these while processing data in parallel.
There are 3 main parallel concepts within Perigee:
Parallel processing lookup data for rapid existence and lookup checks
Parallel transforming records into a new type
Parallel iteration without a defined output.
We have two classes for easy parallel lookups:
GroupProcessor
SingleProcessor
The GroupProcessor
allows for parallel execution over a list, DataTable, or other enumerable input. The "Group" is because it will perform a "GroupBy" on the input data, so that one input key may point to multiple input items. Original input items are retained, and the "key" is used as a hashed lookup. Let's look at some data and an example:
If you were to use a GroupProcessor
on the data above, it would look as so:
Once the data has been processed, you can perform existence checks and data retrieval all using hashed values. This means it's parallel existence check safe and parallel data retrieval safe.
Using our parallel execution loops to produce hashed data lookup has reduced various process times down by almost 500,000% in certain production applications.
The SingleProcessor
allows for parallel execution over a list, DataTable
, or other enumerable input. The "Single" part expects only a single input key to exist within the input data, so that one input key points to a single input item. Original input items are retained and the "key" is used as a hashed lookup. Let's look at some data and an example:
If you were to use a SingleProcessor
on the data above, it would look as so:
Once the data has been processed, you can perform existence checks and data retrieval all using hashed values. This means it's parallel existence check safe and parallel data retrieval safe.
Because of the nature of hashed values, you can use multiple inputs as a mesh key.
Option 1:
Supply the key as a delimited list. An example might be:
This would make the hashed lookup a mesh key of FirstName, LastName. And the retrieval of those records would require both values to match.
Option 2:
Supply the key as a hash itself. The hash can be calculated from a database, or local depending on your needs. An example using a local hash function for SHA512:
Notice how we computed the hash outside of a parallel loop? The one caveat to local hashing is that they CANNOT be hashed in parallel on the same instance of a HashAlgorithm
. For this reason I would recommend, when possible, going with Option 1
.
Hashing the mesh-key is really most useful in two conditions:
The hash is pre-calculated on input data, especially when coming from a database or a different system
You need to compute and add together multiple hashed keys into a single lookup, so that you can perform rapid existence checks on multiple types of mesh keys.
This type of parallel processing is a bit different. Instead of purely processing data for rapid lookup and retrieval, we're taking the input data and transforming it to a new type, class, structure, or even multiple new items to store again later.
For this, we want to use a parallel loop that returns a concurrent safe class. For our example, let's use a concurrent bag:
Bag
now contains 2 items: SteakType, 2
, and SteakRarity, 3
. There's a lot of performance to gain when performing parallel operations on larger datasets.
P.S, Ready for dinner yet? I am...
This one is very similar to what's above, this time we aren't capturing a direct output, but rather sending items to a concurrent dictionary. We'll also introduce the ExceptionRows callback.
After running the code, you'll see one item logged to the console, as expected:
Item {"rCount":0,"TypeKey":null,"Name":null} had an exception thrown: TypeKey is null or empty
We've also created 3 items in the ReturnedDictionary
, with the .value
property being the number of items from the grouped lookup.
Finally, I've added the JsonCompress
class to serialize and deserialize the ParallelProcessException
items, so that they can be stored, sent, or retrieved later.
We've looked at three different ways we can use parallel processing to boost the speed of our data conversion lookups and loops. We've taken a look at what hashing is and how to effectively use it. We've seen several examples of using the Single
and Group
processors. We also got to see how the ExceptionRows
callback allows us to iterate over failed parallel execution items.
The GroupProcessor
class is a generic class that provides parallel processing functionality for collections of elements. It allows users to easily perform parallel operations on a group of items and return the results as different types of collections, such as ConcurrentBag
, ConcurrentDictionary
, or a List
.
The "Group" Processor is for a selection where more than one item can be matched for the given selector, also known as a GroupBy. If you're looking for a single item in and out without grouping, use the SingleProcessor.
This method processes the collection in parallel and invokes the provided callback for each group of items with the same key.
This method processes the collection in parallel and adds the results to a ConcurrentBag
by invoking the provided callback for each group of items with the same key.
This method processes the collection in parallel and adds the results to a ConcurrentDictionary
by invoking the provided callback for each group of items with the same key.
This method processes the keys of the collection in parallel and invokes the provided callback for each key.
This method processes the keys of the collection in parallel and adds the results to a ConcurrentBag
by invoking the provided callback for each key.
This method processes the keys of the collection in parallel and adds the results to a ConcurrentDictionary
by invoking the provided callback for each key.
This method processes the keys of the collection in parallel and returns a new SingleProcessor
instance containing the results.
This method processes the keys of the collection in parallel and returns a new GroupProcessor
instance containing the results, grouped by the provided expression.
Returns all keys in the processor as an enumerable.
The Debounce class is a utility class in C# used to prevent the rapid firing of events. Particularly useful in scenarios where an action should only be performed after an event has ceased to occur for a specified duration, like key-up events on textboxes that trigger searches.
The Bounce
method is called whenever an event occurs. It resets the debounce timeout. If this method is not called again within that timeout, the action specified in the Debounce constructor will fire. The Bounce
method can optionally take a parameter, which will be passed to the action when it fires.
In the first example, the Bounce
method is called without a parameter, so when the action fires it won't receive any parameters. In the second example, the Bounce
method is called with an integer parameter, so the action will receive the last integer passed when it fires.
When disposing the Debounce class that doesn't take any generic parameters, no additional callbacks will be fired.
When disposing the Debounce<T> class, there is an immediate one time fire of the bounce callback that has the latest value supplied. This is particularly useful when you need to dispose of a resource quickly, but still need to access the latest value in the pipeline without waiting on the bounce to fire.
The XML Converter is a useful class designed to transform a simple XML "List" into a structured list of classes. In the example provided, there are three test_scores
elements nested within the data
element. Utilizing the converter simplifies the process of reading and utilizing the relevant data effectively.
Multi-Threaded Processor allows you to efficiently process data in parallel across multiple threads, using both simple and advanced approaches for scatter-gather operations.
Unlike a traditional "scatter-gather", MTP allows you to separate the loading and processing across threads and then await the results. This is incredibly powerful when the loading is done from a remote source where latency is involved.
Processes a collection of items in parallel using a specified processing function.
Parameters:
items
(IEnumerable<TIn>
): The collection of items to process.
Process
(Func<TIn, TOut>
): The function to process each item.
cancelToken
(CancellationToken?
, optional): An optional cancellation token to cancel the processing.
concurrency
(int
, optional): The number of concurrent threads to use. Default is 3.
Logger
(ILogger
, optional): An optional logger for logging.
Returns:
List<MultiProcessResult<TIn, TOut>>
: A list of processing results.
Enqueue an item to be processed.
Parameters:
item
(TIn
): The item to be processed.
Await until all pending executions have been processed. Optionally accepts a cancellation token or a time span to limit the wait.
Example:
A class that represents the result of processing an individual item.
The input data that was processed.
The output data produced by the process.
An optional exception that occurred during processing.
An optional timespan representing the processing time.
Dyanmic Data Tables work similiarly to DataTables. The key difference being they are very loosely defined until the data is solved.
This allows for rows to be added anywhere without keeping up if that row is currently present.
Headers are automatically detected within the data set
Column types are solved, and support solving mesh types within a column.
Metrics are stored and calculated about each row and column.
They can easily be converted to fully structurally defined DataTable's at any point.
Let's take a look at a basic table with 3 columns, and 3 rows.
1
John
25
2
Smith
30
3
Jane
29
To create this the standardized way using DataTable in the shortest possible way, here's how to recreate this table:
Here is how you would do the same thing with a dynamic data table:
In the above example, notice what's different between the two? The Data Types are not present! This is one of the key differences and the reason behind the Dyanmic in the class name.
We also do not have to worry about what rows we've added, or not added. They keep up with themselves.
DynamicDataTable
supports several additional "smart" features that allow it to process and solve the data while processing it. This is a huge advantage when loading data that is not a pristine export.
If you were to change row 3 in the first example you would generate an ArgumentException
:
Changing row 3 in a DynamicDataTable
will convert that column into a string:
Data types will be auto solved after N(default 1000) rows are added and will then be readjusted as rows are added.
This is the secret sauce behind the Excel Reader, it uses a DynamicDataTable
to solve and strip header content before load.
What happens when your data is not on the first row? This is another big problem when dealing with a client file. There may be rows above the actual table and we don't need that information.
You can see it automatically solved the header row after calling .FinishDataLoad()
. The header, column data types, fill rates, data lengths and data type counts will all be solved and available.
There are two primary methods for converting the dynamic table into a standard DataTable.
The .ToDataTable_ColumnsOnly()
version does not convert any data, only the schema of the table (the columns).
The .ToDataTable()
version has two optional parameters for splicing the table. If left at default values, the whole table is converted and returned.
You can easily create a CSV out of the data by calling .ToCSV()
. This is a handy method for quickly exporting data or debugging.
You may add static values to the data table, they will be appended to the end of the columns load set.
The dynamic table comes with a few included "Special" columns that can be automatically added during the load.
To retrieve data statistics after the call to .FinishDataLoad()
, you can easily use .GetStatistics()
to retrieve a data statistics class with information like:
Column names,
Types
Fill rates
Unpopulated row counts
Jagged counts
Row counts
This is a helpful method for post load data analysis.
The GPTClient
class provides functionality to interact with the OpenAI GPT API. It offers methods for building message histories, submitting both synchronous and asynchronous chat and vision requests (including streaming responses), determining model versions and their context window sizes, and retrieving available models. Additionally, it defines several public constants representing chat roles and supported model identifiers.
Here we'll create a new client with our API key. Create and submit a request to o3-mini
, we can set the reasoning effort, message, and even force json_schema mode with a class.
You can see how easy it is to operate and integrate with OpenAI!
This sample is using the absolute minimum amount of code possible to achieve a full conversation with context window limits, model selection, token encoding, and a full chat history. We're using the TikToken
package to feed the client with a method to get token counts. The GPT client does the rest when using the Conversation modes. It keeps track of message history, client messages, context token sizes, and more!
Check out the gif to see how this looks running in an app!
Gets the API key used for authenticating requests with the OpenAI API.
Represents the role for a user in a chat conversation.
Represents the role for the assistant in a chat conversation.
Represents the system role in a chat conversation.
Represents the developer role in a chat conversation.
Conversation methods allow every message of the "conversation" to be managed and maintained via the client. This simplifies worrying about context token sizes, managing message state and history, etc. Simply feed a new message into the conversation any time and history will be maintained.
string newUserMessage
: A new message to feed into the conversation.
Action<string> res
: The response stream feed, use this to append to a logger, feed to the console, etc.
CancellationToken? cancelToken
: An optional cancellation token.
string EndOfStream
: If provided, will append this content to the end of the current response. Useful if you need to statically append html, newlines, or other EOF content.
To use Conversation methods, please allocate the GPTClient with the APIKey
, The RequestModel
, and a method
to return token counts, or (s) ⇒ 0
if you want to disable context window management
We're using the TikToken
package to feed the client with a method to get token counts. If running this sample, install this package as well.
If using the conversation model, you can easily clear all history and reset state by using this method.
Builds a list of messages by prioritizing the user message and then appending historical messages as long as they do not exceed the specified context window token size. This method uses a provided token count function to ensure that the combined messages fit within the allowed token budget. It returns a tuple containing the list of messages, the number of tokens used (calculated as the difference between the initial context window size and the remaining tokens), and the total tokens counted from all messages.
Parameters:
Func<string, int> Count
: A function that returns the token count for a given string.
string userMessage
: The current user message to include.
string systemMessage
: An optional system message to include.
int ContextWindowTokenSize
(default: 128000): The maximum allowed token count.
IEnumerable<Message> MessageHistory
: An optional collection of historical messages (ordered from oldest to newest).
Example:
Determines the version of a GPT model based on its identifier string and sets the corresponding context window token size. The method compares the input gptModel
against known model names and patterns and returns the model version as a decimal value while outputting the appropriate context window size.
Parameters:
string gptModel
: The GPT model identifier (e.g., "gpt-3.5-turbo", "gpt-4-turbo").
out int ContextWindowSize
: An output parameter that will be set to the context window token size associated with the model.
Example:
Submits a chat request asynchronously to the OpenAI API. This method sends the provided Request
object as a POST request and awaits a response. On success, it sets the model on the returned ChatCompletion
object and associates the original request with the response.
Parameters:
Request request
: The request object containing the chat parameters.
CancellationToken? cancelToken
(optional): A cancellation token to cancel the request if needed.
Example:
Submits a chat request synchronously to the OpenAI API. It sends the provided Request
object as a POST request and returns a RestResponse
containing the chat completion result. The response’s model is set based on the request.
Parameters:
Request request
: The request object containing the chat parameters.
CancellationToken? cancelToken
(optional): A cancellation token to cancel the request if needed.
Example:
Submits a vision request asynchronously to the OpenAI API. This method accepts a GPTVisionPayload.Request
object containing vision-specific parameters and sends it as a POST request. If successful, the returned ChatCompletion
object will have its model property set from the request.
Parameters:
GPTVisionPayload.Request request
: The vision request object.
CancellationToken? cancelToken
(optional): A cancellation token to cancel the request if needed.
Example:
Submits a vision request synchronously to the OpenAI API. It accepts a GPTVisionPayload.Request
object and returns a RestResponse
containing the chat completion result, with the model property set based on the request.
Parameters:
GPTVisionPayload.Request request
: The vision request object.
Example:
Submits a chat request that streams results back from the OpenAI API. This method enables streaming by setting req.Stream
to true, and it processes the incoming stream by invoking a provided callback (res
) with each new token received. The method aggregates the response into a complete ChatCompletion
object that is returned once streaming is finished.
Parameters:
Request req
: The chat request object with streaming enabled.
Action<string> res
: A callback action invoked with each new token as it is received.
CancellationToken? cancelToken
(optional): A cancellation token to cancel the streaming operation if needed.
Example:
Retrieves the list of available models from the OpenAI API. This method sends a GET request to the models endpoint and returns a RestResponse
containing a ModelResponse
object with the models information.
Example:
There are quite a few "quick" request methods available, these methods create the request object to be sent to GPT. These can be used to rapidly create new messages from users, override context window sizes, pre-append developer(system) prompts, and create GPT clients ready for Conversation methods. They come pre-configured with the maxiumum completion tokens, model names, and context window size information.
F(x) is a powerful expression evaluator integrated within Perigee, designed to enhance data manipulation and computation capabilities. Much like the "Formula" bar in Excel, F(x) allows users to input complex formulas incorporating functions, mathematical operations, column references, and parentheses. However, F(x) goes beyond basic evaluation by offering advanced features:
Custom Functionality: Users can add custom classes with functions, expanding the evaluator's capabilities to suit specific needs.
Performance Optimization: Instead of repeatedly tokenizing, parsing, and evaluating expressions, F(x) compiles expressions into Intermediate Language (IL) code. This compilation results in a direct expression tree that can be executed swiftly, making it ideal for processing large datasets efficiently.
Possibilities are endless, but let's examine a few cool ideas.
Enable an input box to accept math expressions allowing the user to easily double a value, or figure out what a third of their original input was.
Execute an expression across an entire DataTable, modifying the values of each row. This allows for easy data manipulation and cross references to other columns.
Enabling users to create dynamic data output for a report.
Allow the user to write their own filter on data, tables, report output, or data ingestion.
In this example, we'll see how to create an Fx
class, register functions, compile a formula, and get results. We can see what the identifier callback is and how to use it.
Notice we return FxValue
? This special class handles all of the data types and conversions for the evaluator. If you're passing data in or out of Fx
, it will be in the form of a FxValue
.
Just like last time we'll be compiling an expression, but this time we'll add the method override. This special override allows you inject your own method handing. In this case, we'll inject a "custom"
method that we divide the value by 100. If the requested method isn't "custom"
, then we allow fx to call the methods as it usually would with fx.CallMethod(fn, arg)
.
Sometimes you need input context when processing methods, the CompileWithInputParameter allows you to do exactly this, pass in anything that conforms to the object type (including a class) and it will be available for you in the callback. This is especially useful when processing lists, tables, or modifying behavior based on the index, row, user, etc.
This time we'll be returning a string value, and prepending "name: " to it. The passed in input parameter allows us to reference our lookup dictionary from within the method override call.
This time we'll add a CustomFunctions class to our project, and allow Fx to call it. We added a square root method called "root". Running this example will call the method and return the value (2.449 ...)
In this example, we'll run over a data table with a 1000 rows. We can supply each columns expression allowing us to perform various operations on each column, sequentially.
We'll also use a partitioned sum to set the Calc
column (identical to a SUM(Amount) / group by Type
).
Then finally, override the Type column with a concatenation of the Org and Type.
Booleans values are supplied as: true, false, yes, no
When coercing a String or Number value into a boolean, a 0, 1
will also be accepted
Numbers are supplied as whole or with a decimal point: 5, 2.5
Strings are within double quotes or single quotes: "name", 'day'
Date values are supplied in a string: '01-01-1900'
Timespan values are supplied variably:
01:10
is 1 hour, 10 minutes
01:10:22
is 1 hour, 10 minutes, 22 seconds
01:10:22:55
is 1 day 10 hours, 22 minutes, 55 seconds
01:10:22:55.110
is is 1 day 10 hours, 22 minutes, 55 seconds, 110 milliseconds
+ for addition
- for subtraction
* For multiplication
/ for division
^ for power
& for concatenation
% for modulus
== or = for equals
!= for not equals
All < > >= <= comparison symbols for greater/lesser than.
All of the functions listed below are included in the FxDefaultFunctions
class.
Format
)In the below examle, this would format a string value containing a decimal in France currency with 2 decimal places.
abs
)Ensures the output value is always positive.
max
)Returns the larger of two numbers.
min
)Returns the smaller of two numbers.
sin
)Calculates the sine of a given angle (in radians).
cos
)Calculates the cosine of a given angle (in radians).
tan
)Calculates the tangent of a given angle (in radians).
log10
)Calculates the base-10 logarithm of a number.
sqrt
)Calculates the square root of a number.
round
)Rounds a number to a specified number of decimal places.
places
)Rounds a number to a specified number of decimal places without exceeding the original value.
ceil
)Rounds a number up to the nearest integer.
floor
)Rounds a number down to the nearest integer.
NullIf
)Returns null
if the first value equals the fallback value; otherwise, returns the first value.
IsNull
)Returns the fallback value if the first value is null
; otherwise, returns the first value.
If
)Returns ifTrue
if the condition is true; otherwise, returns ifFalse
.
Coalesce
)Returns the first non-null value from the provided conditions.
and
)Returns true
if all conditions are true; otherwise, returns false
.
or
)Returns true
if any condition is true; otherwise, returns false
.
not
)Inverts the boolean value of the condition.
iferror
)Returns the fallback value if the first value is an error; otherwise, returns the first value.
iserror
)Checks if the value is an error.
isnumber
)Checks if the value is a number.
isstr
)Checks if the value is a string.
isblank
)Checks if the value is null
or an empty string.
isbool
)Checks if the value is a boolean.
pmt
)Calculates the payment for a loan based on constant payments and a constant interest rate.
ppmt
)Calculates the principal part of a payment for a specific period.
ipmt
)Calculates the interest part of a payment for a specific period.
today
)Returns the current date.
now
)Returns the current date and time.
date
)Creates a date from year, month, and day.
DateAdd
)Adds a specified number of intervals to a date.
DateDiff
)Calculates the difference between two dates based on the specified date part.
contains
)Returns a boolean of whether or not the string contains a value
ame
)Parse a name and return a name part. Useful when a column contains a full name and you need to split it.
Valid name parts:
title
first
last
middle
nickname
(nickname)
suffix
fullname
(first middle last)
all
(all name parts, just cleaned up and optionally title cased)
nameformat
)Just like the name function, it allows you the freedom to format the name result in any way you like. Example: {first}/{last} -> Bob/Jones
Valid name replacement strings:
{title}
{first}
{last
{middle
{nickname}
(nickname)
{suffix}
split_take
)Split take works a lot like split, but you can specify a range of results to rejoin.
An example of this is:
This would return a-b-f-g
. as it takes the first two elements, then elements 5 through 20 if they exist.
split
)Split a string and return a split value
The index is zero based, meaning 0 is the first item. You can use 'first', or 'last' as well to retrieve those values.
defaultValue
can be supplied when an index is out of range, or the value is null.
You may supply as many string separators as you need, such as:
repeat
)Repeats a sequence of strings a specified number of times.
concat
)Concatenates multiple strings into one.
substr
)Extracts a substring from a string starting at a specified index with a specified length.
left
)Returns the leftmost specified number of characters from a string.
right
)Returns the rightmost specified number of characters from a string.
trim
)Removes leading and trailing whitespace from a string.
trim
)Removes specified characters from the start and end of a string.
replace
)Replaces all occurrences of a specified substring with another substring.
upper
)Converts a string to uppercase.
lower
)Converts a string to lowercase.
len
)Returns the length of a string.
sumvals
)Calculates the sum of multiple numerical values.
avgvals
)Calculates the average of multiple numerical values.
There are times when you want to turn off or disable something running when a condition occurs. An example of this would be to only run a Coordinator when there is a known network connection available.
This is made possible by a ThreadCondition
:
Let's take a look at what's happening:
The method itself is thread blocking, so it won't return out until the parentToken
is cancelled.
The first callback supplies a childToken
to be used in any method that returns a Task.
This could be your own Task or "async" method.
The second callback should return TRUE
if the method should be running, or start for the first time.
If it returns FALSE
, the condition will cancel the childToken
and await the Task
from the first callback.
The final parameter you can supply is how often this check occurs.
In this example, flipping off the Wi-Fi radio of your development box will stop the coordinator from receiving new messages. Turning it back on will start the coordinator again.
A helper class when dealing with a range of data or cells
To create a new range, supply two initial values:
As you can see it corrected the range by fixing the starting point to A1
, and the ending point to C3
.
The zero based indexes of the range are 0 to 2 inclusive
You can also convert the common Excel locators using the built in functions:
This handy method takes a CRON string and is an easy way of creating an awaitable task. It returns True if the await finished without being cancelled by the optional token.
One of the utility class methods is a Retry
and RetryAsync
. They provide an easy ability to retry on exception thrown with a delay.
The response from these classes is a Tuple where
Item1
is a boolean
indicating if it was a success(true), or failed the maximum number of times (false).
Item2
is the exception thrown if it was not a succeed
Another retry strategy is the Exponential Backoff method. Unlike a constant delay between retries, this method progressively increases the delay (incorporating jitter) to prevent overwhelming a remote system with requests.
You can define both a base delay (in milliseconds) and a maximum delay (in milliseconds) to ensure the delay between requests scales appropriately.
I won't list the models here as they change and get updated frequently, however, we keep these relatively up to date, and the latest greatest models should be available to use.
Formats a date, time span, or decimal according to the specified format. Culture is optional and follows the defined by Microsoft.
You may also use in the condition callback. This will process the tree on every run.
This handy method uses several de-duplication methods, as well as the Soundex and Levenshtein algorithms to produce a best-guess match between two different lists.
It will always match A => B. Meaning items that items in A will only attempt matches to items in B, not the other way around.
In the below example, Peech
is matched to PeachJuice
even though it's misspelled and Agave
has no match. Every other item is matched with it's equivalent juice.
This version of list matching is almost identical to what is listed above, the main difference is that your input dictionary defines the keys to match, and the value is a comma separated list of additional "hints" to match by.
In this example, we can match the key Apple
with Appljuise
, because of the additional hints supplied by the dictionary.
If you're attempting to debug your list matching and want to understand the match values being returned by this, use the _Quality function. It will return a rich object that is easy to read showing the matched value for each key.
Takes every item in a list and joins it with the delimiter
Works identically to the classic LINQ Distinct, however it works on classes as well.
Sometimes you have a list of classes, and need to convert it to a table. This uses reflection to perform the conversion.
Enable you to filter a list of classes that conform to the subclass of another class
Recursively flatten a data structure and return the new type
Recursively select down a tree until something is null. Great for returning a list of all inner exceptions in order.
Our first example was a bunch of fruit juices. The remove duplication method attempts to find any common pattern in the list of items and remove them.
This class provides functionality for downloading data from a database and storing the result in a provided class. It supports multithreaded asynchronous calls, and allows for a queue of threaded download operations. It has automatic retry capability built in.
The type of DB Connection passed determines the database language it uses. You could for example provide a MySql connection instead.
There is an easy way to create the DBDownloader by using the static extension method.
Assigns the downloaded data to a property of the base object. Can be called multiple times using fluent syntax to fill multiple properties. The downloader automatically manages the multithreaded and asyhcnronous operations for filling each property.
It uses Dapper behind the scenes to perform the mapping and query execution. If you want to supply parameters, you can use the object
Param
parameter.
If you're not assigning a list, but a single object, use AssignSingle()
.
Waits for all download tasks to complete. This can be called inline or later right before the downloaded data is required. It is thread blocking.
Gets the total time taken for all download tasks to complete.
You can get to the list of tasks created by accessing this property.
Every DownloadResult has properties for the success boolean, if an exception was thrown, the name of the property it set, and dates to indicate start and end times.
TopoSort is a static class that provides functionality to perform a topological sort on a collection of items with dependencies.
This method takes a source collection of items and a function that returns the dependencies of each item. It returns a sorted list of items based on their dependencies.
A static class for serializing, compressing, and decompressing objects using JSON serialization and Brotli compression.
Serializes and compresses an object using JSON serialization and Brotli compression.
Decompresses the given byte array and deserializes it into type T.
Decompresses the given byte array and deserializes it into a List of type T.
Serializes, encrypts, and compresses an object using JSON serialization and Brotli compression.
Decrypts and decompresses the given byte array and deserializes it into type T.
Type T deserialized from the decompressed byte array.
Decrypts and decompresses the given byte array and deserializes it into a List of type T.
Let's break that down to something we can all follow, like programming the AI of a video game.
The AI needs to do two things:
Check for enemies in the area, If there are enemies in the area, then:
Check if they want to attack, or can attack
Only if the above statement is true, move into combat range
Check for non combat activities after checking that there is no immediate danger in the area
Am I hungry?
Am I thirsty?
The two state checks (enemy check, non combat check) are known as a list of fallbacks
. The reason for this is that every subsequent child node in a Fallback
is executed in order until one succeeds.
As almost an opposite to Fallback
nodes - Sequence
nodes only execute their children in order if the previous one succeeds.
Use Fallback
nodes for an ordered list of items to execute, where a failure moves onto the next node
Use Sequence
if the children need to succeed to proceed to the next node
The demo below uses simple bool variables to control state, but in a real scenario, these would be callback to check within a radius of the AI, or pull it's current hunger/thirst levels.
With that code in place, let's print and run our tree:
If you run the code as shown above, you'll notice the last node to be executed is: Move in for combat.
The reason for this is because "Check for Enemies"
takes priority over "Non Combat Activities"
.
Let's set bool Enemies = false;
and run again. The last node to run this time is:
Thirsty - drink.
That's because our fallback node didn't have any enemies to check for, and our Thirsty bool is true.
This is what makes behavior trees so powerful. Simple state management changes the outcome of what it decides to do. Perigee has a built in SDK to make working with Behavior Trees quite easy.
The print produces a nice breakdown of the tree itself. Showing nodes and their children
The tick engine runs a tree until success or failure. The run code is effectively processing the tree until it's no longer running
:
The three main types are Fallback, Sequence, and Leaf. Leaf nodes are "special" nodes as they are the only nodes that contain a callback for code to be executed to determine the node state.
The three states are:
Success - It's completed.
Processing - It's still working on it, try again...
Failure - It failed, stop.
(Sometimes called a "Selector Node")
Fallback nodes process children nodes in order until a child node is a success, then fall out of the loop.
Processes all children in order, if a child fails, it will return a failure and start back at the first child on next rerun (tick)
Execute code to determine what status a node is in.
There are several special operations that can be performed on nodes.
You can shuffle a set of nodes by using this extension. The example below randomly shuffles whether the hunger check or the thirst check happens first
Every node has a default SortOrder
property. You can assign nodes a sort order or re-prioritize them at runtime/tree creation.
Sort orders these in ascending order. In this example, we sort Thirsty above Hungry.
Invert reverses statuses. Success become failure, and failure become success.
Even though the bool Hungry = false;
- the inversion node inverts the node status to be SUCCESS
.
Retry does as it sounds, it retries the node X number of times if the node returns a FAILURE status:
You'll see "Hungry test"
print 3 times (original + 2 retries).
There is both a ForceFailure
and a ForceSuccess
node. These are helpful when building certain trees that require a forceful response.
There are two handy built in methods for quickly creating the two kinds of trees:
There are multiple pre-built leaf nodes available to use. They return a Leaf Node that is already pre-coded to return the correct Node Status.
To check if the network is available
To check if a ping to 8.8.8.8
is available
To check if the address responds positively to a ping
To check if SQL Server can open a connection
Nested Sets provide a fantastic way of traversing hierarchical data. Once processed, they are able to very rapidly walk large data trees.
Suppose you're given a list of products and are asked:
"What products does the clothing category contain?"
"What products are my siblings?"
"Do I have any products under my category?"
All of these questions are easily answered by processing the list of items into a Nested Set. Once the conversion process is finished, it's very easy to query the set and it's incredibly fast.
To convert a list of items to a nested set, simply create a new NestedSet
by supplying the Root
, ID
, and Parent
fields.
Once finished, this gives you access to all of the other query abilities
Upline is how you traverse a tree upwards. So starting from key 15(vegetables), let's walk all the way back to "All Products".
Downline is how you traverse a tree downwards. So starting from key 1(electronic), let's walk all the way down.
Siblings is how you traverse a tree sideways. So starting from key 14(pizza), let's see what other siblings we have.
Every item is converted by creating a new NestedSet<T>
class, where the generic parameter is the type of objects that were originally processed. This class provides several additional properties available for use, as well as access to the original class used to convert.
Here's an example of retrieving the root node (key 0).
Let's look at some of the properties of this NestedSet<
Product
>
below.
These properties are assigned from the lookup. They are the ID and Parent fields from the supplied class
This property is the horizontal level in the tree, starting from 1(root) to n(lowest level)
These properties are the left and right identifiers for where the node is located in the tree
A numbered index starting from 1, moving down and across the tree
How many nodes does this node contain in it's downline, including itself.
The original class, in our example, this is a Product
.
Perigee's proprietary serialization tool, Bit, stands out as a high-performance, high-compression module for serializing and deserializing data. Comparable to Avro or Protobuf in function but distinct in its foundation, Bit is developed entirely with a unique algorithm in 100% C# code. It is integral to several internal modules at Perigee, facilitating scenarios that demand exceptional performance and minimal disk usage.
Key Features of Bit include:
Serialization and deserialization of classes, structs, lists, datatables, interfaced types, and properties. All without special "tagging" and pre-work
Support for reserialization and "patching" to update data without full rewrites.
If the type can be serialized, you can easily "deep clone" your object tree
Capability for partial deserialization, useful for adapting to revised data maps or focusing on specific properties.
Built-in compression to minimize data size.
Facilities for map reading and generation, enabling dynamic data structuring.
Tools to generate C# classes and schemas directly from data maps.
Very high performance serialization
Attributes for serialization events
Utilities for reference type patching
Bit optimizes data storage by creating a map for each property, determining the necessity of writing each field based on defaults and nullability. This selective approach, combined with the efficient compression of values, significantly reduces the size of the output byte array. In practice, this can compress lists of objects by over 95% compared to their JSON document equivalents.
Unlike many high-compression algorithms that impose strict data input conditions and support limited data types, Bit offers more flexibility. It's fully recursive, supports a wide array of common data types (including custom classes, structs, lists, dictionaries, etc.), and handles data maps without cumbersome limitations.
It even handles Interfaced types without adding additional code. Just make sure the interfaced type has a no-parameters constructor and all will be fine!
Bit's mapping system ensures data resilience over time. For instance, objects serialized with an initial map version (R0) can be seamlessly reserialized to a newer version (R1) as your data schema evolves. This built-in functionality simplifies the process of keeping serialized data up-to-date without the complexity often associated with such transformations.
Bit demonstrates exceptional efficiency in data storage, achieving compression ratios of over 90% in many cases, and even up to 98% when optimizing data types and attribute usage, far surpassing the compression achieved with minimized JSON.
Here's a simple example where we are serializing a single class that has a string
, ushort
, enum
, and recursive list of related "bit people
".
It's extremely easy to use and Bit handles all of the data types and mapping.
If you take this example and compare the byte lengths of JSON to Bit, it's pretty crazy. The compression ratios get even bigger when a larger document is at play, as bit automatically tests for additional compression on larger objects.
The first attribute is Bit2Ignore
. This tells the serializer to ignore this property and not serialize it. It is important to put this attribute on properties that are not necessary to store, or should be re-calculated after deserialization.
There are two serialization attributes as well, they are: Bit2PreSerialize
and Bit2PostDeserialize.
These attribute allow you to specify a singular method to be called before serialization and after deserialization.
These methods must be instanced (not static) methods and the name does not matter.
In almsot all cases, reference types likely don't matter. In those rare cases they do, like traversing and comparing a directed graph, those reference types are very important. In this example, we fix that by:
Implementing the post deserialize callback.
Checking if our deserialized class held it's comparer.
All of the adjacency lists instances are now re-referenced. If you were to call object.ReferenceEquals(a,b)
for two nodes in this graph, it would return true!
For scenarios where you need extreme performance, you can achieve 70-250X faster compression times using the DeSerCache
(Many of our sample test cases were only 2-10 microseconds).
Creating a cache is easy. Simply supply a instantiated object and let the system pre-cache the object and create a bitMap to match. Then supply these objects to any serialization / clone / deserialization calls you want.
This is incredibly helpful if you're serializing many of the same types over and over again (like a realtime communication application or lots of data store requests).
Deserializes a byte array into an instance of a class.
Deserializes a byte array without a header into an instance of a class. Useful for data serialized without initial header bytes.
Serializes an object into a byte array without adding a header.
Serializes an object into a byte array, including a header with a version.
Serializes an object into a byte array and outputs the bytes of the map used for serialization.
Reserializes data from one map version to another. It enables transferring data between different revisions of a map, handling changes like new fields, removed fields, or updated data types.
Similar to Reserialize
, but includes a BeforeReserialization
callback allowing modification of the object just before reserialization.
Deserializes an object, patches it using a provided callback function, and then reserializes it.
Deep clone an object by serializing and deserializing with the same mapping information.
Please note, this only works with properties that Bit2 is able to serialize. Please verify your data can be serialized properly before relying on DeepClone.
This allows you to supply an IEqualityComparer<T>, and either one list, an array of lists, or an initial list + additional lists to replace all reference types. It uses the comparer to identify identical classes and then replaces those unique instances with references to itself.
Converts a map to a C# byte array string. Useful for storing maps internally for later deserialization.
Maps a type to a BitMap
and optionally serializes and returns the bytes of that map.
Reads the BitHeader
from a byte array, using a maximum of 6 bytes. It also outputs the version used during serialization if available.
Converts a byte array to a BitMap
.
Generates a schema from a map, producing full C# classes that can be used to deserialize the data.
A behavior tree allows for a complex set of requirements and tasks to be defined in such a way that a decision can be made based on an infinite number of states. Behavior Trees are used in AI, robotics, mechanical control systems, etc. We use them with for fine grain execution control.
To check if an exists
To check if an has expired data. Returns SUCCESS if the data is NOT expired.
If #2 is true, calling - Which is a Bit2 utility method.
This is likely not necessary on most deserialized data. However, if it is necessary, you have a helper here
Let's say your web app allows users to schedule reports to email off. These items are configurable and the user can add or remove them at any time.
The front-end web app saves a database record back with all the information needed to run the specific report and it's parameters.
Then the Scheduler reads the table for those database records every few minutes and updates, removes, reschedules any changed items.
You get a simple callback when it's time to execute the event task
The demo below hooks up everything but actually generating reports, and emailing them off.
The RunType
argument could easily contain what reports to generate.
The RunArgs
could easily contain additional user information like who it's being sent to, a database ID of the job to lookup, etc.
Line 5 - Declare a new event source. This demo uses the memory scheduler.
Line 9-10 - Add the two scheduled items, A, and B, to schedule and execute
Line 13 - Add a scheduler using the source we defined, and declare the callback.
You're given a CancellationToken for respecting graceful shutdown event.
An ILogger for logging to the system and defined sink sources.
And the GenericScheduledItem<ushort> Which is the interfaced item that allows you to access it's definition values.
Line 14 - You can execute or perform any tasks you need. Like generating a report with parameters.
You can see the call to GetRunType()
. There's also GetRunArgs()
, GetLastRunDate()
, GetName()
, etc.
Sync agents are a more powerful and configurable ways of defining tasks that must perform under a given sequence, dependency tree, or schedule. They can be configured in a number of ways that allow for very complex and fine grain control over how and when they execute.
A few of examples of why we would use a sync agent:
A task or job needs to be fired at a given time
We need the ability to schedule something in a different time zone
We want to request that an agent performs it's refresh from a remote source (like a database)
The task is dependant on other tasks completing
We have data that may expire if not kept current
We need to supply multiple CRON strings to specify which times the task runs
We need to supply a blackout range where the task should not be executed
Agents have several main blocks to them, they are as follows:
The first callback is the configuration section. We can set all kinds of settings including:
Maximum executions per day.
A timespan of how often to execute
An array of CRON strings to specifiy when to execute
Blackout periods in the form of Timespan and CRON strings.
Setting "* 5 * * *" as a blackout CRON would disallow the agent to run during the entire fifth hour of the day (from 5:00AM to 5:59AM inclusive)
The execution callback
This callback is only ever called when the agent is in an active refresh/sync state.
You can perform whatever logic you need here, and simply return exec.Complete or exec.Failure depending on the results of your process
Tree check callback
This callback is only for late binding trees, in the below example you can see how it's used to setup a behavior tree for checking previous agent runs
In the below example we configure 3 agents.
The first is run every 5 seconds, once a day.
The second is run on the minute 0 mark, once a day.
The level 2 agent has a late binding dependency tree to check to determine whether the first two succeeded and the data is not expired. If this is true, then it runs
Grab the code from the blueprint section for API hosting with Perigee
In this example we create a controller that allows us to turn threads on and off.
Simply add a new empty "API Controller" to the Controllers
folder.
Call it AdminController.cs
Paste the code below in to replace the content
In this example we've created a simple endpoint we can call to turn anything on or off by simply checking the thread registry for the requested thread, then turning it on or off.
The port may be different - but you can see how easy it would be to turn something on or off!
[GET] https://localhost:5001/api/admin?task=ProcessAgent&start=true
Typically speaking this would be secured or unavailable outside the firewall. Don't forget to secure your endpoints!
This example uses a file watcher to scan for CSV's and parse them into a DataTable
. It prints the info about it.
The IMAP bot will scan an email box and respond with the text you sent it, as well as info about attachments.
For more information about the imap client, see it's page linked below!
To run the demo, you would need to authenticate it properly with your email server. This demo is using SASL-OAUTH for Google GMail. If you want to supply direct username/password authentication you can do that as well.
This demo shows how you would authorize a token call to Microsoft Graph for delegated permissions where a token response is required.
This setup involves an app registration in Azure Portal, as well as registered redirects and the appropriately selected API Permissions.
To run the below demo, grab the Web Host Utilities and drop it into your project.
Our redirect is https://localhost:7201/api/token
- which is why our call to map the incoming HTTP request is /api/token
.
The appsettings.json
:
It's very important to include offline_access
if you want to be able to refresh the token automatically.
If you're trying to communicate with DataVerse, simply change the domain and scope parameters:
The appsettings.json
:
Working with OAuth Flow can be a double-edged sword. While it's crucial for securely accessing APIs, managing tokens, authorization state, and timeouts can be challenging. Get it wrong, and you may find yourself locked out of a resource or stuck with an expired token during an important operation.
QuickBooks Online (QBO) offers a comprehensive guide and an even better playground for developers, especially those just starting with OAuth. But, here comes the tricky part: you have to manage independent sets of authorization and refresh tokens for each authorized company (known as a realm) within your application.
This article breaks down how to use Perigee to effortlessly manage these tokens across multiple realms.
The following C# code snippet is executed immediately after receiving the RealmID
and Code
parameters from the API callback:
Since each realm has its own set of authorization details, it's important to re-register the callback functions for each realm when the application restarts:
Thanks to our prior setup with Perigee, making authorized realm calls is now a walk in the park. The system ensures that you get a freshly refreshed authorization code each time you hit an endpoint. Plus, it authorizes the call pre-emptively, reducing the risk of a failure midway through a process.
In conclusion, while OAuth can often feel like a labyrinth of tokens and authorizations, Perigee simplifies the process, making it far easier and more reliable.
Happy coding from the Perigee Team!
This demo will allow you to connect to service bus and retrieve messages. It's configured only for 1 concurrent message at a time, but is easily configurable.
Setup and configuration for your service bus can be done on the Azure Portal.
Install the package: Azure.Messaging.ServiceBus
Then install perigee
Copy the code over and configure the settings
Run and modify!
And the appsettings.json:
To run the below demos:
Create a new web api project (.NET 6+)
Download the Web Host Utilities and include it as part of your project.
Copy, paste, and run!
In this demo, we have a coordinator set up to run two steps.
The first step calls an API to submit order data. If it is a success, it will move onto the next step and log the data.
After calling the above demo with curl: curl https://localhost:7216/queue?number=90
After turning off Wi-Fi access, and turning it back on:
We make working with watermarks quite easy and they are very powerful.
Our watermarks are persisted locally to disk automatically and recalled on application startup.
They are debounced (We cover this in the updating section).
They are thread safe.
You can register as many subscribers as you need to the data feed.
You may push that data externally on update, and recall it on initialization for further backup.
The register call should be performed at application initialization if possible to avoid a race condition of pulling for it's value before it is registered. Requesting a watermark before it is registered will throw an exception .
Registering a watermark is an easy one liner. Let's look at the parameters:
Line 4 - Name - The name of the watermark
Line 7 - Func<Watermark> Initialization callback - If the value does not exist, the initialization callback is called and an initial value is set.
Line 10 - Action<Watermark> onUpdate? - An optional callback for when the value is updated.
To create the other base data types, simply provide that type in the initialization callback.
That's it. The underlying system does the heavy lifting of persisting this locally under the /watermarks
folder beside it's running application.
You're able to register as many "subscribers" to the data changes you desire in other parts of the code. These will be called on a background thread with the updated value.
The RegisterAdditionalEventHandler
simply takes two parameters:
Line 4 - Name - The name of the watermark you want to update.
Line 7 - EventHandler<Watermark> - This sends you the (sender, watermark)
back in the callback and you're able to read the value from there
The UpdateWatermark
call takes two parameters:
Line 4 - Name - The name of the watermark you want to update.
Line 7 - Watermark - The new watermark and value.
A lot happens behind the scenes you should be aware of:
Updates are debounced which means that rapid updates don't all push their values at once to any subscribers of the data updates.
Let's say you processed 10 records in a matter of 50 milliseconds and every record you updated the watermark so it went from the initial value of 0
to 10
.
After about a second, all of the subscribers would get a single updated watermark event with the value of 10
.
Debouncing reduces the number of calls both to the filesystem and to your subscribers
Updates are thread safe as they perform internal item locks before writing to disk and notifying subscribers.
Getting the watermark only requires the Name of the watermark to retrieve.
If there are inflight updates to the watermark, this call may block the thread for a second or two while those debounces get processed. If there are no inflight updates, it returns immediately.
From Data To Analyst In a Few Clicks
Need to take your Excel data and load it into your database to get the analysts started?
Instead of taking weeks to develop a new data pipeline, create structure, and work with the business to load your new client data in, just drop the file in Perigee. We will take care of everything from finding and getting the data loaded, to creating a table in only a few seconds. You’ll be a hero!
Here's a typical messed up Excel file where data is shifted and there are unwanted headers.
After running the application, you'll see the log showing how many rows it found, where the header was since it wasn't on the top row, and the column's detected. It even prints out the fill rates per column!
Taking a peak at our database, we see the data loaded into our new table dbo.clientData
:
Which came directly from the Excel file linked below
The appsettings.json
file:
The SampleExcel
file we loaded: