Using Http Webhook

When debatching a Json file with 5600 articles, I found out that using a looping construct in Logic Apps (like foreach or until) is very slow. I had to create a set of Json files with the number of articles equal to a configurable batch size. As an alternative to Logic App looping, I decided to implement the looping in an Azure function, i.e. in custom C#.

[box type=”info”] You can also enable the Split On setting on your queue trigger. This setting automates debatching when you have multiple queue messages. In this case Split On is not usable, because we only have one queue message that needs to be debatched. [/box]

As expected looping in C# is very performant. There was one problem left however. When you call a function from a Logic App, you typically use the Function App connector. When doing so, you will be confronted with a time-out interval of two minutes. In this case not a problem, but I wanted to create a generally usable debatcher. That’s the reason I called an Azure function named TriggerDebatchEntities via a so-called Http Webhook. A Http Webhook allows you to call a function synchronously, pass a callbackurl, and then wait for an asynchronous response. The asynchronous response can’t be returned by the Http triggered function TriggerDebatchEntities. That’s why I created a second queue triggered function named DebatchEntities. DebatchEntities loops through the Json file, calls a REST service every time the batch size is reached and finally returns control to the Logic App by calling the callbackurl. DebatchEntities is not limited to two minutes, but can run for ten minutes. That’s a very long time in computing.

Http Webhook in Logic App:


Note that callbackUrl is set to variable @listCallbackUrl. This Url is a deeplink to the Webhook action. That means the Logic App will continue after the Webhook action when an asynchronous response is received.

Function TriggerDebatchEntities:

#r “Newtonsoft.Json”
#r “Microsoft.WindowsAzure.Storage”

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Queue;
using System;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using System.Text;
using Microsoft.Azure.WebJobs;
using System.Diagnostics;
using Microsoft.Azure.WebJobs.Host;

public class EventMessage
{
public int batchSize { get; set; }

public string callbackUrl { get; set; }

public string blobContainer { get; set; }

public string blob { get; set; }

}

public static async Task<HttpResponseMessage> Run(HttpRequestMessage req, TraceWriter log)
{
int batchSize = 1;
string callbackUrl;
string blobContainer;
string blob;

string storageConnectionString;

try
{

log.Info(String.Format(“Entered HTTP trigger TriggerDebatchEntities, batchsize: {0}”, batchSize.ToString()));

// parse query parameter
batchSize = (req.GetQueryNameValuePairs()
.FirstOrDefault(q => string.Compare(q.Key, “batchSize”, true) == 0)
.Value != null ? Int32.Parse(req.GetQueryNameValuePairs()
.FirstOrDefault(q => string.Compare(q.Key, “batchSize”, true) == 0)
.Value) : 1);

log.Info(String.Format(“batchsize: {0}”, batchSize.ToString()));

// Get request body
dynamic data = await req.Content.ReadAsAsync<object>();
callbackUrl = data?.callbackUrl;
blobContainer = data?.blobContainer;
blob = data?.blob;

log.Info(“Queue fiels read”);

if (callbackUrl == null)
{
return req.CreateResponse(HttpStatusCode.BadRequest, “Please pass a callbackUrl in the request body (required).”);
}

if (blobContainer == null || blob == null)
{
return req.CreateResponse(HttpStatusCode.BadRequest, “Please pass a valid blob(container) reference.”);
}

// Debatchen verplaatsen naar hoofdfunctie omdat berichtomvang anders te groot is voor de queue
storageConnectionString = Environment.GetEnvironmentVariable(“QueueStorage”);
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageConnectionString);
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference(“tempdebatchentities”);
queue.CreateIfNotExists();

// Create a message and add it to the queue.
string messageContents = PrepareMessage(batchSize, callbackUrl, blobContainer, blob, log);
CloudQueueMessage message = new CloudQueueMessage(messageContents);
queue.AddMessage(message);

log.Info(“Queue message sent”);

message = null;
queue = null;
queueClient = null;

log.Info(“HTTP trigger TriggerDebatchEntities finished”);
return req.CreateResponse(HttpStatusCode.OK, “Request is registered for asynchronous processing”);

}
catch (Exception ex)
{
return req.CreateResponse(HttpStatusCode.InternalServerError, String.Format(“Asynchronous processing not succeeded. Error: {0]”, ex.Message));
}

}

private static string PrepareMessage(int batchSize, string callbackUrl, string blobContainer, string blob, TraceWriter log)
{

EventMessage msg = new EventMessage();
msg.batchSize = batchSize;
msg.callbackUrl = callbackUrl;
msg.blobContainer = blobContainer;
msg.blob = blob;

string messageContents = JsonConvert.SerializeObject(msg, Newtonsoft.Json.Formatting.Indented);
msg = null;

return messageContents;

}

Function DebatchEntities:

#r “Newtonsoft.Json”
#r “Microsoft.WindowsAzure.Storage”

using System;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using System.Text;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System.IO;

private static HttpClient tlxClient = new HttpClient();

public static void Run(string myQueueItem, TraceWriter log)
{

int batchSize=1;
string callbackUrl=””;
string blobContainer=””;
string blob=””;

log.Info($”Entered Queue trigger DebatchEntities processed”);

string doWorkUrl = Environment.GetEnvironmentVariable(“ServiceUrl”);

// Read input values from queue
JObject input = JObject.Parse(myQueueItem);
string propertyName;

foreach (JProperty parsedProperty in input.Properties())
{
propertyName = parsedProperty.Name;
if (propertyName.Equals(“batchSize”))
{
batchSize = (int)parsedProperty.Value;
}
if (propertyName.Equals(“callbackUrl”))
{
callbackUrl = (string)parsedProperty.Value;
}
if (propertyName.Equals(“blobContainer”))
{
blobContainer = (string)parsedProperty.Value;
}
if (propertyName.Equals(“blob”))
{
blob = (string)parsedProperty.Value;
}
}

log.Info(String.Format(“callback {0}”, callbackUrl));
log.Info(String.Format(“blobContainer {0}”, blobContainer));
log.Info(String.Format(“blob {0}”, blob));

// Read blob
string storageConnectionString = Environment.GetEnvironmentVariable(“BlobStorage”);
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageConnectionString);

CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
CloudBlobContainer container = blobClient.GetContainerReference(blobContainer);

// Get blob data
CloudBlockBlob cloudBlob = container.GetBlockBlobReference(blob);
MemoryStream stream= new MemoryStream();
cloudBlob.DownloadToStream(stream);
stream.Position = 0;

StreamReader streamReader = new StreamReader(stream);
string streamContent = streamReader.ReadToEnd();

int counter = 0;
JArray jsonContent = JArray.Parse(streamContent);
JArray output = new JArray();
HttpResponseMessage response;

foreach (JObject item in jsonContent)
{
if (counter < batchSize)
{
counter += 1;
output.Add(item);
}
else
{

response = tlxClient.PostAsync(doWorkUrl, new StringContent(output.ToString(), Encoding.UTF8, “application/json”)).Result;
log.Info(“DoWork ServiceCall performed”);

output.Clear();
output.Add(item);
counter = 1;
}
}

if (counter > 0)
{
response = tlxClient.PostAsync(doWorkUrl, new StringContent(output.ToString(), Encoding.UTF8, “application/json”)).Result;
log.Info(“DoWork ServiceCall performed”);
}

using (var client = new HttpClient())
{
response = client.PostAsync(callbackUrl, new StringContent(myQueueItem,
Encoding.UTF8, “application/json”)).Result;
}

log.Info(String.Format(“Response from callback {0}.”, response));

blobClient = null;
cloudBlob = null;
stream = null;
streamReader = null;

log.Info(String.Format(“Asynchronous processing performed. Callback to {0}.”, callbackUrl));
}

Note that the DoWorkUrl is defined in AppSettings. In this case we use a dummy service, being the RequestBin service -> http://requestbin.fullcontact.com/1iwzk791.

Please visit my blog post on durable functionsas well.

LogicApp Debatching using SplitOn

Debatching is a common need in enterprise integration. I found a very informative link on using SplitOn with either json or xml as the input message.

Link: Blog Toon Vanhoutte

From an exception handling perspective this approach is neat, because you get a separate logic app for each array item. All these logic apps can run and be resubmitted independently. Compare this with a foreach construct where there’s only one parent logic app that fails when one of the child logic apps fails. There’s one important gotcha. If you do nothing and send an invalid message the SplitOn action may fail. That’s why it’s good practice to always validate the message before SplitOn. For json messages you can use the Parse Json action.

The logic apps are separate when using the SplitOn command, but you can relate them using the CorrelationID. For querying logic apps yoyu can use Logic Apps Management REST API.

Do Until in Logic App

June 2015 the DoUntil functionality in Logic Apps was announced. You can add a Do Until to every action in your Logic App. Let’s say you wanna do a Http Post until you receive an Http 200. You can specify the  do until expression and one or more limits (for example: call max 5 times or do for max 5 minutes. Below is an example:

Do Until

Looping in a Logic App

Below is an example where you receive an X12 format EDI message over AS2. The BizTalk X12 Api App receives an EDI file and stores the records in an array called GoodMessages. The Transform shape loops through the records via the Repeat block (added by clickting the dots … at the top of the card). The @repeatItem().PayLoad is input for the transform which is executed multiple times.

Repeat

Remark. You can’t use this functionality in the same way when you read and validate an XML document with multiple child nodes, let’s say an xml documents with a Persons root node and multiple Person records. I don’t know yet how to get that working.