(第29回)リアルタイム性について考える(その4:レディーキューの検索)

2007/12/12

あなたは 人目のお客様です.

前回は動的メモリ獲得のリアルタイム化を行ったのだけど,KOZOSの内部には ループを使用しているためにリアルタイム性がいまいち確保できていない部分が 他にもある.

たとえば割り込み処理(thread_intrvec())の終了直前には, スレッドのスケジューリングが行われる(schedule()). ここではレディーキューの検索が行われるのだが,現状で以下のようになっている.

static void schedule()
{
  int i;
  for (i = 0; i < PRI_NUM; i++) {
    if (readyque[i].head) break;
  }
  if (i == PRI_NUM) {
    /* 実行可能なスレッドが存在しないので,終了する */
    exit(0);
  }
  current = readyque[i].head;
}
つまりレディーキューの数だけループして,実行可能なスレッドがあるかどうかを 検索している.これはキューの状態によってループされる回数が変化するために ループ回数が予測できないので,リアルタイム性が無い. いや,実はそうでもなくて PRI_NUM は固定値なので,ループの最悪数をもって 最悪実行時間を見積もることはできる. よってリアルタイム性が全く無いというわけでもないのだが, schedule() は割り込みのたびに必ず呼ばれるため,このような作りになっていて リアルタイム性を唱うというのもちょっと気が引ける.というかこれでリアルタイムと 言い張るにはちょっと弱い.

で,どうするかなのだが,このような場合にリアルタイム性を確保するための 書き方がある.まあ簡単にいうと,レディーキューの状態をビットマップ状に管理し, ビットの立っている位置を高速に検索するというものだ. まあ言葉だけで説明してもわかりにくいので,まずはソースコード.

(2009/04/10 ライセンスに関する文書として,KL-01とLICENSEを追加. 詳しくは第43回を参照)

以下に修正差分を示しながら,リアルタイム性を確保する方法について説明する.

まず thread.c に対して,レディーキューの状態をビットマップ管理するための 変数として readyque_bitmap を追加.

 kz_thread threads[THREAD_NUM];
 static struct {
   kz_thread *head;
   kz_thread *tail;
 } readyque[PRI_NUM];
+static unsigned int readyque_bitmap;
 static kz_timebuf *timers;
 static kz_thread *sigcalls[SIG_NUM];
 static int debug_sockt = 0;
 sigset_t block;
 static int on_os_stack = 0;
現状では優先度の個数(PRI_NUM)が32個のため,unsigned int 型の変数ひとつで readyque_bitmap が表現できている.しかしもしも PRI_NUM が256とかになった 場合には,これではまずい.これについては後述する. まあ本来はこーいうのは,PRI_NUM が増加してビットマップが拡張されたときの ために,先を見越した(拡張のしやすい)書き方をしておくべきなのだ.しかしまあ 今回は,わかりやすさを優先してベタに書いてしまう.

次にKOZOSの起動部分に,ビットマップの初期化を追加.

 static void thread_start(kz_func func, char *name, int pri, int argc, char *argv[])
 {
   struct sigaction sa;
   stack_t sigstack;
 
   memset(threads, 0, sizeof(threads));
   memset(readyque, 0, sizeof(readyque));
   memset(sigcalls, 0, sizeof(sigcalls));
 
+  readyque_bitmap = 0;
   timers = NULL;
 
...
さらに,スレッドのレディーキューへの出し入れを行う getcurrent(), putcurrent() に対して,キューにスレッドが追加されたら readyque_bitmap のビットを立て, キューが空になったらビットを落とす処理を追加.
 static void getcurrent()
 {
   readyque[current->pri].head = current->next;
-  if (readyque[current->pri].head == NULL)
+  if (readyque[current->pri].head == NULL) {
     readyque[current->pri].tail = NULL;
+    readyque_bitmap &= ~(1 << current->pri);
+  }
   current->flags &= ~KZ_THREAD_FLAG_RUNNING;
   current->next = NULL;
 }
 static int putcurrent()
 {
   if (current->flags & KZ_THREAD_FLAG_RUNNING) {
     /* すでに有る場合は無視 */
     return -1;
   }
 
   if (readyque[current->pri].tail) {
     readyque[current->pri].tail->next = current;
   } else {
     readyque[current->pri].head = current;
   }
   readyque[current->pri].tail = current;
+  readyque_bitmap |= (1 << current->pri);
   current->flags |= KZ_THREAD_FLAG_RUNNING;
 
   return 0;
 }
まあここまでは簡単な処理の追加だけだ. ここまでの修正により,レディーキューにスレッドが存在するか否かが readyque_bitmap によってビットマップ管理されることになる. よって readyque_bitmap の中で一番最下位にあるビット(一番右にあるビット) を検索すると,その位置が「最も優先度の高いスレッドが存在するレディーキュー」 となるはずだ.

ただしそれを,for ループでぐるぐる回して

bit = 1;
for (i = 0; i < PRI_NUM; i++) {
  if (readyque_bitmap & bit)
    ...
  bit <<= 1;
}
のようにして検索してはいけない.これではループによってレディーキューを 検索するのとたいして変わらないので,以前と変わらずリアルタイム性を 確保できないからだ.これではわざわざビットマップにした意味が無い. どうにかしてループを使用せずに,固定の命令数で最下位ビットの位置を検索する 必要がある.

で,最近のCPUだと,まさにこのような用途のために「ビットサーチ命令」というのが 用意されていて,そのような命令に readyque_bitmap を渡すことで,「最も最下位に ある,立っているビットの位置」を1命令で検索することができたりする. 当然,普通にCで書いていてコンパイルしたところで,コンパイラがこのような命令を 使ってくれることは無いので,インラインアセンブラを使って, ビットサーチ命令を明示的に使うように自前で書かなければならないのだけど, これを使えば確実にリアルタイム性を確保できる.

しかしまあこれはCPU依存になるので,ここではそーいうような命令が使えなかった 場合の一般的な書き方をしよう.

static void schedule()
{
#if PRI_NUM > 32
#error ビットマップを配列化する必要あり
#endif
  unsigned int bitmap = readyque_bitmap;
  int n = 0;
  static int bitmap2num[16] = {
    -32, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0
  };

  if (!(bitmap & 0xffff)) {
    bitmap >>= 16;
    n += 16;
  }
  if (!(bitmap & 0xff)) {
    bitmap >>= 8;
    n += 8;
  }
  if (!(bitmap & 0xf)) {
    bitmap >>= 4;
    n += 4;
  }
  n += bitmap2num[bitmap & 0xf];
  if (n < 0) {
    /* 実行可能なスレッドが存在しないので,終了する */
    exit(0);
  }

  current = readyque[n].head;
}
これは Toppers のスケジューリング部分をわかりやすく噛み砕いたものなのだけど, ループが利用されていない点に注目してほしい.このため,処理にかかる時間を 固定で見積もることができる.

で,そのアルゴリズムはこうだ.

詰める際には,詰めた数をnに加えていく.こーすることで,ビットマップを log2に比例した回数で調べることができる.つまり,検索数を O(log n) にできる.

で,このまま2ビット,1ビットと詰めていってもいいのだけど, 最後の4ビットは高速化のために,配列で検索している. これが

  n += bitmap2num[bitmap & 0xf];
という部分だ.

bitmap2num[]は,以下のような値になっている.

 n      num
-----------
 0 0000 -32
 1 0001 0
 2 0010 1
 3 0011 0
 4 0100 2
 5 0101 0
 6 0110 1
 7 0111 0
 8 1000 3
 9 1001 0
10 1010 1
11 1011 0
12 1100 2
13 1101 0
14 1110 1
15 1111 0
ちょっとわかりにくいかもしれないが,与えられた4ビットの値に対して, その4ビットをビットマップ化した際の最下位ビットの位置を値としてあらかじめ 持っている.なので,4ビットまで丸め込めば,あとはbitmap2num[]を通すことで 最下位ビットを調べることができるわけだ.これも Toppers の書き方をマネしたもの.

4ビットまで丸め込んだらあとはbitmap2num[]を通すというのは,単に高速化のためだ. まあこう考えると原理的には32ビットビットマップ用のbitmap2num[]を配列として 用意すれば,始めから配列を使って一撃で検索できるわけなのだけど, それには4ギガ個の配列が必要になるのでまったく現実的でない. ということで,最初のうちは半分ずつに丸め込んでいき,適当なところで配列で 一気に検索,というしくみになっている.

ここで注意なのは,現状のKOZOSではレディーキューの個数が32個なので, ビットマップが unsigned int 型の変数ひとつに収まっているということだ. しかし優先度の数が32個というのは実は組み込みOSとしてはちょっと心もとなくて, 256個とかの優先度が必要になることもあるようだ.この場合にはビットマップを unsigned int 型の変数8個に拡張しなければならない. このため,schedule() の先頭では

#if PRI_NUM > 32
#error ビットマップを配列化する必要あり
#endif
のようにして,PRI_NUMが32を超えた場合にはエラーでコンパイルを中断するように してある.こういうときにはそれっぽくコンパイルできて動いてしまう (が,32以上の優先度がうまく動作しない)というのではなく,#error でコンパイル できないようにするのが正しい.そもそも32以上の優先度がうまく動作しないので 改造が必要なのだから,エラーによって「未対応なので改造が必要ですよ」と 教えてくれたほうが,はるかに親切だからだ. (コンパイルが通ってしまい,問題無いもんだと思って使っていたらあるとき突然 バグ発生,ということを想像してほしい.どーせ直さにゃならんのだから,早く 気づけたほうが遥かに良い)

で,まあ本来ならば優先度の個数を拡張したときにもすんなりと改造できるような (もしくは PRI_NUM を増やすだけで,他に修正する必要が無いような), 先を見越した書き方(ビットの上げ下げをマクロ化しておくなど)をしておくべき なのだけど,まあ今回は優先度の個数は32個までと限定して,わかりやすさを優先 して単純に書いてみた. schedule()内部で16,8,4ビットと半分ずつ狭めていく部分も,ほんとはループ化 (このループ化は,ループ回数を固定にできるのでリアルタイム性の問題は無い) してもっとカッコいい書き方ができると思うのだけど,まあわかりやすくするために, ベタに書いてみた.

ちなみに優先度の個数を32よりも増やした場合なのだけど,この場合は ビットマップが unsigned int 型の変数1個では収まらなくなるので, 複数の変数に分ける,つまりビットマップを配列化する必要が出てくる. たとえば優先度を1024個とかにした際には,ビットマップの個数も32個になる. この配列をループで検索すると,やはりリアルタイム性という意味ではちょっと うーむな作りになってしまう. このような場合には,ビットマップがゼロかそうでないかの状態(配列の状態)を 管理するためのビットマップを別途追加すれば,ビットマップの配列の検索も, ループを用いずに行える.ビットマップを階層構造にするわけだ.

あーあと言うの忘れたけど,今回の改良版のソースで,gdbの接続,continue, telnet 接続などは一通り行えることは確認してある.念のため.

ここまでで,徐々にそれっぽくなってきた. ていうか,リアルタイムOSっぽいつくりになってきた. 次回は割り込みの応答性とか,割り込み禁止区間とかについて考えてみたい.

うーむ,咳が止まらん...


メールは kozos(アットマーク)kozos.jp まで