Coordinator Demo
Last updated
Last updated
To run the below demos:
Create a new web api project (.NET 6+)
Download the Web Host Utilities and include it as part of your project.
Copy, paste, and run!
In this demo, we have a coordinator set up to run two steps.
The first step calls an API to submit order data. If it is a success, it will move onto the next step and log the data.
// Visit https://docs.perigee.software to learn more
// Visit https://perigee.software to purchase a license!
//To call order, use postman or curl:
//curl https://localhost:7216/queue?number=66
PerigeeApplication.ApplicationNoInit("Coordinator", (c) =>
{
#region Handlers
//Declare a coordinator source - MSSQL
//var handler = new ITransactionSource_MSSQL(c.GetConnectionString("integration"), "Order_Header", "Order_Item");
//... Or a memory handler if you prefer local testing
var handler = new ITransactionSource_Memory("ITSourceMemory.json", c.GetCancellationToken());
//To clear local directory on load (for ease of testing)
// if (Directory.Exists($"TC{Path.DirectorySeparatorChar}OrderProcess")) Directory.Delete($"TC{Path.DirectorySeparatorChar}OrderProcess", true);
// if (File.Exists("ITSourceMemory.json")) File.Delete("ITSourceMemory.json");
#endregion
//Add a coordinator named "OrderProcess" to the system
c.AddTransactionCoordinator("OrderProcess", handler, (ct, l, process) =>
{
using var client = new RestClient("https://localhost:7216");
//MultiStep
process.MultiStep(new string[] { "order", "report", "csv" }, MultiStepOptions.RequireInternet,
(order) =>
{
//Get the initial data object, a string
var orderString = $"O-{order.GetInitialDataObjectAs<string>()}";
l.LogInformation("Executing order {order}", orderString);
var rsp = order.Execute<OrderResponse>(client, new RestRequest("/order", Method.Get).AddParameter("order", orderString));
},
(report) =>
{
//Get the data object from the last step, an OrderResponse
var order = report.GetDataObjectFromLastStepAs<OrderResponse>();
l.LogInformation("Reporting order {@order} to record service", order);
var rsp = report.Execute(client, new RestRequest("/report", Method.Get).AddParameter("order", order.Order).AddParameter("created", order.CreatedAt.ToString("O")));
},
(csv) =>
{
//Get the response body, since we're logging it.
var previousResponseBody = csv.GetPreviousItem()!.ResponseBody;
//Get the original item, "order".
var orderItem = csv.GetItemWithName("order")!;
l.LogInformation("Order has been completed! Response was: {body}", previousResponseBody);
//Generate a CSV and save it to our filesystem
var DT = new List<OrderResponse>() { JsonConvert.DeserializeObject<OrderResponse>(previousResponseBody)! }.ToDataTable();
new CSVWriter(DT).WriteFile($"OrderCSV{Path.DirectorySeparatorChar}Order_{orderItem.TransactionID}.{orderItem.ReplayID}.csv");
l.LogInformation("Complete! Wrote CSV to path");
//As this isn't using .Execute, we need to set the status to complete the item, and header.
csv.SetStatus(TransactionStatus.Completed);
}
);
}, LocalPullTimespan: TimeSpan.FromSeconds(15), RemotePullTimespan: TimeSpan.FromSeconds(60));
//Add an API to trigger the processes
c.AddMinimalAPI("DemoAPI", 7216, (r) =>
{
//To queue a new order with number
r.MapGet("/queue", ([FromQuery] string number, TransactionCoordinator tc) =>
{
tc.QueueTransaction(TransactionHeader.Transaction(number, number));
return Results.Ok("Enqueued");
});
//Replay an order
r.MapGet("/replay", ([FromQuery] string number, TransactionCoordinator tc) =>
{
try
{
tc.ReplayTransaction(number);
return Results.Ok("Replay started");
}
catch (Exception)
{
return Results.BadRequest("Couldn't replay that ID");
}
});
//An API method to get an order and respond with it
r.MapGet("/order", ([FromQuery] string order) => Results.Json(new { Order = order, CreatedAt = DateTimeOffset.Now }, statusCode: 201));
r.MapGet("/report", ([FromQuery] string order, [FromQuery] DateTimeOffset created) => Results.Json(new { Order = order, CreatedAt = created, Status = "order successfully reported to record system" }, statusCode: 200));
}, (b, s) => { s.AddSingleton(c.GetTransactionCoordinator("OrderProcess")!); });
});
public class OrderResponse { public string Order { get; set; } public DateTimeOffset CreatedAt { get; set; } }
After calling the above demo with curl: curl https://localhost:7216/queue?number=90
[16:42:18 INF](OrderProcess) Executing order O-90
[16:42:27 INF](OrderProcess) Reporting order {"Order": "O-90", "CreatedAt": "2023-05-10T16:42:18.4905622-04:00", "$type": "OrderResponse"} to record service
[16:42:27 INF](OrderProcess) Order has been completed! Response was: {"order":"O-90","createdAt":"2023-05-10T16:42:18.4905622-04:00","status":"order successfully reported to record system"}
[16:42:27 INF](OrderProcess) Complete! Wrote CSV to path
After turning off Wi-Fi access, and turning it back on:
//Off
[11:43:11 INF](OrderProcess) Coordinator OrderProcess cancellation requested
[11:43:11 INF](OrderProcess) Coordinator OrderProcess cancellation complete
//Back on
[11:43:16 INF](OrderProcess) Coordinator OrderProcess starting