Using Azure Stream Analytics to tap into an Event Hub data stream

The pre-requisite is to make sure that you have requested for Stream Analytics preview if you have not already done so.

1. Create a Stream Analytics job. Jobs can only be created in 2 regions while the service is under preview.

sa-1

2. Add an input to your job., This is the best part because we get to tap into an Event Hub to analyse the continuous sequence of event data stream and potentially transform it by the same job.

sa-2-addinput

3. Select Event Hub.

sa-3-addeh4. Configure the event hub settings. You could also tap into an event hub from a different subscription.

sa-4-ehsettings

5. Due to my event hub’s event data being serialized in JSON format, this is exactly what I will select in the following step.

sa-5-serializationsetting

Under the Query tab, I just insert a simple query like the following.

SELECT * FROM getfityall 

I’m not doing any transformation yet, I just want to make sure that the event data sent by my Raspberry Pi via Event Hub’s REST API is done properly.

Next on the list of steps is to setup the output in the job.

6. Add an output to your job. I’m using a BLOB storage just to keep things simple so that I could use open the CSV file in Excel to take a look at the data stream.

sa-6-output

7. Setup the BLOB storage settings.

sa-7-blobsettings

8. Specify the serialization settings. I’m choosing CSV for obvious reason stated above.

sa-8-serialization

As I pump telemetry data from my Raspberry Pi, I could see my CSV file created/updated. Just go to the container view in your BLOB storage, and download the CSV file.

sa-9

Below is what my event data stream looks like.  It shows event data points captured from two Raspberry Pis, one using the MPL3115 temperature sensor (part of the Xtrinsic sensor board), and another using MCP9808 temperature sensor. The fun begins as I could write some funky transformation logic in the query and do some real-time complex event processing.

streamanalytics-temperature

Scalable Event Hub Processor in an Azure Worker Role

So what happens after all those activity data point messages had been fired off into an Azure Event Hub? Event Hubs enable me to durably store these messages/events until the required retention period expires, which is 24 hours by default. I need to process those messages fast. That sounds like the starting point of big data where millions of data points have been ingested and these data need to be processed and for someone to step in to try to make some sense out of it. Enter a scalable event hub processor implemented as an Azure Worker Role. Fortunately the good guys at the Azure team already thought about this and someone published a a code project on how to scale out event processing  with Event Hubs. The EventProcessorHost simplifies the workload distribution model and I can focus on the actual data processing rather than handling the checkpoint and fault tolerance, the so-called “real enterprise architecture” stuffs which sometimes people tell me that I don’t know much about, but that’s ok. :)

First thing first, I could either implement this as another WebJob that is based on a custom trigger as when events/messages are ingested into my Event Hub.  But no, I wanted to try something else, hence I chose to host the Event Hub Processor in an Azure Worker Role. This also makes scaling sense because I could perform auto-scaling on the worker role instances based on the number of events/messages in the Event Hub. This is like deja vu, my favourite Azure demo which I did years ago was how to do auto-scaling based upon a queue metric. Each Worker Role hosts an instance of the Event Processor Host which processes events from all of the default 16 partitions in the Event Hub. The Worker Role upon OnStart() does the following to start the host. The rest of the code in the project above is used.

WorkerRole.cs code snippet:

public override bool OnStart()
{
consumerGroupName = EventHubConsumerGroup.DefaultGroupName;
eventHubName = CloudConfigurationManager.GetSetting("EventHubName");
string hostPrefix = CloudConfigurationManager.GetSetting("HostName");
string instanceId = RoleEnvironment.CurrentRoleInstance.Id;
int instanceIndex = 0;
if (int.TryParse(instanceId.Substring(instanceId.LastIndexOf(".") + 1), out instanceIndex)) // On cloud.
{
int.TryParse(instanceId.Substring(instanceId.LastIndexOf("_") + 1), out instanceIndex); // On compute emulator.
}
hostName = hostPrefix + "-" + instanceIndex.ToString();
StartHost().Wait();
return base.OnStart();

The processing logic is defined in the GetFitYallActivityProcessor class and specifically in an async method called ProcessEventsAsync(). The code appears below:

GetFitYallActivityProcessor.cs code snippet:

public Task ProcessEventsAsync(PartitionContext context, IEnumerable messages)
{
// here is the place for you to process the received data for futher processing.
// suggest you keep it simple fast and reliable.
try
{
List fitbitEntities = new List();
List stravaEntities = new List();
List getfityallEntities = new List();
foreach (EventData message in messages)
{
byte[] msgBytes = message.GetBytes();
var m = string.Format("{0} > received message: {1} at partition {2}, owner: {3}, offset: {4}", DateTime.Now.ToString(), Encoding.UTF8.GetString(msgBytes), context.Lease.PartitionId, context.Lease.Owner, message.Offset);
Trace.WriteLine(m);
string type = message.Properties["Type"].ToString();
string _id = type + "_" + Guid.NewGuid().ToString();
switch (type)
{
case "Fitbit":
// convert an EventData message into an Azure Table Entity / row
var eventBody = Newtonsoft.Json.JsonConvert.DeserializeObject(Encoding.Default.GetString(msgBytes));
FitbitActivityEntity activity = new FitbitActivityEntity(message.PartitionKey, _id);
string[] dtstr = eventBody.Time.GetDateTimeFormats('g', CultureInfo.CreateSpecificCulture("en-AU"));
activity.DateTime = dtstr[7].ToString();
activity.Steps = eventBody.Steps;
activity.StepsLevel = eventBody.StepsLevel;
activity.CaloriesOut = eventBody.CaloriesOut;
activity.CaloriesOutLevel = eventBody.CaloriesOutLevel;
fitbitEntities.Add(activity);
break;
case "Strava":
// convert an EventData message into an Azure Table Entity / row
var seventBody = Newtonsoft.Json.JsonConvert.DeserializeObject(Encoding.Default.GetString(msgBytes));
StravaActivityEntity sactivity = new StravaActivityEntity(message.PartitionKey, _id);

It’s pretty straight-forward, just deserialize the “payload” from JSON into the same object which I constructed in the WebJob message pump.  Essentially this simple implementation just retrieves all the data point properties and especially make sure that the datetimestamp value from all my data points are on a consistent format. The reason for this is because I need to match these rows using a self-analytic BI tool, PowerBI is currently my tool of choice. Each data point is stored as a row into the Azure Storage table. Why? Because it’s pretty scalable and I could actually use an Azure Storage Table as a data source to be queries in PowerQuery. Good enough for me for now. A little bit of optimization is available in my code that performs the insert operations in batches. I borrowed this from some sample code I found but I couldn’t find it again to refer to it here. A batch operation may contain up to 100 individual table operations, with the requirement that each operation entity must have same partition key. That’s exactly what I did. Besides reducing the latency, this is also for monetary reason, it saves on the # of transactions to my Azure Storage. In the Internet of Things, I need to think about scale, remember? The end result, Azure storage tables storing all my activity data points, ready to be “mashup y’all”.

stravatable

 

fitbittable

So far so good. In my next post I will talk about the fun stuffs, using Excel beyond tracking my expenses in a spreadsheet, this is self-service analytics, Big Data style!