Tag: Azure Event Hubs

AnalyticsAviationMicrosoft Fabric

Bringing Real‑Time Intelligence to Airport Data Streams (Type-B Messages) – Part 2: Setting It Up in Microsoft Fabric

In Part 1, I covered the architecture, the event model, and why airport baggage tracking makes such a compelling real-time analytics scenario. In this post, I want to get hands-on and walk through actually setting this up inside Microsoft Fabric — from creating the workspace artifacts all the way through to running the simulator and querying live events.

Let’s get into it.

Prerequisites

Before you start, you’ll need:

  • Microsoft Fabric workspace with the Real-Time Intelligence workload enabled (F2 or higher capacity, or a Fabric trial)
  • An Azure Event Hubs namespace with a hub (or you can use Fabric’s built-in custom endpoint in Eventstream — more on that below)
  • The Baggage Handling Simulator cloned locally, and Python 3.10+ was installed

If you want to follow along with Azure Event Hubs as the source, a Basic-tier namespace is fine for dev/test. If you’re going direct to Fabric Eventstream, you won’t need Event Hubs at all.

Architecture Overview

Step 1: Create the Fabric Workspace Artifacts

Start by creating a new workspace if you don’t already have one. I created on called “Airport Operations Demo”. Open up your Fabric workspace, and you’ll need to create four main artifacts:

  1. Eventhouse (your KQL database)
  2. Eventstream (ingestion and routing)
  3. KQL Queryset (ad-hoc queries and saved analytics)
  4. Real-Time Dashboard (live visualization)

Create the Eventhouse

From the Fabric workspace, click New → Eventhouse. Give it a name like Airport-Eventhouse. This creates an Eventhouse and a default KQL database inside it.

Once provisioned, open the Eventhouse and navigate to the KQL database. This is where we’ll define the table schemas.

Step 2: Create the Table Schema

We’ll use a Bronze / Silver / Gold medallion structure inside the KQL database. Bronze tables hold raw events exactly as ingested. Silver tables hold cleaned and enriched data. Gold tables hold pre-aggregated views.

NOTE: Due to the number of schema objects, I’m going to be showing a subset across each section below. Please go here to see the rest of the KQL scripts to be applied: https://github.com/calloncampbell/BaggageHandling-TypeB-Simulator/blob/main/kql

Open the Explore your data pane in the KQL database and run the following to create the Bronze tables:

// Bronze: raw airport events
.create table airport_events (
id: string,
source: string,
specversion: string,
type: string,
datacontenttype: string,
dataschema: string,
subject: string,
['time']: datetime,
data: dynamic,
seriesclock: datetime
)
.alter table airport_events policy streamingingestion enable
// Bronze: raw flight operational events
.create table typeb_flight_events (
id: string,
source: string,
specversion: string,
type: string,
datacontenttype: string,
dataschema: string,
subject: string,
['time']: datetime,
data: dynamic,
seriesclock: datetime
)
.alter table typeb_flight_events policy streamingingestion enable
// Bronze: event tables
.create table ['Airport.Passenger.Checkin_v1']
(
___id : string,
___source : string,
___type : string,
___time : datetime,
___subject : string,
flightId : string,
flightNumber : string,
airline : string,
origin : string,
destination : string,
departureUtc : string,
paxId : string,
name : string
)

Ingestion Mapping

When events arrive as JSON via Eventstream, you’ll want an ingestion mapping so the raw JSON gets parsed correctly into the table columns:

// Ingestion mapping for airport events
.create-or-alter table airport_events ingestion json mapping "airport_events_mapping"
```
[
{
"column": "id",
"path": "$.id",
"datatype": "string",
"transform": null
},
{
"column": "source",
"path": "$.source",
"datatype": "string",
"transform": null
},
{
"column": "specversion",
"path": "$.specversion",
"datatype": "string",
"transform": null
},
{
"column": "type",
"path": "$.type",
"datatype": "string",
"transform": null
},
{
"column": "datacontenttype",
"path": "$.datacontenttype",
"datatype": "string",
"transform": null
},
{
"column": "dataschema",
"path": "$.dataschema",
"datatype": "string",
"transform": null
},
{
"column": "subject",
"path": "$.subject",
"datatype": "string",
"transform": null
},
{
"column": "time",
"path": "$.time",
"datatype": "datetime",
"transform": null
},
{
"column": "data",
"path": "$.data",
"datatype": "dynamic",
"transform": null
}
]
```
// Ingestion mapping for flight events
.create-or-alter table typeb_flight_events ingestion json mapping "typeb_flight_events_mapping"
```
[
{
"column": "id",
"path": "$.id",
"datatype": "string",
"transform": null
},
{
"column": "source",
"path": "$.source",
"datatype": "string",
"transform": null
},
{
"column": "specversion",
"path": "$.specversion",
"datatype": "string",
"transform": null
},
{
"column": "type",
"path": "$.type",
"datatype": "string",
"transform": null
},
{
"column": "datacontenttype",
"path": "$.datacontenttype",
"datatype": "string",
"transform": null
},
{
"column": "dataschema",
"path": "$.dataschema",
"datatype": "string",
"transform": null
},
{
"column": "subject",
"path": "$.subject",
"datatype": "string",
"transform": null
},
{
"column": "time",
"path": "$.time",
"datatype": "datetime",
"transform": null
},
{
"column": "data",
"path": "$.data",
"datatype": "dynamic",
"transform": null
}
]
```

Silver Layer via Update Policy

Rather than running a scheduled job to promote Bronze to Silver, Update Policies do this automatically at ingest time. Define a function that transforms the raw event, then attach it as a policy. In the following example I’ve embedded my KQL query directly in the policy. You could instead create a KQL function for this query and then reference it here instead.

.alter table ['Airport.Passenger.Checkin_v1'] policy update
```
[
{
"IsEnabled": true,
"Source": "airport_events",
"Query": "let bags = airport_events
| where type == 'Airport.Passenger.Checkin' and isnull(array_length(data))==true;
let arrays = airport_events
| where type == 'Airport.Passenger.Checkin' and isnull(array_length(data))==false
| mv-expand data;
bags
| union arrays
| project
___id = tostring(id),
___source = tostring(source),
___type = tostring(type),
___time = todatetime(['time']),
___subject = tostring(subject),
flightId = tostring(data.flightId),
flightNumber = tostring(data.flightNumber),
airline = tostring(data.airline),
origin = tostring(data.origin),
destination = tostring(data.destination),
departureUtc = tostring(data.departureUtc),
paxId = tostring(data.paxId),
name = tostring(data.name)",
"IsTransactional": false,
"PropagateIngestionProperties": false
}
]
```
// Legacy table name aliases (functions for backward compatibility)
.create-or-alter function airport_passenger_checkin() {
['Airport.Passenger.Checkin_v1']
}

Every event that lands in BaggageEvents_Bronze is automatically transformed and inserted into BaggageEvents_Silver — no pipelines, no orchestration.

Gold Layer via Materialized Views

Materialized Views pre-aggregate data so your dashboard queries are fast, even against billions of rows. Here is a sample useful ones for this use case:

// Gold: latest status per flight
.create materialized-view flights_current on table flights {
flights
| summarize arg_max(updated, *) by flightId
}

When we’re done, we should see our database schema of tables, materialized views, and functions:

Step 3: Configure the Eventstream

Now wire up the ingestion pipeline. In your Fabric workspace, click New → Eventstream and name it evs-airport-events.

Option A: Azure Event Hubs as Source (what I’m doing)

If you’re publishing events from the Baggage Handling Simulator to Azure Event Hubs:

  1. In the Eventstream canvas, click Add source → Azure Event Hubs
  2. Enter your Event Hubs namespace, hub name, and connection string (or use a Fabric connection)
  3. Set the consumer group — use $Default for dev/test
  4. Set the data format — use json

Option B: Custom Endpoint (no Event Hubs needed)

Fabric Eventstream also exposes a custom endpoint — an HTTPS or AMQP ingest URL you can publish CloudEvents directly to, without needing an external Event Hubs namespace. This is great for demos and local testing.

  1. Click Add source → Custom endpoint
  2. Copy the connection string — you’ll use this in the simulator config

Add the Eventhouse Destination

  1. Click Add destination → Eventhouse
  2. Select the Airport-Eventhouse Eventhouse and the KQL database
  3. Select the airport_events table and the airport_events_mapping ingestion mapping

Once everything is setup, this is what we should see:

One of the nice features in Fabric Eventhouse that we don’t see in Azure Data Explorer is the Entity Diagram, which is currently in preview. Go back to your KQL database main view and click on the entity diagram button:

Step 4: Run the Baggage Handling Simulator

Clone the simulator and install dependencies:

git clone https://github.com/calloncampbell/BaggageHandling-TypeB-Simulator.git
cd BaggageHandlingSimulator
pip install -r requirements.txt

Configure the connection string for your Event Hubs namespace or Fabric custom endpoint in config.json (or as environment variables — check the repo README for the exact format).

Run the simulator:

python -m baggage_simulator.cli --verbose --clock-speed 120 --flight-interval-minutes 5 --max-active-flights 5 --eventhub-conn "**********************" --eventhub-name "airport-events-evh"

The --speed multiplier lets you fast-forward simulation time so you don’t have to wait hours for bags to travel through their lifecycle. With --clock-speed 120, a full flight’s baggage cycle completes in minutes.

Within seconds, you should see events flowing into the Eventstream and landing in the Eventhouse Bronze tables.

Step 5: Query the Data

Open a KQL Queryset in your workspace and start exploring. Here are a few queries I find useful:

Track a specific bag end-to-end

BaggageEvents_Silver
| where BagTagNumber == "0014567891234"
| order by Timestamp asc
| project Timestamp, EventCategory, Location, FlightNumber

Find bags that haven’t been delivered (potential mishandles)

BagLatestStatus
| where EventCategory != "Delivered" and EventCategory != "Lost"
| where Timestamp < ago(2h)
| project BagTagNumber, FlightNumber, EventCategory, Location, Timestamp
| order by Timestamp asc

Baggage throughput by event type over the last hour

BaggageEventsByHour
| where Timestamp > ago(1h)
| summarize Total = sum(EventCount) by EventType
| order by Total desc
| render barchart

Average bag journey time (check-in to delivery) per flight

let CheckIn = BaggageEvents_Silver | where EventCategory == "CheckedIn" | project BagTagNumber, CheckInTime = Timestamp;
let Delivered = BaggageEvents_Silver | where EventCategory == "Delivered" | project BagTagNumber, DeliveredTime = Timestamp;
CheckIn
| join kind=inner Delivered on BagTagNumber
| extend JourneyMinutes = datetime_diff('minute', DeliveredTime, CheckInTime)
| summarize AvgJourneyMinutes = avg(JourneyMinutes), BagCount = count() by bin(CheckInTime, 1h)
| order by CheckInTime desc

Step 6: Build the Real-Time Dashboard

Create a Real-Time Dashboard in your workspace. Add tiles by writing KQL queries directly in the dashboard editor — no separate report tool needed.

You can always view the query for each of the tiles by clicking on the … menu and then view query:

Useful tiles for this scenario:

  • Bags in-flight right now — count of bags with a status other than Delivered or Lost
  • Events per minute — a time chart showing ingestion rate
  • Lost or rejected bags — a table filtered to EventCategory in ("Lost", "Rejected")
  • Baggage throughput by hour — bar or area chart of event volume over time

Set the dashboard auto-refresh to 30 seconds for a live operations feel.

Step 7: Set Up an Activator Alert

Activator is Fabric’s alerting engine, and this is where things get genuinely useful. You can define a rule that watches a KQL query result and triggers an action when a condition is met.

From the Real-Time Dashboard, click Set alert on the “Lost or rejected bags” tile. Configure:

  • Condition: row count > 0
  • Action: send an email, Teams message, or trigger a Power Automate flow
  • Check frequency: every 5 minutes

You can also create Activator items directly from the Eventhouse using Data Activator and write your own detection query — useful for more complex conditions like “bag hasn’t progressed in 45 minutes”:

BagLatestStatus
| where EventCategory !in ("Delivered", "Lost")
| where Timestamp < ago(45m)

Putting It All Together

Here’s the full flow end-to-end:

  1. Simulator generates CloudEvents and publishes to Event Hubs / Eventstream endpoint
  2. Eventstream ingests, routes by event type, and writes to Bronze tables in Eventhouse
  3. Update Policies automatically promote Bronze → Silver on ingest
  4. Materialized Views continuously aggregate Silver → Gold
  5. KQL Querysets power ad-hoc investigation
  6. Real-Time Dashboard shows live operations at a glance
  7. Activator fires alerts when something goes wrong

The whole pipeline is serverless from the Eventstream inward — there’s no infrastructure to manage, no Spark jobs to schedule, and no orchestration to babysit.

That’s what I find most impressive about Fabric RTI for this kind of scenario. The time from “event published” to “insight on a dashboard with an alert configured” is measured in minutes, not weeks.

Enjoy!

References

AzureAzure Event Hubs

Boost Data Reliability with Geo-Replication for Azure Event Hubs

This week, Microsoft announced the public preview of geo-replication for Azure Event Hubs. Geo-replication enhances Microsoft Azure data availability and geo-disaster recovery capabilities by enabling the replication of Event Hubs data payloads across different Azure regions.

With geo-replication, your client applications continue to interact with the primary namespace. Customers can designate a secondary region, choose replication consistency (synchronous or asynchronous), and set replication lag for the data. The service handles the replication between primary and secondary regions. If a primary change is needed (for maintenance or failover), the secondary can be promoted to primary, seamlessly servicing all client requests without altering any configurations (connection strings, authentication, etc.). The former primary then becomes the secondary, ensuring synchronization between both regions.

In summary, geo-replication is designed to provide you with the following benefits:

  • High availability: You can ensure that your data is always accessible and durable, even in the event of a regional outage or disruption. You can also reduce the impact of planned maintenance events by switching to the secondary region before the primary region undergoes any updates or changes.
  • Disaster recovery: You can recover your data quickly and seamlessly in case of a disaster that affects your primary region. You can initiate a failover to the secondary region and resume your data streaming operations with minimal downtime and data loss.
  • Regional compliance: You can meet the regulatory and compliance requirements of your industry or region by replicating your data to a secondary region that complies with the same or similar standards as your primary region. You can also leverage the geo-redundancy of your data to support your business continuity and resilience plans.

How to get started with Azure Event Hubs Geo-replication?

If you want to try out Azure Event Hubs Geo-replication, please check out the official documentation over at Azure Event Hubs Geo-replication documentation and they also have a demo here.

I look forward to when this becomes GA and is available in more regions.

Enjoy!

References

https://techcommunity.microsoft.com/t5/messaging-on-azure-blog/announcing-public-preview-for-geo-replication-for-azure-event/ba-p/4164522

Azure Event Hubs Geo-replication documentation

AzureAzure Event Hubs

Azure Event Hubs Unveils Large Message Support

This week Microsoft announced in public preview, support for large messages (up to 20 MB) in Azure Event Hubs in its self-service scalable dedicated clusters, enhancing its capabilities to handle a wide range of message sizes without additional costs.

This new feature allows for seamless streaming of large messages without requiring any client code changes, maintaining compatibility with existing Event Hubs SDKs and the Kafka API. This enhancement ensures uninterrupted business operations by accommodating instances where messages cannot be divided into smaller segments. The service continues to offer high throughput and low latency, making it a robust solution for data streaming needs.

What are some cases for large message support?

Here are some key use cases for the new large message support in Azure Event Hubs:

  • Multimedia Streaming: Handling large video, audio, or image files that cannot be split into smaller segments.
  • Data Aggregation: Transmitting aggregated data sets or logs that exceed typical message size limits.
  • IoT Applications: Streaming large sensor data or firmware updates from IoT devices.
  • Batch Processing: Sending large batches of data for processing without needing to break them down.

These enhancements ensure seamless and uninterrupted business operations across various scenarios.

How do you enable large message support?

To enable large message support in your existing Azure Event Hubs setup, follow these steps:

  1. Use Self-Serve Scalable Dedicated Clusters: Ensure your Event Hubs are built on the latest infrastructure that supports self-serve scalable dedicated clusters. If you are using Event Hubs, then you will need to create an Event Hub Cluster to take advantage of large message support.
  2. No Client Code Changes Needed: You can continue using your existing Event Hubs SDK or Kafka API. The only change required is in the message or event size itself.

For more detailed instructions, visit the documentation at aka.ms/largemessagesupportforeh.

How do Azure Event Hubs differ from Azure Event Hub Clusters?

Azure Event Hubs and Event Hub Clusters serve different purposes within the Azure ecosystem:

  • Azure Event Hubs: This is a fully managed, real-time data ingestion service that can receive and process millions of events per second. It’s designed for high-throughput data streaming and is commonly used for big data and analytics.
  • Azure Event Hub Clusters: These are dedicated clusters that provide isolated resources for Event Hubs. They offer enhanced performance, scalability, and the ability to handle large messages (up to 20 MB). Clusters are ideal for scenarios requiring high throughput and low latency.

Enjoy!

References

https://techcommunity.microsoft.com/t5/messaging-on-azure-blog/announcing-large-message-support-for-azure-event-hubs-public/ba-p/4146455

https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-quickstart-stream-large-messages

https://learn.microsoft.com/en-us/azure/event-hubs/compare-tiers

Azure Event Hubs Overview

AzureCloudUncategorized

How to choose Azure services for working with messages in your application | Azure Friday

In this episode of Azure Friday, Azure MVP Barry “Azure Barry” Luijbregts joins Scott Hanselman to outline how you can choose the right services for working with messages and events in your application.

[0:00:48] – Presentation

Source: Channel 9

Resources

AzureCloudCloud Native

Go serverless: Big data processing with Azure Event Hubs for Apache Kafka | Azure Friday

Working with Apache Kafka and want to simplify management of your infrastructure? In this episode of Azure Friday, Lena Hall joins Scott Hanselman to show you can keep using Apache Kafka libraries for hundreds of projects, and try Azure Event Hubs behind the scenes to focus on code instead of maintaining infrastructure.

[0:04:44] – Demo

Source: Channel 9

Want more? Lena wrote a great article titled Apache Kafka Applications Can Work Without Apache Kafka Cluster? which can be read at https://dev.to/azure/apache-kafka-applications-can-work-without-apache-kafka-cluster-3en5.

Resources