Scalable Event Hub Processor in an Azure Worker Role

So what happens after all those activity data point messages had been fired off into an Azure Event Hub? Event Hubs enable me to durably store these messages/events until the required retention period expires, which is 24 hours by default. I need to process those messages fast. That sounds like the starting point of big data where millions of data points have been ingested and these data need to be processed and for someone to step in to try to make some sense out of it. Enter a scalable event hub processor implemented as an Azure Worker Role. Fortunately the good guys at the Azure team already thought about this and someone published a a code project on how to scale out event processing  with Event Hubs. The EventProcessorHost simplifies the workload distribution model and I can focus on the actual data processing rather than handling the checkpoint and fault tolerance, the so-called “real enterprise architecture” stuffs which sometimes people tell me that I don’t know much about, but that’s ok. 🙂

First thing first, I could either implement this as another WebJob that is based on a custom trigger as when events/messages are ingested into my Event Hub.  But no, I wanted to try something else, hence I chose to host the Event Hub Processor in an Azure Worker Role. This also makes scaling sense because I could perform auto-scaling on the worker role instances based on the number of events/messages in the Event Hub. This is like deja vu, my favourite Azure demo which I did years ago was how to do auto-scaling based upon a queue metric. Each Worker Role hosts an instance of the Event Processor Host which processes events from all of the default 16 partitions in the Event Hub. The Worker Role upon OnStart() does the following to start the host. The rest of the code in the project above is used.

WorkerRole.cs code snippet:

public override bool OnStart()
{
consumerGroupName = EventHubConsumerGroup.DefaultGroupName;
eventHubName = CloudConfigurationManager.GetSetting("EventHubName");
string hostPrefix = CloudConfigurationManager.GetSetting("HostName");
string instanceId = RoleEnvironment.CurrentRoleInstance.Id;
int instanceIndex = 0;
if (int.TryParse(instanceId.Substring(instanceId.LastIndexOf(".") + 1), out instanceIndex)) // On cloud.
{
int.TryParse(instanceId.Substring(instanceId.LastIndexOf("_") + 1), out instanceIndex); // On compute emulator.
}
hostName = hostPrefix + "-" + instanceIndex.ToString();
StartHost().Wait();
return base.OnStart();

The processing logic is defined in the GetFitYallActivityProcessor class and specifically in an async method called ProcessEventsAsync(). The code appears below:

GetFitYallActivityProcessor.cs code snippet:

public Task ProcessEventsAsync(PartitionContext context, IEnumerable messages)
{
// here is the place for you to process the received data for futher processing.
// suggest you keep it simple fast and reliable.
try
{
List fitbitEntities = new List();
List stravaEntities = new List();
List getfityallEntities = new List();
foreach (EventData message in messages)
{
byte[] msgBytes = message.GetBytes();
var m = string.Format("{0} > received message: {1} at partition {2}, owner: {3}, offset: {4}", DateTime.Now.ToString(), Encoding.UTF8.GetString(msgBytes), context.Lease.PartitionId, context.Lease.Owner, message.Offset);
Trace.WriteLine(m);
string type = message.Properties["Type"].ToString();
string _id = type + "_" + Guid.NewGuid().ToString();
switch (type)
{
case "Fitbit":
// convert an EventData message into an Azure Table Entity / row
var eventBody = Newtonsoft.Json.JsonConvert.DeserializeObject(Encoding.Default.GetString(msgBytes));
FitbitActivityEntity activity = new FitbitActivityEntity(message.PartitionKey, _id);
string[] dtstr = eventBody.Time.GetDateTimeFormats('g', CultureInfo.CreateSpecificCulture("en-AU"));
activity.DateTime = dtstr[7].ToString();
activity.Steps = eventBody.Steps;
activity.StepsLevel = eventBody.StepsLevel;
activity.CaloriesOut = eventBody.CaloriesOut;
activity.CaloriesOutLevel = eventBody.CaloriesOutLevel;
fitbitEntities.Add(activity);
break;
case "Strava":
// convert an EventData message into an Azure Table Entity / row
var seventBody = Newtonsoft.Json.JsonConvert.DeserializeObject(Encoding.Default.GetString(msgBytes));
StravaActivityEntity sactivity = new StravaActivityEntity(message.PartitionKey, _id);

It’s pretty straight-forward, just deserialize the “payload” from JSON into the same object which I constructed in the WebJob message pump.  Essentially this simple implementation just retrieves all the data point properties and especially make sure that the datetimestamp value from all my data points are on a consistent format. The reason for this is because I need to match these rows using a self-analytic BI tool, PowerBI is currently my tool of choice. Each data point is stored as a row into the Azure Storage table. Why? Because it’s pretty scalable and I could actually use an Azure Storage Table as a data source to be queries in PowerQuery. Good enough for me for now. A little bit of optimization is available in my code that performs the insert operations in batches. I borrowed this from some sample code I found but I couldn’t find it again to refer to it here. A batch operation may contain up to 100 individual table operations, with the requirement that each operation entity must have same partition key. That’s exactly what I did. Besides reducing the latency, this is also for monetary reason, it saves on the # of transactions to my Azure Storage. In the Internet of Things, I need to think about scale, remember? The end result, Azure storage tables storing all my activity data points, ready to be “mashup y’all”.

stravatable

 

fitbittable

So far so good. In my next post I will talk about the fun stuffs, using Excel beyond tracking my expenses in a spreadsheet, this is self-service analytics, Big Data style!

Leave a comment

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.