Page cover image

Watermarking

We make working with watermarks quite easy and they are very powerful.

  • Our watermarks are persisted locally to disk automatically and recalled on application startup.

  • They are debounced (We cover this in the updating section).

  • They are thread safe.

  • You can register as many subscribers as you need to the data feed.

  • You may push that data externally on update, and recall it on initialization for further backup.

//Register a watermark
Watermarking.Register("IntegrationOffset", () => Watermark.FromDateTimeOffset(DateTimeOffset.UtcNow), (nVal) => {
    taskConfig.GetLogger<Watermark>().LogInformation("New value for {name} ({value})", nVal.Name, nVal.GetDateTimeOffset());
});

//Register additional handlers anywhere else, they receive updates to that watermark
Watermarking.RegisterAdditionalEventHandler("IntegrationOffset", (s, nVal) => {
    taskConfig.GetLogger<Watermark>().LogInformation("AddlHandler: New value for {name} ({value})", nVal.Name, nVal.GetDateTimeOffset());
});

//Push updates to this watermark anywhere else
Watermarking.UpdateWatermark("IntegrationOffset", Watermark.FromDateTimeOffset(DateTimeOffset.UtcNow));

//And when you need the value, refer to it again later
Watermarking.GetWatermark("IntegrationOffset").GetDateTimeOffset();

Registering

Registering a watermark is an easy one liner. Let's look at the parameters:

Watermarking.Register(

//Name 
"IntegrationOffset", 

//Initialization Callback
() => Watermark.FromDateTimeOffset(DateTimeOffset.UtcNow), 

//OnUpdate (optional)
(nVal) => {
    taskConfig.GetLogger<Watermark>().LogInformation("New value for {name} ({value})", nVal.Name, nVal.GetDateTimeOffset());
});
  • Line 4 - Name - The name of the watermark

  • Line 7 - Func<Watermark> Initialization callback - If the value does not exist, the initialization callback is called and an initial value is set.

  • Line 10 - Action<Watermark> onUpdate? - An optional callback for when the value is updated.

That's it. The underlying system does the heavy lifting of persisting this locally under the /watermarks folder beside it's running application.

Registering Handlers

You're able to register as many "subscribers" to the data changes you desire in other parts of the code. These will be called on a background thread with the updated value.

The RegisterAdditionalEventHandler simply takes two parameters:

Watermarking.RegisterAdditionalEventHandler(

//Name of the watermark to register
"IntegrationOffset", 

//Callback function
(s, nVal) => {
    taskConfig.GetLogger<Watermark>().LogInformation("AddlHandler: New value for {name} ({value})", nVal.Name, nVal.GetDateTimeOffset());
});
  • Line 4 - Name - The name of the watermark you want to update.

  • Line 7 - EventHandler<Watermark> - This sends you the (sender, watermark) back in the callback and you're able to read the value from there

Updating

The UpdateWatermark call takes two parameters:

Watermarking.UpdateWatermark(

//Name
"IntegrationOffset", 

//New value
Watermark.FromDateTimeOffset(DateTimeOffset.UtcNow));
  • Line 4 - Name - The name of the watermark you want to update.

  • Line 7 - Watermark - The new watermark and value.

A lot happens behind the scenes you should be aware of:

  • Updates are debounced which means that rapid updates don't all push their values at once to any subscribers of the data updates.

    1. Let's say you processed 10 records in a matter of 50 milliseconds and every record you updated the watermark so it went from the initial value of 0 to 10.

    2. After about a second, all of the subscribers would get a single updated watermark event with the value of 10.

    3. Debouncing reduces the number of calls both to the filesystem and to your subscribers

  • Updates are thread safe as they perform internal item locks before writing to disk and notifying subscribers.

Get

Getting the watermark only requires the Name of the watermark to retrieve.

Watermarking.GetWatermark("IntegrationOffset").GetDateTimeOffset()

If there are inflight updates to the watermark, this call may block the thread for a second or two while those debounces get processed. If there are no inflight updates, it returns immediately.

Last updated