介绍

模型概述

xdb一共包括五个模块, 分别是

  1. Data Manager(DM)
  2. Transaction Manager(TM)
  3. Version Manager(VM)
  4. Index Manager(IM)
  5. Table Manager(TBM)

五个模块的依赖关系如下

其中TM模块为事务管理模块, 主要作用就是标记事务的状态, 并且提供接口以供其他模块进行查询

Transaction Manager

TM模块是比较简单的模块, 主要就是标记每个事务的状态以供其他模块查询

每一个事务都有三种状态

  1. active 正在进行, 尚未结束
  2. committed 已提交
  3. aborted 已撤销(回滚)

XID文件

每一个事务都有一个xid, 这个xid唯一标识了这个事务, xid的标号从1开始, 依次递增, 不重复

规定xid为0的事务为超级事务(Super Transaction), 当一些操作想在没有申请事务的情况下进行, 那么可以将操作的 XID 设置为 0, XID 为 0 的事务的状态永远是 committed

XID文件就是TM模块中记录事务状态的文件, 它为每个事务分配1字节的空间来记录状态, 并且在头部还保存了一个8字节的数字来记录事务的个数

事务xid的状态在XID文件中的位置在(xid-1)+8字节处

对外接口

TM模块中对其他模块提供了接口来查询事务状态

1
2
3
4
5
6
7
8
9
public interface TransactionManager {
long begin(); // 开启一个新事务
void commit(long xid); // 提交一个事务
void abort(long xid); // 取消一个事务
boolean isActive(long xid); // 查询一个事务的状态是否是正在进行的状态
boolean isCommitted(long xid); // 查询一个事务的状态是否是已提交
boolean isAborted(long xid); // 查询一个事务的状态是否是已取消
void close(); // 关闭TM
}

TransactionManagerImpl

定义常量

1
2
3
4
5
6
7
8
9
10
11
12
// 超级事务, 永远为committed状态
public static final long SUPER_XID = 0;
// XID文件头长度
static final int LEN_XID_HEADER_LENGTH = 8;
// XID 文件后缀
static final String XID_SUFFIX = ".xid";
// 每个事务的占用长度
private static final int XID_FIELD_SIZE = 1;
// 事务的三种状态
private static final byte FIELD_TRAN_ACTIVE = 0;
private static final byte FIELD_TRAN_COMMITTED = 1;
private static final byte FIELD_TRAN_ABORTED = 2;

构造方法

所有的文件读写都采用了 NIO 方式的 FileChannel

1
2
3
4
5
6
TransactionManagerImpl(RandomAccessFile file, FileChannel fc) {
this.file = file;
this.fc = fc;
counterLock = new ReentrantLock();
checkXIDCounter();
}

检查XID文件

在构造方法创建了TransactionManager后需要对XID文件进行校验, 确保XID是合法的文件

校验逻辑: 读取前8个字节(事务个数, 每个事务占1个字节)作为xidCounter, 根据xidCounter的值可以知道有多少个事务, 事务的个数+8字节, 判断这个数值与文件的长度是否相同, 如果不同则说明XID文件不合法, 直接终止程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 检查XID文件是否合法
* 读取XID_FILE_HEADER中的xidCounter, 根据它计算文件的理论长度, 对比实际长度
*/
private void checkXIDCounter() {
long fileLen = 0;
try {
fileLen = file.length();
} catch (IOException e1) {
Panic.panic(Error.BadXIDFileException);
}
if (fileLen < LEN_XID_HEADER_LENGTH) {
Panic.panic(Error.BadXIDFileException);
}

ByteBuffer buf = ByteBuffer.allocate(LEN_XID_HEADER_LENGTH);
try {
fc.position(0);
fc.read(buf);
} catch (IOException e) {
Panic.panic(e);
}
this.xidCounter = Parser.parseLong(buf.array());
long end = getXidPosition(this.xidCounter + 1);
if (end != fileLen) {
Panic.panic(Error.BadXIDFileException);
}
}

// 根据事务xid取得其在xid文件中对应的位置
private long getXidPosition(long xid) {
return LEN_XID_HEADER_LENGTH + (xid - 1) * XID_FIELD_SIZE;
}

接口方法

begin()

begin()方法会开始一个事务, 首先把xidCounter+1对应的事务的状态设置为active, 然后更新文件头(值加1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Override
// 开始一个事务, 并返回XID
public long begin() {
counterLock.lock();
try {
long xid = xidCounter + 1;
updateXID(xid, FIELD_TRAN_ACTIVE);
incrXIDCounter();
return xid;
} finally {
counterLock.unlock();
}
}

// 更新xid事务的状态为status
private void updateXID(long xid, byte status) {
long offset = getXidPosition(xid);
byte[] tmp = new byte[XID_FIELD_SIZE];
tmp[0] = status;
ByteBuffer buf = ByteBuffer.wrap(tmp);
try {
fc.position(offset);
fc.write(buf);
} catch (IOException e) {
Panic.panic(e);
}
try {
fc.force(false);
} catch (IOException e) {
Panic.panic(e);
}
}

// 将XID加一, 并更新XID Header
private void incrXIDCounter() {
xidCounter++;
ByteBuffer buf = ByteBuffer.wrap(Parser.long2Byte(xidCounter));
try {
fc.position(0);
fc.write(buf);
} catch (IOException e) {
Panic.panic(e);
}
try {
fc.force(false);
} catch (IOException e) {
Panic.panic(e);
}
}

commit()

1
2
3
4
5
@Override
// 提交XID事务
public void commit(long xid) {
updateXID(xid, FIELD_TRAN_COMMITTED);
}

abort()

1
2
3
4
5
@Override
// 回滚XID事务
public void abort(long xid) {
updateXID(xid, FIELD_TRAN_ABORTED);
}

isActive()

isActive(), isCommitted(), isAborted()都是检查一个xid的状态, 使用一个通用的checkXID()方法解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 检测XID事务是否处于status状态
private boolean checkXID(long xid, byte status) {
long offset = getXidPosition(xid);
ByteBuffer buf = ByteBuffer.wrap(new byte[XID_FIELD_SIZE]);
try {
fc.position(offset);
fc.read(buf);
} catch (IOException e) {
Panic.panic(e);
}
return buf.array()[0] == status;
}

@Override
public boolean isActive(long xid) {
if (xid == SUPER_XID) {
return false;
}
return checkXID(xid, FIELD_TRAN_ACTIVE);
}

@Override
public boolean isCommitted(long xid) {
if (xid == SUPER_XID) {
return true;
}
return checkXID(xid, FIELD_TRAN_COMMITTED);
}

@Override
public boolean isAborted(long xid) {
if (xid == SUPER_XID) {
return false;
}
return checkXID(xid, FIELD_TRAN_ABORTED);
}

close()

1
2
3
4
5
6
7
8
9
@Override
public void close() {
try {
fc.close();
file.close();
} catch (IOException e) {
Panic.panic(e);
}
}

创建,打开XID文件

使用两个静态方法create()open()来进行XID文件的创建和打开

create()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static TransactionManagerImpl create(String path) {
File f = new File(path + TransactionManagerImpl.XID_SUFFIX);
try {
if (!f.createNewFile()) {
Panic.panic(Error.FileExistsException);
}
} catch (Exception e) {
Panic.panic(e);
}
if (!f.canRead() || !f.canWrite()) {
Panic.panic(Error.FileCannotRWException);
}

FileChannel fc = null;
RandomAccessFile raf = null;
try {
raf = new RandomAccessFile(f, "rw");
fc = raf.getChannel();
} catch (FileNotFoundException e) {
Panic.panic(e);
}

// 写空XID文件头
ByteBuffer buf = ByteBuffer.wrap(new byte[TransactionManagerImpl.LEN_XID_HEADER_LENGTH]);
try {
fc.position(0);
fc.write(buf);
} catch (IOException e) {
Panic.panic(e);
}

return new TransactionManagerImpl(raf, fc);
}

open()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static TransactionManagerImpl open(String path) {
File f = new File(path + TransactionManagerImpl.XID_SUFFIX);
if (!f.exists()) {
Panic.panic(Error.FileNotExistsException);
}
if (!f.canRead() || !f.canWrite()) {
Panic.panic(Error.FileCannotRWException);
}

FileChannel fc = null;
RandomAccessFile raf = null;
try {
raf = new RandomAccessFile(f, "rw");
fc = raf.getChannel();
} catch (FileNotFoundException e) {
Panic.panic(e);
}

return new TransactionManagerImpl(raf, fc);
}

Test

创建testXid.xid文件后执行了3次begin()(相当于创建了3个事务), 然后把xid为3事务提交后回滚

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TransactionManagerTest {
@Test
public void testCreate() {
// TransactionManagerImpl tmger = TransactionManager.create("testXid");
TransactionManagerImpl tmger = TransactionManager.open("testXid");
// tmger.begin();

System.out.println(tmger.xidCounter);
tmger.commit(3);
tmger.abort(3);
tmger.close();
}
}

此时查看XID文件

可以看出有3个事务, 状态分别是0, 0, 2, 对应了active, active, aborted

Data Manager

介绍

Data Manager主要的职责有

  1. 分页管理DB文件并进行缓存
  2. 管理日志文件, 保证发生错误时能够根据日志进行数据恢复
  3. 抽象DB文件为DataItem供上层模块使用, 并提供缓存

总体来时DM就是上层模块与文件系统之间的一个抽象层, DM从文件系统读写的文件并封装供上层模块使用, 并且提供日志功能

无论是向上还是向下, DM都提供了一个缓存的功能, 用内存操作来保证效率

引用计数缓存框架

为什么使用引用计数缓存而不使用LRU?

是因为LRU缓存策略中资源的驱逐是不可控的, 如果某一时刻LRU中的某一资源被驱逐, 而此时上层模块又需要将该资源写回到数据源中, 此时发现在缓存中找不到该资源, 在这种情况下: 是否还要做写回数据源的操作?

  1. 不回源. 由于没法确定缓存被驱逐的时间, 也没法确定缓存被驱逐后是否被修改, 这样是极其不安全的
  2. 回源. 如果数据线没有被修改, 那么就是一次无效的回源
  3. 放回到缓存中. 由于此时缓存已满, 放回到缓存中势必会驱逐掉其他的资源, 又会引起相同的问题

引用计数策略正好解决了这个问题, 只有上层模块主动释放引用, 缓存在确保没有模块在使用这个资源了, 才会去驱逐资源

而当缓存满的时候, 引用计数策略无法自动释放缓存, 此时应该直接报错(类似JVM的OOM)

AbstractCache<T>

抽象方法

AbstractCache<T>抽象类中有两个抽象方法, 实现类去实现具体操作

1
2
3
4
5
6
7
8
9
10
11
12
13
`	/**
* 当资源不在缓存时的获取行为
* @param key
* @return
* @throws Exception
*/
protected abstract T getForCache(long key) throws Exception;

/**
* 当资源被驱逐时的写回行为
* @param obj
*/
protected abstract void releaseForCache(T obj);

引用计数

为了用对多线程环境, 需要记录一个资源是否正在获取中

1
2
3
private HashMap<Long, T> cache;                     // 实际缓存的数据
private HashMap<Long, Integer> references; // 资源的引用个数
private HashMap<Long, Boolean> getting; // 正在被获取的资源

初始化

1
2
3
4
5
6
7
8
9
10
11
private int maxResource;                            // 缓存的最大缓存资源数
private int count = 0; // 缓存中元素的个数
private Lock lock;

public AbstractCache(int maxResource) {
this.maxResource = maxResource;
cache = new HashMap<>();
references = new HashMap<>();
getting = new HashMap<>();
lock = new ReentrantLock();
}

get()获取资源

使用get()方法来获取资源, 如果缓存中存在该资源, 则直接从缓存中返回, 否则则从数据源中获取, 并将其加入到缓存中, 再返回

在获取资源时首先要检查资源是否被占用, 如果被占用了, 则进入循环等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected T get(long key) throws Exception {
while (true) {
lock.lock();
// 请求的资源正在被其他线程获取, 重复等待
if (getting.containsKey(key)) {
lock.unlock();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
continue;
}
continue;
}

...

}
}

当资源不被占用时, 需要判断资源是否存在在缓存中, 如果存在, 则直接返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected T get(long key) throws Exception {
while (true) {
...
// 如果资源在缓存中, 则直接返回
if (cache.containsKey(key)) {
T obj = cache.get(key);
references.put(key, references.get(key) + 1);
lock.unlock();
return obj;
}
...

}
}

如果缓存中不存在, 准备从数据源获取资源, 将资源占用, getting设置为true, 缓存数+1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected T get(long key) throws Exception {
while (true) {
...
// 缓存中没有时, 尝试获取该资源, 如果缓存已满, 则抛出异常
if (maxResource > 0 && count == maxResource) {
lock.unlock();
throw Error.CacheFullException;
}
// 设置getting, 缓存数+1, 结束循环
count++;
getting.put(key, true);
lock.unlock();
break;
}
}

从数据源获取资源并加入到缓存中去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected T get(long key) throws Exception {
...

// 从数据源获取资源obj
T obj = null;
try {
obj = getForCache(key);
} catch (Exception e) {
lock.lock();
count--;
getting.remove(key);
lock.unlock();
throw e;
}

// 将资源obj放入缓存中, 引用数设置为1
lock.lock();
getting.remove(key);
cache.put(key, obj);
references.put(key, 1);
lock.unlock();

return obj;
}

release()释放缓存

强行释放一个缓存, 将缓存引用数references减 1, 如果减到 0, 就把相关的缓存删除并写回数据源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected void release(long key) {
lock.lock();
try {
int ref = references.get(key) - 1;
if (ref == 0) {
T obj = cache.get(key);
releaseForCache(obj);
references.remove(key);
cache.remove(key);
count--;
} else {
references.put(key, ref);
}
} finally {
lock.unlock();
}
}

close()

关闭缓存, 写回所有资源

1
2
3
4
5
6
7
8
9
10
11
12
13
protected void close() {
lock.lock();
try {
Set<Long> keys = cache.keySet();
for (long key : keys) {
release(key);
references.remove(key);
cache.remove(key);
}
} finally {
lock.unlock();
}
}

共享内存数组

SubArray 类, 来(松散地)规定这个数组的可使用范围

1
2
3
4
5
6
7
8
9
10
11
public class SubArray {
public byte[] raw;
public int start;
public int end;

public SubArray(byte[] raw, int start, int end) {
this.raw = raw;
this.start = start;
this.end = end;
}
}

数据页面的缓存与管理

DM模块向下读写文件系统, 并将其抽象为数据页面, 并将页面进行缓存

数据页的默认大小设定为8KB

数据页面结构

定义Page与PageImpl

Page

1
2
3
4
5
6
7
8
9
10
public interface Page {
public static final int PAGE_SIZE = 1 << 13; // 2 ^ 13 8KB
void lock();
void unlock();
void release();
void setDirty(boolean dirty);
boolean isDirty();
int getPageNumber();
byte[] getData();
}

PageImpl

1
2
3
4
5
6
7
public class PageImpl implements Page {
private int pageNumber;
private byte[] data;
private boolean dirty;
private Lock lock;
...
}

pageNumber: 页号, 从第一页开始

data: 实际存储的数据

dirty: 是否是脏页面, 在缓存被驱逐时, 脏页面需要被写回文件系统

页面缓存

接口

1
2
3
4
5
6
7
8
9
10
public interface PageCache {
public static final int PAGE_SIZE = 1 << 13; // 2 ^ 13 8KB
int newPage(byte[] initData);
Page getPage(int pgno) throws Exception;
void close();
void release(Page page);
void truncateByBgno(int maxPgno);
int getPageNumber();
void flushPage(Page pg);
}

实现类

页面缓存的实现类需要继承缓存框架, 并实现上面的接口

继承缓存框架, 实现getForCache()releaseForCache()方法

初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class PageCacheImpl extends AbstractCache<Page> implements PageCache {
public static final String DB_SUFFIX = ".db";
private static final int MEM_MIN_LIM = 10;
private RandomAccessFile file;
private FileChannel fc;
private Lock fileLock;

private AtomicInteger pageNumbers;

PageCacheImpl(RandomAccessFile file, FileChannel fileChannel, int maxResource) {
super(maxResource);
if (maxResource < MEM_MIN_LIM) {
Panic.panic(Error.MemTooSmallException);
}
long length = 0;
try {
length = file.length();
} catch (IOException e) {
Panic.panic(e);
}
this.file = file;
this.fc = fileChannel;
this.fileLock = new ReentrantLock();
this.pageNumbers = new AtomicInteger((int) length / PAGE_SIZE);
}
...
}
getForCache()

getForCache直接从文件中读取, 封装为Page即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class PageCacheImpl extends AbstractCache<Page> implements PageCache {
...
/**
* 根据pageNumber从数据库文件中读取页数据, 并包裹成Page
* @param key
* @return
* @throws Exception
*/
@Override
protected Page getForCache(long key) throws Exception {
int pgno = (int) key;
long offset = PageCacheImpl.pageOffset(pgno);

ByteBuffer buf = ByteBuffer.allocate(PAGE_SIZE);
fileLock.lock();
try {
fc.position(offset);
fc.read(buf);
} catch (IOException e) {
Panic.panic(e);
}
fileLock.unlock();
return new PageImpl(pgno, buf.array(), this);
}

private static long pageOffset(int pgno) {
// 页号从1开始
return (pgno - 1) * PAGE_SIZE;
}
...
}
releaseForCache

releaseForCache()驱逐页面时需要判断是否是脏页面来决定是否直接驱逐

flush()方法为写入文件系统, 如果是脏页面, 则执行flush()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class PageCacheImpl extends AbstractCache<Page> implements PageCache {
...
@Override
protected void releaseForCache(Page pg) {
if (pg.isDirty()) {
flush(pg);
pg.setDirty(false);
}
}

private void flush(Page pg) {
int pgno = pg.getPageNumber();
long offset = pageOffset(pgno);

fileLock.lock();
try {
ByteBuffer buf = ByteBuffer.wrap(pg.getData());
fc.position(offset);
fc.write(buf);
fc.force(false);
} catch (IOException e) {
Panic.panic(e);
} finally {
fileLock.unlock();
}
}
...
}
其他方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class PageCacheImpl extends AbstractCache<Page> implements PageCache {
...
@Override
public int newPage(byte[] initData) {
int pgno = pageNumbers.incrementAndGet();
Page pg = new PageImpl(pgno, initData, null);
flush(pg);
return pgno;
}

@Override
public Page getPage(int pgno) throws Exception {
return get((long) pgno);
}

@Override
public void release(Page page) {
release((long) page.getPageNumber());
}

@Override
public void flushPage(Page pg) {
flush(pg);
}

@Override
public void truncateByBgno(int maxPgno) {
long size = pageOffset(maxPgno + 1);
try {
file.setLength(size);
} catch (IOException e) {
Panic.panic(e);
}
pageNumbers.set(maxPgno);
}

@Override
public void close() {
super.close();
try {
fc.close();
file.close();
} catch (IOException e) {
Panic.panic(e);
}
}

@Override
public int getPageNumber() {
return pageNumbers.intValue();
}
}

数据页面管理

第一页

数据库页面的第一页不用来存放主要数据, xdb会在第一页记录一个随机数作校验功能

xdb在启动时会在100-107位置写入一个随机数, 关闭时将该数copy到108-115位置, 等到下次启动时比较这两处的数字即可得知数据库是否是正常关闭的

OF: offset

VC: Valid Check

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class PageOne {
private static final int OF_VC = 100; // offset_valid_check
private static final int LEN_VC = 8;

public static byte[] InitRaw() {
byte[] raw = new byte[Page.PAGE_SIZE];
setVcOpen(raw);
return raw;
}

public static void setVcOpen(Page pg) {
pg.setDirty(true);
setVcOpen(pg.getData());
}
// 启动时设置随机数
private static void setVcOpen(byte[] raw) {
System.arraycopy(RandomUtil.randomBytes(LEN_VC), 0, raw, OF_VC, LEN_VC);
}

public static void setVcClose(Page pg) {
pg.setDirty(true);
setVcClose(pg.getData());
}

// 关闭时拷贝随机数
private static void setVcClose(byte[] raw) {
System.arraycopy(raw, OF_VC, raw, OF_VC + LEN_VC, LEN_VC);
}

public static boolean checkVc(Page pg) {
return checkVc(pg.getData());
}

// 启动时检查合法性
private static boolean checkVc(byte[] raw) {
return Arrays.equals(Arrays.copyOfRange(raw, OF_VC, OF_VC + LEN_VC), Arrays.copyOfRange(raw, OF_VC + LEN_VC, OF_VC + 2 * LEN_VC));
}
}

其他页面

一个普通页面大小为8KB, 使用最前面的2B用来记录空闲位置的偏移量, 剩下的存储数据

[FSO][Data]
[0-1] [2-8k]

FSO: Free Space Offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class PageX {

private static final short OF_FREE = 0;
private static final short OF_DATA = 2;
public static final int MAX_FREE_SPACE = Page.PAGE_SIZE - OF_DATA;

public static byte[] initRaw() {
byte[] raw = new byte[Page.PAGE_SIZE];
setFSO(raw, OF_DATA);
return raw;
}

private static void setFSO(byte[] raw, short ofData) {
System.arraycopy(Parser.short2Byte(ofData), 0, raw, OF_FREE, OF_DATA);
}

// 获取pg的FSO
public static short getFSO(Page pg) {
return getFSO(pg.getData());
}

private static short getFSO(byte[] raw) {
return Parser.parseShort(Arrays.copyOfRange(raw, 0, 2));
}

// 将raw插入pg中, 返回插入位置
public static short insert(Page pg, byte[] raw) {
pg.setDirty(true);
short offset = getFSO(pg.getData());
System.arraycopy(raw, 0, pg.getData(), offset, raw.length);
setFSO(pg.getData(), (short) (offset + raw.length));
return offset;
}

// 获取页面的空闲空间大小
public static int getFreeSpace(Page pg) {
return Page.PAGE_SIZE - (int) getFSO(pg.getData());
}

// 将raw插入pg中的offset位置, 并将pg的offset设置为较大的offset
public static void recoverInsert(Page pg, byte[] raw, short offset) {
pg.setDirty(true);
System.arraycopy(raw, 0, pg.getData(), offset, raw.length);

short rawFSO = getFSO(pg.getData());
if (rawFSO < offset + raw.length) {
setFSO(pg.getData(), (short) (offset + raw.length));
}
}

// 将raw插入pg中的offset位置, 不更新update
public static void recoverUpdate(Page pg, byte[] raw, short offset) {
pg.setDirty(true);
System.arraycopy(raw, 0, pg.getData(), offset, raw.length);
}
}

其中的两个方法recoverInsert()recoverUpdate()用于在数据库崩溃后重新打开时, 恢复例程直接插入数据以及修改数据使用

日志管理

日志的主要作用是当数据库发生错误崩溃时恢复数据

每次插入或更新时都会进行日志的记录, 当恢复数据时只需要读取日志并redo和undo即可

日志结构

日志文件

[XCheckSum][Log1][Log2]…[LogN][BadTail]

XCheckSum用来校验整个文件, 占4个字节大小, 后面为一条条的日志, BadTail为意外没有写入成功的日志, 在恢复的时候首先要把BadTail给去掉

每条日志的结构如下

[Size][CheckSum][Data]

Size和CheckSum各占4个字节, Size标识的时Data的长度, CheckSum是该条日志的校验和

日志校验和的计算(SEED为自己设置的值, 类似于hash的盐)

1
2
3
4
5
6
7
// 根据log计算新的xCheck
private int calChecksum(int xCheck, byte[] log) {
for (byte b : log) {
xCheck = xCheck * SEED + b;
}
return xCheck;
}

所有每条日志的校验和的总和即为日志文件文件头XCheckSum

校验

每次打开日志时, 都需要对其进行校验, 并且还要移除BadTail(未写入完成的日志)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class LoggerImpl implements Logger {
...
// 检查并移除bad tail
private void checkAndRemoveTail() {
// 重新定位指针到校验和之后第一条日志的位置 void rewind(){ position = 4 };
rewind();

int xCheck = 0;
while (true) {
byte[] log = internNext();
if (log == null) {
break;
}
xCheck = calChecksum(xCheck, log);
}
if (xCheck != xChecksum) {
Panic.panic(Error.BadLogFileException);
}

try {
truncate(position);
} catch (Exception e) {
Panic.panic(e);
}
try {
// 设置文件指针(写入或读取位置)
file.seek(position);
} catch (IOException e) {
Panic.panic(e);
}
rewind();
}

// 截断文件在position, 用于截掉badtail
@Override
public void truncate(long x) throws Exception {
lock.lock();
try {
fc.truncate(x);
} finally {
lock.unlock();
}
}
...
}

日志写入

写入日志时, 先计算出该条日志的校验和以及size, 并将这些数据封装成日志格式, 再写入到日志文件里, 最后还要把日志文件头的校验和更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class LoggerImpl implements Logger {
...
// 写入log并更新checkSum
@Override
public void log(byte[] data) {
byte[] log = wrapLog(data);
ByteBuffer buf = ByteBuffer.wrap(log);
lock.lock();
try {
fc.position(fc.size());
fc.write(buf);
} catch (IOException e) {
Panic.panic(e);
} finally {
lock.unlock();
}
updateXChecksum(log);
}
// 更新校验和
private void updateXChecksum(byte[] log) {
this.xChecksum = calChecksum(this.xChecksum, log);
try {
fc.position(0);
fc.write(ByteBuffer.wrap(Parser.int2Byte(xChecksum)));
fc.force(false);
} catch (IOException e) {
Panic.panic(e);
}
}

// 封装为日志格式
private byte[] wrapLog(byte[] data) {
byte[] checksum = Parser.int2Byte(calChecksum(0, data));
byte[] size = Parser.int2Byte(data.length);
return Bytes.concat(size, checksum, data);
}
...
}

日志读取

日志的读取依靠next方法, 每执行一次next方法就会对读取到一条日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class LoggerImpl implements Logger {
...
@Override
public byte[] next() {
lock.lock();
try {
byte[] log = internNext();
if (log == null) {
return null;
}
return Arrays.copyOfRange(log, OF_DATA, log.length);
} finally {
lock.unlock();
}
}

private byte[] internNext() {
if (position + OF_DATA >= fileSize) {
return null;
}
// 读取size
ByteBuffer tmp = ByteBuffer.allocate(4);
try {
fc.position(position);
fc.read(tmp);
} catch (IOException e) {
Panic.panic(e);
}
int size = Parser.parseInt(tmp.array());
// size代表的知识data大小, OF_DATA+size得到的才是一条日志的长度
if (position + size + OF_DATA > fileSize) {
return null;
}

// 读取size+checksum+data(整条日志)
ByteBuffer buf = ByteBuffer.allocate(OF_DATA + size);
try {
fc.position(position);
fc.read(buf);
} catch (IOException e) {
Panic.panic(e);
}

// 校验 checksum
byte[] log = buf.array();
int checkSum1 = calChecksum(0, Arrays.copyOfRange(log, OF_DATA, log.length));
int checkSum2 = Parser.parseInt(Arrays.copyOfRange(log, OF_CHECKSUM, OF_DATA));
if (checkSum1 != checkSum2) {
return null;
}
position += log.length;
return log;
}
...
}

日志的恢复

读取日志, 检查每一条日志的事务状态, 如果已经完成就将该条日志记录的操作重做(redo), 否则进行撤销(undo)

redo

正序扫描日志

如果是插入操作 (Ti, I, A, x), 就将x重新插入A位置

如果是更新操作 (Ti, U, A, oldx, newx), 就将A位置的值设置为newx

undo

倒序扫描日志

如果是插入操作 (Ti, I, A, x), 就A位置的数据删除

如果是更新操作 (Ti, U, A, oldx, newx), 就将A位置的值设置为oldx

日志格式

Insert

[LogType] [XID] [Pgno] [Offset] [Raw]

1 8 4 2 字节

Update

[LogType] [XID] [UID] [OldRaw] [NewRaw]

1 8 8 字节

多线程

规定未提交事务产生的数据不会被其他事务读取和修改

恢复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class Recover {
private static final byte LOG_TYPE_INSERT = 0;
private static final byte LOG_TYPE_UPDATE = 1;
private static final int REDO = 0;
private static final int UNDO = 1;

static class InsertLogInfo {
long xid;
int pgno;
short offset;
byte[] raw;
}

static class UpdateLogInfo {
long xid;
int pgno;
short offset;
byte[] oldRaw;
byte[] newRaw;
}

public static void recover(TransactionManager tm, Logger lg, PageCache pc) {
System.out.println("Recovering...");

lg.rewind();
int maxPgno = 0;
while(true) {
byte[] log = lg.next();
if(log == null) {
break;
}
int pgno;
if(isInsertLog(log)) {
InsertLogInfo li = parseInsertLog(log);
pgno = li.pgno;
} else {
UpdateLogInfo li = parseUpdateLog(log);
pgno = li.pgno;
}
if(pgno > maxPgno) {
maxPgno = pgno;
}
}
if(maxPgno == 0) {
maxPgno = 1;
}
pc.truncateByBgno(maxPgno);
System.out.println("Truncate to " + maxPgno + " pages.");

redoTranscations(tm, lg, pc);
System.out.println("Redo Transactions Over.");

undoTranscations(tm, lg, pc);
System.out.println("Undo Transactions Over.");

System.out.println("Recovery Over.");
}
...
}

redoTranscations()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Recover {
...
private static void redoTranscations(TransactionManager tm, Logger lg, PageCache pc) {
lg.rewind();
while(true) {
byte[] log = lg.next();
if(log == null) {
break;
}
if(isInsertLog(log)) {
InsertLogInfo li = parseInsertLog(log);
long xid = li.xid;
if(!tm.isActive(xid)) {
doInsertLog(pc, log, REDO);
}
} else {
UpdateLogInfo xi = parseUpdateLog(log);
long xid = xi.xid;
if(!tm.isActive(xid)) {
doUpdateLog(pc, log, REDO);
}
}
}
}
...
}

undoTranscations()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class Recover {
...
private static void undoTranscations(TransactionManager tm, Logger lg, PageCache pc) {
Map<Long, List<byte[]>> logCache = new HashMap<>();
lg.rewind();
while(true) {
byte[] log = lg.next();
if(log == null) {
break;
}
if(isInsertLog(log)) {
InsertLogInfo li = parseInsertLog(log);
long xid = li.xid;
if(tm.isActive(xid)) {
if(!logCache.containsKey(xid)) {
logCache.put(xid, new ArrayList<>());
}
logCache.get(xid).add(log);
}
} else {
UpdateLogInfo xi = parseUpdateLog(log);
long xid = xi.xid;
if(tm.isActive(xid)) {
if(!logCache.containsKey(xid)) {
logCache.put(xid, new ArrayList<>());
}
logCache.get(xid).add(log);
}
}
}

// 对所有active log进行倒序undo
for(Map.Entry<Long, List<byte[]>> entry : logCache.entrySet()) {
List<byte[]> logs = entry.getValue();
for (int i = logs.size()-1; i >= 0; i--) {
byte[] log = logs.get(i);
if(isInsertLog(log)) {
doInsertLog(pc, log, UNDO);
} else {
doUpdateLog(pc, log, UNDO);
}
}
tm.abort(entry.getKey());
}
}
...
}

doUpdateLog()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class Recover {
...
// [LogType] [XID] [UID] [OldRaw] [NewRaw]
private static final int OF_TYPE = 0;
private static final int OF_XID = OF_TYPE + 1;
private static final int OF_UPDATE_UID = OF_XID + 8;
private static final int OF_UPDATE_RAW = OF_UPDATE_UID + 8;

public static byte[] updateLog(long xid, DataItem di) {
byte[] logType = {LOG_TYPE_UPDATE};
byte[] xidRaw = Parser.long2Byte(xid);
byte[] uidRaw = Parser.long2Byte(di.getUid());
byte[] oldRaw = di.getOldRaw();
SubArray raw = di.getRaw();
byte[] newRaw = Arrays.copyOfRange(raw.raw, raw.start, raw.end);
return Bytes.concat(logType, xidRaw, uidRaw, oldRaw, newRaw);
}

private static UpdateLogInfo parseUpdateLog(byte[] log) {
UpdateLogInfo li = new UpdateLogInfo();
li.xid = Parser.parseLong(Arrays.copyOfRange(log, OF_XID, OF_UPDATE_UID));
long uid = Parser.parseLong(Arrays.copyOfRange(log, OF_UPDATE_UID, OF_UPDATE_RAW));
li.offset = (short)(uid & ((1L << 16) - 1));
uid >>>= 32;
li.pgno = (int)(uid & ((1L << 32) - 1));
int length = (log.length - OF_UPDATE_RAW) / 2;
li.oldRaw = Arrays.copyOfRange(log, OF_UPDATE_RAW, OF_UPDATE_RAW+length);
li.newRaw = Arrays.copyOfRange(log, OF_UPDATE_RAW+length, OF_UPDATE_RAW+length*2);
return li;
}

private static void doUpdateLog(PageCache pc, byte[] log, int flag) {
int pgno;
short offset;
byte[] raw;
if(flag == REDO) {
UpdateLogInfo xi = parseUpdateLog(log);
pgno = xi.pgno;
offset = xi.offset;
raw = xi.newRaw;
} else {
UpdateLogInfo xi = parseUpdateLog(log);
pgno = xi.pgno;
offset = xi.offset;
raw = xi.oldRaw;
}
Page pg = null;
try {
pg = pc.getPage(pgno);
} catch (Exception e) {
Panic.panic(e);
}
try {
PageX.recoverUpdate(pg, raw, offset);
} finally {
pg.release();
}
}
...
}

doInsertLog()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Recover {
...
// [LogType] [XID] [Pgno] [Offset] [Raw]
private static final int OF_INSERT_PGNO = OF_XID+8;
private static final int OF_INSERT_OFFSET = OF_INSERT_PGNO+4;
private static final int OF_INSERT_RAW = OF_INSERT_OFFSET+2;

public static byte[] insertLog(long xid, Page pg, byte[] raw) {
byte[] logTypeRaw = {LOG_TYPE_INSERT};
byte[] xidRaw = Parser.long2Byte(xid);
byte[] pgnoRaw = Parser.int2Byte(pg.getPageNumber());
byte[] offsetRaw = Parser.short2Byte(PageX.getFSO(pg));
return Bytes.concat(logTypeRaw, xidRaw, pgnoRaw, offsetRaw, raw);
}

private static InsertLogInfo parseInsertLog(byte[] log) {
InsertLogInfo li = new InsertLogInfo();
li.xid = Parser.parseLong(Arrays.copyOfRange(log, OF_XID, OF_INSERT_PGNO));
li.pgno = Parser.parseInt(Arrays.copyOfRange(log, OF_INSERT_PGNO, OF_INSERT_OFFSET));
li.offset = Parser.parseShort(Arrays.copyOfRange(log, OF_INSERT_OFFSET, OF_INSERT_RAW));
li.raw = Arrays.copyOfRange(log, OF_INSERT_RAW, log.length);
return li;
}

private static void doInsertLog(PageCache pc, byte[] log, int flag) {
InsertLogInfo li = parseInsertLog(log);
Page pg = null;
try {
pg = pc.getPage(li.pgno);
} catch(Exception e) {
Panic.panic(e);
}
try {
if(flag == UNDO) {
DataItem.setDataItemRawInvalid(li.raw);
}
PageX.recoverInsert(pg, li.raw, li.offset);
} finally {
pg.release();
}
}
...
}

页面索引

页面索引的作用是缓存每一页的可用空间, 在插入数据的时候能够根据页面索引快速查找出适合插入的空闲空间

索引结构

页面索引的实现是通过一个list数组实现, 每个list里存放的时PageInfo, 包含页号和空闲空间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class PageIndex {
// 将一页划分成40个区间
private static final int INTERVALS_NO = 40;
// 单个区间大小, 使用freeSpace/THRESHOLD即可得出在第几个区间
private static final int THRESHOLD = PageCache.PAGE_SIZE / INTERVALS_NO;

private Lock lock;
// lists[i]即表示在第i个区间的页号集合
private List<PageInfo>[] lists;

public PageIndex() {
lock = new ReentrantLock();
lists = new List[INTERVALS_NO + 1];
for (int i = 0; i < INTERVALS_NO + 1; i++) {
lists[i] = new ArrayList<>();
}
}
}

PageInfo

1
2
3
4
5
6
7
8
9
public class PageInfo {
public int pgno;
public int freeSpace;

public PageInfo(int pgno, int freeSpace) {
this.pgno = pgno;
this.freeSpace = freeSpace;
}
}

添加索引

1
2
3
4
5
6
7
8
9
10
11
12
13
public class PageIndex {
...
public void add(int pgno, int freeSpace) {
lock.lock();
try {
int number = freeSpace / THRESHOLD;
lists[number].add(new PageInfo(pgno, freeSpace));
} finally {
lock.unlock();
}
}
...
}

通过索引选择页面

注意选到页面后要将该页面从当前索引中移除, 同一个页面不允许并发写, 当上层模块使用完这个页面之后需要将这个页面重新插入索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class PageIndex {
...
public PageInfo select(int spaceSize) {
lock.lock();
try {
// number相当于index, 标记一个区间的下标
int number = spaceSize / THRESHOLD;
// number下标从1开始
if(number < INTERVALS_NO) {
number++;
}

// 必须在大于等于number标记的区间里才会有合适的空间页面进行存储
while(number<= INTERVALS_NO) {
if(lists[number].size() == 0) {
number++;
continue;
}
// 选到的页面会移出, 同一个页面不允许并发写
return lists[number].remove(0);
}
return null;
} finally {
lock.unlock();
}
}
...
}

DataItem

DataItem就是DM对数据的抽象, 以供上层模块使用

DataItem结构: [ValidFlag(1B)][DataSize(2B)][Data]

ValidFlag标记了数据是否有效, 类似于逻辑删除, 将其值设为0即代表着删除该条数据

1
2
3
4
5
6
7
8
9
10
11
12
13
public class DataItemImpl implements DataItem {
static final int OF_VALID = 0;
static final int OF_SIZE = 1;
static final int OF_DATA = 3;

private SubArray raw;
private byte[] oldRaw;
private Lock rLock;
private Lock wLock;
private DataManagerImpl dm;
private long uid; // pgno + offset各4字节组成uid, 用于解析页号和偏移量
private Page pg;
}

上层模块在获取到DataItem后, 可以通过data()方法, 该方法返回的数组是数据共享的, 而不是拷贝实现的, 所以使用了SubArray

1
2
3
4
5
6
7
8
public class DataItemImpl implements DataItem {
...
@Override
public SubArray data() {
return new SubArray(raw.raw, raw.start+OF_DATA, raw.end);
}
...
}

在上层模块对DataItem进行修改时, 要先执行before()方法, 想要撤销修改就执行unBefore()方法, 修改完成之后调用after()方法, after方法中会操作日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class DataItemImpl implements DataItem {
...
@Override
public void before() {
wLock.lock();
pg.setDirty(true);
System.arraycopy(raw.raw, raw.start, oldRaw, 0, oldRaw.length);
}

@Override
public void unBefore() {
System.arraycopy(oldRaw, 0, raw.raw, raw.start, oldRaw.length);
wLock.unlock();
}

@Override
public void after(long xid) {
dm.logDataItem(xid, this);
wLock.unlock();
}
...
}

DM实现

DataManager是对上层模块提供方法的类, 同时也集成了DataItem的缓存

DataItem的缓存

DataItem中存储着一个uid, uid是由页号和页内偏移组成的8字节整数, 页号和偏移各占4字节

通过解析uid的前32位得到页号, 后32位得到页内偏移

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class DataManagerImpl extends AbstractCache<DataItem> implements DataManager {
...
@Override
protected DataItem getForCache(long uid) throws Exception {
// 1L << 16 = 1 0000 0000 0000 0000
// 1L << 16 - 1 = 1111 1111 1111 1111
// 与uid做与操作, 相当于提取出uid的后16位, 即后两个字节
// 虽然offset占4字节, 但是2字节以足够使用
// pagesize = 8KB = 8 * 8Kbit = 2 ^ 16, 最大偏移量两个字节刚好够用
short offset = (short)(uid & ((1L << 16) - 1));
uid >>>= 32;
int pgno = (int)(uid & ((1L << 32) - 1));
Page pg = pc.getPage(pgno);
return DataItem.parseDataItem(pg, offset, this);
}
...
}

缓存释放

调用page的release即可写回数据源并释放缓存

1
2
3
4
5
6
7
8
public class DataManagerImpl extends AbstractCache<DataItem> implements DataManager {
...
@Override
protected void releaseForCache(DataItem di) {
di.page().release();
}
...
}

创建与打开

创建

从空文件创建首先需要对第一页进行初始化

1
2
3
4
5
6
7
8
9
10
11
12
public interface DataManager {
...
public static DataManager create(String path, long mem, TransactionManager tm) {
PageCache pc = PageCache.create(path, mem);
Logger lg = Logger.create(path);

DataManagerImpl dm = new DataManagerImpl(pc, lg, tm);
dm.initPageOne();
return dm;
}
...
}

初始化第一页

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class DataManagerImpl extends AbstractCache<DataItem> implements DataManager {
...
// 在创建文件时初始化PageOne
void initPageOne() {
int pgno = pc.newPage(PageOne.InitRaw());
assert pgno == 1;
try {
pageOne = pc.getPage(pgno);
} catch (Exception e) {
Panic.panic(e);
}
pc.flushPage(pageOne);
}
...
}

打开

从已有文件创建, 则是需要对第一页进行校验, 来判断是否需要执行恢复流程, 并重新对第一页生成随机字节, 还要生成页面索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface DataManager {
...
public static DataManager open(String path, long mem, TransactionManager tm) {
PageCache pc = PageCache.open(path, mem);
Logger lg = Logger.open(path);
DataManagerImpl dm = new DataManagerImpl(pc, lg, tm);
if (!dm.loadCheckPageOne()) {
Recover.recover(tm, lg, pc);
}
dm.fillPageIndex();
PageOne.setVcOpen(dm.pageOne);
dm.pc.flushPage(dm.pageOne);

return dm;
}
...
}

校验第一页

1
2
3
4
5
6
7
8
9
10
11
12
13
public class DataManagerImpl extends AbstractCache<DataItem> implements DataManager {
...
// 在打开已有文件时时读入PageOne, 并验证正确性
boolean loadCheckPageOne() {
try {
pageOne = pc.getPage(1);
} catch (Exception e) {
Panic.panic(e);
}
return PageOne.checkVc(pageOne);
}
...
}

生成页面索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class DataManagerImpl extends AbstractCache<DataItem> implements DataManager {
...
// 初始化pageIndex
void fillPageIndex() {
int pageNumber = pc.getPageNumber();
for(int i = 2; i <= pageNumber; i ++) {
Page pg = null;
try {
pg = pc.getPage(i);
} catch (Exception e) {
Panic.panic(e);
}
pIndex.add(pg.getPageNumber(), PageX.getFreeSpace(pg));
pg.release();
}
}
...
}

对外接口

DM 层提供了三个功能供上层使用, 分别是读, 插入和修改, 修改是通过读出的DataItem实现的, 于是DataManager只需要提供read()insert()方法

read()通过uid从缓存中获取DataItem

1
2
3
4
5
6
7
8
9
10
11
12
13
public class DataManagerImpl extends AbstractCache<DataItem> implements DataManager {
...
@Override
public DataItem read(long uid) throws Exception {
DataItemImpl di = (DataItemImpl)super.get(uid);
if(!di.isValid()) {
di.release();
return null;
}
return di;
}
...
}

insert()先在页面索引中获取到可以插入的页号, 首先插入日志, 在通过PageX来插入数据, 最后把页面信息重新添加到页面索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class DataManagerImpl extends AbstractCache<DataItem> implements DataManager {
...
@Override
public long insert(long xid, byte[] data) throws Exception {
byte[] raw = DataItem.wrapDataItemRaw(data);
if(raw.length > PageX.MAX_FREE_SPACE) {
throw Error.DataTooLargeException;
}

PageInfo pi = null;
for(int i = 0; i < 5; i ++) {
pi = pIndex.select(raw.length);
if (pi != null) {
break;
} else {
int newPgno = pc.newPage(PageX.initRaw());
pIndex.add(newPgno, PageX.MAX_FREE_SPACE);
}
}
if(pi == null) {
throw Error.DatabaseBusyException;
}

Page pg = null;
int freeSpace = 0;
try {
pg = pc.getPage(pi.pgno);
byte[] log = Recover.insertLog(xid, pg, raw);
logger.log(log);

short offset = PageX.insert(pg, raw);

pg.release();
return Types.addressToUid(pi.pgno, offset);

} finally {
// 将取出的pg重新插入pIndex
if(pg != null) {
pIndex.add(pi.pgno, PageX.getFreeSpace(pg));
} else {
pIndex.add(pi.pgno, freeSpace);
}
}
}
...
}

DataManager正常关闭时, 需要执行缓存和日志的关闭流程, 关闭时要鸡蛋设置第一页的字节校验即拷贝随机数

1
2
3
4
5
6
7
8
9
10
11
12
13
public class DataManagerImpl extends AbstractCache<DataItem> implements DataManager {
...
@Override
public void close() {
super.close();
logger.close();

PageOne.setVcClose(pageOne);
pageOne.release();
pc.close();
}
...
}

总结

DM层各个子类的功能

  1. AbstractCache: 引用计数法的缓存框架, 留了两个从数据源获取数据和释放缓存的抽象方法给具体实现类去实现
  2. PageImpl: 数据页的数据结构, 包含页号、是否脏数据页、数据内容、所在的PageCache缓存
  3. PageOne: 校验页面, 用于启动DM的时候进行文件校验
  4. PageX: 每个数据页的管理器。initRaw()新建一个数据页并设置FSO值, FSO后面存的其实就是一个个DataItem数据包
  5. PageCacheImpl: 数据页的缓存具体实现类, 除了重写获取 和释放两个方法外, 还完成了所有数据页的统一管理:
    1. 获取数据库中的数据页总数; getPageNumber()
    2. 新建一个数据页并写入数据库文件; newPage(byte[] initData)
    3. 从缓存中获取指定的数据页; getPage(int pgno)
    4. 删除指定位置后面的数据页; truncateByBgno(int maxPgno)
  6. PageIndex: 方便DataItem的快速定位插入, 其实现原理可以理解为HashMap那种数组+链表结构(实际实现是 List+ArrayList), 先是一个大小为41的数组 存的是区间号(区间号从1>开始), 然后每个区间号数组后面跟一个数组存满足空闲大小的所有数据页信息(PageInfo)
  7. Recover: 日志恢复策略, 主要维护两个日志: updateLog和insertLog, 重做所有已完成事务 redo, 撤销所有未完成事务undo
  8. DataManager: 统揽全局的类, 主要方法也就是读写和修改, 全部通过DataItem进行。

DM层执行流程

DataManager的所有功能(主要功能就是CRUD), 进行数据的读写修改都是靠DataItem进行操作的 , 所以PageX管理页面的时候FSO后面的DATA其实就是一个个的DataItem包

  1. 首先从DataManager进去创建DM(打开DM就不谈了, 只是多了个检验PageOne 和更新PageIndex), 需要执行的操作是:

    1. 新建PageCache, DM里面有页面缓存DataItem缓存两个实现; DataItem缓存也是在PageCache中获取的, DataItem缓存不存在的时候就去PageCache缓存获取, PageCache缓存没有才去数据库文件中获取
    2. 新建日志
    3. 构建DM管理器
    4. 初始化校验页面 initPageOne() 和 启动时候进行校验: loadCheckPageOne()
  2. 读取数据 read(long uid): 从DataItem缓存中读取一个DataItem数据包并进行校验, 如果DataItem缓存中没有就会调用 DataManager下的getForCache(long uid)从PageCache缓存中读取DataItem数据包并加入DataItem缓存(其实PageCache缓存和DataItem缓存都是共用的一个cache Map存的, 只是key不一样, page的key是页号, DataItem的key是uid, 页号+偏移量), 如果PgeCache也没有就去数据库文件读取。

  3. 插入数据 insert(long xid, byte[] data): 先把数据打包成DataItem格式, 然后在 pageIndex 中获取一个足以存储插入内容的页面的页号; 获取页面后, 需要先写入插入日志Recover.insertLog(xid, pg, raw), 接着才可以通过 pageX 在目标数据页插入数据PageX.insert(pg, raw), 并返回插入位置的偏移。如果在pageIndex中没有空闲空间足够插入数据了, 就需要新建一个数据页pc.newPage(PageX.initRaw()), 最后需要将页面信息重新插入 pageIndex

  4. 修改数据就是先读取数据, 然后修改DataItem内容, 再插入DataItem数据。但是在修改数据操作的前后需要调用DataItemImp.after()进行解写锁并记录更新日志, 这里需要依赖DataManager里面的logDataItem(long xid, DataItem di)方法

  5. 释放缓存: 释放DataItem的缓存, 实质上就是释放DataItem所在页的PageCache缓存

Version Manager

VM基于两段锁协议实现了调度序列的可串行化, 并实现了 MVCC 以消除读写阻塞, 同时实现了两种隔离级别

记录和版本

使用Entry来保存一条条的记录, 每个Entry中保存着一个DataItem

结构

[XMIN] [XMAX] [data]

XMIN表示创建该条记录的事务编号, XMAX是删除或者更新该条记录的事务编号, data存储数据

1
2
3
4
5
6
7
8
9
public class Entry {
private static final int OF_XMIN = 0;
private static final int OF_XMAX = OF_XMIN + 8;
private static final int OF_DATA = OF_XMAX + 8;

private long uid;
private DataItem dataItem;
private VersionManager vm;
}

封装

把事务号和数据包装成Entry

1
2
3
4
5
6
7
public class Entry {
public static byte[] wrapEntryRaw(long xid, byte[] data) {
byte[] xmin = Parser.long2Byte(xid);
byte[] xmax = new byte[8];
return Bytes.concat(xmin, xmax, data);
}
}

获取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Entry {
// 以拷贝的形式返回内容
public byte[] data() {
dataItem.rLock();
try {
SubArray sa = dataItem.data();
byte[] data = new byte[sa.end - sa.start - OF_DATA];
System.arraycopy(sa.raw, sa.start + OF_DATA, data, 0, data.length);
return data;
} finally {
dataItem.rUnLock();
}
}
}

修改

如果要修改的话, 需要对DataItem执行before()after()方法

1
2
3
4
5
6
7
8
9
10
11
public class Entry {
public void setXmax(long xid) {
dataItem.before();
try {
SubArray sa = dataItem.data();
System.arraycopy(Parser.long2Byte(xid), 0, sa.raw, sa.start+OF_XMAX, 8);
} finally {
dataItem.after(xid);
}
}
}

事务的隔离级别

xdb支持两种事务隔离: 读提交, 可重复读

版本可见性: 如果一个记录的最新版本被加锁, 当另一个事务想要修改或读取这条记录时, MYDB 就会返回一个较旧的版本的数据, 这时就可以认为, 最新的被加锁的版本, 对于另一个事务来说,是不可见的

读提交

“读提交”就是保证事务在读取数据时只能读取已经提交事务产生的数据

为每个版本, 维护两个特殊的变量, XMIN, XMAX, 他们含义如下:

  • XMIN: 创建该版本的事务XID;
  • XMAX: 删除该版本的事务XID;

XMIN自然是在版本被建立的时候填写. XMAX是在该版本被删除, 或是有新版本出现时, 被填写.

1
2
3
4
5
6
7
8
(XMIN == Ti and                          // created by Ti itself and
XMAX == NULL // not deleted now
)
or // or
(XMIN is commited and // created by a commited transaction and
(XMAX == NULL or // not deleted now or
(XMAX != Ti and XMAX is not commited) // deleted by a uncommited transaction
))

如果上述逻辑为true, 则该版本对Ti可见

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Visibility {
private static boolean readCommitted(TransactionManager tm, Transaction t, Entry e) {
long xid = t.xid;
long xmin = e.getXmin();
long xmax = e.getXmax();
if (xmin == xid && xmax == 0) {
return true;
}

if (tm.isCommitted(xmin)) {
if (xmax == 0) {
return true;
}
if (xmax != xid) {
if (!tm.isCommitted(xmax)) {
return true;
}
}
}
return false;
}
}

可重复读

“读提交”可能会使一个事务在执行期间对同一个数据项的读取得到不同结果, 例如

1
2
3
4
5
6
7
(假如X一开始为0)
T1 begin
R1(X) // T1读得0
T2 begin
U2(X) // 将X修改为1
T2 commit
R1(X) // T1读的1

在上面的两次R1(X)中, 读出的X是不一样的. 有些事务是不希望出现这种情况的, 为此, 我们提供一个更加严格的隔离度”可重复读”, “可重复读”在”读提交”的基础上, 还保证事务执行期间, 多次对同一记录的读取, 将会是一致

规定: 事务只能读取它开始时, 就已经结束的那些事务产生的数据版本, 也就是忽略下面两种情况的数据:

  1. 比Ti后开始的事务的数据;
  2. Ti开始时还为active状态的事务的数据;

对于第一条, 我们直接通过比较版本的XID, 即可完成, 既事务只能读取XID比他小的事务产生的数据版本

对于第二条, 引入”快照”(snapshot)技术, 在Ti开始时, 我们记录下当前所有活跃的事务, 并存在SP(Ti)中. 那么, 如果某个版本的XMIN在SP(Ti)中的话, 那么该版本应该对Ti不可见

1
2
3
4
5
6
7
8
9
10
11
12
13
(XMIN == Ti and                 // 由Ti创建且
(XMAX == NULL or // 尚未被删除
))
or // 或
(XMIN is commited and // 由一个已提交的事务创建且
XMIN < XID and // 这个事务小于Ti且
XMIN is not in SP(Ti) and // 这个事务在Ti开始前提交且
(XMAX == NULL or // 尚未被删除或
(XMAX != Ti and // 由其他事务删除但是
(XMAX is not commited or // 这个事务尚未提交或
XMAX > Ti or // 这个事务在Ti开始之后才开始或
XMAX is in SP(Ti) // 这个事务在Ti开始前还未提交
))))

需要提供一个结构以保存快照数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// vm对一个事务的抽象
public class Transaction {
public long xid;
public int level;
public Map<Long, Boolean> snapshot;
public Exception err;
public boolean autoAborted;

public static Transaction newTransaction(long xid, int level, Map<Long, Transaction> active) {
Transaction t = new Transaction();
t.xid = xid;
t.level = level;
if(level != 0) {
t.snapshot = new HashMap<>();
for(Long x : active.keySet()) {
t.snapshot.put(x, true);
}
}
return t;
}

public boolean isInSnapshot(long xid) {
if(xid == TransactionManagerImpl.SUPER_XID) {
return false;
}
return snapshot.containsKey(xid);
}
}

一个版本是否对事务可见的逻辑判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Visibility {
private static boolean repeatableRead(TransactionManager tm, Transaction t, Entry e) {
long xid = t.xid;
long xmin = e.getXmin();
long xmax = e.getXmax();
if (xmin == xid && xmax == 0) {
return true;
}

if (tm.isCommitted(xmin) && xmin < xid && !t.isInSnapshot(xmin)) {
if (xmax == 0) {
return true;
}
if (xmax != xid) {
if (!tm.isCommitted(xmax) || xmax > xid || t.isInSnapshot(xmax)) {
return true;
}
}
}
return false;
}
}

版本跳跃问题

“多版本”使得VM的”删除”和”撤销”操作变得意外的简单, 只需要将这个事务标记为aborted即可

但是”多版本”却会导致”版本跳跃”, 为了解决它, VM会限制出现版本跳跃的事务回滚

考虑如下: 假设两个”可重复读”的事务, 有如下的执行序列:

1
2
3
4
5
6
7
8
9
(假设X一开始只有x0版本)
T1 begin
T2 begin
R1(X) // T1读取x0
R2(X) // T2读取x0
U1(X) // T1将X更新到x1
T1 commit
U2(X) // T2将X更新到x2
T2 commit

上述操作本身不会有任何问题, 但是逻辑上却有不妥. T1是将X从x0->x1, 这没问题. 但T2实际上是想将X从x0->x2, 跳过了x1这个版本, 因为T2读取时, X还只有x0. 这是有问题的

于是, 我们对两个隔离度的事务, 分别做出如下的规定:

  1. “读提交”的事务允许出现版本跳跃;
  2. “可重复读”的事务不允许出现版本跳跃;

解决”可重复读”隔离度下的版本跳跃问题的思路: 如果Ti想要修改X, 但X已经被某个Ti不可见的事务Tj修改过了, 那么要求Ti回滚

即: 如果Ti想要修改X, 但X的最新版本是Tj创建的, 且XID(Tj) > XID(Ti) 或者 Tj在SP(Ti)中, 则令Ti回滚, 以防止版本跳跃

实现的时候也非常简单, 直接取出X的最新版本, 如果该版本对Ti不可见, 那么Ti要求修改X则一定会发生版本跳跃, 于是要求Ti回滚

1
2
3
4
5
6
7
8
9
10
11
public class Visibility {
public static boolean isVersionSkip(TransactionManager tm, Transaction t, Entry e) {
long xmax = e.getXmax();
// 读提交是允许版本跳跃的, 而可重复读则是不允许版本跳跃的
if (t.level == 0) {
return false;
} else {
return tm.isCommitted(xmax) && (xmax > t.xid || t.isInSnapshot(xmax));
}
}
}

死锁问题

VM利用2PL来实现”可串行化”调度, 2PL处理除了会阻塞事务外, 更严重的, 会造成死锁问题

如果Ti锁定了X, 现在Tj准备更新X, 那么Tj会被阻塞, 等待Ti释放X的锁. 这种等待关系可以用有向图来表示, 比如上述关系就可以表示为”Tj -> Ti”. 于是, 将所有这样的关系, 转化为图后, 死锁判断就简单了. 如果图中有环, 则有死锁; 否则无死锁

VM使用一个LockTable对象, 在内存中维护这张图, 每当出现等待时, 则向图中加入一条边. 每向图中加入一条边, 便进行一次死锁检测. 如果加入某条边后检测到了死锁, 则撤销加入这条边的事务

1
2
3
4
5
6
7
8
9
10
public class LockTable {
private Map<Long, List<Long>> x2u; // 某个XID已经获得的资源的UID列表
private Map<Long, Long> u2x; // UID被某个XID持有
private Map<Long, List<Long>> wait; // 正在等待UID的XID列表
private Map<Long, Lock> waitLock; // 正在等待资源的XID的锁
private Map<Long, Long> waitU; // XID正在等待的UID
private Lock lock;

...
}

当出现等待的情况时, 就向图中添加一条边

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class LockTable {
// 不需要等待则返回null,否则返回锁对象
// 会造成死锁则抛出异常
public Lock add(long xid, long uid) throws Exception {
lock.lock();
try {
// 图中有这条边, 相当于xid已经获取uid资源, 无需再等待
if (isInList(x2u, xid, uid)) {
return null;
}
// 资源uid未被使用
if (!u2x.containsKey(uid)) {
u2x.put(uid, xid);
putIntoList(x2u, xid, uid);
return null;
}
// xid没有占用uid, 且uid被使用了, 进入等待, xid等待uid
waitU.put(xid, uid);
putIntoList(wait, xid, uid);
// 检测到死锁就撤销这条边, 同时撤销事务(VM中捕获到异常会自动撤销事务)
if (hasDeadLock()) {
waitU.remove(xid);
removeFromList(wait, uid, xid);
throw Error.DeadlockException;
}
// 无死锁, 等待状态, 返回一个上锁的Lock对象
Lock l = new ReentrantLock();
l.lock();
waitLock.put(xid, l);
return l;

} finally {
lock.unlock();
}
}
}

调用 add, 如果需要等待的话, 会返回一个上了锁的Lock对象, 调用方在获取到该对象时, 需要尝试获取该对象的锁, 由此实现阻塞线程的目的, 例如:

1
2
3
4
5
6
7
Lock l = lt.add(xid, uid);
// l是一个返回的上锁的对象
if (l != null) {
// lock(): 获取锁, 如果锁被其他线程持有, 则阻塞该线程
l.lock(); // 阻塞线程
l.unlock();
}

检测图中是否有环: dfs

查找图中是否有环的算法也非常简单, 就是一个深搜, 只是需要注意这个图不一定是连通图, 思路就是为每个节点设置一个访问戳, 都初始化为 -1, 随后遍历所有节点, 以每个非 -1 的节点作为根进行深搜, 并将深搜该连通图中遇到的所有节点都设置为同一个数字, 不同的连通图数字不同, 这样, 如果在遍历某个图时, 遇到了之前遍历过的节点, 说明出现了环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class LockTable {
private boolean hasDeadLock() {
xidStamp = new HashMap<>();
stamp = 1;
for (long xid : x2u.keySet()) {
Integer s = xidStamp.get(xid);
if (s != null && s > 0) {
continue;
}
stamp++;
if (dfs(xid)) {
return true;
}
}
return false;
}

private boolean dfs(long xid) {
Integer stp = xidStamp.get(xid);
if (stp != null && stp == stamp) {
return true;
}
if (stp != null && stp < stamp) {
return false;
}
xidStamp.put(xid, stamp);

Long uid = waitU.get(xid);
if (uid == null) {
return false;
}
Long x = u2x.get(uid);
assert x != null;
return dfs(x);
}
}

在一个事务commit或者abort时, 就可以释放所有它持有的锁, 并将自身从等待图中删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class LockTable {
public void remove(long xid) {
lock.lock();
try {
List<Long> l = x2u.get(xid);
if (l != null) {
while (l.size() > 0) {
Long uid = l.remove(0);
selectNewXID(uid);
}
}
waitU.remove(xid);
x2u.remove(xid);
waitLock.remove(xid);

} finally {
lock.unlock();
}
}
}

VM实现

VM接口

1
2
3
4
5
6
7
8
9
public interface VersionManager {
byte[] read(long xid, long uid) throws Exception;
long insert(long xid, byte[] data) throws Exception;
boolean delete(long xid, long uid) throws Exception;

long begin(int level);
void commit(long xid) throws Exception;
void abort(long xid);
}

实现

缓存

VM实现类还实现了Entry的缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class VersionManagerImpl extends AbstractCache<Entry> implements VersionManager {
@Override
protected Entry getForCache(long uid) throws Exception {
Entry entry = Entry.loadEntry(this, uid);
if (entry == null) {
throw Error.NullEntryException;
}
return entry;
}

@Override
protected void releaseForCache(Entry entry) {
entry.remove();
}
}

begin()

开启一个事务, 并初始化事务的结构, 将其存放在activeTransaction中, 用于检查和快照使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class VersionManagerImpl extends AbstractCache<Entry> implements VersionManager {
@Override
public long begin(int level) {
lock.lock();
try {
long xid = tm.begin();
Transaction t = Transaction.newTransaction(xid, level, activeTransaction);
activeTransaction.put(xid, t);
return xid;
} finally {
lock.unlock();
}
}
}

commit()

提交一个事务, 主要就是free掉相关的结构, 并且释放持有的锁, 并修改 TM 状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class VersionManagerImpl extends AbstractCache<Entry> implements VersionManager {
@Override
public void commit(long xid) throws Exception {
lock.lock();
Transaction t = activeTransaction.get(xid);
lock.unlock();

try {
if (t.err != null) {
throw t.err;
}
} catch (NullPointerException n) {
System.out.println(xid);
System.out.println(activeTransaction.keySet());
Panic.panic(n);
}

lock.lock();
activeTransaction.remove(xid);
lock.unlock();

lt.remove(xid);
tm.commit(xid);
}
}

abort

abort事务的方法则有两种, 手动和自动, 手动指的是调用abort()方法, 而自动, 则是在事务被检测出出现死锁时, 会自动撤销回滚事务, 或者出现版本跳跃时, 也会自动回滚

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class VersionManagerImpl extends AbstractCache<Entry> implements VersionManager {
@Override
public void abort(long xid) {
internAbort(xid, false);
}

private void internAbort(long xid, boolean autoAborted) {
lock.lock();
Transaction t = activeTransaction.get(xid);
if (!autoAborted) {
activeTransaction.remove(xid);
}
lock.unlock();

if (t.autoAborted) {
return;
}
lt.remove(xid);
tm.abort(xid);
}
}

read()

读取一个Entry, 注意判断可见性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class VersionManagerImpl extends AbstractCache<Entry> implements VersionManager {
@Override
public byte[] read(long xid, long uid) throws Exception {
lock.lock();
Transaction t = activeTransaction.get(xid);
lock.unlock();

if (t.err != null) {
throw t.err;
}

Entry entry = null;
try {
entry = super.get(uid);
} catch (Exception e) {
if (e == Error.NullEntryException) {
return null;
} else {
throw e;
}
}
try {
if (Visibility.isVisible(tm, t, entry)) {
return entry.data();
} else {
return null;
}
} finally {
entry.release();
}
}
}

insert()

将数据包装为Entry, 交给DM插入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class VersionManagerImpl extends AbstractCache<Entry> implements VersionManager {
@Override
public long insert(long xid, byte[] data) throws Exception {
lock.lock();
Transaction t = activeTransaction.get(xid);
lock.unlock();

if (t.err != null) {
throw t.err;
}

byte[] raw = Entry.wrapEntryRaw(xid, data);
return dm.insert(xid, raw);
}
}

delete()

删除的操作只是设置 XMAX即可

但是有三个需要判断

  1. 可见性判断
  2. 获取资源的锁
  3. 版本跳跃判断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class VersionManagerImpl extends AbstractCache<Entry> implements VersionManager {
@Override
public boolean delete(long xid, long uid) throws Exception {
lock.lock();
Transaction t = activeTransaction.get(xid);
lock.unlock();

if (t.err != null) {
throw t.err;
}
Entry entry = null;
try {
entry = super.get(uid);
} catch (Exception e) {
if (e == Error.NullEntryException) {
return false;
} else {
throw e;
}
}
try {
if (!Visibility.isVisible(tm, t, entry)) {
return false;
}
Lock l = null;
try {
l = lt.add(xid, uid);
} catch (Exception e) {
t.err = Error.ConcurrentUpdateException;
internAbort(xid, true);
t.autoAborted = true;
throw t.err;
}
// l是一个返回的上锁的对象
if (l != null) {
// lock(): 获取锁, 如果锁被其他线程持有, 则阻塞该线程
l.lock(); // 阻塞线程
l.unlock();
}

if (entry.getXmax() == xid) {
return false;
}

if (Visibility.isVersionSkip(tm, t, entry)) {
t.err = Error.ConcurrentUpdateException;
internAbort(xid, true);
t.autoAborted = true;
throw t.err;
}

entry.setXmax(xid);
return true;

} finally {
entry.release();
}
}
}

总结

  1. 为了实现”可串行化”调度, VM使用了2PL;
  2. 为了减少2PL的阻塞率, VM实现了MVCC;
  3. 为了实现MVCC, VM抽象出了”记录”和”版本”;
  4. 为了对应”版本”和事务, VM使用了可见性判断;
  5. 为了实现更严格”可重复读”隔离度的可见性, VM引入了”快照”技术;
  6. “多版本”使得VM的”删除”和”撤销”操作变得意外的简单;
  7. 但是”多版本”却会导致”版本跳跃”, 为了解决它, VM会限制出现版本跳跃的事务回滚;
  8. VM还解决了2PL带来的”死锁”问题.

Index Manager

IM基于DM, 维护了索引的结构, xdb实现了基于B+树索引结构

IM 对上层模块主要提供两种能力:插入索引和搜索节点

结点结构

二叉树由一个个 Node 组成,每个 Node 都存储在一条 DataItem 中。结构如下:

1
2
[LeafFlag][KeyNumber][SiblingUid]
[Son0][Key0][Son1][Key1]...[SonN][KeyN]

其中 LeafFlag 标记了该节点是否是个叶子节点, KeyNumber 为该节点中 key 的个数, SiblingUid 是其兄弟节点存储在 DM 中的 UID, 后续是穿插的子节点(SonN)和KeyN, 最后的一个 KeyN 始终为 MAX_VALUE,以此方便查找

Node 类持有了其 B+ 树结构的引用, DataItem 的引用和 SubArray 的引用, 用于方便快速修改数据和释放数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Node {
static final int IS_LEAF_OFFSET = 0;
static final int NO_KEYS_OFFSET = IS_LEAF_OFFSET + 1;
static final int SIBLING_OFFSET = NO_KEYS_OFFSET + 2;
static final int NODE_HEADER_SIZE = SIBLING_OFFSET + 8;

static final int BALANCE_NUMBER = 32;
static final int NODE_SIZE = NODE_HEADER_SIZE + (2*8)*(BALANCE_NUMBER*2+2);

BPlusTree tree;
DataItem dataItem;
SubArray raw;
long uid;

...
}

根节点生成

根节点的初始两个子节点为 left 和 right, 初始键值为 key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Node {
...
// 生成根节点
static byte[] newRootRaw(long left, long right, long key) {
SubArray raw = new SubArray(new byte[NODE_SIZE], 0, NODE_SIZE);

setRawIsLeaf(raw, false);
setRawNoKeys(raw, 2);
setRawSibling(raw, 0);
setRawKthSon(raw, left, 0);
setRawKthKey(raw, key, 0);
setRawKthSon(raw, right, 1);
setRawKthKey(raw, Long.MAX_VALUE, 1);

return raw.raw;
}
...
}

空根节点生成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Node {
...
// 生成空根节点
static byte[] newNilRootRaw() {
SubArray raw = new SubArray(new byte[NODE_SIZE], 0, NODE_SIZE);

setRawIsLeaf(raw, true);
setRawNoKeys(raw, 0);
setRawSibling(raw, 0);

return raw.raw;
}
...
}

插入操作

searchNext 寻找对应 key 的 UID, 如果找不到, 则返回兄弟节点的 UID

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Node {
...
class SearchNextRes {
long uid;
long siblingUid;
}

public SearchNextRes searchNext(long key) {
dataItem.rLock();
try {
SearchNextRes res = new SearchNextRes();
int noKeys = getRawNoKeys(raw);
for(int i = 0; i < noKeys; i ++) {
long ik = getRawKthKey(raw, i);
if(key < ik) {
res.uid = getRawKthSon(raw, i);
res.siblingUid = 0;
return res;
}
}
res.uid = 0;
res.siblingUid = getRawSibling(raw);
return res;

} finally {
dataItem.rUnLock();
}
}
...
}

搜索操作

leafSearchRange 方法在当前节点进行范围查找,范围是 [leftKey, rightKey],这里约定如果 rightKey 大于等于该节点的最大的 key, 则还同时返回兄弟节点的 UID,方便继续搜索下一个节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Node {
...
class LeafSearchRangeRes {
List<Long> uids;
long siblingUid;
}

public LeafSearchRangeRes leafSearchRange(long leftKey, long rightKey) {
dataItem.rLock();
try {
int noKeys = getRawNoKeys(raw);
int kth = 0;
while(kth < noKeys) {
long ik = getRawKthKey(raw, kth);
if(ik >= leftKey) {
break;
}
kth ++;
}
List<Long> uids = new ArrayList<>();
while(kth < noKeys) {
long ik = getRawKthKey(raw, kth);
if(ik <= rightKey) {
uids.add(getRawKthSon(raw, kth));
kth ++;
} else {
break;
}
}
long siblingUid = 0;
if(kth == noKeys) {
siblingUid = getRawSibling(raw);
}
LeafSearchRangeRes res = new LeafSearchRangeRes();
res.uids = uids;
res.siblingUid = siblingUid;
return res;
} finally {
dataItem.rUnLock();
}
}
...
}

BPlusTree

由于 B+ 树在插入删除时, 会动态调整, 根节点不是固定节点, 于是设置一个 bootDataItem, 该 DataItem 中存储了根节点的 UID, 可以注意到, IM在操作DM时, 使用的事务都是SUPER_XID

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class BPlusTree {
DataItem bootDataItem;

private long rootUid() {
bootLock.lock();
try {
SubArray sa = bootDataItem.data();
return Parser.parseLong(Arrays.copyOfRange(sa.raw, sa.start, sa.start+8));
} finally {
bootLock.unlock();
}
}

private void updateRootUid(long left, long right, long rightKey) throws Exception {
bootLock.lock();
try {
byte[] rootRaw = Node.newRootRaw(left, right, rightKey);
long newRootUid = dm.insert(TransactionManagerImpl.SUPER_XID, rootRaw);
bootDataItem.before();
SubArray diRaw = bootDataItem.data();
System.arraycopy(Parser.long2Byte(newRootUid), 0, diRaw.raw, diRaw.start, 8);
bootDataItem.after(TransactionManagerImpl.SUPER_XID);
} finally {
bootLock.unlock();
}
}
}

删除操作

IM没有Delete的接口, 下面解释原因, 和VM有关.

如果用户想要删除某条记录X, 那么实际的执行过程大致是:

  1. TBM解析语句;
  2. TBM利用IM, 查询到X的地址;
  3. TBM调用VM, 将X删除;
  4. VM将X对该事务可见的那个版本的XMAX设置为该事务的XID;
  5. 删除结束.

上述过程并没有要求从IM中删除X对应的索引, 假如T1删除X之后, 又有T5事务, 尝试读取X, 那会怎样呢? 过程大致如下:

  1. TBM解析语句;
  2. TBM利用IM, 查询到X的地址;
  3. TBM利用VM读取X;
  4. 由于X已经被删除, VM将找不到合适的版本, 于是返回nil;
  5. TBM接受到nil, 当做该条记录不存在, 返回给用户这个结果.

可见, 由于VM的存在, 使得IM不用删除这些索引也没关系.

但是如果IM一直不删除这些索引的话, 索引树又会变得极其的庞大, 占用资源, 也降低NYADB效率. 所以, 在必要的时候, 可以让IM对索引树进行整理.

B+树并发控制协议

本小节描述B+树上的并发控制协议, 并证明该协议是无死锁的.

我们用u来表示B+树上的某个节点, 用s(u)来表示它右边的那个兄弟节点. 每个节点都有一个读写锁, 并规定, 在对u做任意读取之前, 都必须要调用u.RLock(), 在对u做任意修改之前, 都必须要调用u.WLock(). 且u.Leaf()能够返回该节点是否为叶子节点.

另外, 我们规定, 任意的事务Ti, 在某一时刻, 最多只能取得一个节点的锁. 也就是说, 现在Ti取得u的锁, 如果它想访问下一个节点v, 那么它必须先释放掉u的锁. 该协议能够保证在并发访问的情况下, 不会出现死锁.

(下面的过程叙述需要你熟知B+树的算法, 请自行查阅资料.)

下面先描述I(k, v)操作的过程, 假设初始u为B+树根节点:

  • 1)u.RLock();
  • 2)如果u.Leaf() == false, 则在u中查找下一个需要迭代的子节点v;
  • 2.1)如果查找失败(失败原因见后文), u.RUnlock(), u:=s(u), 重复2.
  • 2.2)如果查找成功, u.RUnlock(), 则另u:=v, 重复1.
  • 3)如果u.Leaf() == true, u.RUnlock(), u.WLock(), 并尝试向u中插入(k, v);
  • 3.1)如果插入失败(失败原因见后文), u.WUnlock(), 则另u:=s(u), 重复3.
  • 3.2)如果插入成功, 检测u是否需要分裂;
  • 3.2.1)如果不需要, u.WUnlock(), 插入成功, 直接返回.
  • 3.2.2)如果需要, 则依照B+树算法, 创建新节点v, 另s(u):=v, u.WUnlock().
  • 4)递归的向父亲节点插入新增加的节点(如果需要的话), 直至根节点.

现在说明插入失败, 和查找失败的原因. 假设T1准备向B+树中插入(10, 10), 并有如下的执行序列:

  1. T1当前在u节点, 并查询到它下一个需要访问的节点是v; // 注意此时T1并没有v的锁
  2. T2向v中插入了(8, 10), 使得v从原来的[(1, 1), (2, 2), (10, 10)], 变成了[(1, 1), (2, 2), (8, 8), (10, 10)], 并被分裂成为v[(1, 1), (2, 2)], s(v)[(8, 8), (10, 10)]
  3. T2执行v.WUnlock()
  4. T1取得v, 并执行v.RLock(), 接着进行查询

可见, 在T1对v进行查询时, 便会发生查询失败, 原因也很显而易见: 在T1得知要查询v, 到T1对v进行查询期间, 有其他事务对v操作并让其产生了分裂.

v被分裂过后, 它原本的一部分数据就会被移动到s(v)中, 于是则需要在查询失败后, 继续对s(v)进行查询. 插入操作也是同理.

对于S(key)的操作就不用赘述了, 就是I(key, value)中的1), 2), 3)步, 只不过把对应的插入操作改为查询操作.

B+树的事务无关和错误处理

B+树是事务无关的, 既B+树直接以超级事务在执行. 试想如果某个事务, 在B+树中插入了很多索引, 然后又被回滚了, 会怎么样? 结果就是该事务的索引依然留在B+树种, 但是VM却”读不出来”, 和B+树没有Delete操作类似, 因此不会对其他事务造成影响, 这些废弃的索引是安全的.

现在再来看看如果B+树在执行过程中, 发生了崩溃会怎么样? 如果Ti在对u进行修改时, 发生了崩溃, 那么节点u的内部结构就被破坏了, 但是由于B+树是建立在DM上的, 在下一次数据库重启时, u就会被恢复成修改之前的状态. 也就是说, 由于DM的保护, 对B+树节点的操作, 是原子性的!

于是我们现在就不用考虑节点内部错误的情况了, 只需要考虑节点间错误的情况, 而这样的错误情况只有一种: 某次对u的插入操作创建了新节点v, 此时s(u)=v, 但是v却并没有被插入到他的父节点中. 于是成了大致如下的状态:

1
2
3
4
[parent]
|
v
[u] -> [v]

而正确的状态应该如下:

1
2
3
4
[ parent ]
| |
v v
[u] -> [v]

和正确状态相比, 错误状态下, 少了一支从父亲节点到v的指针. 但是这样是没问题的! 因为在插入和查询操作中, 如果失败, 就会不断的向右兄弟节点迭代. 因此, 在错误的状态下, 如果想找v中的内容, 那么情况是: 1)找到parent, 2)通过parent找到u, 3)在u中查找失败, 4)通过u找到v, 5)查找成功.

于是, 在DM的原子性保护下, 结合B+树本身的算法过程, 能够证明B+树是完全能够应对数据库崩坏的.

总结

  1. B+树基于DM.
  2. B+树只提供了Insert和Search两种操作.
  3. 和TBM依赖VM不同, B+树自己管理锁来进行并发控制.
  4. B+树的并发协议是不会产生死锁的.
  5. B+树本身的算法性质和DM原子性保证,使得它能够应对错误情况.