Scalable Event Hub Processor in an Azure Worker Role

So what happens after all those activity data point messages had been fired off into an Azure Event Hub? Event Hubs enable me to durably store these messages/events until the required retention period expires, which is 24 hours by default. I need to process those messages fast. That sounds like the starting point of big data where millions of data points have been ingested and these data need to be processed and for someone to step in to try to make some sense out of it. Enter a scalable event hub processor implemented as an Azure Worker Role. Fortunately the good guys at the Azure team already thought about this and someone published a a code project on how to scale out event processing  with Event Hubs. The EventProcessorHost simplifies the workload distribution model and I can focus on the actual data processing rather than handling the checkpoint and fault tolerance, the so-called “real enterprise architecture” stuffs which sometimes people tell me that I don’t know much about, but that’s ok. 🙂

First thing first, I could either implement this as another WebJob that is based on a custom trigger as when events/messages are ingested into my Event Hub.  But no, I wanted to try something else, hence I chose to host the Event Hub Processor in an Azure Worker Role. This also makes scaling sense because I could perform auto-scaling on the worker role instances based on the number of events/messages in the Event Hub. This is like deja vu, my favourite Azure demo which I did years ago was how to do auto-scaling based upon a queue metric. Each Worker Role hosts an instance of the Event Processor Host which processes events from all of the default 16 partitions in the Event Hub. The Worker Role upon OnStart() does the following to start the host. The rest of the code in the project above is used.

WorkerRole.cs code snippet:

public override bool OnStart()
consumerGroupName = EventHubConsumerGroup.DefaultGroupName;
eventHubName = CloudConfigurationManager.GetSetting("EventHubName");
string hostPrefix = CloudConfigurationManager.GetSetting("HostName");
string instanceId = RoleEnvironment.CurrentRoleInstance.Id;
int instanceIndex = 0;
if (int.TryParse(instanceId.Substring(instanceId.LastIndexOf(".") + 1), out instanceIndex)) // On cloud.
int.TryParse(instanceId.Substring(instanceId.LastIndexOf("_") + 1), out instanceIndex); // On compute emulator.
hostName = hostPrefix + "-" + instanceIndex.ToString();
return base.OnStart();

The processing logic is defined in the GetFitYallActivityProcessor class and specifically in an async method called ProcessEventsAsync(). The code appears below:

GetFitYallActivityProcessor.cs code snippet:

public Task ProcessEventsAsync(PartitionContext context, IEnumerable messages)
// here is the place for you to process the received data for futher processing.
// suggest you keep it simple fast and reliable.
List fitbitEntities = new List();
List stravaEntities = new List();
List getfityallEntities = new List();
foreach (EventData message in messages)
byte[] msgBytes = message.GetBytes();
var m = string.Format("{0} > received message: {1} at partition {2}, owner: {3}, offset: {4}", DateTime.Now.ToString(), Encoding.UTF8.GetString(msgBytes), context.Lease.PartitionId, context.Lease.Owner, message.Offset);
string type = message.Properties["Type"].ToString();
string _id = type + "_" + Guid.NewGuid().ToString();
switch (type)
case "Fitbit":
// convert an EventData message into an Azure Table Entity / row
var eventBody = Newtonsoft.Json.JsonConvert.DeserializeObject(Encoding.Default.GetString(msgBytes));
FitbitActivityEntity activity = new FitbitActivityEntity(message.PartitionKey, _id);
string[] dtstr = eventBody.Time.GetDateTimeFormats('g', CultureInfo.CreateSpecificCulture("en-AU"));
activity.DateTime = dtstr[7].ToString();
activity.Steps = eventBody.Steps;
activity.StepsLevel = eventBody.StepsLevel;
activity.CaloriesOut = eventBody.CaloriesOut;
activity.CaloriesOutLevel = eventBody.CaloriesOutLevel;
case "Strava":
// convert an EventData message into an Azure Table Entity / row
var seventBody = Newtonsoft.Json.JsonConvert.DeserializeObject(Encoding.Default.GetString(msgBytes));
StravaActivityEntity sactivity = new StravaActivityEntity(message.PartitionKey, _id);

It’s pretty straight-forward, just deserialize the “payload” from JSON into the same object which I constructed in the WebJob message pump.  Essentially this simple implementation just retrieves all the data point properties and especially make sure that the datetimestamp value from all my data points are on a consistent format. The reason for this is because I need to match these rows using a self-analytic BI tool, PowerBI is currently my tool of choice. Each data point is stored as a row into the Azure Storage table. Why? Because it’s pretty scalable and I could actually use an Azure Storage Table as a data source to be queries in PowerQuery. Good enough for me for now. A little bit of optimization is available in my code that performs the insert operations in batches. I borrowed this from some sample code I found but I couldn’t find it again to refer to it here. A batch operation may contain up to 100 individual table operations, with the requirement that each operation entity must have same partition key. That’s exactly what I did. Besides reducing the latency, this is also for monetary reason, it saves on the # of transactions to my Azure Storage. In the Internet of Things, I need to think about scale, remember? The end result, Azure storage tables storing all my activity data points, ready to be “mashup y’all”.




So far so good. In my next post I will talk about the fun stuffs, using Excel beyond tracking my expenses in a spreadsheet, this is self-service analytics, Big Data style!

Using Azure WebJobs as an IoT data points ingestor

My GetFitY’all project has evolved again, obviously. Previously I intended to implement the message pump functionality as a RESTful endpoint that can be called automatically from some form of “cron job” in the cloud. But then I digress because a simpler approach could be used which is Azure WebJobs. It works well in my case because I already have a GetFitY’all Azure website which serves to provision users and to allow them to authorize GetFitY’all server-side to ingest data points from the various activity sources. My data points source have expanded to include MapMyFitness API besides Strava API and Fitbit API.

Implementing the WebJob is simple as I just pulled out the Azure WebJobs sample solution from ASP.NET CodePlex. I chose to implement a manual trigger which starts the method to ingest activity data points from my data sources. The code looks like the following:

static void Main(string[] args)
JobHost host = new JobHost();

public static void ManualTrigger([Table("ActivityDatapointsProcessingLog")] CloudTable logTable)

The data points ingestor cum message pump is meant to simulate a device gateway. And only if I could get my hands on the Azure Intelligent Systems Service limited beta, I could further use the ISS software agent to be embedded with my “device gateway”.  This is illustrated in the devices topologies below:

Source: BD516 Building an Intelligent Systems Business with Microsoft Azure Services

Leveraging ISS agent on the gateway is good because all the IoT workflow would be managed by ISS including the sending of the data points as queue message. Currently  I have implemented some of the workflow on my own as some form of message pump (see below). Ideally I just want to focus on the simple device gateway logic of ingesting data points from multiple device APIs, and of course the fun parts of doing self-analysis of my IoT data points.

The “devices gateway” acts as a message pump to send each data point as a message via AMQP to the Azure Event Hub. This is meant to decouple the processing of these messages from the message pump. The Azure Event Hub is a highly scalable publish-subscribe ingestor that can intake millions of events per second and can handle huge throughput.

That piece of code looks like this:

// use RedDog.ServiceBus helper class to generate SAS
var sas = EventHubSharedAccessSignature.CreateForSender(senderKeyName, senderKey, serviceNamespace, hubName, deviceName, new TimeSpan(0, 120, 0));
var factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", serviceNamespace, ""), new MessagingFactorySettings
TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(sas),
TransportType = Microsoft.ServiceBus.Messaging.TransportType.Amqp
var client = factory.CreateEventHubClient(String.Format("{0}/publishers/{1}", hubName, deviceName));
// iterate through the 4 Lists and compose an ActivityDataPoint object and fire off the message using Event Hub
int cnt = intraDaySteps.DataSet.Count;
var procTasks = new List();
for (int i = 0; i < cnt; i++)
FitbitDataPoint adt = new FitbitDataPoint();
adt.Time = intraDaySteps.DataSet[i].Time;
adt.Steps = intraDaySteps.DataSet[i].Value;
adt.StepsLevel = intraDaySteps.DataSet[i].Level;
adt.CaloriesOut = intraDayCaloriesOut.DataSet[i].Value;
adt.CaloriesOutLevel = intraDayCaloriesOut.DataSet[i].Level;
var data = new EventData(System.Text.Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(adt)));
data.PartitionKey = userName;
data.Properties.Add("Type", "Fitbit");
await Task.WhenAll(procTasks);
procResult.Success = true;

A FitbitDataPoint object is constructed for each data point retrieved from the Fitbit API. This is then serialized into JSON and added as EventData. The partition and a simple property to identify this message as a Fitbit data point is set to the EventData object. Finally a simple SendAsync() sends this to the Event Hub using  Microsoft.ServiceBus.Messaging.TransportType.Amqp. I am using RedDog.ServiceBus which is a really nifty library which you can install using NuGet. Full credit goes to Sandrino Di Mattia, I learned a lot from his blog post about IoT with Azure Service Bus Event Hubs.

Similar code goes for Strava and MapMyFitness data points. When I’m done,  I just have to publish this WebJob to Azure. This can be performed in Visual Studio itself but please make sure that you have installed Visual Studio 2013 Update 3. This is because the WebJobs deployment features are included in Update 3. I won’t describe the steps here because there is already a great article about How to Deploy Azure WebJobs to Azure Websites. Also check out Get Started with the Azure WebJobs SDK



All is fine and dandy, I could execute my WebJob and the message pump works like a charm in firing off the AMQP messages to my Event Hub. But there’s a problem which I haven’t been able to troubleshoot, my web jobs always abort.



Maybe it is due to what is described by Amit Apple that WebJobs only run continuously in an “Always on” Azure website and this is not available in the free and shared websites. But then my webjob is not meant to run continuously, it just runs and finishes upon firing the message pump. The WebJobs dashboard should display a successful run. *scratch head*

As a final confirmation that there’s indeed some “action” on the part of my Event Hub, here’s my GetFitYall Azure Event Hub dashboard.


Moving up the Internet of Things Value Chain

Fragmentation == Opportunities

I reckon that I don’t need to make an introduction about the Internet of Things because there are enough resources out there to explain what it is and also the opportunities around it. Thus far the market seems highly fragmented as there are a number of players operating within the IoT space without directly competing. This condition typically appears for new business or market during its nascent stage. Competition usually heats up as the market develops and matures. However the key differentiator for a successful company is to identify the market opportunities early on by being “customer-obsessed” to the point that solutions would encompass many stages within the IoT ecosystem. Such are the opportunities that lie within the IoT value chain.

The Internet of Things (IoT) solutions are very fragmented, they are either custom and unrepeatable or incompatible with existing infrastructure, with incomplete and unprotected data access and insights. Fragmentation is both a challenge as well as an opportunity. This blog post describes how a typical IoT vendor be it a device manufacturer, solution provider or system integrator can move up the IoT value chain by taking advantage of these fragmentation opportunities and offer a proposed solution in the form of an IoT Services Platform to its customers and partners.


The goal is to ensure that your company becomes the device manufacturer or SI of choice among customers and every player within the IoT value chain. More players are anticipated along every stage in the IoT value chain. IoT players would have a healthy relationship of collaboration, competition and dependence on each other along the value chain. Customers are highly dependent on data, information and insights. The success criteria is fast time to market, repeatable IoT vertical solution that meets customers’ business objectives.

Tremendous value could be realized from the following simple chevron process:


In the following examples, I am painting the picture of how an IoT device or module manufacturer can provide value in each of the process above.

  • Be the de facto module/device manufacturer by helping new entrants achieve faster time to market, be it a managed services company, a cloud platform IoT provider or a CRM/Big Data analytics company. This promotes dependence on your modules and devices. Successful partners would influence the sales of the modules and devices.
  • Provide a modular approach to helping these new entrants build their customised IoT vertical solutions using your IoT Services Platform. Basically avoid reinventing the wheel and allowing more time on product and solution innovation. Overall this is all part and parcel of building a healthy IoT ecosystem.
  • Upsell consulting and premium support services to System Integration partners who require help, guidance and support to integrate with the IoT Services Platform.
  • Architect the IoT Services Platform as a gateway of telemetry collection being able to process large-scale telemetry data streams generated by the devices and hardware developed by yourself, but beyond that manage other devices and services too.
  • Data is not the only raw material being unlocked by the IoT it is also the ability to perform command and control of devices.

As a result of taking advantage of these opportunities

  • Recurring revenue through a subscription model.

o   E.g., a vertical IoT software platform partner can subscribe to the IoT Services Platform to enable the ingestion of IoT device telemetry streams. This especially makes sense because of the large-scale telemetry data streams and equally high-volume of instructions that needs to be communicated to the devices in the field.

  • Win-win Partnership models – solution partners pay as their business grows
  • A vibrant partnership ecosystem whereby more IoT vertical solutions are launched into the market with reliance on your modules/devices or the IoT Services Platform.
  • Be what’s next in “Product Relationship Management”, it’s beyond buzz words such as CRM, Big Data or BI.


What an IoT Services Platform may look like?



An IoT Services Platform offers a services & software platform to help customers and partners gain new insights, optimize business processes, make more informed decisions and identify new revenue opportunities. The platform performs the heavy-lifting and plumbing services such as IoT device management, telemetry collection, interactivity, notifications, servicing, etc. This would remove the barrier to entry for many IoT players, allowing them to just focus on IoT vertical solutions.

More importantly, having this platform allows an IoT provider to actively pursue IoT verticals such as smart metering, eHealth, Building Automation, Digital Display, Light Industrial scenarios. It allows it to customise solutions to meet individual customer requirement. It also promotes the best of breed partnerships as it continues to partner with large, established IoT players to deliver solutions in various verticals such as utilities, transportation, healthcare, government and consumer.


Business Model

The business model of this platform is one that relies on the success of IoT market which is huge. Monetization is based on the following business models:

Consumption model

  • Pay-as-you-go allows broader, wider adoption among smaller players in the IoT ecosystem

Subscription model

  • Allows medium and bigger players to leverage the economies of scale of this platform by committing to subscription contracts.
  • Provides good recurring revenue.

Premium Consulting and Support Services

  • Pay per support incidents
  • Support contracts with enterprises and medium to big business partners



Many existing IoT providers have the first mover advantage in the IoT ecosystem. It can continue to extend such an advantage by taking its lead in moving up the value chain. Such a move does not necessarily compete with its partners because it addresses the opportunities which could be leveraged by any player within the IoT market. By providing an IoT Services Platform whereby products, solutions and innovation from any player could thrive within the IoT market, the ecosystem becomes larger and healthier. One company getting a bigger slice of the proverbial pie is one thing. However a bigger pie is everything.

An IoT Services Platform serves as a great framework and also infrastructure for either custom or repeatable IoT vertical solutions could be engineered. This brings a two-fold benefit because it promotes dependence and loyalty on its solutions or hardware modules and devices. This bolsters its existing hardware business. The next opportunity is to take advantage of new business models described in this white paper to leverage new revenue streams which are recurring in nature.

This is all part and parcel of moving up the IoT value chain.

GetFitY’all – More about a project on the Internet of Things using Microsoft Azure

More on my Internet of Things project, GetFitY’all.

  1. I exposed 2 RESTful endpoints and just to prove that Azure “loves all, serves all”, these endpoints were implemented as a node.js app. In my previous “life” I was considering a few API proxy platforms, so I deployed an API proxy that hosts these endpoints. But then these endpoints could easily be deployed onto a simple and free Azure Web Site. Next candidate should be the Azure API management feature.
  2. These 2 endpoints perform these simple tasks:
    1. Mashup on demand – Let client apps consume a mash up of fitness activity data points from different target APIs based upon user ID and time period for a specific date. This has to be real-time and on demand although the mashup does take up quite a bit of computational time.
    2. Automated message pump – Generates a message pump that pulls fitness activity data points from different target APIs, and sends as AMQP messages asynchronously to Azure Event Hub. This is meant to decouple the processing of these messages from the message pump. The message pump endpoint could be fired up based upon a set schedule in Azure Scheduler.
  3. Publish these endpoints, it will return you valid JSON or XML depending on the HTTP Accept header.
    1. GET /v1/getfityall/mashup – no query strings to make things simpler based upon known parameters that return an activities mashup.
    2. POST /v1/getfityall/msgpump

Actual endpoint URLs to be updated later… this is the placeholder.. watch out…

  1. To further demonstrate that PowerQuery and PowerMap are good to consume data sources directly from API proxy endpoints, I’d created another spreadsheet, and this time it is for a mashup of how I got to Mobile Monday Sydney, a meetup I attended on the first Aug Monday evening, it was fun to get to know the community here. The skyscrapers are thinner now, and click the play button to see my routes.
MoMoSy-PowerMap visualisation

Microsoft Azure + Office365 Power BI == Power to the Internet of Things

Here’s a sneak peak of what I got. The end result is the ability to do self-service analytics on my daily activities by doing a data mashup of Fitbit and Strava activity data points. This is not really enterprise-grade, just a fun personal project, but in later posts I will discuss more about making it more “enterprise-grade” by decoupling all the mashing work to an Event  Hub and Azure Worker Roles. Sure I’ll even defend why I use this later but don’t beat me too much to it.

I just want to be able to do some self-service analytics using Excel 2013’s new add-ins called PowerQuery and PowerMap. Why Excel? Well because I have it and I think it’s rather ubiquitous.

Why Fitbit and Strava? Fitbit One is a device I carry with me all the time, and Strava is an app which was intended for cycling (because I’m an avid mountain biker) but I use it a lot and I am glad that this app is really power-friendly in not draining off my smartphone battery when I record my activities.  So the simple task is to create a IoT gateway proxy with 2 endpoints, but let’s just start with the first end-point which is the following:

GET /v1/mashup?ondate=YYYY-MM-DD&aftertime=hh:mm&beforetime=hh:mm

I’ve not turn the endpoint live yet in the cloud will do so soon.

The end result, a visualization of where I spend my activities, when, and how. This is the PowerMap, very powerful indeed. Those skyscrapers in the map below are not new building in Sydney but rather a representation of my steps count as recorded on Fitbit and the lat/long coordinates recorded by the Strava app. A quick tour of what I did a few Sundays ago:

  1. Skyscraper #1 – I left my apartment in Waterloo/Redfern area, walked to the bus stop with my family
  2. Skyscraper #2 – Alighted at Central station, walked to the other side of the station to catch another bus
  3. Skyscraper #3 – Got off at Parramatta Road to go to a kid and baby Sunday market, can’t remember the name of the place but from the map it’s somewhere near Barnwell golf course. This kind fits the purpose of why I build this, to remember where I’d been considering I’m really new in Sydney.
  4. Skyscraper #4 – After much shopping for kid and baby stuffs, we were starving, so we went to Westfield Burwood for lunch
  5. Skyscraper #5 – Did some groceries around the commercial area near the Burwood train station
  6. Skyscraper #6 – Walked back to apartment from the bus stop
Family Sunday Funday
Family Sunday Funday

Here’s the Excel spreadsheet which packs all these goodies, but the actual data is pulled from Azure Storage tables.

Just download these two add-ins to get started.

PowerQuery –

PowerMap –

The brains behind all of this: it’s all in Microsoft Azure. I’ll just quickly summarize the technologies I used:

  1. Cloud services – virtual machines to be exact
  2. Azure Service Bus Event Hubs
  3. Azure Worker Roles
  4. Azure Storage tables

You can take a look at a rough sketch of the architecture below:

EPSON MFP imageDisclaimer: I’m not an enterprise architect, just know enough to get a quick IoT POC moving.

Hope you like it. My next step is to tighten this up and I’m also exposing this as a well-designed RESTful API, hopefully.

More about GetFitY’all

Just to describe more of my personal “Internet of my Things” project as a proof-of-concept that the Fitbit is a great telemetry device and its data could be analysed in the cloud. The scenario that I am trying to build to create a great user experience for my users is described below:

Goal: To allow Fitbit users to see good visualization such as a power
map of where and when their activities occurred.

– Motivation: Let users find out where and when certain activities
occurred. This is especially useful for remembering which activities were pleasurable so much that it recorded the most steps. Then repeat those activities. While the Fitbit device does not record the latitude, longitude data, I could do a mashup of data recorded from an app like Strava by accessing its API.

Workflow is something along the lines of:

  • Create a message pump that retrieves a stream of data points from 2 API source; Fitbit and Strava.
  • Fetch intraday data points up to 1 minute detail for the activities collection. Currently this is not possible yet because I’m still waiting for access to this part of the Fitbit Partner API to be granted. Meanwhile I am using a workaround by Andrew Wilkinson’s python-fitbit libraryUpdate: I had since gotten my Fitbit Partner API access and I’d been ingesting thousands of data points. Small data getting bigger.
  • Fetch a stream of latitude, longitude, time and distance data from Strava API.
  • Message pump sends data points to a cloud-scale telemetry ingestion hub.
  • Use Office 365’s new PowerQuery and PowerMap to merge the data streams from Fitbit activities time series data and Strava activities stream to create a visualization like a map to show where the activities occurred.
  • Publish this on social media to encourage more friends to join in the activities.


GetFitY’all is (I’m making this up as I go)

  • a Fitbit personal fun project that is to help me learn more about the Fitbit API but more importantly in the scheme of Internet of Things (IoT), really  use my Fitbit One as it really is, a special-purpose device that is one of the thousands of items which I own, which is also a smart product that could send a bunch of telemetry data that it already collects into the cloud.
  • Beyond that I also want to test out some IoT patterns which allows me to control the device. While there is limited commands I could issue to my Fitbit one, some of the more useful and obvious commands are setting an alarm, and also changing the timezone as I travel.


  • I will be creating a proxy of a device gateway that is the connection gateway which allows all my special-purpose devices to connect.
  • While the Fitbit One never initiates a connection, I will be creating a client proxy for it as well to test the telemetry, inquiry and command patterns to be truly tested between my device (the thing) and the device gateway.
  • Device gateway will be implemented simply using the Azure Service Bus and the client proxy will be using a wire-level protocol called AMQP. Sounds cool already huh?

Let’s get started!