Azure FunctionsでEvent Hubsトリガー使ってデータ処理したりするケースはよくあると思いますが、すでに稼働しててバンバンデータが流れるEvent Hubsに新しくFunctionsで処理を追加したい、とかのケースでそのままだと初回起動時にえらいことになります(よね?)。※ パーティション内に保持されてるデータを最初から読み出そうとするので
だいたいそういうケースだとこれまでのデータを処理する必要はないのでパーティション内の最後のデータから処理を開始したりしたいわけですが、トリガー単位ではこの辺りを制御できません。代わりにFunctions全体で挙動を指定できるようにはなってるのでそちらで行います。(トリガーとしては無理だけどイベントプロセッサホストとしては指定可能ということです)
host.json で行う
host.json で行う場合は以下のようにします。(host.jsonでの構成だとFunctionがどの言語でもOKのはず)
{
"version": "2.0",
"extensions": {
"eventHubs": {
"batchCheckpointFrequency": 5,
"eventProcessorOptions": {
"maxBatchSize": 256,
"prefetchCount": 512
},
"initialOffsetOptions": {
"type": "fromEnd",
"enqueuedTimeUtc": ""
}
}
}
}
initialOffsetOptions オプションでいろいろ指定できます。指定できるtypeは fromStart , fromEnd , fromEnqueuedTime の3種類。 fromStartが既定かな。fromEndはFunction実行後にキューに入ったデータから処理します。fromEnqueuedTimeはenqueuedTimeUtcで指定した時刻以降のデータが対象。enqueuedTimeUtcはC#のDateTime.Parse()で解釈できる文字列ならOK。タイムゾーンに気を付けましょう(指定しなかったらUTC)
コードで行う(C#)
コードで行う場合、DependencyIndection を使って指定することもできます。下記みたいに Configure 内で PostConfigure に EventHubOptions を渡すことができるので、InitialOffsetProvider に希望する挙動を渡します。
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.Functions.Extensions.DependencyInjection;
using Microsoft.Azure.WebJobs.EventHubs;
using Microsoft.Extensions.DependencyInjection;
[assembly: FunctionsStartup(typeof(Test.EventHubStartup))]
namespace Test
{
public class EventHubStartup : FunctionsStartup
{
public override void Configure(IFunctionsHostBuilder builder)
{
builder.Services.PostConfigure<EventHubOptions>(o => o.EventProcessorOptions.InitialOffsetProvider = GetInitialOffset);
}
private static EventPosition GetInitialOffset(string partitionId)
{
return EventPosition.FromEnd();
}
}
}
EventPosition に FromStart() とかいろいろあるので必要なのを指定しましょう。Offset直指定もまぁできますけど使いにくいですね(パーティション毎に違うし)。
(2021/02/17追記)上記の設定しようとすると Microsoft.Azure.WebJobs.Extensions.EventHubs 4.2.0 が必要です。バージョンには注意しましょう。
初回起動以降のチェックポイント
初回起動時(チェックポイントがない状態)の挙動は上記設定に従います。初回起動後以降はEvent Hubsトリガーがデータを処理するたびにチェックポイントファイルをWebJob用のBlob Storageのazure-webjobs-eventhubコンテナーに作成・更新します(BlobはEvent Hubsのパーティション毎にできます)。中身は下記のような感じ。
{
"Offset": "188978563312",
"SequenceNumber": 2277,
"PartitionId": "0",
"Owner": "xxxxxxx-15d2-4323-8e0b-3fa5b15e384b",
"Token": "xxxxxx-70f9-4e20-b087-fcc8f2863a2d",
"Epoch": 12
}
初回以降は起動時にチェックポイント内のオフセットなどをみてそこから処理を始めます。ちなみに別Functionsに処理を移行したい(たとえば別リージョンに移したいけど処理は継続させたいとか)場合とか、チェックポイントファイルをうまくコピーしてあげれば処理を継続させることができます。なかなかシビレル挙動なのであまりしたくはないですけど。
まとめ
もうこういう時のパターンとして組み込んでしまったほうが楽ですね。ちなみにIoT Hubトリガーは実質Event Hubsトリガーなので(組み込みエンドポイントはEvent Hubsだし)同じ設定で大丈夫だと思います。
ピンバック: Microsoft.Azure.WebJobs.Extensions.EventHubs を 5.x にする | ブチザッキ