Azure Event Hubs で送受信

あるアプリケーションから別のアプリケーションへデータを送信する方法として、
Microsoft Azure のイベント ハブ (Event Hubs) を利用できます。
イベント ハブは、小さいデータを短い間隔で送るというシナリオに向いています。
また、イベント ハブは文字通りハブとして機能し、送信側と受信側を多対多とすることができます。

今回は、基本的には公式のチュートリアルである Event Hubs の使用の方法をもとにして実装していきます。
(そのため、重複する内容は割愛することがあります。)

まず、Azure の管理ポータルでイベント ハブとストレージを作成します。
ストレージは、受信のための構成を保存しておく場所になるようです。

サービス バスの名前空間を sakapon-event-201508-ns、イベント ハブの名前を sakapon-event-201508 としました。

イベント ハブ

共有アクセス ポリシーには、SendRule と ReceiveRule を定義しておきます。

イベント ハブ

 

続いて、Visual Studio でプロジェクトを作成します。
今回は、送信側も受信側も WPF アプリケーションとして作成しました。

プロジェクトを作成したら、NuGet で、送信側には WindowsAzure.ServiceBus を、
受信側には Microsoft.Azure.ServiceBus.EventProcessorHost をインストールします。
このとき、App.config には接続文字列などの設定のためのテンプレートが追加されます。

以下、送信側および受信側の実装について説明します。

 

送信側の実装

まず、App.config の appSettings に、キーが Microsoft.ServiceBus.ConnectionString のエントリが生成されているので、
そこに SendRule に対応する接続文字列を指定します。

App.config

using System;
using System.Text;
using Microsoft.ServiceBus.Messaging;
using Newtonsoft.Json;
namespace SenderWpf
{
public class AppModel
{
public static void SendMessage()
{
var client = EventHubClient.Create("sakapon-event-201508");
var obj = new { id = 123, text = "Hello" };
var message = JsonConvert.SerializeObject(obj);
var data = new EventData(Encoding.UTF8.GetBytes(message));
client.Send(data);
}
}
}
view raw AppModel.cs hosted with ❤ by GitHub

EventHubClient オブジェクトを初期化するときに、
サービス バスの名前空間ではなく、イベント ハブの名前を指定します。
特殊な指定方法ですが、これで接続文字列が .config ファイルから読み込まれます。

あとは、送りたいメッセージをバイナリ データ (byte[] や Stream) に変換して、
Send または SendAsync メソッドに渡すだけです。
複数の属性値を持つオブジェクトを送信するには、JSON などにシリアライズするとよいでしょう。

 

受信側の実装

送信側のときと同様、App.config に、ReceiveRule に対応する接続文字列を設定します。
なお、イベント ハブのほかに、ストレージの接続文字列も設定します。

EventProcessorHost オブジェクトを初期化するには接続文字列が必要になりますが、
送信側のときとは異なり、.config ファイルから透過的に読み込む機能はないため、明示的に接続文字列を取得しています。

App.config

using System;
using System.Configuration;
using Microsoft.ServiceBus.Messaging;
using Newtonsoft.Json;
namespace ReceiverWpf
{
public class AppModel
{
public AppModel()
{
StaticEventProcessor.MessageArrived += s =>
{
dynamic obj = JsonConvert.DeserializeObject(s);
int id = obj.id;
string text = obj.text;
};
var hostName = string.Format("Host-{0:yyyyMMdd-HHmmss}", DateTime.Now);
var eventHubName = "sakapon-event-201508";
var eventHubConnectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
var storageConnectionString = ConfigurationManager.AppSettings["StorageConnection"];
// Receives an event once for one consumer group.
// EventHubConsumerGroup.DefaultGroupName is "$Default".
var host = new EventProcessorHost(hostName, eventHubName, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
host.RegisterEventProcessorAsync<StaticEventProcessor>();
}
}
}
view raw AppModel.cs hosted with ❤ by GitHub
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;
namespace ReceiverWpf
{
public class StaticEventProcessor : IEventProcessor
{
public static event Action<string> MessageArrived;
static readonly object messageLock = new object();
public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
Debug.WriteLine("Partition {0}: Processor closing. Reason: {1}", context.Lease.PartitionId, reason);
if (reason == CloseReason.Shutdown)
await context.CheckpointAsync();
}
public Task OpenAsync(PartitionContext context)
{
Debug.WriteLine("Partition {0}: Processor opening. Offset: {1}", context.Lease.PartitionId, context.Lease.Offset);
return Task.FromResult<object>(null);
}
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var data in messages)
{
lock (messageLock)
{
var message = Encoding.UTF8.GetString(data.GetBytes());
Debug.WriteLine("Partition {0}: Message received. {1}", context.Lease.PartitionId, message);
var h = MessageArrived;
if (h != null) h(message);
}
}
await context.CheckpointAsync();
}
}
}

通知されたイベントを実際に処理するクラスとして、IEventProcessor インターフェイスを実装したクラスを作成しなければなりません。
パーティション 1 つにつき、そのインスタンスが 1 つ生成されます。

ここでは StaticEventProcessor クラスを作成しましたが、
内部的に既定のコンストラクターが呼び出されて初期化されるため、外からデリゲートを渡すのが難しい構造です。
実装がややこしくなるため、この SDK には改良の余地があるように思えます。

 

さて、上記の実装をもとに、送信側の WPF アプリケーション (SenderWpf) をマウスなどでドラッグして移動させると
受信側の WPF アプリケーション (ReceiverWpf) が自動的に同じ座標に追従するというサンプルを作成しました。
完全なソースコードは EventHubsSample – GitHub にあります。

Event Hubs - sync apps

なお、その他の注意点は以下の通りです。

  • イベントの到着の順序は保証されません。
    (上のサンプルで非連続な動作があるのはそのためです。)
  • PartitionContext や EventData は一意の SequenceNumber を持ちますが、
    これらはパーティションの中で一意であって、コンシューマー グループ全体で一意になるわけではありません。
    したがって、メッセージに一意性を持たせるには、ID や時刻を含めて送信する必要があります。
  • それぞれのデータは、コンシューマー グループ 1 つにつき 1 回だけ配信されます。
    同じコンシューマー グループに設定したアプリケーションを 2 つ起動すると、
    受信するデータが半分ずつ程度に分散してしまいます。
    コンシューマー グループを追加することにより、送信側と受信側を多対多にすることができます。
  • PartitionContext.CheckpointAsync メソッドにより、イベントを受信したことを記録します。
    これが呼び出されないと、また同じイベントが通知されます。
    また、送信側が稼働しているのに受信側が稼働していない場合もデータは蓄積されているため、
    次に受信側を起動したときに大量に押し寄せてくることがあります。

 

作成したサンプル
EventHubsSample (GitHub)

バージョン情報
.NET Framework 4.5
WindowsAzure.ServiceBus 3.0.1
Microsoft.Azure.ServiceBus.EventProcessorHost 2.0.2

参照
Event Hubs の使用

カテゴリー: クラウド. タグ: , . Leave a Comment »

コメントを残す