LogoLogo
HomePricingDocumentation
  • 💿Getting Started
    • Installation and Project Setup
    • Hello Perigee!
    • Perigee Application Design
    • Hello Configuration
    • Hello Logs
    • Hello Integration
    • Troubleshooting
    • Case Studies
  • 📃License + Notice
    • 📂Licensing
    • Notice of Third Party Agreements
  • 🚀Perigee and Beyond
    • Extending - Threads
    • Extending - Loaders
    • ⏳All about CRON
  • 🔮API Generation
    • What is API Generation?
    • API Builder
  • 🗺️Architecting YOUR App
    • Design and Requirements
    • Define Sources
    • Requirements
  • 🧩Core Modules
    • 🌐PerigeeApplication
    • 🪡Thread Registry
    • Event Sources
      • Scheduled/Logic
        • CRON Thread
        • Scheduler
        • Sync Agent
      • Watchers
        • SalesForce
        • Sharepoint
        • Directory Watch
        • Directory Notifier
        • IMAP
    • Credential Management
      • Connection Strings
      • Custom Refresh Logic
      • RestSharp Authenticator
      • Credential Store SDK
      • ⁉️Troubleshooting Credentials
    • Integration Utilities
      • HTTP(S) - RestSharp
      • Transaction Coordinator
      • Limiter
      • Watermarking
    • Alert Managers
      • SMS
      • Email
      • Discord
      • Teams
    • File Formats
      • Excel
      • CSV
    • 📁File System Storage
      • File Revision Store
      • Concurrent File Store
      • FileSync + Cache
    • Third Party
      • SmartSheets
      • Microsoft Graph
    • Perigee In Parallel
      • Parallel Processing Reference
      • Extensions
      • GroupProcessor
      • SingleProcessor
    • 🧱Utility Classes
      • Metrics
      • F(x) Expressions
      • Multi-Threaded Processor (Scatter Gather)
      • OpenAI - GPT
      • XML Converter
      • Dynamic Data Table
      • Debounce
      • Thread Conditions
      • Perigee Utility Class
      • Network Utility
      • Lists
      • FileUtil
      • Inclusive2DRange
      • Strings, Numbers, Dates
      • Nested Sets
      • Behavior Trees
      • JsonCompress
      • Topological Sorting
      • DBDownloader
    • 🈁Bit Serializer
  • 📣Examples and Demos
    • API + Perigee
    • 📰Excel Quick Load
    • SalesForce Watcher
    • Report Scheduler
    • Agent Data Synchronization
    • 📩IMAP Echo bot
    • Watch and load CSVs
    • Graph Delegated Authorization + DataVerse
    • Coordinator Demo
    • Azure Service Bus
    • QuickBooks Online
  • 📘Blueprints
    • Perigee With .NET Hosting
    • Web Host Utilities
    • 🔌Plugin Load Context
  • 🎞️Transforms
    • 🌟What is Transforms?
    • 📘Terminology
    • 🦾The Mapping Document
    • 👾Transformation Process
    • 😎Profile
    • 🎒Automation
      • 🕓Package Options
      • 🔳Configuration
    • 🔧Utilities
      • 🧹Clean
      • 📑Map File
      • 🔎File Identification
      • 🗺️Map Generation
      • 🪅Insert Statement Generation
  • 🗃️Transform SDK
    • 👋Quick Start Guide
    • 🥳MapTo
    • 🔌Authoring Plugins
      • 🔘File IO Process
      • 📢Data Quality
      • 🟢Transform Process
    • SDK Reference
      • 🔘FileIOProcessData
      • 📢DataQualityContext
      • 🎛️TransformDataContext
      • 🏅TransformResult
Powered by GitBook
On this page
  • Registering
  • Registering Handlers
  • Updating
  • Get
Export as PDF
  1. Core Modules
  2. Integration Utilities

Watermarking

PreviousLimiterNextAlert Managers

Last updated 2 years ago

We make working with watermarks quite easy and they are very powerful.

  • Our watermarks are persisted locally to disk automatically and recalled on application startup.

  • They are debounced (We cover this in the ).

  • They are thread safe.

  • You can register as many subscribers as you need to the data feed.

  • You may push that data externally on update, and recall it on initialization for further backup.

//Register a watermark
Watermarking.Register("IntegrationOffset", () => Watermark.FromDateTimeOffset(DateTimeOffset.UtcNow), (nVal) => {
    taskConfig.GetLogger<Watermark>().LogInformation("New value for {name} ({value})", nVal.Name, nVal.GetDateTimeOffset());
});

//Register additional handlers anywhere else, they receive updates to that watermark
Watermarking.RegisterAdditionalEventHandler("IntegrationOffset", (s, nVal) => {
    taskConfig.GetLogger<Watermark>().LogInformation("AddlHandler: New value for {name} ({value})", nVal.Name, nVal.GetDateTimeOffset());
});

//Push updates to this watermark anywhere else
Watermarking.UpdateWatermark("IntegrationOffset", Watermark.FromDateTimeOffset(DateTimeOffset.UtcNow));

//And when you need the value, refer to it again later
Watermarking.GetWatermark("IntegrationOffset").GetDateTimeOffset();
PerigeeApplication.ApplicationNoInit("HelloConfig",
(taskConfig) =>
{
    //Register me first!
    
    
    //Then .Add other methods
});

Registering

Registering a watermark is an easy one liner. Let's look at the parameters:

Watermarking.Register(

//Name 
"IntegrationOffset", 

//Initialization Callback
() => Watermark.FromDateTimeOffset(DateTimeOffset.UtcNow), 

//OnUpdate (optional)
(nVal) => {
    taskConfig.GetLogger<Watermark>().LogInformation("New value for {name} ({value})", nVal.Name, nVal.GetDateTimeOffset());
});
  • Line 4 - Name - The name of the watermark

  • Line 7 - Func<Watermark> Initialization callback - If the value does not exist, the initialization callback is called and an initial value is set.

  • Line 10 - Action<Watermark> onUpdate? - An optional callback for when the value is updated.

To create the other base data types, simply provide that type in the initialization callback.

Watermark.FromString("");
Watermark.FromInt(1);
//etc

That's it. The underlying system does the heavy lifting of persisting this locally under the /watermarks folder beside it's running application.

Registering Handlers

You're able to register as many "subscribers" to the data changes you desire in other parts of the code. These will be called on a background thread with the updated value.

The RegisterAdditionalEventHandler simply takes two parameters:

Watermarking.RegisterAdditionalEventHandler(

//Name of the watermark to register
"IntegrationOffset", 

//Callback function
(s, nVal) => {
    taskConfig.GetLogger<Watermark>().LogInformation("AddlHandler: New value for {name} ({value})", nVal.Name, nVal.GetDateTimeOffset());
});
  • Line 4 - Name - The name of the watermark you want to update.

  • Line 7 - EventHandler<Watermark> - This sends you the (sender, watermark) back in the callback and you're able to read the value from there

Updating

The UpdateWatermark call takes two parameters:

Watermarking.UpdateWatermark(

//Name
"IntegrationOffset", 

//New value
Watermark.FromDateTimeOffset(DateTimeOffset.UtcNow));
  • Line 4 - Name - The name of the watermark you want to update.

  • Line 7 - Watermark - The new watermark and value.

A lot happens behind the scenes you should be aware of:

  • Updates are debounced which means that rapid updates don't all push their values at once to any subscribers of the data updates.

    1. Let's say you processed 10 records in a matter of 50 milliseconds and every record you updated the watermark so it went from the initial value of 0 to 10.

    2. After about a second, all of the subscribers would get a single updated watermark event with the value of 10.

    3. Debouncing reduces the number of calls both to the filesystem and to your subscribers

  • Updates are thread safe as they perform internal item locks before writing to disk and notifying subscribers.

Get

Getting the watermark only requires the Name of the watermark to retrieve.

Watermarking.GetWatermark("IntegrationOffset").GetDateTimeOffset()

If there are inflight updates to the watermark, this call may block the thread for a second or two while those debounces get processed. If there are no inflight updates, it returns immediately.

The register call should be performed at application initialization if possible to avoid a of pulling for it's value before it is registered. Requesting a watermark before it is registered will throw an exception .

race condition
updating section
🧩
Page cover image