MYDB 4. ログファイルとリカバリ戦略
本章で扱うコードは backend/dm/logger と backend/dm/Recover.java にあります。
はじめに
MYDB はクラッシュ後のデータ復旧機能を提供しています。DM 層は基盤データに対する操作のたびに、ログをディスクに記録します。データベースがクラッシュした後、再起動時にログの内容をもとにデータファイルを復元し、一貫性を保証します。
ログの読み書き
ログのバイナリファイルは以下の形式で配置されています:
[XChecksum][Log1][Log2][Log3]...[LogN][BadTail]
ここで、XChecksum は 4 バイトの整数で、以降のすべてのログに対するチェックサムです。Log1 から LogN は通常のログデータで、BadTail はデータベースがクラッシュした際に書き込み途中で残った不完全なログデータであり、存在しない場合もあります。
各ログのフォーマットは以下の通りです:
[Size][Checksum][Data]
Size は 4 バイトの整数で Data 部分のバイト数を示します。Checksum はそのログのチェックサムです。
単一ログのチェックサムは、指定されたシードを用いて以下のように計算されます:
private int calChecksum(int xCheck, byte[] log) {
for (byte b : log) {
xCheck = xCheck * SEED + b;
}
return xCheck;
}
このようにして、すべてのログのチェックサムを計算し合計することで、ログファイル全体のチェックサムが得られます。
Logger はイテレーターパターンで実装されており、next()
メソッドを通じてファイルから次のログを順次読み込み、Data 部分を解析して返します。next()
の実装は主に internNext()
に依存しており、位置情報 position
は現在のログファイル読み込み位置のオフセットです:
private byte[] internNext() {
if(position + OF_DATA >= fileSize) {
return null;
}
// size を読み込む
ByteBuffer tmp = ByteBuffer.allocate(4);
fc.position(position);
fc.read(tmp);
int size = Parser.parseInt(tmp.array());
if(position + size + OF_DATA > fileSize) {
return null;
}
// checksum + data を読み込む
ByteBuffer buf = ByteBuffer.allocate(OF_DATA + size);
fc.position(position);
fc.read(buf);
byte[] log = buf.array();
// checksum を検証
int checkSum1 = calChecksum(0, Arrays.copyOfRange(log, OF_DATA, log.length));
int checkSum2 = Parser.parseInt(Arrays.copyOfRange(log, OF_CHECKSUM, OF_DATA));
if(checkSum1 != checkSum2) {
return null;
}
private byte[] internNext() {
if(position + OF_DATA >= fileSize) {
return null;
}
// size を読み込む
ByteBuffer tmp = ByteBuffer.allocate(4);
fc.position(position);
fc.read(tmp);
int size = Parser.parseInt(tmp.array());
if(position + size + OF_DATA > fileSize) {
return null;
}
// checksum + data を読み込む
ByteBuffer buf = ByteBuffer.allocate(OF_DATA + size);
fc.position(position);
fc.read(buf);
byte[] log = buf.array();
// checksum を検証
int checkSum1 = calChecksum(0, Arrays.copyOfRange(log, OF_DATA, log.length));
int checkSum2 = Parser.parseInt(Arrays.copyOfRange(log, OF_CHECKSUM, OF_DATA));
if(checkSum1 != checkSum2) {
return null;
}
position += log.length;
return log;
}
ログファイルを開く際は、まずログファイルの XChecksum を検証し、ファイル末尾に存在する可能性のある BadTail を除去します。BadTail は書き込み途中のログであるため、ファイルのチェックサムには含まれていません。BadTail を取り除くことでログファイルの一貫性が保たれます。
private void checkAndRemoveTail() {
rewind();
int xCheck = 0;
while(true) {
byte[] log = internNext();
if(log == null) break;
xCheck = calChecksum(xCheck, log);
}
if(xCheck != xChecksum) {
Panic.panic(Error.BadLogFileException);
}
// 正常なログの末尾までファイルを切り詰める
truncate(position);
rewind();
}
ログファイルへの書き込みも同様に、まずデータをログフォーマットに包み込み、ファイルに書き込んだ後、ファイルのチェックサムを更新します。チェックサム更新時にはバッファをフラッシュし、内容が確実にディスクに書き込まれるようにします。
public void log(byte[] data) {
byte[] log = wrapLog(data);
ByteBuffer buf = ByteBuffer.wrap(log);
lock.lock();
try {
fc.position(fc.size());
fc.write(buf);
} catch(IOException e) {
Panic.panic(e);
} finally {
lock.unlock();
}
updateXChecksum(log);
}
private void updateXChecksum(byte[] log) {
this.xChecksum = calChecksum(this.xChecksum, log);
fc.position(0);
fc.write(ByteBuffer.wrap(Parser.int2Byte(xChecksum)));
fc.force(false);
}
private byte[] wrapLog(byte[] data) {
byte[] checksum = Parser.int2Byte(calChecksum(0, data));
byte[] size = Parser.int2Byte(data.length);
public void log(byte[] data) {
byte[] log = wrapLog(data);
ByteBuffer buf = ByteBuffer.wrap(log);
lock.lock();
try {
fc.position(fc.size());
fc.write(buf);
} catch(IOException e) {
Panic.panic(e);
} finally {
lock.unlock();
}
updateXChecksum(log);
}
private void updateXChecksum(byte[] log) {
this.xChecksum = calChecksum(this.xChecksum, log);
fc.position(0);
fc.write(ByteBuffer.wrap(Parser.int2Byte(xChecksum)));
fc.force(false);
}
private byte[] wrapLog(byte[] data) {
byte[] checksum = Parser.int2Byte(calChecksum(0, data));
byte[] size = Parser.int2Byte(data.length);
return Bytes.concat(size, checksum, data);
}
リカバリ戦略
リカバリ戦略は NYADB2 の戦略を参考にしており、やや複雑です(個人的には頭を使います)。
DM は上位モジュールに対し、2 種類の操作を提供します。新規データ挿入(I)と既存データ更新(U)です。なぜ削除操作がないのかは VM セクションで説明します。
DM のログ戦略は非常にシンプルで、一言で言うと:
I と U 操作を行う前に、必ず対応するログ操作を先に行い、ログがディスクに書き込まれたことを保証してからデータ操作を行う。
このログ戦略により、DM はデータ操作のディスク同期を柔軟に行えます。ログがデータ操作前に確実にディスクに到達していれば、たとえデータ操作が最後までディスクに同期されずにクラッシュしても、後からログを使ってデータを復元できます。
2 種類のデータ操作に対して、DM は以下のログを記録します:
- (Ti, I, A, x):トランザクション Ti が位置 A にデータ x を挿入した
- (Ti, U, A, oldx, newx):トランザクション Ti が位置 A のデータを oldx から newx に更新した
まず並行処理を考慮しない場合、ある時点で操作しているトランザクションは一つだけです。ログは以下のように並びます:
(Ti, x, x), ..., (Ti, x, x), (Tj, x, x), ..., (Tj, x, x), (Tk, x, x), ..., (Tk, x, x)
シングルスレッド
シングルスレッドの場合、Ti、Tj、Tk のログは決して交差しません。この場合のログによるリカバリは簡単で、ログの最後のトランザクションが Ti だとすると:
- Ti より前のすべてのトランザクションのログをリドゥ(redo)する
- Ti の状態(XID ファイル)を確認し、完了済み(コミット済みまたはアボート済み)なら Ti をリドゥ、そうでなければアンドゥ(undo)する
トランザクション T のリドゥは以下の通り:
- T のすべてのログを順方向にスキャン
- 挿入操作 (Ti, I, A, x) なら x を位置 A に再挿入
- 更新操作 (Ti, U, A, oldx, newx) なら位置 A の値を newx に設定
アンドゥは以下の通り:
- T のすべてのログを逆方向にスキャン
- 挿入操作 (Ti, I, A, x) なら位置 A のデータを削除
- 更新操作 (Ti, U, A, oldx, newx) なら位置 A の値を oldx に戻す
注意点として、MYDB には実際の削除操作は存在せず、挿入操作のアンドゥは該当データの有効フラグを無効にするだけです。削除に関する議論は VM セクションで行います。
マルチスレッド
上記の操作で MYDB はシングルスレッド環境でのリカバリを保証できます。ではマルチスレッドの場合はどうでしょうか?以下の 2 つのケースを考えます。
1 つ目:
T1 begin
T2 begin
T2 U(x)
T1 R(x)
...
T1 commit
MYDB break down
システムクラッシュ時、T2 はまだアクティブ状態です。データベース再起動時にリカバリ処理を行うと、T2 はアンドゥされ影響が消えます。しかし T1 は T2 の更新値を読み込んでおり、T2 がアンドゥされるなら T1 もアンドゥされるべきです。これがカスケードロールバックです。しかし T1 はすでにコミット済みであり、コミット済みトランザクションの影響は永続化されるべきです。ここに矛盾が生じます。したがって以下を保証する必要があります:
規則 1:進行中のトランザクションは、他の未コミットトランザクションのデータを一切読み取らない。
2 つ目のケース(x の初期値は 0 とする):
T1 begin
T2 begin
T1 set x = x+1 // ログは (T1, U, A, 0, 1)
T2 set x = x+1 // ログは (T2, U, A, 1, 2)
T2 commit
MYDB break down
クラッシュ時、T1 はまだアクティブです。再起動後のリカバリで、T1 はアンドゥされ、T2 はリドゥされます。しかしアンドゥとリドゥの順序に関わらず、x の最終値は 0 か 2 のどちらかになり、これは誤りです。
この問題の根本原因は、ログが単純すぎて「前の状態」と「後の状態」だけを記録し、単純に「前の状態」でアンドゥ、「後の状態」でリドゥしていることにあります。この単純なログとリカバリ方式では、すべてのデータベース操作の意味をカバーできません。
解決策は 2 つあります:
- ログの種類を増やす
- データベース操作を制限する
MYDB は後者を採用し、以下を保証します:
規則 2:進行中のトランザクションは、他の未コミットトランザクションが変更または生成したデータを一切変更しない。
MYDB では VM の存在により、DM 層に渡される実際の操作列は規則 1 と規則 2 を満たすことが保証されます。VM がこれらの規則をどう保証するかは VM 層の節で説明します(VM はかなり難しいです)。これらの規則があれば、並行処理下でのログリカバリは非常にシンプルになります:
- クラッシュ時に完了済み(コミット済みまたはアボート済み)トランザクションをすべてリドゥする
- クラッシュ時に未完了(アクティブ)トランザクションをすべてアンドゥする
リカバリ後、データベースはすべての完了済みトランザクションが終了し、未完了トランザクションが開始前の状態に戻ります。
実装
まず 2 種類のログフォーマットを定義します:
private static final byte LOG_TYPE_INSERT = 0;
private static final byte LOG_TYPE_UPDATE = 1;
// updateLog:
// [LogType] [XID] [UID] [OldRaw] [NewRaw]
// insertLog:
// [LogType] [XID] [Pgno] [Offset] [Raw]
原理で述べた通り、リカバリ処理は主に 2 段階です:完了済みトランザクションのリドゥ、未完了トランザクションのアンドゥ。
private static void redoTranscations(TransactionManager tm, Logger lg, PageCache pc) {
lg.rewind();
while(true) {
byte[] log = lg.next();
if(log == null) break;
if(isInsertLog(log)) {
InsertLogInfo li = parseInsertLog(log);
long xid = li.xid;
if(!tm.isActive(xid)) {
doInsertLog(pc, log, REDO);
}
} else {
UpdateLogInfo xi = parseUpdateLog(log);
long xid = xi.xid;
if(!tm.isActive(xid)) {
doUpdateLog(pc, log, REDO);
}
}
}
}
private static void undoTranscations(TransactionManager tm, Logger lg, PageCache pc) {
Map<Long, List<byte[]>> logCache = new HashMap<>();
lg.rewind();
while(true) {
private static void redoTranscations(TransactionManager tm, Logger lg, PageCache pc) {
lg.rewind();
while(true) {
byte[] log = lg.next();
if(log == null) break;
if(isInsertLog(log)) {
InsertLogInfo li = parseInsertLog(log);
long xid = li.xid;
if(!tm.isActive(xid)) {
doInsertLog(pc, log, REDO);
}
} else {
UpdateLogInfo xi = parseUpdateLog(log);
long xid = xi.xid;
if(!tm.isActive(xid)) {
doUpdateLog(pc, log, REDO);
}
}
}
}
private static void undoTranscations(TransactionManager tm, Logger lg, PageCache pc) {
Map<Long, List<byte[]>> logCache = new HashMap<>();
lg.rewind();
while(true) {
byte[] log = lg.next();
if(log == null) break;
if(isInsertLog(log)) {
InsertLogInfo li = parseInsertLog(log);
long xid = li.xid;
if(tm.isActive(xid)) {
if(!logCache.containsKey(xid)) {
logCache.put(xid, new ArrayList<>());
}
logCache.get(xid).add(log);
}
} else {
UpdateLogInfo xi = parseUpdateLog(log);
long xid = xi.xid;
if(tm.isActive(xid)) {
if(!logCache.containsKey(xid)) {
logCache.put(xid, new ArrayList<>());
}
logCache.get(xid).add(log);
}
}
}
// すべてのアクティブログを逆順にアンドゥ
for(Entry<Long, List<byte[]>> entry : logCache.entrySet()) {
List<byte[]> logs = entry.getValue();
for (int i = logs.size()-1; i >= 0; i --) {
byte[] log = logs.get(i);
if(isInsertLog(log)) {
doInsertLog(pc, log, UNDO);
} else {
doUpdateLog(pc, log, UNDO);
}
}
tm.abort(entry.getKey());
}
}
updateLog と insertLog のリドゥ・アンドゥ処理は、以下のように一つのメソッドにまとめて実装しています:
private static void doUpdateLog(PageCache pc, byte[] log, int flag) {
int pgno;
short offset;
byte[] raw;
if(flag == REDO) {
UpdateLogInfo xi = parseUpdateLog(log);
pgno = xi.pgno;
offset = xi.offset;
raw = xi.newRaw;
} else {
UpdateLogInfo xi = parseUpdateLog(log);
pgno = xi.pgno;
offset = xi.offset;
raw = xi.oldRaw;
}
Page pg = null;
try {
pg = pc.getPage(pgno);
} catch (Exception e) {
Panic.panic(e);
}
try {
PageX.recoverUpdate(pg, raw, offset);
} finally {
pg.release();
private static void doUpdateLog(PageCache pc, byte[] log, int flag) {
int pgno;
short offset;
byte[] raw;
if(flag == REDO) {
UpdateLogInfo xi = parseUpdateLog(log);
pgno = xi.pgno;
offset = xi.offset;
raw = xi.newRaw;
} else {
UpdateLogInfo xi = parseUpdateLog(log);
pgno = xi.pgno;
offset = xi.offset;
raw = xi.oldRaw;
}
Page pg = null;
try {
pg = pc.getPage(pgno);
} catch (Exception e) {
Panic.panic(e);
}
try {
PageX.recoverUpdate(pg, raw, offset);
} finally {
pg.release();
}
}
private static void doInsertLog(PageCache pc, byte[] log, int flag) {
InsertLogInfo li = parseInsertLog(log);
Page pg = null;
try {
pg = pc.getPage(li.pgno);
} catch(Exception e) {
Panic.panic(e);
}
try {
if(flag == UNDO) {
DataItem.setDataItemRawInvalid(li.raw);
}
PageX.recoverInsert(pg, li.raw, li.offset);
} finally {
pg.release();
}
}
注意すべきは、doInsertLog()
の削除処理で DataItem.setDataItemRawInvalid(li.raw);
を使っている点です。DataItem は次節で説明しますが、これは該当データアイテムの有効ビットを無効に設定し、論理削除を実現しています。