Using Paxos to Build a Scalable, Consistent, and Highly Available Datastore

このエントリは、読んだことを思い出すための備忘録。

Intro.

既存のデータベースだと、手作業でテーブルを水平分割し、レプリケーションをmaster-slaveで実現している。

マスタスレーブの限界とPaxos

既存のデータベースでは、全ての更新処理をmasterに集めてコミットログに書いて、slaveに情報を渡してコミットログに書いて、slaveからmasterに書き込み終了の連絡が届いたあとで、masterはデータベース内を更新ししてコミットする。もし、masterが死んだら、速やかにslaveが受け継ぐことができる(コミットログがあるので)。だけど、この復旧シーケンスの中で、一台しかサーバが落ちなかったとしても一時的にデータベースへのアクセスが出来なくなるポイントがあることが知られている。
slaveが死んだ時に、masterが方系でトランザクションをコミットしてし、slaveが復旧する前にmasterが死ぬと、slaveとmasterの状態がずれる。master-slaveで状態を同期し続けるためには、master側のコミットを止めないといけない。
クラスタが大きくなると、二つのディスクが同時に障害にあうことがある(これは、ディスクが壊れてそれを復旧する前にもう一つ壊れるという意味。)。そうすると、レプリカが2つというのは弱いので、レプリカ3とするのが基本的なアプローチになる。レプリカが3つあるとプロダクトのオンラインアップデートも容易。レプリカ3つにすると、実装は複雑になるが、分散処理系の研究が30年程度行われてきた。
M. Pease, R. Shostak, and L. Lamport. "Reaching Agreement in the Presence of Faults," In Journal of The ACM, pages 228–234, 1980.(ちょっと、読みきれる自信がないが、読みたい)。障害の組み合わせが単純な場合だけに対応できる分散アルゴリズムは考えることができるが、どのような場合でも対応できるのは、PAXOSしかないと考えられている。PAXOSは、レプリケーション間のプロトコルであり、2F+1個のレプリカが用意されていれば、F個のレプリカで一度に障害が起きても信頼性が担保できることが証明されている。

Strong vs. Eventual Consistency

分散システム界隈でのConsistencyは、別々のレプリカの同期性の観点で用いられる(データ工学のそれとは違うという意味)。Strong Consistencyは、全てのレプリカがいかなる時も同期されているということだとすると、どうしても可用性(Availability)が担保できない。もちろん、ユーザからはセマンティクスが分かりやすいというメリットがある。このへんの話は、CAP定理として有名であり、Consistency, Availability, Patition tolerance、という三つの属性を全てもつ分散システムは構築できず、二つを満たす分散して無であれば、構築できることが示されている。

Dynamoでは、この文脈で言うところの、APを満たすシステムである。Evetual Consistencyという新たな概念を導入した。Strong Consistencyが保証されないため、ユーザがDynamoを利用すると、ある一つのキーに対して、複数のバージョンのvalueが派生する。アプリケーションは、このようなことが生じることを自覚して、対処する必要がある。そのかわり、Availabityは極めて高くなる。このような不便さを受け入れつつ、可用性を担保して欲しいというアプリケーションもあるだろう。でも、実際にはもう少しConsistencyを保証してあげたほうがよりさまざまなアプリケーションに適用しやすい。Stonebraker教授(データ工学の重鎮)は、ひとつのデータセンタを対象とした場合、そもそもネットワーク分断が起こることはまれと言えるので、CAが良いだろうという見解を示している(CA==これまでのRDBMS)。

Spinnaker

ということで、Spinnakerというプロダクトを作ります。Spinnakerは、レプリカ数3、(テーブルを)キー範囲で分割、トランザクションに対応したget/put APIの提供、Timeline Consistencyの提供(これはこのブログの別エントリで記述しました。)という特徴を持っている。PAXOSベースのプロトコルは、コミットログ書き込みとリカバリの時に利用している。PAXOSを用いることで、レプリカの過半数が分断されたネットワークの片側にあれば、可用性を維持できる。CAP定理で言えば、Spinnakerは、CAシステムのひとつと考えることができる(これは、本当かな?私の感触だとAPという感じがするけど、Timeline Consistency != Strong Consistencyだから。)。このプロダクトは、ひとつのデータセンタ向けに作成しているので。複数データセンタ間の場合は、プロトコルを変更する必要がある。

2フェーズコミット

2フェーズコミット(2PC)は、以下のような短所のため使えないと考える。ひとつのサーバが落ちたときでも、システムがアボートする。このことは、可用性の観点から受け入れられない。また、2PCを更新ごとに利用すると遅すぎる。その理由は、2つのディスクへのsyncと2つのメッセージ投げるということをやらないとならないから。three phase commitというのもあるが、これも遅い(初めて聴いた、D. Skeen. Nonblocking Commit Protocols. In SIGMOD,pages 133–142, 1981.、後で読む)。

データ工学でのレプリケーション

データ工学でのレプリケーションは、単体のデータベースのレプリカの話が主たるテーマ。初期実装の一つに、Postgres-Rがある。このシステムでは、グループコミュニケーションシステム(GCS)と呼ばれる、信頼性のあるマルチキャストを利用して、レプリカの複製等を行っている。GCSでも、合意プロトコルが実装されているがSpinnakerとはちょっと違う、なぜなら、CGSではレプリカプロトコルがコミットログやリカバリ機能の中にうもれているから。Tashkentというプロダクトの作者も同じ課題認識を持っていて、トランザクション順番やコミットログの場所を一箇所に集めることでスループットを高めた。ただし、障害耐性がない。Ceccheさんの論文は、分散ミドルウェアのレプリカ制御に関して良いサーベイになっている。Ganymedさんの論文では、シングルマスタ+キューに関するレプリカ例があげられている。ただ、既存の研究では基本的に単発の障害しか検討していない。

Dynamo, Bigtable, and PNUTとの比較

この節では、有名どろろのプロダクトと比較が記述されている。

  • Dynamo:Spinnaker(この論文のプロダクト名)は設計がシンプル(設計というより、Timeline Consistencyのセマンティクス分かりやすいということだと思う。)。特に、バージョンが衝突することがないため、ユーザはその処理を考える必要がない。
  • Bigtable:まず、強い一貫性を保ち、コミットログベースで実装されているので、書き込み時がどうしても遅い。CAP定理でいうところの、CPを選択していることもあり、マシンダウン時に一部のデータは見れなくなる(Availabiltyが他に比べて低い)。Googleのエンジニアもそのへんの弱さが分かっているので、M. K. McKusick and S. Quinlan. "GFS: Evolution onFast-Forward," ACM Queue, 7(7), 2009.で言及されている(後で読むが、Timeline Consistencyを提供できるようにGFSを改造することは不可能ではない気がする)。
  • PNUTS:ディザスタリカバリ対応のクラスタシステムで、そのコアな技術は、Yahoo Message Broker。ボトルネックは、この技術だろうが、残念ながら詳細な情報は公開されていない。

他のシステム

  • Azure:クラウドベースでSQLインターフェースを提供するが、レプリケーションの詳細は公開されていない。
  • FAWN:(私は、初めてこの名を聴いた)このプロダクトは、コモディティ製品の中でもスペックが低いマシン上での動作を検討している。

データモデルとAPI

SpinnakerのAPIやモデルは、bigtablePNUTSと似ている。テーブルと行があり、行は一意なkeyをふくんでる。行は、複数のカラムをふくんでおり、それぞれのカラムにひもづく、バージョン付きの値がある。APIは、get/put/delete/conditionalPut/ConditionalDeleteなどがある。Conditionalの意味合いは、バージョンを指定できること(バージョン付きの値と書いたが、バージョンは内部のkeyに付随しているだろう。bigtableがそうだし。そちらは、timestampと言うなだけど本質は同じ…と思ったが、違いました。Timeline Consistency用のインターフェースですね、PNUTSのそれと同じ。)。バージョンは、put時にSpinnaker内部でインクリメンタルに割り当てられる。Condition月関数は、read-modify-write(compare-and-swap)として用いることができる。get関数の第三引数はconsistentレベルを指定する。trueであれば、必ず最新バージョンを読み込む(Strong Consistency)。falseであれば、古いバージョンも読み込む(Timeline Consistency)。書いてないけど、multi-columnsバージョンのAPIもある。

ARCHITECTURE

アーキテクチャの話はするけど、記述スペースないので、物理・論理配置や、サーバマシンの追加・削除、ロードバランシングといった話は省略する(むーん、こういう所が一番おもしろんだけど、残念。)。PNUTSBigtableと同じように、テーブルはキー範囲で分割・管理される。各キー範囲ごとにレプリカをデフォルトで3つ用意する(Fig.2参照)。このレプリケーション方式は、Chained Declustringと呼ばれる手法に似ている(初めて聴いた、H. Hsiao and D. J. Dewitt. Chained Declustering: A New Availability Strategy for Multiprocessor Database Machines. In ICDE, pages 227–254, 1990.後で読む)。同じレプリカを持つサーバの集合を"cohort"と呼ぶ(直訳で、仲間という意味)。

Node Architecture

Spinnakerを構成する各ノードには、いくつかのコンポーネントがふくまえていて、thread safe。書き込みがあると、LSN(the latest sequence numberの略と思う)を各write adhead log(WAL)に書き込むレコードごとに割り当てて、書きこむ。WALは、ノードに一つのみなので、LSNは、論理番号として各サーバでインクリメンタルに増加する。その後、cohortから一定数以上のackが帰ってきたら、commitされる。コミットされるまでの間、更新情報は、commit queueというメモリ上の構造に保持される。コミットされたレコードは、 メモリ上のmemtableというソート済みマップに格納され、ある程度たまると、ディスク上に吐される。ディスク上の構造をSSTableといい、このファイルは一度書かれたら変更されないファイルです(まあ、必要されなくなったら、MajorCompactionで消えるということです)。

Zookeeper

Zookeeperを、耐障害性をもつ、分散コーディネータとして利用しています(Goolgeのchubbyですね)。Zookeeperを使うことで、アーキテクチャデザインがビックリするほどシンプルになる。分散ロックや、バリア、メンバシップなどのややこしいところを任せることができる。Zookeeperは、ボトルネックにならない。それは、Zookeeperとのメッセージのやりとりが、死活監視のみだから。

THE REPLICATION PROTOCOL

レプリケーションプロトコルについて、記述する。この話は、cohortの中で閉じる。PAXOS知っていると、見通しが良い。 付録についている。各cohortには、leaderとfollowersがいる。leader選択フェーズと、quorumフェーズがある。quorumフェーズでは、leaderが書き込みを提案して、followesが受理する。leader選択フェーズは、障害が起こったときとシステム起動時。通常必要とされるのは、quorumフェーズのみ。書き込み要求は、まずleaderに届く。leaderは、WALに追記してsyncし、commit queueに書き込みデータをpush。と同時に、followersに書き込み要求を受理するかどうか提案する。提案が受理されると、followersは、各々のWALに書いてsyncして、commit queueにpushする。で、leaderにackを返却する。leaderは、一つでもackが返却された時点で、commit queueからpopして、memtableを更新する。その後、クライアントに成功を返却する。伝統的なWALと違って、書き込みデータがログに記録すべきかどうかの区別はしない。Quorumベースのリカバリ手法は、この書き込みに対する永続性を保証する。

定期的にleaderは、非同期のcommit messageをfollowersにLSNとともに投げる。followersは、受け取ったLSNまでのcommit をペンディグしていたレコード全てに対して、memtableへの更新を行う。これは、memtableへの更新を一度にやることで、効率化を目指しているから。リカバリ時には、ここで共有されたLSNをメモリ上に持っているので、そこまでリカバリすることになる(うーんちょっと理解できてない、クライアントに一度OKを返却したら、永続性を保証するわけなので、最後に投げたLSNとleaderがクライアントにOKを投げたLSNがズレている気がするんだが。多分、私が何か勘違いしてるのだろう…多分、リカバリ時にログを見て、本当の最新をチェックするのかなと思う。)。以上のような仕組みなので、Strongly consistentで読み込み要求を行うと、leaderにクエリが飛んで、最新のレコードを得ることができる。Timeline consistentを要求すると、followersにもクエリが飛んで、古いレコードを得ることになる。さて、leaderがコミットしてから、followersがコミットするまでの期間をcommit periodと呼ぶ。この期間を小さくすることで、古いレコードを得る可能性が 小さくなる。トータルで、ひとつの書き込み要求に対して3つのログ書き込みと4つのメッセージのやりとりが必要になる。沢山のオペレーションは同時に発生しているので、Bigtableにあるようなグループコミットは有効です(これはスループット観点)。レスポンスタイムの観点で言うと、ひとつのfollowerがleaderから書き込み要求を受け取り、WALに書き込み、leaderにackを返却すればよいので、1つのログ書き込みと2つのメッセージのやりとりがクリティカルパスとなる(leaderの書き込みは、並列でしているからですね)。

Conditional Put

レプリケーション、ログ書き込み、リカバリの方針は、通常のputと変わらない。leaderは、書きこまれている最新のバージョンと要求されている書き込みのバージョンが一致すれば、書き込み処理実行。でなければ、何もせずにエラー返却。Conditional Putであっても、cohortに含まれる各サーバの状態は同じであることが保証される。これは、コミットする順番がLSNの順番に従うからである。

Follower のリカバリ

Followerのリカバリは、2つのフェーズからなる。ローカルリカバリとキャッチアップである。f.cmtとf.lstは、それぞれfollowerが最後にコミットしたLSNとログに記録されている最大のLSNとする。まずは、f.cmtまでのログを読み込み、memtableを構成します。この処理は、冪等の処理です(冪等==idempotent、何度処理しても同じ結果になることを指す、これをやってる最中に死んでもまた繰り返せば良いですね)。でも、f.cmt < LSN < f.lstを満たすようなレコードは、leaderによってperiodically commitされたものかわからない。ここから、キャッチアップフェーズになります。もし、followerのディスク故障でログが吹き飛んでいたなら、すぐにキャッチアップフェーズになります。followerは、leaderにf.cmtを送ります。すると、leaderは、f.cmt以降のコミットされたデータをfollowerに送ります。この作業の最後のレコードにおいて、leaderは一時的にwriteを止めて、followerがキャッチアップ完了するのを待ちます。followerが 再度立ち上がるまでに時間が開いて、leaderに要求されたf.cmtはSSTable上にしかないかもしれない。このことに対応するために、SSTableには、最小と最大のLSNを保持しておきます。

followerのもつログエントリの論理的な削除

leader死んでしまうと、次に選ばれたleaderは、f.cmt以降の書き込みデータがコミットされていない状態になる。followerのログに書きこまれている、f.cmt以降のWALに書きこまれたデータをtruncateすることで、この不整合に対応する(まじか、それでいいんだっけ?死んでしまったleaderがユーザに対してコミット通知した分はどうなるんだろう。)。これは、followerが誤ったローカルリカバリをしないため。
ただ、これは素直にはできない。なぜなら、ひとつのWALは複数のcohortの情報を一元的に書き込んでいるから。もし、何も考えずにtruncateすると、別のcohortのまで消してしまう。このことを解決するために、論理的なtruncateという概念を導入する。実際にtruncateしたい範囲は、f.cmt 〜 f.lstであるので、その区間をスキップすることを意味する情報をディスクに書いておく。

Leader のフェールオーバ

leaderが障害により落ちると、一時的にそのキー範囲はアクセス不可となる。で、新しいleaderが選択され、leader業務を引き継ぐ。新しいleaderは、以前のleaderがコミットしたデータを全て持っているfollowerから選ぶ。詳細は、7章。新しいleaderがコミットしていて、新しいleaderがコミットしていないデータ(f.lst \ f.cmt)をコミットするアルゴリズムは、fig.6。新しいleaderは、{f.cmt \ l.cmt }をfollowersに送信する。受け取ったfollowersはすでに持っている場合があるので、その場合は送られてきてもスルーする。新しいleaderは、コミット要求(l.cmtまで)をfollowersに出す。leaderは少なくともひとつのfollowerがackを返却するのを待つ。{l.lst \ l.cmt}について、通常のコミットプロトコルで同期する。書き込み要求を受け付ける(今わかったけど、一つでもfollowerがackを返してきたらquorumになっているんですね.)。

Leader選出

これ以降は気が向いたら...