Using Azure WebJobs as an IoT data points ingestor

My GetFitY’all project has evolved again, obviously. Previously I intended to implement the message pump functionality as a RESTful endpoint that can be called automatically from some form of “cron job” in the cloud. But then I digress because a simpler approach could be used which is Azure WebJobs. It works well in my case because I already have a GetFitY’all Azure website which serves to provision users and to allow them to authorize GetFitY’all server-side to ingest data points from the various activity sources. My data points source have expanded to include MapMyFitness API besides Strava API and Fitbit API.

Implementing the WebJob is simple as I just pulled out the Azure WebJobs sample solution from ASP.NET CodePlex. I chose to implement a manual trigger which starts the method to ingest activity data points from my data sources. The code looks like the following:

static void Main(string[] args)
{
JobEnvInitializer();
JobHost host = new JobHost();
host.Call(typeof(Program).GetMethod("ManualTrigger"));
host.RunAndBlock();
Console.WriteLine("\nDone");
}


[NoAutomaticTrigger]
public static void ManualTrigger([Table("ActivityDatapointsProcessingLog")] CloudTable logTable)
{
GetFitYallAsync(logTable).Wait();
}

The data points ingestor cum message pump is meant to simulate a device gateway. And only if I could get my hands on the Azure Intelligent Systems Service limited beta, I could further use the ISS software agent to be embedded with my “device gateway”.  This is illustrated in the devices topologies below:

Source: https://connect.digitalwpc.com/Pages/SessionDetail.aspx?sessionId=9040ae74-c1b6-e311-8491-00155d5066d7Source: BD516 Building an Intelligent Systems Business with Microsoft Azure Services

Leveraging ISS agent on the gateway is good because all the IoT workflow would be managed by ISS including the sending of the data points as queue message. Currently  I have implemented some of the workflow on my own as some form of message pump (see below). Ideally I just want to focus on the simple device gateway logic of ingesting data points from multiple device APIs, and of course the fun parts of doing self-analysis of my IoT data points.

The “devices gateway” acts as a message pump to send each data point as a message via AMQP to the Azure Event Hub. This is meant to decouple the processing of these messages from the message pump. The Azure Event Hub is a highly scalable publish-subscribe ingestor that can intake millions of events per second and can handle huge throughput.

That piece of code looks like this:

// use RedDog.ServiceBus helper class to generate SAS
var sas = EventHubSharedAccessSignature.CreateForSender(senderKeyName, senderKey, serviceNamespace, hubName, deviceName, new TimeSpan(0, 120, 0));
var factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", serviceNamespace, ""), new MessagingFactorySettings
{
TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(sas),
TransportType = Microsoft.ServiceBus.Messaging.TransportType.Amqp
});
var client = factory.CreateEventHubClient(String.Format("{0}/publishers/{1}", hubName, deviceName));
// iterate through the 4 Lists and compose an ActivityDataPoint object and fire off the message using Event Hub
int cnt = intraDaySteps.DataSet.Count;
var procTasks = new List();
for (int i = 0; i < cnt; i++)
{
FitbitDataPoint adt = new FitbitDataPoint();
adt.Time = intraDaySteps.DataSet[i].Time;
adt.Steps = intraDaySteps.DataSet[i].Value;
adt.StepsLevel = intraDaySteps.DataSet[i].Level;
adt.CaloriesOut = intraDayCaloriesOut.DataSet[i].Value;
adt.CaloriesOutLevel = intraDayCaloriesOut.DataSet[i].Level;
var data = new EventData(System.Text.Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(adt)));
data.PartitionKey = userName;
data.Properties.Add("Type", "Fitbit");
procTasks.Add(client.SendAsync(data));
}
await Task.WhenAll(procTasks);
procResult.Success = true;

A FitbitDataPoint object is constructed for each data point retrieved from the Fitbit API. This is then serialized into JSON and added as EventData. The partition and a simple property to identify this message as a Fitbit data point is set to the EventData object. Finally a simple SendAsync() sends this to the Event Hub using  Microsoft.ServiceBus.Messaging.TransportType.Amqp. I am using RedDog.ServiceBus which is a really nifty library which you can install using NuGet. Full credit goes to Sandrino Di Mattia, I learned a lot from his blog post about IoT with Azure Service Bus Event Hubs.

Similar code goes for Strava and MapMyFitness data points. When I’m done,  I just have to publish this WebJob to Azure. This can be performed in Visual Studio itself but please make sure that you have installed Visual Studio 2013 Update 3. This is because the WebJobs deployment features are included in Update 3. I won’t describe the steps here because there is already a great article about How to Deploy Azure WebJobs to Azure Websites. Also check out Get Started with the Azure WebJobs SDK

publishwebjob

 

All is fine and dandy, I could execute my WebJob and the message pump works like a charm in firing off the AMQP messages to my Event Hub. But there’s a problem which I haven’t been able to troubleshoot, my web jobs always abort.

webjobs-aborted

 

Maybe it is due to what is described by Amit Apple that WebJobs only run continuously in an “Always on” Azure website and this is not available in the free and shared websites. But then my webjob is not meant to run continuously, it just runs and finishes upon firing the message pump. The WebJobs dashboard should display a successful run. *scratch head*

As a final confirmation that there’s indeed some “action” on the part of my Event Hub, here’s my GetFitYall Azure Event Hub dashboard.

eventhubdashboard

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.