Azure Event Hubs で送受信

あるアプリケーションから別のアプリケーションへデータを送信する方法として、
Microsoft Azure のイベント ハブ (Event Hubs) を利用できます。
イベント ハブは、小さいデータを短い間隔で送るというシナリオに向いています。
また、イベント ハブは文字通りハブとして機能し、送信側と受信側を多対多とすることができます。

今回は、基本的には公式のチュートリアルである Event Hubs の使用の方法をもとにして実装していきます。
(そのため、重複する内容は割愛することがあります。)

まず、Azure の管理ポータルでイベント ハブとストレージを作成します。
ストレージは、受信のための構成を保存しておく場所になるようです。

サービス バスの名前空間を sakapon-event-201508-ns、イベント ハブの名前を sakapon-event-201508 としました。

イベント ハブ

共有アクセス ポリシーには、SendRule と ReceiveRule を定義しておきます。

イベント ハブ

 

続いて、Visual Studio でプロジェクトを作成します。
今回は、送信側も受信側も WPF アプリケーションとして作成しました。

プロジェクトを作成したら、NuGet で、送信側には WindowsAzure.ServiceBus を、
受信側には Microsoft.Azure.ServiceBus.EventProcessorHost をインストールします。
このとき、App.config には接続文字列などの設定のためのテンプレートが追加されます。

以下、送信側および受信側の実装について説明します。

 

送信側の実装

まず、App.config の appSettings に、キーが Microsoft.ServiceBus.ConnectionString のエントリが生成されているので、
そこに SendRule に対応する接続文字列を指定します。

App.config

using System;
using System.Text;
using Microsoft.ServiceBus.Messaging;
using Newtonsoft.Json;
namespace SenderWpf
{
public class AppModel
{
public static void SendMessage()
{
var client = EventHubClient.Create("sakapon-event-201508");
var obj = new { id = 123, text = "Hello" };
var message = JsonConvert.SerializeObject(obj);
var data = new EventData(Encoding.UTF8.GetBytes(message));
client.Send(data);
}
}
}
view raw AppModel.cs hosted with ❤ by GitHub

EventHubClient オブジェクトを初期化するときに、
サービス バスの名前空間ではなく、イベント ハブの名前を指定します。
特殊な指定方法ですが、これで接続文字列が .config ファイルから読み込まれます。

あとは、送りたいメッセージをバイナリ データ (byte[] や Stream) に変換して、
Send または SendAsync メソッドに渡すだけです。
複数の属性値を持つオブジェクトを送信するには、JSON などにシリアライズするとよいでしょう。

 

受信側の実装

送信側のときと同様、App.config に、ReceiveRule に対応する接続文字列を設定します。
なお、イベント ハブのほかに、ストレージの接続文字列も設定します。

EventProcessorHost オブジェクトを初期化するには接続文字列が必要になりますが、
送信側のときとは異なり、.config ファイルから透過的に読み込む機能はないため、明示的に接続文字列を取得しています。

App.config

using System;
using System.Configuration;
using Microsoft.ServiceBus.Messaging;
using Newtonsoft.Json;
namespace ReceiverWpf
{
public class AppModel
{
public AppModel()
{
StaticEventProcessor.MessageArrived += s =>
{
dynamic obj = JsonConvert.DeserializeObject(s);
int id = obj.id;
string text = obj.text;
};
var hostName = string.Format("Host-{0:yyyyMMdd-HHmmss}", DateTime.Now);
var eventHubName = "sakapon-event-201508";
var eventHubConnectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
var storageConnectionString = ConfigurationManager.AppSettings["StorageConnection"];
// Receives an event once for one consumer group.
// EventHubConsumerGroup.DefaultGroupName is "$Default".
var host = new EventProcessorHost(hostName, eventHubName, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
host.RegisterEventProcessorAsync<StaticEventProcessor>();
}
}
}
view raw AppModel.cs hosted with ❤ by GitHub
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;
namespace ReceiverWpf
{
public class StaticEventProcessor : IEventProcessor
{
public static event Action<string> MessageArrived;
static readonly object messageLock = new object();
public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
Debug.WriteLine("Partition {0}: Processor closing. Reason: {1}", context.Lease.PartitionId, reason);
if (reason == CloseReason.Shutdown)
await context.CheckpointAsync();
}
public Task OpenAsync(PartitionContext context)
{
Debug.WriteLine("Partition {0}: Processor opening. Offset: {1}", context.Lease.PartitionId, context.Lease.Offset);
return Task.FromResult<object>(null);
}
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var data in messages)
{
lock (messageLock)
{
var message = Encoding.UTF8.GetString(data.GetBytes());
Debug.WriteLine("Partition {0}: Message received. {1}", context.Lease.PartitionId, message);
var h = MessageArrived;
if (h != null) h(message);
}
}
await context.CheckpointAsync();
}
}
}

通知されたイベントを実際に処理するクラスとして、IEventProcessor インターフェイスを実装したクラスを作成しなければなりません。
パーティション 1 つにつき、そのインスタンスが 1 つ生成されます。

ここでは StaticEventProcessor クラスを作成しましたが、
内部的に既定のコンストラクターが呼び出されて初期化されるため、外からデリゲートを渡すのが難しい構造です。
実装がややこしくなるため、この SDK には改良の余地があるように思えます。

 

さて、上記の実装をもとに、送信側の WPF アプリケーション (SenderWpf) をマウスなどでドラッグして移動させると
受信側の WPF アプリケーション (ReceiverWpf) が自動的に同じ座標に追従するというサンプルを作成しました。
完全なソースコードは EventHubsSample – GitHub にあります。

Event Hubs - sync apps

なお、その他の注意点は以下の通りです。

  • イベントの到着の順序は保証されません。
    (上のサンプルで非連続な動作があるのはそのためです。)
  • PartitionContext や EventData は一意の SequenceNumber を持ちますが、
    これらはパーティションの中で一意であって、コンシューマー グループ全体で一意になるわけではありません。
    したがって、メッセージに一意性を持たせるには、ID や時刻を含めて送信する必要があります。
  • それぞれのデータは、コンシューマー グループ 1 つにつき 1 回だけ配信されます。
    同じコンシューマー グループに設定したアプリケーションを 2 つ起動すると、
    受信するデータが半分ずつ程度に分散してしまいます。
    コンシューマー グループを追加することにより、送信側と受信側を多対多にすることができます。
  • PartitionContext.CheckpointAsync メソッドにより、イベントを受信したことを記録します。
    これが呼び出されないと、また同じイベントが通知されます。
    また、送信側が稼働しているのに受信側が稼働していない場合もデータは蓄積されているため、
    次に受信側を起動したときに大量に押し寄せてくることがあります。

 

作成したサンプル
EventHubsSample (GitHub)

バージョン情報
.NET Framework 4.5
WindowsAzure.ServiceBus 3.0.1
Microsoft.Azure.ServiceBus.EventProcessorHost 2.0.2

参照
Event Hubs の使用

カテゴリー: クラウド. タグ: , . Leave a Comment »

クラスタリングを実装する (C#)

以前に Azure Machine Learning で色のクラスタリングを投稿しましたが、
このときはクラウド サービスである Azure Machine Learning を利用してクラスタリングを実行させていました。
しかし、この方法ではネットワークが必要であり、少なくとも現在提供されている機能では、
クライアント アプリケーションなどでリアルタイムに再学習させていくのは難しそうです。

そこで今回は、k 平均法 (k-means 法) によるクラスタリングを自前で実装してみます。

クラスタリングの対象となる各データはそれぞれの特徴を表す属性の集合を持ちますが、
k 平均法は、この属性の集合をベクトル (次元数は属性数) と見なし、
それらの距離が近いもの同士を同じクラスターに振り分けるという単純なものです。

k 平均法の詳細の手順は次の通りです。
なお、各クラスターにおけるデータの平均値を表すベクトルを重心 (centroid) と呼びます。

(0) クラスター数および反復回数を与える

(1) データの集合の中からクラスターの数だけデータを選び、各クラスターの初期の重心とする

(2) すべてのデータを、重心との距離が最も小さいクラスターに割り当てる

(3) (2) の結果として得られた各クラスターで、重心を再計算する

(4) (2)(3) を与えられた回数だけ反復する

この k 平均法のアルゴリズムについては、K-means 法を D3.js でビジュアライズしてみたを参照するとよいでしょう。
クラスタリングの処理が収束していく様子が視覚化されており、理解しやすいと思います。

さて、このアルゴリズムを C# で次のように実装してみました。
(ただし、ここに載せているのは一部です。完全なものは ClusteringConsole – GitHub にあります。)

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
namespace ClusteringConsole
{
[DebuggerDisplay(@"\{Clusters: {ClustersNumber}, Iterations: {IterationsNumber}\}")]
public class KMeans<T>
{
public int ClustersNumber { get; private set; }
public int IterationsNumber { get; private set; }
public KMeans(int clustersNumber, int iterationsNumber)
{
ClustersNumber = clustersNumber;
IterationsNumber = iterationsNumber;
}
public Dictionary<int, Record<T>[]> Train(Record<T>[] records)
{
var clusters = InitializeClusters(ClustersNumber, records);
for (var i = 0; i < IterationsNumber; i++)
TrainOnce(clusters, records);
return clusters.ToDictionary(c => c.Id, c => c.Records.ToArray());
}
static Cluster<T>[] InitializeClusters(int clustersNumber, Record<T>[] records)
{
return RandomUtility.ShuffleRange(records.Length)
.Take(clustersNumber)
.Select(i => records[i])
.Select((r, i) => new Cluster<T>(i, r.Features))
.ToArray();
}
static void TrainOnce(Cluster<T>[] clusters, Record<T>[] records)
{
Array.ForEach(clusters, c => c.Records.Clear());
AssignRecords(clusters, records);
Array.ForEach(clusters, c => c.TuneCentroid());
}
static void AssignRecords(Cluster<T>[] clusters, IEnumerable<Record<T>> records)
{
foreach (var record in records)
{
var cluster = clusters.FirstToMin(c => FeaturesHelper.GetDistance(c.Centroid, record.Features));
cluster.Records.Add(record);
}
}
}
[DebuggerDisplay(@"\{{ToDebugString()}\}")]
public struct Record<T>
{
public T Element { get; set; }
public double[] Features { get; set; }
string ToDebugString()
{
return string.Format("{0}: {1}", Element, FeaturesHelper.ToString(Features));
}
}
[DebuggerDisplay(@"\{{ToDebugString()}\}")]
class Cluster<T>
{
public int Id { get; private set; }
public double[] Centroid { get; private set; }
public List<Record<T>> Records { get; private set; }
public Cluster(int id, double[] centroid)
{
Id = id;
Centroid = centroid;
Records = new List<Record<T>>();
}
public void TuneCentroid()
{
if (Records.Count == 0) return;
Centroid = Enumerable.Range(0, Centroid.Length)
.Select(i => Records.Average(r => r.Features[i]))
.ToArray();
}
string ToDebugString()
{
return string.Format("{0}: {1}: {2} records", Id, FeaturesHelper.ToString(Centroid), Records.Count);
}
}
public static class FeaturesHelper
{
public static double GetDistance(double[] p1, double[] p2)
{
return Math.Sqrt(p1.Zip(p2, (x1, x2) => x1 - x2).Sum(x => x * x));
}
public static double GetNorm(double[] p)
{
return Math.Sqrt(p.Sum(x => x * x));
}
public static string ToString(double[] p)
{
return string.Join(", ", p.Select(x => x.ToString("F3")));
}
}
}
view raw KMeans.cs hosted with ❤ by GitHub
using System;
using System.Collections.Generic;
using System.Drawing;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace ClusteringConsole
{
class Program
{
static void Main(string[] args)
{
ClusterColors();
}
static void ClusterColors()
{
var colors = typeof(Color).GetProperties(BindingFlags.Public | BindingFlags.Static)
.Where(p => p.PropertyType == typeof(Color))
.Select(p => (Color)p.GetValue(null))
.Where(c => c.A == 255) // Exclude Transparent.
.ToArray();
var records = colors
.Select(c => new Record<Color> { Element = c, Features = new double[] { c.R, c.G, c.B } })
.ToArray();
var clustering = new KMeans<Color>(20, 50);
var clusters = clustering.Train(records);
foreach (var cluster in clusters)
{
Console.WriteLine(cluster.Key);
Console.WriteLine(string.Join(", ", cluster.Value.Select(r => r.Element.Name)));
}
}
}
}
view raw Program.cs hosted with ❤ by GitHub

対象のデータとして System.Drawing.Color で定義されている色の集合を利用し、属性は R, G, B の 3 つとしました。
このコンソール アプリケーションを実行すると、次の図のように結果が出力されます (結果は毎回異なります)。

ClusteringConsole

 

とりあえず単純な方法で実装してみましたが、この方法では、
(1) の段階で等しい値のデータが別々のクラスターの重心として割り当てられると、
データを持たないクラスターが出てくる可能性があります。
したがって、初期の重心を分散させるなど、改良の余地はありそうです。

ちなみに今回はコンソール アプリケーションで作成しましたが、
クラスタリングの結果を視覚化すれば Azure Machine Learning で色のクラスタリング (2) の結果と同じようになるでしょう:

Azure Machine Learning で色のクラスタリング

 

作成したサンプル
ClusteringConsole (GitHub)

参照
k平均法 – Wikipedia
K-means 法を D3.js でビジュアライズしてみた
Azure Machine Learning で色のクラスタリング (1)
「クラウドではじめる機械学習 Azure ML でらくらく体験」書評