uchan note

プログラミングや電子工作の話題を書きます

FIFOバッファ

この記事はプログラミング,とりわけ自作OSに興味を持った計算機科学初心者の方に向け,FIFOバッファの紹介,および具体的な実装例を紹介するものです. FIFOバッファとは何か,自作OSでの応用例,ナイーブな実装例と性能を多少考慮した改良例を紹介します.

バッファ

もともと「バッファ」は緩衝器という意味があって,衝撃を和らげる機械を意味します. 頭痛を和らげるバファリンという薬は有名ですね.

コンピュータの分野で「バッファ(buffer)」とは何かデータを貯めておく入れ物です. ここでは「データ」が具体的に何なのかはあまり関係ありません. 整数かもしれないし,テキストかもしれないし,動画像データかもしれません.

何らかのデータを生み出す処理(生産者)と利用する処理(消費者)があるのはプログラミングではとても一般的な構造です. 動画サイトのシステムを思い浮かべてください. ユーザーは動画を好きなときに投稿し,それがサーバー側でエンコードされ,動画として公開されます. 動画を受信してエンコードする,というのを最も素直に実装すると次のような構造のプログラムになるでしょう.

while (true) {
  auto video_data = receive_video_data(); // 生産者
  auto encoded_data = encode(video_data); // 消費者
  publish(encoded_data);
}

receive_video_data により1つの動画データを受信し,その動画データを encode により公開に適する形式へとエンコードします. このプログラムでは生産者と消費者が分離されていないため分かりにくいかもしれませんが,動画データを受信する部分が生産者,エンコードする部分が消費者と考えることができます.

このプログラムの問題点は,1人が動画をアップロード,およびエンコードしている間は他の人は作業ができないということです. 散発的にアップロードされれば問題はありませんが,短時間にアップロードしようとするユーザーが集中すると待ち時間が大きく伸びます.

ここにバッファを導入すると次のように処理を分割できます.

// 生産者側
while (true) {
  auto video_data = receive_video_data();
  buffer.push(video_data);
}
// 消費者側
while (true) {
  auto video_data = buffer.pop();
  auto encoded_data = encode(video_data);
  publish(encoded_data);
}

生産者側では受信した動画データをバッファに追加(push)します.消費者側ではバッファから1つずつ動画データを取得(pop)してエンコード処理したのち,動画を公開します. このようにバッファ(緩衝器)を間に挟むことで,急に沢山のアップロードが行われた場合でもユーザーはそれほど待たずにアップロードを完了させることができます. ユーザーはアップロード完了後,エンコードがそのうち終わることを期待して他の作業に取り掛かれます.

FIFO

FIFOはFirst In, First Outの略で,日本語では「先入れ先出し」となります. 最初に入れたデータが最初に取り出される,という意味です. バッファなどの「複数のデータを入れておくデータ構造」の性質を表します.

同様の言葉にFILO(First In, Last Out)もあります.先入れ後出しという言葉通り,最初に入れたデータが最後に取り出されます.LIFO(Last In, First Out)も同じ意味の言葉です. これはちょっと分かりにくいかもしれません. 机の上に本を1冊ずつ積む様子を考えるといいでしょう. 本を積むときは上に積んでいきますし,取り出すときも普通は上から順に取ります. 一番最初に机に置いた本は,一番最後まで机に置いたままになりますね.

FIFOやFILOには別名があります. FIFOがキュー,FILOがスタックです.

蛇足ですが,FIFOでもFILOでもないデータ構造もあります. 例えば優先度付きキューは,取り出そうとしたときに存在するデータの中で最も優先度の高いものが取り出されるデータ構造です. 動画エンコードシステムの例では,例えば最も高いお金を払っているユーザーの動画を最初に処理したいとか,最もサイズが小さい動画から順にエンコードしたい,などが考えられます.

話を戻します. FIFOやFILOといった,データを1つずつ追加したり取り出す操作には共通の名前が付いています. 追加操作がプッシュ(push),取得操作がポップ(pop)です. プッシュとポップの実際の挙動は,対象のバッファがFIFO型なのかFILO型なのかで異なりますが,データを1つ追加したり,取得したりする,というのは共通しています.

自作OSにおけるFIFO

OSの構成部品としてFIFO型のバッファが活躍します. 「30日でできる!OS自作入門」では,割り込み処理で発生したデータ,例えば押されたキーのキーコードやマウスからの座標データ,タイマーのタイムアウト情報などがFIFOバッファを介してやりとりされます.

FIFOバッファを用いることで割り込み処理を短時間に終わらせ,後でゆっくりと優先度の高い処理から実行するように構成できるため,OSの全体的な応答性が向上します.

FIFOバッファを実装する(配列ずらし版)

データを入れた順に取り出すFIFOバッファを,固定長配列を用いてC++で実装してみます. 2つの実装を紹介しますが,まずは分かりやすさを重視して性能を犠牲にしたバージョンです.

(ここでは実装の読みやすさのためにC++のテンプレート機能を用いませんでした. このような汎用的なデータ構造を書く際はテンプレート機能を使って要素の型や容量を指定可能にすることが多いでしょう)

#include <iostream>
#include <array>

class ShiftArrayQueue {
 public:
  static const int kCapacity = 32;

  void Push(int value) {
    if (size_ == kCapacity) {
      return;
    }
    data_[size_] = value;
    ++size_;
  }

  int Pop() {
    if (size_ == 0) {
      return 0;
    }
    int result = data_[0];
    --size_;
    for (int i = 0; i < size_; ++i) {
      data_[i] = data_[i + 1];
    }
    return result;
  }

 private:
  std::array<int, kCapacity> data_{};
  int size_{0};
};

using namespace std;
int main(int argc, char** argv) {
  ShiftArrayQueue q;
  q.Push(3);
  q.Push(5);
  cout << q.Pop() << endl; // 3
  cout << q.Pop() << endl; // 5
  cout << q.Pop() << endl; // 0
}

ShiftArrayQueue クラスがFIFOバッファの実装です. Push メソッドによりデータを末尾に追加し,Pop メソッドにより先頭のデータを取得します. データの型はとりあえず int としていますが,data_ 配列の要素型を変えることで他のデータ型に対応可能です.

Push によるデータの追加は配列 data_ の末尾にどんどん追加していくだけです. 末尾の位置は size_ により表しています.

一方,Pop によるデータの取得は data_ の先頭から取り出した後,配列をずらす処理を行っています. data_ の先頭要素を削除するために,隣の要素を1つ手前にコピーするという処理を要素数分繰り返します. このずらす操作を忘れると,何回 Pop しても同じ値が表示されるということになります.

FIFOバッファを実装する(リング版)

配列をずらす操作というのは非常に効率が悪い処理です. 格納された要素数size_)が小さいうちはいいですが,大きくなるにつれてずらしの処理に時間がかかるようになります. 要素数nに処理コストが比例するという意味で「O(n)」と書いたりします.

実は,ちょっと工夫した実装をすることで要素が何個であってもほぼ一定の時間で Push および Pop が処理されるように実装できます. それはリングキューという考え方です.

リングキューの図

         -------------
rp ---> |             | 0
        |-------------|
        |             | 1
        |-------------|
wp ---> |             | 2
        |-------------|
        |             | 3
         -------------

リングキューはデータを格納する配列と,書き込み位置を管理するインデックス(wp: write pointer),読み込み位置を管理するインデックス(rp: read pointer)で構成されます. 初期状態ではwp==rpとしておき,プッシュでwpを増やし,ポップでrpを増やします. wpやrpが配列の末尾に来たら0に戻すことで,ぐるぐると際限なく回転します. 概念的には配列の先頭と末尾がくっついたリングに思えることからリングキューと呼ばれます.

class RingArrayQueue {
 public:
  static const int kCapacity = 32;

  void Push(int value) {
    if (size_ == kCapacity) {
      return;
    }
    data_[wp_] = value;
    ++size_;
    ++wp_;
    if (wp_ == kCapacity) {
      wp_ = 0;
    }
  }

  int Pop() {
    if (size_ == 0) {
      return 0;
    }
    int result = data_[rp_];
    --size_;
    ++rp_;
    if (rp_ == kCapacity) {
      rp_ = 0;
    }
    return result;
  }

 private:
  std::array<int, kCapacity> data_{};
  int size_{0}, wp_{0}, rp_{0};
};

メンバ変数として wp_ および rp_ が増えました. そして,Pop からは配列をずらす処理がなくなり,rp_ のインクリメントのみになりました. インクリメント処理にかかる時間はデータの個数に関係ありませんので,定数時間で処理できるという意味で「O(1)」と書いたりします.

まとめ

この記事ではバッファについての一般的な説明や応用例,FIFOやFILOというデータ構造が持つ特性の紹介,FIFO型のバッファの実装方法を説明しました. 記事をコンパクトにするために,網羅的には説明せずつまみ食い的な説明をしました. 興味のある方は「データ構造とアルゴリズム」の教科書などを参照してください.

実装例はとても簡易なものです. もっと工夫するなら,例えば配列が満杯だったり空だったりする際に単に無視したりデフォルト値を返すのではなく,きちんとエラーを返すと良いでしょう. マルチスレッドで用いるキューなのであれば対象のスレッドを停止させるような実装も考えられます.

最後に,いろんな型に対応させ,要素数も外側から指定できるようにしたバージョンを紹介して終わります.

#include <iostream>
#include <array>
#include <string>

template <class T, size_t N>
class ArrayQueue {
 public:
  void Push(const T& value) {
    if (size_ == N) {
      return;
    }
    data_[wp_] = value;
    ++size_;
    ++wp_;
    if (wp_ == N) {
      wp_ = 0;
    }
  }

  T Pop() {
    if (size_ == 0) {
      return T{};
    }
    T result = data_[rp_];
    --size_;
    ++rp_;
    if (rp_ == N) {
      rp_ = 0;
    }
    return result;
  }

 private:
  std::array<T, N> data_{};
  size_t size_{0}, wp_{0}, rp_{0};
};

using namespace std;
int main(int argc, char** argv) {
  ArrayQueue<string, 2> q;
  q.Push("Hello");
  q.Push(", ");
  q.Push("world");
  cout << q.Pop() << endl;
  cout << q.Pop() << endl;
  cout << q.Pop() << endl;
  q.Push("world");
  cout << q.Pop() << endl;
  cout << q.Pop() << endl;
}