タイトル通り。Azure Functions の Event Hubs の出力バインディングでデータ送信する際に、一貫性を持たせたい(イベントの順序を維持したい)場合、ドキュメントにあるやり方だと無理なのでちょっと調べました。
まずよくある出力バインディングのやり方。( https://learn.microsoft.com/ja-jp/azure/azure-functions/functions-bindings-event-hubs-output?tabs=in-process%2Cfunctionsv2%2Cextensionv5&pivots=programming-language-csharp#example )
[FunctionName("EH2EH")]
public static async Task Run(
[EventHubTrigger("source", Connection = "EventHubConnectionAppSetting")] EventData[] events,
[EventHub("dest", Connection = "EventHubConnectionAppSetting")]IAsyncCollector<string> outputEvents,
ILogger log)
{
foreach (EventData eventData in events)
{
// do some processing:
var myProcessedEvent = DoSomething(eventData);
// then send the message
await outputEvents.AddAsync(JsonConvert.SerializeObject(myProcessedEvent));
}
}
わかりやすいですね(?)。 IAsyncCollector の AddAsync メソッドを呼べば送信できます。その他には EventData を使ったりすることもできますが、いずれにせよ順序を維持するための(一貫性を保つために)Event Hubs のパーティションIDを指定することができません。(パーティションと一貫性についてはこちらやこちらを参考に)
実際上記のコードをもうちょっと弄って、試しに100のユニークなID(クライアントみたいなのを表現)に関するデータをそれぞれ10回を投げた時に使われた Event Hubs のパーティションIDを見るとバラバラになります。

さてパーティションIDを指定するにはどうしたらいいかというと、 EventHubProducerClient の SendAsync メソッドで送信する際に SendEventOptions で PartitionKey か PartitionId を指定すれば良いです。
EventHubProducerClient はSDK使って自前管理しないと?と思ってしまいますが、普通に EventHubProducerClient のインスタンスを渡してくれるので、下記みたいにすれば大丈夫です。
[FunctionName("SendRequest")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req,
[EventHub("%EventHubName%", Connection = "EventHubsConnection")] EventHubProducerClient eventHubClient,
ILogger log)
{
await eventHubClient.SendAsync(
new List<EventData> {
new EventData {
EventBody = new System.BinaryData(id)
}
},
new SendEventOptions
{
PartitionKey = id
});
return new OkResult();
}
PartitionKey を指定した場合はキーのハッシュ値をパーティション数で割った余りで計算してるんじゃないでしょうか(知らないけど)※ Math.Abs(key.GetHashCode() % TotalNumberOfPartition) みたいにするのを見かけた
結果はというと、わざわざパーティションキー(ID)を指定してるので同じキーに対しては同じパーティションが使われるようになりました。

これで仮に同一IDのものに対しては1つのパーティションにデータが配信されるので、順序を維持したまま処理することができるようになります。(他のパーティションにばらけると他のキーのデータやコンシューマー側で処理するタイミングによっては順序が変わってしまう)
※ちなみにコンシューマー側(Event Hubs トリガーなFunctions)のコードはこんな感じで受信データのパーティションIDなどを出力してます。
[FunctionName("Function1")]
public async Task Run([EventHubTrigger("%EventHubName%", Connection = "EventHubsConnection")] EventData eventData,
PartitionContext PartitionContext,
string PartitionKey)
{
var messageBody = Encoding.UTF8.GetString(eventData.EventBody.ToArray());
Logger.LogWarning("partitionId\t{id}\tpartitionKey\t{key}\tbody\t{body}", PartitionContext.PartitionId, PartitionKey, messageBody);
}
というわけでAzure Functions で Event Hubs に対して出力バインディングを使ってパーティション指定したデータ送信ができました。
ただ、公式ドキュメントにも書いてる通り順序を維持する場合のデメリットもあります。
- パーティションID指定時
- 順序が維持できる。
- アプリから見てEvent Hubs の可用性がパーティションレベルになる(指定したパーティションが使えなくなった場合にエラーになる)※ 一貫性が維持できる(維持したい)範囲内で同一パーティションを指定すれば良いので、そのような場合の代替ロジックを用意すれば対応できるが、自前でそれをしないといけない
- キーによってはパーティション内の利用率に偏りがでる(スループットなどに影響がでる可能性がある、パーティション数を活かせない)
- パーティションID未指定時
- 順序は維持できない。
- Event Hubs の高可用性を最大限に活かせる。
- 負荷の分散を最大限に行える。
Event Hubs は Standard だとパーティション数の変更が構築後はできないので、想定スループットや上記の用途も含めて結構慎重に決定する必要があります。(データが流れてからメトリクスみて調整すればいいや的な手法がかなり面倒くさい。作り直し&再設定=その間のデータどうするんや等)
というわけで、とりあえず手段としてはわかりましたけど使い道は用量用法にあわせてご利用ください、という感じでした。コンシューマーグループやらも併せていろいろ弄ってますけど奥が深いですね。
※ Excel力が無いのと綺麗にプロットできないなぁというので割と絶望する。