# Watermarking

We make working with watermarks quite easy and they are very powerful.&#x20;

* Our watermarks are persisted locally to disk automatically and recalled on application startup.
* They are debounced (We cover this in the [updating section](#updating)).
* They are thread safe.
* You can register as many subscribers as you need to the data feed.
* You may push that data externally on update, and recall it on initialization for further backup.

```csharp
//Register a watermark
Watermarking.Register("IntegrationOffset", () => Watermark.FromDateTimeOffset(DateTimeOffset.UtcNow), (nVal) => {
    taskConfig.GetLogger<Watermark>().LogInformation("New value for {name} ({value})", nVal.Name, nVal.GetDateTimeOffset());
});

//Register additional handlers anywhere else, they receive updates to that watermark
Watermarking.RegisterAdditionalEventHandler("IntegrationOffset", (s, nVal) => {
    taskConfig.GetLogger<Watermark>().LogInformation("AddlHandler: New value for {name} ({value})", nVal.Name, nVal.GetDateTimeOffset());
});

//Push updates to this watermark anywhere else
Watermarking.UpdateWatermark("IntegrationOffset", Watermark.FromDateTimeOffset(DateTimeOffset.UtcNow));

//And when you need the value, refer to it again later
Watermarking.GetWatermark("IntegrationOffset").GetDateTimeOffset();
```

{% hint style="warning" %}
The register call should be performed at application initialization if possible to avoid a [race condition](https://en.wikipedia.org/wiki/Race_condition) of pulling for it's value before it is registered. <mark style="color:red;">**Requesting a watermark before it is registered will throw an exception**</mark> .

```csharp
PerigeeApplication.ApplicationNoInit("HelloConfig",
(taskConfig) =>
{
    //Register me first!
    
    
    //Then .Add other methods
});
```

{% endhint %}

### Registering

Registering a watermark is an easy one liner. Let's look at the parameters:

{% code lineNumbers="true" %}

```csharp
Watermarking.Register(

//Name 
"IntegrationOffset", 

//Initialization Callback
() => Watermark.FromDateTimeOffset(DateTimeOffset.UtcNow), 

//OnUpdate (optional)
(nVal) => {
    taskConfig.GetLogger<Watermark>().LogInformation("New value for {name} ({value})", nVal.Name, nVal.GetDateTimeOffset());
});
```

{% endcode %}

* <mark style="color:blue;">**Line 4 - Name**</mark> - The name of the watermark
* <mark style="color:blue;">**Line 7 - Func\<Watermark> Initialization callback**</mark> - If the value does not exist, the initialization callback is called and an initial value is set.
* <mark style="color:blue;">**Line 10 - Action\<Watermark> onUpdate?**</mark> - An optional callback for when the value is updated.

{% hint style="success" %}
To create the other base data types, simply provide that type in the <mark style="color:blue;">**initialization callback**</mark>.&#x20;

```csharp
Watermark.FromString("");
Watermark.FromInt(1);
//etc
```

{% endhint %}

That's it. The underlying system does the heavy lifting of persisting this locally under the **`/watermarks`** folder beside it's running application.

### Registering Handlers

You're able to register as many "subscribers" to the data changes you desire in other parts of the code. These will be called on a background thread with the updated value.

The <mark style="color:green;">**`RegisterAdditionalEventHandler`**</mark> simply takes two parameters:

{% code lineNumbers="true" %}

```csharp
Watermarking.RegisterAdditionalEventHandler(

//Name of the watermark to register
"IntegrationOffset", 

//Callback function
(s, nVal) => {
    taskConfig.GetLogger<Watermark>().LogInformation("AddlHandler: New value for {name} ({value})", nVal.Name, nVal.GetDateTimeOffset());
});
```

{% endcode %}

* <mark style="color:blue;">**Line 4 - Name**</mark> - The name of the watermark you want to update.
* <mark style="color:blue;">**Line 7 - EventHandler\<Watermark>**</mark> - This sends you the <mark style="color:blue;">**`(sender, watermark)`**</mark> back in the callback and you're able to read the value from there

### Updating

The <mark style="color:green;">**`UpdateWatermark`**</mark> call takes two parameters:

{% code lineNumbers="true" %}

```csharp
Watermarking.UpdateWatermark(

//Name
"IntegrationOffset", 

//New value
Watermark.FromDateTimeOffset(DateTimeOffset.UtcNow));
```

{% endcode %}

* <mark style="color:blue;">**Line 4 - Name**</mark> - The name of the watermark you want to update.
* <mark style="color:blue;">**Line 7 - Watermark**</mark> - The new watermark and value.

A lot happens behind the scenes you should be aware of:

* Updates are <mark style="color:purple;">**debounced**</mark> which means that rapid updates don't all push their values at once to any subscribers of the data updates.&#x20;
  1. Let's say you processed 10 records in a matter of 50 milliseconds and every record you updated the watermark so it went from the initial value of <mark style="color:red;">**`0`**</mark> to <mark style="color:red;">**`10`**</mark>.&#x20;
  2. After about a second, all of the subscribers would get **a single updated watermark event** with the value of <mark style="color:red;">**`10`**</mark>.
  3. Debouncing reduces the number of calls both to the filesystem and to your subscribers
* Updates are thread safe as they perform internal item locks before writing to disk and notifying subscribers.&#x20;

### Get

Getting the watermark only requires the <mark style="color:blue;">**Name**</mark> of the watermark to retrieve.&#x20;

```csharp
Watermarking.GetWatermark("IntegrationOffset").GetDateTimeOffset()
```

If there are inflight updates to the watermark, this call may block the thread for a second or two while those debounces get processed. If there are no inflight updates, it returns immediately.&#x20;
