地上の洞窟

どこにも行かず、液晶と「にらめっこ」し続ける人の物語。

Rubyで優先度付きキュー(二分ヒープ)を実装してみる

目次

前置き

配列内のもっとも小さい要素を求めるコード。

array = [0, 1, 2, 3, 4]
array.min

.minメソッドは要素内の並び順に関係なく最小値を求められますが
その代わりに全ての要素を一つ一つ比較する必要があり、
素数に比例して処理時間が長くなっていきます。
いわゆるオーダー記法で書けば O(n) というやつ。

しかし今回の場合の配列は要素が小さい順に並んでるっぽいので
このようなコードでも、最小値が求められそうです

array[0]

この場合は配列の要素数に関係なく
最小値を求める時間は O(1) で済みます。

しかし当然、より小さい値が追加されるとおかしなことになりますね。

array << -1
array[0] # => 0 お前は何を言ってるんだ

では、要素を並び替えればいいじゃないか?という発想になるかもしれませんが
配列のソートというのは O(n\ log\ n) から O(n^2) ほどの計算量が必要、
つまりは線形探索 O(n) 的に最小値を求めるより
余程時間がかかってしまう訳なんですね。

二分ヒープを使ってみよう

要素の追加・削除に最小値・最大値・最善・最悪などの「優先度」があり
それが頻繁に行われる場合は、優先度付きキューの一種、
「ヒープ」*1を使ってみるといいかもしれません。

ヒープの中でも代表的と言えそうなのが
さっきから題に上がっている「二分ヒープ」ですね。

詳しい概要などはWikipediaにあったり、
自分の作った動画で解説してたりします(動画の方は適当解説)。

ja.wikipedia.org

youtu.be

二分ヒープの速度

操作 最悪計算量 平均計算量
最小値の取得 O(1) O(1)
素数の取得 O(1) O(1)
追加 O(log\ n) O(1)
削除 O(log\ n) O(log\ n)
値の更新 O(log\ n) O(1)

追加・削除・更新など、データ構造の操作自体は
若干遅いですが、最小値の取得は O(1) の理論値です。速い。

ここで
「ちょっと待って O(n) とか O(1) とか O(log\ n) とかなんなんだ」
という所で置いてけぼりにされている人に向けて一言で解説すると

  • O() という書き方は計算量について表している
  • n は要素の数
  • log\ n の大体の意味は、要素数 n を2で割れる回数

つまり、もし n = 1,000,000 だとしたら

  • O(1) → 1 (こいつはそもそも定数)
  • O(n) → 1,000,000
  • O(log\ n) → 20

この分の計算量が処理に必要だよ、という目安になります。
(要素へのアクセス回数の目安とも言えるかも)

実装

簡単に、数値の追加・最小値の取り出し(削除)のみを実装したコードです。

class BinaryHeap
  def initialize(n = 0)
    @node = Array.new(n)
    @count = 0
  end

  def push(v)
    @count += 1
    i = @count
    while i > 1
      parent_i = i / 2
      parent = @node[parent_i]
      break if parent <= v
      @node[i] = parent
      i = parent_i
    end
    @node[i] = v
  end

  def shift
    return unless @count > 0
    result = @node[1]
    current = @node[@count]
    @node[1] = current
    @node[@count] = nil
    @count -= 1

    i = 1
    while
      left_i = i * 2
      right_i = left_i + 1
      break if left_i > @count

      left = @node[left_i]
      right = @node[right_i]

      if right && right < left
        min_i = right_i
        min = right
      else
        min_i = left_i
        min = left
      end

      break if min > current

      swap(i, min_i)
      i = min_i
    end

    return result
  end

  def swap(i, j)
    @node[i], @node[j] = @node[j], @node[i]
  end
  
  def min
    @node[1]
  end
end

変数の解説

@node
@node = [nil, 0, 3, 2, 5, 6, 9] # 要素のイメージ

配列が入った変数です。
使う時は0番を空として、1番目に最小値が来ます。

二分ヒープのデータ構造、制約というのは

  1. 一つのデータに対して最大二つの繋がりを持つ
  2. 上のデータは下のデータより比較して小さい(ソート条件による)
  3. 繋がる要素は、全体から見て上の方、かつ左に寄せる

といった感じになってます(わかりづれぇよ)

これを配列に当てはめて、一番上の要素の番号を1とすると

  • その下の左の要素は n * 2
  • その下の右の要素は n * 2 + 1
  • 下の要素から見て、上の要素は n / 2

となります。

二分ヒープのデータと配列インデックスの関係性イメージ

そのため、わざわざ以下のような、
要素を格納するためのクラスを実装する必要はないということなんですね。
余計なインスタンス生成を減らせるので非常に効率が良くなります。

class BinaryHeap
  # いらない
  class Node
    attr_accessor :parent, :left, :right
  end
end
@count

現在の要素の数を表します。
@nodeには配列がありそこから要素数が取れるじゃん!ってなりそうですが
「要素の削除時に nil を代入する」
都合上、@node.sizeを使うことはできません。そのための変数です。

メソッドの解説

push(v)

要素に数値を追加します。
イメージとしては@nodeの配列の末尾に要素を追加した後、
追加した要素が上の要素と比較して小さい場合は
上の要素と追加した要素を入れ替えていく、ということの繰り返しです。

追加した要素が下から上に入れ替わり移動していくことから、
この処理は「up-heap」と呼ばれたりするようです。

※実際には追加する要素が配置されるべき場所を
入れ替えで用意して、最後に要素を追加しています。

要素の追加イメージ
shift

@nodeから最小値を取り出し削除します。
イメージとしては@node[@count]と@node[1]、つまりは
最小値と末尾の要素(最大とは限らない)を入れ替えてから、
その下の左右に繋がる要素の内、小さい方を上の要素と入れ替えていきます。

一番下から一番上まで持ってきた要素が
再び下の方へと入れ替わっていくことから
この処理は「down-heap」と呼ばれたりもするようです。

要素の削除イメージ
swap(i, j)

指定されたインデックスにある要素を入れ替えます。
「down-heap」処理に使います。

min

ヒープの最小値を取得します。仕組み上、@node[1]に必ず最小値が入っています。

検証

  • 乱数の追加と同時に、最小値の取得

続けて、追加した乱数データを維持したまま以下のテスト
(要素数が多い状態で実行するテスト)

  • 乱数の追加と同時に、最小値を取得 そして 取得した最小値をデータから削除

それぞれ10,000回ずつ実行します。

require "benchmark"

array = nil
heap = nil

Benchmark.bm(18) do |r|
  r.report("array.min") {
    array = []
    10_000.times do
      array << rand(1000)
      array.min
    end
  }
  
  r.report("array.delete_at") {
    10_000.times do
      v = array[i = j = 0]
      s = array.size
      while (i += 1) < s
        if (m = array[i]) > v
          v = m
          j = i
        end
      end
      array.delete_at(j)
    end  
  }
  
  r.report("array.sort![0]") {
    array = []
    10_000.times { array << rand(1000); array.sort![0] }
  }
  
  r.report("array.sort!.shift") {
    10_000.times { array << rand(1000); array.sort!.shift }
  }
  
  r.report("BinaryHeap.push") {
    heap = BinaryHeap.new
    10_000.times do
      heap.push(rand(1000))
      heap.min
    end
  }
  
  r.report("BinaryHeap.shift"){
    10_000.times do
      heap.push(rand(1000))
      heap.shift
    end
  }
end

#                          user     system      total        real
# array.min            2.172000   0.000000   2.172000 (  2.179804)
# array.delete_at      1.797000   0.000000   1.797000 (  1.823975)
# array.sort![0]       2.140000   0.000000   2.140000 (  2.152906)
# array.sort!.shift    4.250000   0.000000   4.250000 (  4.254257)
# BinaryHeap.push      0.016000   0.000000   0.016000 (  0.004482)
# BinaryHeap.shift     0.031000   0.000000   0.031000 (  0.040346)

比較対象がろくでもないコード(ぉぃ)ですが
二分ヒープがぶっちぎりで速いことが分かると思います。

機能追加

そのままでも二分ヒープはバチクソ速いですが
「最小値を取り出して、新たに値を追加する」という
一連の流れにはロスがあります。

ここで以下のコードを追加します。

class BinaryHeap
  def shift_change(target_v)
    return unless @count > 0
    result = @node[1]
    @node[1] = target_v
    
    i = 1
    while
      left_i = i * 2
      right_i = left_i + 1
      break if left_i > @count

      left = @node[left_i]
      right = @node[right_i]

      if right && right < left
        min_i = right_i
        min = right
      else
        min_i = left_i
        min = left
      end

      break if min > target_v

      swap(i, min_i)
      i = min_i
    end

    return result
  end
end

このコードは「最小値の削除・値の追加」を
「値の変更」という一つの処理によって実現させています。

これでどれくらい速くなるか見てみましょう→

require "benchmark"

array = nil
heap = nil

Benchmark.bm(22) do |r|
  heap = BinaryHeap.new
  10_000.times { heap.push(rand(1000)) }
  
  r.report("BinaryHeap.shift"){
    10_000.times do
      heap.push(rand(1000))
      heap.shift
    end
  }
  
  heap = BinaryHeap.new
  10_000.times { heap.push(rand(1000)) }
  
  r.report("BinaryHeap.shift_change"){
    10_000.times do
      heap.shift_change(rand(1000))
    end
  }
end

#                              user     system      total        real
# BinaryHeap.shift         0.031000   0.000000   0.031000 (  0.040345)
# BinaryHeap.shift_change  0.015000   0.000000   0.015000 (  0.024096)

1.67倍くらい速くなっています。つよい

「値の変更」(他にも値を指定した削除など)を行うためには、変更する値が
「@node上のどこに格納されているか」を知っている必要があります。

今回は最小値を変更するので@node[1]に格納されていると分かりますが、
その他の値を変更する場合は、Hashなどを用いて
インデックスを作成・更新する必要があります。

その辺は二分ヒープを実際にどのように使うかという所にもよりますし、割愛します。
(その辺を書いた自分のコードがバグっちゃってて直すのが面倒とか言えない)

まとめ

要素が次々に追加・削除される環境での比較は二分ヒープが強い!

二分ヒープの具体的な使い道は、「A*アルゴリズム」なんかの、
探索と評価を繰り返す「最良優先探索」などに用いたりします。

実は二分ヒープの動画の前に
A*の解説動画を上げたりしていますが、そういう流れだったりします。
しかし、A*はまだしも、二分ヒープは内容としては大概の人には謎過ぎますね・・・。

youtu.be

いずれはA*の詳しい実装も解説できたらいいかな。

*1:メモリのヒープ領域とはまた別物だよ~