Asynchronuous Processing

Asynchronuous processing is a complex construct, that I don’t use every day. For that reason, I will try and clarify the steps.

First, let’s look at the signature of the asynchronuous method:
private static async Task<bool> StoreMetadataAsync(ILogger Logger)
private static async Task DoAsync(Page<BlobItem> blobPage, CloudTable table, int page)

The Async suffix and the async indicator marks the method as an asynchronuous method. Return values are passed as Task<T> where Task is equivalent to void.

The next step is that we call the asynchronuous method. This way we start the method, but it’s not a blocking call. We don’t wait for an answer, we simply continue with the next statement
in the calling method or we shift focus back to the user interface. Call:
Task<bool> storeMetadataTask = StoreMetadataAsync(logger);

After the asynchronuous call, we can perform other functions in the calling method or user interface. Only when using the await statement execution will hold until the awaited asynchronous operation is complete.
bool storeMetaDataFinished = await StoreMetadataTask

If there’s no intermediate work to do, it makes no sense to do an asynchronuous call. Yes, we could perform the following call, but this is actually an synchronuous call of an asynchronous method:
bool storeMetaDataFinished = await StoreMetadataAsync(logger);

Let’s make it one step more complicated. What if we want to use await on multiple tasks and not for each task separately. I actually found two posts on await Task.WhenAll.
The first example executes two known tasks. By executing the tasks asynchronuously and using await Task.WhenAll instead of an await per task, we can significantly lower through-put time.
Note how through-put time is measured via the Stopwatch class.
Await Known Tasks

In a second example, we use a ForEach construct with an unknown number of tasks. The tasks are executed asynchronuously and added to a list. As a final statement we await all tasks in the list.
Await Unknown Tasks

I wrote a simple console application that you can use to test the difference between a synchronuous call and an asynchronuous call. I performed 100 times 100 table inserts., so in total 10.000 table inserts. The difference in through-put time is substantial: sync 02:44, async 00:14 seconds.

using Microsoft.Azure.Cosmos.Table;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;

namespace TestAsync
{
    class MetaDataEntity : TableEntity
    {

        public MetaDataEntity(string session, string modifiedDate)
        {
            this.PartitionKey = session; this.RowKey = modifiedDate;
        }

        public MetaDataEntity() { }

        //public string Email { get; set; }

    }

    class Program
    {
        static async Task Main(string[] args)
        {

            await ParentMethodAsync();

        }

        private static async Task ParentMethodAsync()
        {

            var stopwatch = new Stopwatch();
            string connectionString = "...";

            List<Task> listOfTasks = new List<Task>();

            // Create a CloudTableClient
            CloudStorageAccount account = 
              CloudStorageAccount.Parse(connectionString);
            CloudTableClient tableClient = 
              account.CreateCloudTableClient();
            CloudTable table = tableClient.GetTableReference("MetaData");

            // Synchronuous Call
            stopwatch.Start();
            Console.WriteLine("Synchronuous Call");

            for (int i = 0; i < 100; i++)
            {
                Task forEachTask = DoAsync(table, i);
                await forEachTask;
            }

            stopwatch.Stop();
            Console.WriteLine(stopwatch.Elapsed);

            // Asynchronuous Call
            stopwatch.Reset();
            stopwatch.Start();
            Console.WriteLine("Asynchronuous Call");

            for (int i = 0; i < 100; i++)
            {
                Task forEachTask = DoAsync(table, i);
                listOfTasks.Add(forEachTask);
            }

            await Task.WhenAll(listOfTasks);

            stopwatch.Stop();
            Console.WriteLine(stopwatch.Elapsed);

        }

        private static async Task DoAsync(CloudTable table, int run)
        {

            MetaDataEntity metaDataEntity = new MetaDataEntity();

            for (int i = run * 100; i < (run+1)*100; i++)
            {
                metaDataEntity.PartitionKey = $"session{i.ToString()}";
                metaDataEntity.RowKey = $"device{i.ToString()}";
                TableOperation insertOperation = 
                TableOperation.InsertOrMerge(metaDataEntity);
                await table.ExecuteAsync(insertOperation);
            }
                  
            metaDataEntity = null;

        }

    }
}

Related to the concept of asynchronuous execution are the concepts of concurrency and parallelism.

Async execution allows a single thread to start one process and then do something else instead of waiting for the first process to finish.
Concurrency – or thread-based concurrency – actually divides up the work over multiple threads, which makes it faster.
Parallelism – or CPU-based concurrency – divides up the work over multiple threads running on multiple processors or processor cores.

For thread-based concurrency Microsoft uses the Task Parallel Library, notably Parallel.ForEach. The threads can execute on multiple processors or CPU cores, but that’s where it gets a bit vague. Parallel LINQ also uses thread-based concurrency. I have added the following code to the above example. The code now executes in about 00:17 seconds. That’s slower than expected.

            // Parallel
            int[] runs = new int[100];
            for (int i = 0; i < 100; i++)
            {
                runs[i] = i;
            }

            stopwatch.Reset();
            stopwatch.Start();
            Console.WriteLine("Parallel");

            Parallel.ForEach(runs, async i =>
            {

                Task forEachTask = DoAsync(table, i);
                listOfTasks.Add(forEachTask);

            });

            await Task.WhenAll(listOfTasks);

            stopwatch.Stop();
            Console.WriteLine(stopwatch.Elapsed);

When I replace the asynchronuous call for a synchronuous call, it obviously works slower: 00:38 seconds. I though maybe it was a problem to mix multi-threading and asynchronuous execution.

            // Parallel
            int[] runs = new int[100];
            for (int i = 0; i < 100; i++)
            {
                runs[i] = i;
            }

            stopwatch.Reset();
            stopwatch.Start();
            Console.WriteLine("Parallel");

            Parallel.ForEach(runs, i =>
            {
                
                DoSync(table, i);
                
            });

            stopwatch.Stop();
            Console.WriteLine(stopwatch.Elapsed);

        private static void DoSync(CloudTable table, int run)
        {

            MetaDataEntity metaDataEntity = new MetaDataEntity();

            for (int i = run * 100; i < (run + 1) * 100; i++)
            {
                metaDataEntity.PartitionKey = $"session{i.ToString()}";
                metaDataEntity.RowKey = $"device{i.ToString()}";
                TableOperation insertOperation = 
                TableOperation.InsertOrMerge(metaDataEntity);
                table.Execute(insertOperation);
            }

            metaDataEntity = null;

        }

Durable Functions

I a previous post I explained the webhook solution to make it possible to call a long running function asynchronuously. This approach is quite complicated, since you will have to create and manage queues yourself. There’s a solution called Azure Durable Functions where you can simply call a function from your logic app an leave the heavy lifting to that very function. Conceptually it works as follows:

  1. Function A gets the request from Logic Apps
  2. Function A invokes a durable orchestrator function and returns the status (202 Accepted with location header for status updates)
  3. The durable orchestrator calls the function(s) that need to do work which may be > 2 minutes
  4. The Logic App will continue to check the location header (automatically) until a 200 status code is received.

When you look at the implementation, you will see three functions are created: a HttpTrigger function, a OrchestrationTrigger function and an ActivityTrigger function. A very good explanation can be found at the following link: Jeff Hollan on GitHub.

You will need to add the following NuGet packages to your Azure Function project:

  • Microsoft.Azure.WebJobs.Extensions.DurableTask, v1.5.0
  • Microsoft.Net.Sdk.Functions, v1.0.14

The DurableTask Extension can be added via the NuGet Package Manager (Tools menu). Command:
PM> Install-Package Microsoft.Azure.WebJobs.Extensions.DurableTask -Version 1.5.0

Important note: You need to understand the concept of task hubs. See Microsoft Docs. When you create multiple durable functions (linked to the same storage account), you will need a separate task hub per function. When no task hub is specified, the default task hub named durablefunctionshub is used. In Azure blob storage, you will see taskhub-leases in blob storage as well as queue entries. The task hub is automatically created in the storage account you specify in application setting AzureWebJobStorage and listens for asynchronuous response.

When testing your function in Visual Studio, the task hub is specified in local.settings.json:

{
“IsEncrypted”: false,

[highlight color=”color here”] “durableTask”: {
“HubName”: “wcstaskhub”
}, [/highlight]
“Values”: {
“APPINSIGHTS_INSTRUMENTATIONKEY”: “1ef50250-49f8-463d-849a-e264336c6797”,
“AzureWebJobsStorage”: “DefaultEndpointsProtocol=https;AccountName=bloasbdevst02;AccountKey=spgiGm…EndpointSuffix=core.windows.net”,

},
“ConnectionStrings”: {
“OutputDataConnectionString”: “Server=tcp:blo-asb-dev.database.windows.net,1433;Initial Catalog=db-blo-asb-dev-sql…”
}
}

When deploying to Azure, specify the task hub in the host.json file:

[highlight color=”color here”] {
“durableTask”: {
“HubName”: “wcstaskhub”
}, [/highlight]
“Values”: {
“APPINSIGHTS_INSTRUMENTATIONKEY”: “1ef50250-49f8-463d-849a-e264336c6797”,
“AzureWebJobsStorage”: “DefaultEndpointsProtocol=https;AccountName=bloasbdevst02;AccountKey=spgiGma5h1W7Pdey7NRGa/KOqCqjUROf7YNsEqoD4GiZ/NdTPezDQ3XTojBz2MI7P6zVBH+qEqIYFXwMC9VsFg==;EndpointSuffix=core.windows.net”,

}
}

The tradeoff to this approach is potential added latency. The polling pattern does mean there may be some delay between when the work is completed and when the response is returned to the logic app. That’s because you specify a polling interval in the parent HttpTrigger function.

Asynchronuous Processing

Asynchronuous processing is a feature you might not use that often. Below link explains the concept: Link: Asynchronuous Programming.

I needed the asynchronous call in an Azure function. More specifically I had to trigger a logic app for four different regions. I found however that if one of the regions failed, the processing of the other regions was skipped. That’s when I thought of processing the logic apps asynchronuously.

First I changed the signature of the Azure function to an asynchronuous method:

public static async Task Run(TimerInfo schedule, TraceWriter log)
instead of: public static void Run(TimerInfo schedule, TraceWriter log).

Next I changed the call to the logic app:
await Task.Run(() => client.PostAsync(logicAppUri, new StringContent(content, Encoding.UTF8, “application/json”)));
instead of: var response = client.PostAsync(logicAppUri, new StringContent(content, Encoding.UTF8, “application/json”)).Result;

Trace an async program

Unforunately, this option didn’t work. For that reason I turned to another option using Parallel.ForEach, instead of the regular ForEach. See: Link MSDN. The function is implemented as follows:

using System;
using System.Threading.Tasks;Link MSDN
using System.Net.Http;
using System.Text;

private static string logicAppUri = Environment.GetEnvironmentVariable(“ProcessAGAEventsURI”);
private static string content;

public static void Run(TimerInfo schedule, TraceWriter log)
{

string[] regions = new string[4];
regions[0]=”North”;
regions[1]=”South”;
regions[2]=”East”;
regions[3]=”West”;

Parallel.ForEach(regions, (regionCode) =>
//foreach (string regionCode in regions)
{
log.Info($”regionCode: {regionCode}”);
using (var client = new HttpClient())
{
var response = client.PostAsync(logicAppUri, new StringContent(content, Encoding.UTF8, “application/json”)).Result;
}
});
}