uchan note

プログラミングや電子工作の話題を書きます

Kafka のトピック・パーティションの決め方を考察する

Apache Kafka は分散キューまたは pub/sub メッセージ配信システムとして使われるミドルウェアである。Kafka は pub/sub システムで一般的にある「トピック」に加えて、各トピックを分散処理のために分割する「パーティション」という機能を持っている。この記事では、どちらもメッセージを分類するという機能をもつ「トピック」と「パーティション」の使い分けを考察してみる。

トピック

トピックとは日本語で「話題」という意味がある。pub/sub システムでは、関連するメッセージを集める単位として使われる言葉らしい。筆者は pub/sub システムの一般的な概念をそんなによく知らないのではっきりしたことを言えないが、「アクセスログトピック」とか「システムログトピック」とか「気温と湿度のログトピック」というように、メッセージの種類ごとに分けるのがトピックだと考えている。

メッセージのパブリッシャ(送信側)とサブスクライバ(受信側)は、あるトピックを介してつながる。HTTP サーバは「アクセスログトピック」にログを出し、アクセスログを読みたいと思ったサブスクライバが「アクセスログトピック」に接続してログを読む。パブリッシャ、サブスクライバともに複数でも良く、「アクセスログ」というのをキーワードにしてゆるくつながる関係だ。

パーティション

これは一般の pub/sub システムにはない Kafka 独自の概念だ。パーティションはひとつのトピックを分割するもので、各パーティションはトピック内で一意の整数で区別される。例えば、「アクセスログトピック」はパーティション 0, 1, 2 を持つ、というように。パーティションの数は途中でも変えることができる。

Kafka では、サブスクライバを「コンシューマ(消費者)」と呼ぶ。論理的に一人の消費者を表す「コンシューマグループ」があり、あるコンシューマグループはひとつ以上の「コンシューマプロセス」から成り立つ。

Kafka は pub/sub システムであるので、あるトピックを複数のコンシューマグループが購読することができる。そしてここがポイントだが、あるコンシューマグループの中では、あるパーティションはどれかひとつのコンシューマプロセスに割り当てられる。ひとつのコンシューマプロセスが複数パーティションを担当することはあるが、ひとつのパーティションはひとつのコンシューマプロセスにしか割り当てられない。

パーティション割り当て戦略

パーティションは整数でしか区別されないので、アクセスログのホスト名ごとにパーティションを割り振る、というのは難しいし、おそらく Kafka では想定されていない使い方だ。そうではなく、ログの到着ごとにラウンドロビンパーティションに割り振るか、文字列のハッシュ値パーティション数で割った余りをパーティション ID とするか、もう少し頑張って、事前定義した文字列→整数の対応表にしたがって振り分けるか、という感じになるだろう。一応、文字列→整数の対応表を動的に作って共有するシステムを作れば、ホスト名ごとにパーティションを振り分けるのは不可能ではない。

ただ、現実的にはパーティション割り振りはラウンドロビンハッシュ値の余りでやるのがよさそうだ。つまり、あまり意味的な割り振りはせず、ただ負荷を分散させる目的でパーティションを使うということだ。なぜなら、Kafka 向けのサードパーティ製品は高度なパーティション割り振り戦略をサポートしていないことがあるからだ。例えば fluentd の Kafka 用プラグイン fluent-plugin-kafka では、パーティション割り振り戦略がラウンドロビンハッシュ値の余りかしか選べない。

トピックかパーティション

まとめると、意味的、またはログの形式が異なるものをトピックで分け、負荷分散の目的でパーティションを分けるのが良いだろう。負荷分散が目的なら、ログの割り振りはラウンドロビンで十分機能する。

ただ、パーティション分割を単なる負荷分散ではない目的で使わざるを得ない場合もある。

あるコンシューマプロセスが、特定の種類のログ、例えば「ホスト A」に関するアクセスログをすべて取得し、継続的に処理する必要があるとする。これは、ホスト A に関するアクセスのエラーレートを継続的に計算したい場合などに該当する。ホスト A に関するログが他のプロセスに流れてしまっては、正確なエラーレートを計算できないだろう。

この場合、ホスト A に関するアクセスログを別トピックとして切り出すか、ホスト名をハッシュ値の計算に含めるようにしなければならない。ホストごとにトピックを作っていたらトピックの数が増えすぎてしまうので、現実的にはすべてのアクセスログを単一トピックで扱い、ホスト名をハッシュ値の計算に含めることになると思う。

先ほど紹介した fluent-plugin-kafka は、パーティション振り分けのハッシュ値計算に使う文字列を指定できる。ログに partition_key という項目があればその値が使われ、ない場合には default_partition_key に設定した文字列が振り分けのキーとして使われる。もし、ホスト名とリモート IP の組み合わせでパーティションを振り分けたいという場合には、ホスト名とリモート IP を文字列結合した値を partition_key としてログに含めれば良い。

トピックは基本的に静的なものなので、アクセスログのホスト名のように、突然増えたりする場合には使いにくい。したがって、パーティションを意味的な割り振りに使う場面も出てくる。そんなとき、独自の割り振り戦略を考えるのではなく、文字列のハッシュ値の余りによってパーティションを決定するように設計するのが吉だろう。