介绍 模型概述 xdb一共包括五个模块, 分别是
Data Manager(DM)
Transaction Manager(TM)
Version Manager(VM)
Index Manager(IM)
Table Manager(TBM)
五个模块的依赖关系如下
其中TM模块为事务管理模块, 主要作用就是标记事务的状态, 并且提供接口以供其他模块进行查询
Transaction Manager TM模块是比较简单的模块, 主要就是标记每个事务的状态以供其他模块查询
每一个事务都有三种状态
active 正在进行, 尚未结束
committed 已提交
aborted 已撤销(回滚)
XID文件 每一个事务都有一个xid, 这个xid唯一标识了这个事务, xid的标号从1开始, 依次递增, 不重复
规定xid为0的事务为超级事务(Super Transaction), 当一些操作想在没有申请事务的情况下进行, 那么可以将操作的 XID 设置为 0, XID 为 0 的事务的状态永远是 committed
XID文件就是TM模块中记录事务状态的文件, 它为每个事务分配1字节的空间来记录状态, 并且在头部还保存了一个8字节的数字来记录事务的个数
事务xid的状态在XID文件中的位置在(xid-1)+8字节处
对外接口 TM模块中对其他模块提供了接口来查询事务状态
1 2 3 4 5 6 7 8 9 public interface TransactionManager { long begin () ; void commit (long xid) ; void abort (long xid) ; boolean isActive (long xid) ; boolean isCommitted (long xid) ; boolean isAborted (long xid) ; void close () ; }
TransactionManagerImpl 定义常量 1 2 3 4 5 6 7 8 9 10 11 12 public static final long SUPER_XID = 0 ;static final int LEN_XID_HEADER_LENGTH = 8 ;static final String XID_SUFFIX = ".xid" ;private static final int XID_FIELD_SIZE = 1 ;private static final byte FIELD_TRAN_ACTIVE = 0 ;private static final byte FIELD_TRAN_COMMITTED = 1 ;private static final byte FIELD_TRAN_ABORTED = 2 ;
构造方法 所有的文件读写都采用了 NIO 方式的 FileChannel
1 2 3 4 5 6 TransactionManagerImpl(RandomAccessFile file, FileChannel fc) { this .file = file; this .fc = fc; counterLock = new ReentrantLock (); checkXIDCounter(); }
检查XID文件 在构造方法创建了TransactionManager后需要对XID文件进行校验, 确保XID是合法的文件
校验逻辑: 读取前8个字节(事务个数, 每个事务占1个字节)作为xidCounter, 根据xidCounter的值可以知道有多少个事务, 事务的个数+8字节, 判断这个数值与文件的长度是否相同, 如果不同则说明XID文件不合法, 直接终止程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private void checkXIDCounter () { long fileLen = 0 ; try { fileLen = file.length(); } catch (IOException e1) { Panic.panic(Error.BadXIDFileException); } if (fileLen < LEN_XID_HEADER_LENGTH) { Panic.panic(Error.BadXIDFileException); } ByteBuffer buf = ByteBuffer.allocate(LEN_XID_HEADER_LENGTH); try { fc.position(0 ); fc.read(buf); } catch (IOException e) { Panic.panic(e); } this .xidCounter = Parser.parseLong(buf.array()); long end = getXidPosition(this .xidCounter + 1 ); if (end != fileLen) { Panic.panic(Error.BadXIDFileException); } } private long getXidPosition (long xid) { return LEN_XID_HEADER_LENGTH + (xid - 1 ) * XID_FIELD_SIZE; }
接口方法 begin() begin()方法会开始一个事务, 首先把xidCounter+1对应的事务的状态设置为active, 然后更新文件头(值加1)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Override public long begin () { counterLock.lock(); try { long xid = xidCounter + 1 ; updateXID(xid, FIELD_TRAN_ACTIVE); incrXIDCounter(); return xid; } finally { counterLock.unlock(); } } private void updateXID (long xid, byte status) { long offset = getXidPosition(xid); byte [] tmp = new byte [XID_FIELD_SIZE]; tmp[0 ] = status; ByteBuffer buf = ByteBuffer.wrap(tmp); try { fc.position(offset); fc.write(buf); } catch (IOException e) { Panic.panic(e); } try { fc.force(false ); } catch (IOException e) { Panic.panic(e); } } private void incrXIDCounter () { xidCounter++; ByteBuffer buf = ByteBuffer.wrap(Parser.long2Byte(xidCounter)); try { fc.position(0 ); fc.write(buf); } catch (IOException e) { Panic.panic(e); } try { fc.force(false ); } catch (IOException e) { Panic.panic(e); } }
commit() 1 2 3 4 5 @Override public void commit (long xid) { updateXID(xid, FIELD_TRAN_COMMITTED); }
abort() 1 2 3 4 5 @Override public void abort (long xid) { updateXID(xid, FIELD_TRAN_ABORTED); }
isActive() isActive(), isCommitted(), isAborted()都是检查一个xid的状态, 使用一个通用的checkXID()方法解决
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private boolean checkXID (long xid, byte status) { long offset = getXidPosition(xid); ByteBuffer buf = ByteBuffer.wrap(new byte [XID_FIELD_SIZE]); try { fc.position(offset); fc.read(buf); } catch (IOException e) { Panic.panic(e); } return buf.array()[0 ] == status; } @Override public boolean isActive (long xid) { if (xid == SUPER_XID) { return false ; } return checkXID(xid, FIELD_TRAN_ACTIVE); } @Override public boolean isCommitted (long xid) { if (xid == SUPER_XID) { return true ; } return checkXID(xid, FIELD_TRAN_COMMITTED); } @Override public boolean isAborted (long xid) { if (xid == SUPER_XID) { return false ; } return checkXID(xid, FIELD_TRAN_ABORTED); }
close() 1 2 3 4 5 6 7 8 9 @Override public void close () { try { fc.close(); file.close(); } catch (IOException e) { Panic.panic(e); } }
创建,打开XID文件 使用两个静态方法create()和open()来进行XID文件的创建和打开
create() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public static TransactionManagerImpl create (String path) { File f = new File (path + TransactionManagerImpl.XID_SUFFIX); try { if (!f.createNewFile()) { Panic.panic(Error.FileExistsException); } } catch (Exception e) { Panic.panic(e); } if (!f.canRead() || !f.canWrite()) { Panic.panic(Error.FileCannotRWException); } FileChannel fc = null ; RandomAccessFile raf = null ; try { raf = new RandomAccessFile (f, "rw" ); fc = raf.getChannel(); } catch (FileNotFoundException e) { Panic.panic(e); } ByteBuffer buf = ByteBuffer.wrap(new byte [TransactionManagerImpl.LEN_XID_HEADER_LENGTH]); try { fc.position(0 ); fc.write(buf); } catch (IOException e) { Panic.panic(e); } return new TransactionManagerImpl (raf, fc); }
open() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static TransactionManagerImpl open (String path) { File f = new File (path + TransactionManagerImpl.XID_SUFFIX); if (!f.exists()) { Panic.panic(Error.FileNotExistsException); } if (!f.canRead() || !f.canWrite()) { Panic.panic(Error.FileCannotRWException); } FileChannel fc = null ; RandomAccessFile raf = null ; try { raf = new RandomAccessFile (f, "rw" ); fc = raf.getChannel(); } catch (FileNotFoundException e) { Panic.panic(e); } return new TransactionManagerImpl (raf, fc); }
Test 创建testXid.xid文件后执行了3次begin()(相当于创建了3个事务), 然后把xid为3事务提交后回滚
1 2 3 4 5 6 7 8 9 10 11 12 13 public class TransactionManagerTest { @Test public void testCreate () { TransactionManagerImpl tmger = TransactionManager.open("testXid" ); System.out.println(tmger.xidCounter); tmger.commit(3 ); tmger.abort(3 ); tmger.close(); } }
此时查看XID文件
可以看出有3个事务, 状态分别是0, 0, 2, 对应了active, active, aborted
Data Manager 介绍 Data Manager主要的职责有
分页管理DB文件并进行缓存
管理日志文件, 保证发生错误时能够根据日志进行数据恢复
抽象DB文件为DataItem供上层模块使用, 并提供缓存
总体来时DM就是上层模块与文件系统之间的一个抽象层, DM从文件系统读写的文件并封装供上层模块使用, 并且提供日志功能
无论是向上还是向下, DM都提供了一个缓存的功能, 用内存操作来保证效率
引用计数缓存框架 为什么使用引用计数缓存而不使用LRU?
是因为LRU缓存策略中资源的驱逐是不可控的, 如果某一时刻LRU中的某一资源被驱逐, 而此时上层模块又需要将该资源写回到数据源中, 此时发现在缓存中找不到该资源, 在这种情况下: 是否还要做写回数据源的操作?
不回源. 由于没法确定缓存被驱逐的时间, 也没法确定缓存被驱逐后是否被修改, 这样是极其不安全的
回源. 如果数据线没有被修改, 那么就是一次无效的回源
放回到缓存中. 由于此时缓存已满, 放回到缓存中势必会驱逐掉其他的资源, 又会引起相同的问题
引用计数策略正好解决了这个问题, 只有上层模块主动释放引用, 缓存在确保没有模块在使用这个资源了, 才会去驱逐资源
而当缓存满的时候, 引用计数策略无法自动释放缓存, 此时应该直接报错(类似JVM的OOM)
AbstractCache<T> 抽象方法 AbstractCache<T>抽象类中有两个抽象方法, 实现类去实现具体操作
1 2 3 4 5 6 7 8 9 10 11 12 13 ` protected abstract T getForCache (long key) throws Exception; protected abstract void releaseForCache (T obj) ;
引用计数 为了用对多线程环境, 需要记录一个资源是否正在获取中
1 2 3 private HashMap<Long, T> cache; private HashMap<Long, Integer> references; private HashMap<Long, Boolean> getting;
初始化 1 2 3 4 5 6 7 8 9 10 11 private int maxResource; private int count = 0 ; private Lock lock;public AbstractCache (int maxResource) { this .maxResource = maxResource; cache = new HashMap <>(); references = new HashMap <>(); getting = new HashMap <>(); lock = new ReentrantLock (); }
get()获取资源 使用get()方法来获取资源, 如果缓存中存在该资源, 则直接从缓存中返回, 否则则从数据源中获取, 并将其加入到缓存中, 再返回
在获取资源时首先要检查资源是否被占用, 如果被占用了, 则进入循环等待
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 protected T get (long key) throws Exception { while (true ) { lock.lock(); if (getting.containsKey(key)) { lock.unlock(); try { Thread.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); continue ; } continue ; } ... } }
当资源不被占用时, 需要判断资源是否存在在缓存中, 如果存在, 则直接返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 protected T get (long key) throws Exception { while (true ) { ... if (cache.containsKey(key)) { T obj = cache.get(key); references.put(key, references.get(key) + 1 ); lock.unlock(); return obj; } ... } }
如果缓存中不存在, 准备 从数据源获取资源, 将资源占用, getting设置为true, 缓存数+1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected T get (long key) throws Exception { while (true ) { ... if (maxResource > 0 && count == maxResource) { lock.unlock(); throw Error.CacheFullException; } count++; getting.put(key, true ); lock.unlock(); break ; } }
从数据源获取资源并加入到缓存中去
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 protected T get (long key) throws Exception { ... T obj = null ; try { obj = getForCache(key); } catch (Exception e) { lock.lock(); count--; getting.remove(key); lock.unlock(); throw e; } lock.lock(); getting.remove(key); cache.put(key, obj); references.put(key, 1 ); lock.unlock(); return obj; }
release()释放缓存 强行释放一个缓存, 将缓存引用数references减 1, 如果减到 0, 就把相关的缓存删除并写回数据源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 protected void release (long key) { lock.lock(); try { int ref = references.get(key) - 1 ; if (ref == 0 ) { T obj = cache.get(key); releaseForCache(obj); references.remove(key); cache.remove(key); count--; } else { references.put(key, ref); } } finally { lock.unlock(); } }
close() 关闭缓存, 写回所有资源
1 2 3 4 5 6 7 8 9 10 11 12 13 protected void close () { lock.lock(); try { Set<Long> keys = cache.keySet(); for (long key : keys) { release(key); references.remove(key); cache.remove(key); } } finally { lock.unlock(); } }
共享内存数组 SubArray 类, 来(松散地)规定这个数组的可使用范围
1 2 3 4 5 6 7 8 9 10 11 public class SubArray { public byte [] raw; public int start; public int end; public SubArray (byte [] raw, int start, int end) { this .raw = raw; this .start = start; this .end = end; } }
数据页面的缓存与管理 DM模块向下读写文件系统, 并将其抽象为数据页面, 并将页面进行缓存
数据页的默认大小设定为8KB
数据页面结构 定义Page与PageImpl
Page 1 2 3 4 5 6 7 8 9 10 public interface Page { public static final int PAGE_SIZE = 1 << 13 ; void lock () ; void unlock () ; void release () ; void setDirty (boolean dirty) ; boolean isDirty () ; int getPageNumber () ; byte [] getData(); }
PageImpl 1 2 3 4 5 6 7 public class PageImpl implements Page { private int pageNumber; private byte [] data; private boolean dirty; private Lock lock; ... }
pageNumber: 页号, 从第一页开始
data: 实际存储的数据
dirty: 是否是脏页面, 在缓存被驱逐时, 脏页面需要被写回文件系统
页面缓存 接口 1 2 3 4 5 6 7 8 9 10 public interface PageCache { public static final int PAGE_SIZE = 1 << 13 ; int newPage (byte [] initData) ; Page getPage (int pgno) throws Exception; void close () ; void release (Page page) ; void truncateByBgno (int maxPgno) ; int getPageNumber () ; void flushPage (Page pg) ; }
实现类 页面缓存的实现类需要继承缓存框架, 并实现上面的接口
继承缓存框架, 实现getForCache()与releaseForCache()方法
初始化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class PageCacheImpl extends AbstractCache <Page> implements PageCache { public static final String DB_SUFFIX = ".db" ; private static final int MEM_MIN_LIM = 10 ; private RandomAccessFile file; private FileChannel fc; private Lock fileLock; private AtomicInteger pageNumbers; PageCacheImpl(RandomAccessFile file, FileChannel fileChannel, int maxResource) { super (maxResource); if (maxResource < MEM_MIN_LIM) { Panic.panic(Error.MemTooSmallException); } long length = 0 ; try { length = file.length(); } catch (IOException e) { Panic.panic(e); } this .file = file; this .fc = fileChannel; this .fileLock = new ReentrantLock (); this .pageNumbers = new AtomicInteger ((int ) length / PAGE_SIZE); } ... }
getForCache() getForCache直接从文件中读取, 封装为Page即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class PageCacheImpl extends AbstractCache <Page> implements PageCache { ... @Override protected Page getForCache (long key) throws Exception { int pgno = (int ) key; long offset = PageCacheImpl.pageOffset(pgno); ByteBuffer buf = ByteBuffer.allocate(PAGE_SIZE); fileLock.lock(); try { fc.position(offset); fc.read(buf); } catch (IOException e) { Panic.panic(e); } fileLock.unlock(); return new PageImpl (pgno, buf.array(), this ); } private static long pageOffset (int pgno) { return (pgno - 1 ) * PAGE_SIZE; } ... }
releaseForCache releaseForCache()驱逐页面时需要判断是否是脏页面来决定是否直接驱逐
flush()方法为写入文件系统, 如果是脏页面, 则执行flush()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class PageCacheImpl extends AbstractCache <Page> implements PageCache { ... @Override protected void releaseForCache (Page pg) { if (pg.isDirty()) { flush(pg); pg.setDirty(false ); } } private void flush (Page pg) { int pgno = pg.getPageNumber(); long offset = pageOffset(pgno); fileLock.lock(); try { ByteBuffer buf = ByteBuffer.wrap(pg.getData()); fc.position(offset); fc.write(buf); fc.force(false ); } catch (IOException e) { Panic.panic(e); } finally { fileLock.unlock(); } } ... }
其他方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class PageCacheImpl extends AbstractCache <Page> implements PageCache { ... @Override public int newPage (byte [] initData) { int pgno = pageNumbers.incrementAndGet(); Page pg = new PageImpl (pgno, initData, null ); flush(pg); return pgno; } @Override public Page getPage (int pgno) throws Exception { return get((long ) pgno); } @Override public void release (Page page) { release((long ) page.getPageNumber()); } @Override public void flushPage (Page pg) { flush(pg); } @Override public void truncateByBgno (int maxPgno) { long size = pageOffset(maxPgno + 1 ); try { file.setLength(size); } catch (IOException e) { Panic.panic(e); } pageNumbers.set(maxPgno); } @Override public void close () { super .close(); try { fc.close(); file.close(); } catch (IOException e) { Panic.panic(e); } } @Override public int getPageNumber () { return pageNumbers.intValue(); } }
数据页面管理 第一页 数据库页面的第一页不用来存放主要数据, xdb会在第一页记录一个随机数作校验功能
xdb在启动时会在100-107位置写入一个随机数, 关闭时将该数copy到108-115位置, 等到下次启动时比较这两处的数字即可得知数据库是否是正常关闭的
OF: offset
VC: Valid Check
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class PageOne { private static final int OF_VC = 100 ; private static final int LEN_VC = 8 ; public static byte [] InitRaw() { byte [] raw = new byte [Page.PAGE_SIZE]; setVcOpen(raw); return raw; } public static void setVcOpen (Page pg) { pg.setDirty(true ); setVcOpen(pg.getData()); } private static void setVcOpen (byte [] raw) { System.arraycopy(RandomUtil.randomBytes(LEN_VC), 0 , raw, OF_VC, LEN_VC); } public static void setVcClose (Page pg) { pg.setDirty(true ); setVcClose(pg.getData()); } private static void setVcClose (byte [] raw) { System.arraycopy(raw, OF_VC, raw, OF_VC + LEN_VC, LEN_VC); } public static boolean checkVc (Page pg) { return checkVc(pg.getData()); } private static boolean checkVc (byte [] raw) { return Arrays.equals(Arrays.copyOfRange(raw, OF_VC, OF_VC + LEN_VC), Arrays.copyOfRange(raw, OF_VC + LEN_VC, OF_VC + 2 * LEN_VC)); } }
其他页面 一个普通页面大小为8KB, 使用最前面的2B用来记录空闲位置的偏移量, 剩下的存储数据
[FSO][Data] [0-1] [2-8k]
FSO: Free Space Offset
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public class PageX { private static final short OF_FREE = 0 ; private static final short OF_DATA = 2 ; public static final int MAX_FREE_SPACE = Page.PAGE_SIZE - OF_DATA; public static byte [] initRaw() { byte [] raw = new byte [Page.PAGE_SIZE]; setFSO(raw, OF_DATA); return raw; } private static void setFSO (byte [] raw, short ofData) { System.arraycopy(Parser.short2Byte(ofData), 0 , raw, OF_FREE, OF_DATA); } public static short getFSO (Page pg) { return getFSO(pg.getData()); } private static short getFSO (byte [] raw) { return Parser.parseShort(Arrays.copyOfRange(raw, 0 , 2 )); } public static short insert (Page pg, byte [] raw) { pg.setDirty(true ); short offset = getFSO(pg.getData()); System.arraycopy(raw, 0 , pg.getData(), offset, raw.length); setFSO(pg.getData(), (short ) (offset + raw.length)); return offset; } public static int getFreeSpace (Page pg) { return Page.PAGE_SIZE - (int ) getFSO(pg.getData()); } public static void recoverInsert (Page pg, byte [] raw, short offset) { pg.setDirty(true ); System.arraycopy(raw, 0 , pg.getData(), offset, raw.length); short rawFSO = getFSO(pg.getData()); if (rawFSO < offset + raw.length) { setFSO(pg.getData(), (short ) (offset + raw.length)); } } public static void recoverUpdate (Page pg, byte [] raw, short offset) { pg.setDirty(true ); System.arraycopy(raw, 0 , pg.getData(), offset, raw.length); } }
其中的两个方法recoverInsert()和recoverUpdate()用于在数据库崩溃后重新打开时, 恢复例程直接插入数据以及修改数据使用
日志管理 日志的主要作用是当数据库发生错误崩溃时恢复数据
每次插入或更新时都会进行日志的记录, 当恢复数据时只需要读取日志并redo和undo即可
日志结构 日志文件
[XCheckSum][Log1][Log2]…[LogN][BadTail]
XCheckSum用来校验整个文件, 占4个字节大小, 后面为一条条的日志, BadTail为意外没有写入成功的日志, 在恢复的时候首先要把BadTail给去掉
每条日志的结构如下
[Size][CheckSum][Data]
Size和CheckSum各占4个字节, Size标识的时Data的长度, CheckSum是该条日志的校验和
日志校验和的计算(SEED为自己设置的值, 类似于hash的盐)
1 2 3 4 5 6 7 private int calChecksum (int xCheck, byte [] log) { for (byte b : log) { xCheck = xCheck * SEED + b; } return xCheck; }
所有每条日志的校验和的总和即为日志文件文件头XCheckSum
校验 每次打开日志时, 都需要对其进行校验, 并且还要移除BadTail(未写入完成的日志)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class LoggerImpl implements Logger { ... private void checkAndRemoveTail () { rewind(); int xCheck = 0 ; while (true ) { byte [] log = internNext(); if (log == null ) { break ; } xCheck = calChecksum(xCheck, log); } if (xCheck != xChecksum) { Panic.panic(Error.BadLogFileException); } try { truncate(position); } catch (Exception e) { Panic.panic(e); } try { file.seek(position); } catch (IOException e) { Panic.panic(e); } rewind(); } @Override public void truncate (long x) throws Exception { lock.lock(); try { fc.truncate(x); } finally { lock.unlock(); } } ... }
日志写入 写入日志时, 先计算出该条日志的校验和以及size, 并将这些数据封装成日志格式, 再写入到日志文件里, 最后还要把日志文件头的校验和更新
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class LoggerImpl implements Logger { ... @Override public void log (byte [] data) { byte [] log = wrapLog(data); ByteBuffer buf = ByteBuffer.wrap(log); lock.lock(); try { fc.position(fc.size()); fc.write(buf); } catch (IOException e) { Panic.panic(e); } finally { lock.unlock(); } updateXChecksum(log); } private void updateXChecksum (byte [] log) { this .xChecksum = calChecksum(this .xChecksum, log); try { fc.position(0 ); fc.write(ByteBuffer.wrap(Parser.int2Byte(xChecksum))); fc.force(false ); } catch (IOException e) { Panic.panic(e); } } private byte [] wrapLog(byte [] data) { byte [] checksum = Parser.int2Byte(calChecksum(0 , data)); byte [] size = Parser.int2Byte(data.length); return Bytes.concat(size, checksum, data); } ... }
日志读取 日志的读取依靠next方法, 每执行一次next方法就会对读取到一条日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public class LoggerImpl implements Logger { ... @Override public byte [] next() { lock.lock(); try { byte [] log = internNext(); if (log == null ) { return null ; } return Arrays.copyOfRange(log, OF_DATA, log.length); } finally { lock.unlock(); } } private byte [] internNext() { if (position + OF_DATA >= fileSize) { return null ; } ByteBuffer tmp = ByteBuffer.allocate(4 ); try { fc.position(position); fc.read(tmp); } catch (IOException e) { Panic.panic(e); } int size = Parser.parseInt(tmp.array()); if (position + size + OF_DATA > fileSize) { return null ; } ByteBuffer buf = ByteBuffer.allocate(OF_DATA + size); try { fc.position(position); fc.read(buf); } catch (IOException e) { Panic.panic(e); } byte [] log = buf.array(); int checkSum1 = calChecksum(0 , Arrays.copyOfRange(log, OF_DATA, log.length)); int checkSum2 = Parser.parseInt(Arrays.copyOfRange(log, OF_CHECKSUM, OF_DATA)); if (checkSum1 != checkSum2) { return null ; } position += log.length; return log; } ... }
日志的恢复 读取日志, 检查每一条日志的事务状态, 如果已经完成就将该条日志记录的操作重做(redo), 否则进行撤销(undo)
redo 正序扫描日志
如果是插入操作 (Ti, I, A, x), 就将x重新插入A位置
如果是更新操作 (Ti, U, A, oldx, newx), 就将A位置的值设置为newx
undo 倒序扫描日志
如果是插入操作 (Ti, I, A, x), 就A位置的数据删除
如果是更新操作 (Ti, U, A, oldx, newx), 就将A位置的值设置为oldx
日志格式 Insert [LogType] [XID] [Pgno] [Offset] [Raw]
1 8 4 2 字节
Update [LogType] [XID] [UID] [OldRaw] [NewRaw]
1 8 8 字节
多线程 规定未提交事务产生的数据不会被其他事务读取和修改
恢复 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public class Recover { private static final byte LOG_TYPE_INSERT = 0 ; private static final byte LOG_TYPE_UPDATE = 1 ; private static final int REDO = 0 ; private static final int UNDO = 1 ; static class InsertLogInfo { long xid; int pgno; short offset; byte [] raw; } static class UpdateLogInfo { long xid; int pgno; short offset; byte [] oldRaw; byte [] newRaw; } public static void recover (TransactionManager tm, Logger lg, PageCache pc) { System.out.println("Recovering..." ); lg.rewind(); int maxPgno = 0 ; while (true ) { byte [] log = lg.next(); if (log == null ) { break ; } int pgno; if (isInsertLog(log)) { InsertLogInfo li = parseInsertLog(log); pgno = li.pgno; } else { UpdateLogInfo li = parseUpdateLog(log); pgno = li.pgno; } if (pgno > maxPgno) { maxPgno = pgno; } } if (maxPgno == 0 ) { maxPgno = 1 ; } pc.truncateByBgno(maxPgno); System.out.println("Truncate to " + maxPgno + " pages." ); redoTranscations(tm, lg, pc); System.out.println("Redo Transactions Over." ); undoTranscations(tm, lg, pc); System.out.println("Undo Transactions Over." ); System.out.println("Recovery Over." ); } ... }
redoTranscations() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class Recover { ... private static void redoTranscations (TransactionManager tm, Logger lg, PageCache pc) { lg.rewind(); while (true ) { byte [] log = lg.next(); if (log == null ) { break ; } if (isInsertLog(log)) { InsertLogInfo li = parseInsertLog(log); long xid = li.xid; if (!tm.isActive(xid)) { doInsertLog(pc, log, REDO); } } else { UpdateLogInfo xi = parseUpdateLog(log); long xid = xi.xid; if (!tm.isActive(xid)) { doUpdateLog(pc, log, REDO); } } } } ... }
undoTranscations() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public class Recover { ... private static void undoTranscations (TransactionManager tm, Logger lg, PageCache pc) { Map<Long, List<byte []>> logCache = new HashMap <>(); lg.rewind(); while (true ) { byte [] log = lg.next(); if (log == null ) { break ; } if (isInsertLog(log)) { InsertLogInfo li = parseInsertLog(log); long xid = li.xid; if (tm.isActive(xid)) { if (!logCache.containsKey(xid)) { logCache.put(xid, new ArrayList <>()); } logCache.get(xid).add(log); } } else { UpdateLogInfo xi = parseUpdateLog(log); long xid = xi.xid; if (tm.isActive(xid)) { if (!logCache.containsKey(xid)) { logCache.put(xid, new ArrayList <>()); } logCache.get(xid).add(log); } } } for (Map.Entry<Long, List<byte []>> entry : logCache.entrySet()) { List<byte []> logs = entry.getValue(); for (int i = logs.size()-1 ; i >= 0 ; i--) { byte [] log = logs.get(i); if (isInsertLog(log)) { doInsertLog(pc, log, UNDO); } else { doUpdateLog(pc, log, UNDO); } } tm.abort(entry.getKey()); } } ... }
doUpdateLog() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 public class Recover { ... private static final int OF_TYPE = 0 ; private static final int OF_XID = OF_TYPE + 1 ; private static final int OF_UPDATE_UID = OF_XID + 8 ; private static final int OF_UPDATE_RAW = OF_UPDATE_UID + 8 ; public static byte [] updateLog(long xid, DataItem di) { byte [] logType = {LOG_TYPE_UPDATE}; byte [] xidRaw = Parser.long2Byte(xid); byte [] uidRaw = Parser.long2Byte(di.getUid()); byte [] oldRaw = di.getOldRaw(); SubArray raw = di.getRaw(); byte [] newRaw = Arrays.copyOfRange(raw.raw, raw.start, raw.end); return Bytes.concat(logType, xidRaw, uidRaw, oldRaw, newRaw); } private static UpdateLogInfo parseUpdateLog (byte [] log) { UpdateLogInfo li = new UpdateLogInfo (); li.xid = Parser.parseLong(Arrays.copyOfRange(log, OF_XID, OF_UPDATE_UID)); long uid = Parser.parseLong(Arrays.copyOfRange(log, OF_UPDATE_UID, OF_UPDATE_RAW)); li.offset = (short )(uid & ((1L << 16 ) - 1 )); uid >>>= 32 ; li.pgno = (int )(uid & ((1L << 32 ) - 1 )); int length = (log.length - OF_UPDATE_RAW) / 2 ; li.oldRaw = Arrays.copyOfRange(log, OF_UPDATE_RAW, OF_UPDATE_RAW+length); li.newRaw = Arrays.copyOfRange(log, OF_UPDATE_RAW+length, OF_UPDATE_RAW+length*2 ); return li; } private static void doUpdateLog (PageCache pc, byte [] log, int flag) { int pgno; short offset; byte [] raw; if (flag == REDO) { UpdateLogInfo xi = parseUpdateLog(log); pgno = xi.pgno; offset = xi.offset; raw = xi.newRaw; } else { UpdateLogInfo xi = parseUpdateLog(log); pgno = xi.pgno; offset = xi.offset; raw = xi.oldRaw; } Page pg = null ; try { pg = pc.getPage(pgno); } catch (Exception e) { Panic.panic(e); } try { PageX.recoverUpdate(pg, raw, offset); } finally { pg.release(); } } ... }
doInsertLog() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class Recover { ... private static final int OF_INSERT_PGNO = OF_XID+8 ; private static final int OF_INSERT_OFFSET = OF_INSERT_PGNO+4 ; private static final int OF_INSERT_RAW = OF_INSERT_OFFSET+2 ; public static byte [] insertLog(long xid, Page pg, byte [] raw) { byte [] logTypeRaw = {LOG_TYPE_INSERT}; byte [] xidRaw = Parser.long2Byte(xid); byte [] pgnoRaw = Parser.int2Byte(pg.getPageNumber()); byte [] offsetRaw = Parser.short2Byte(PageX.getFSO(pg)); return Bytes.concat(logTypeRaw, xidRaw, pgnoRaw, offsetRaw, raw); } private static InsertLogInfo parseInsertLog (byte [] log) { InsertLogInfo li = new InsertLogInfo (); li.xid = Parser.parseLong(Arrays.copyOfRange(log, OF_XID, OF_INSERT_PGNO)); li.pgno = Parser.parseInt(Arrays.copyOfRange(log, OF_INSERT_PGNO, OF_INSERT_OFFSET)); li.offset = Parser.parseShort(Arrays.copyOfRange(log, OF_INSERT_OFFSET, OF_INSERT_RAW)); li.raw = Arrays.copyOfRange(log, OF_INSERT_RAW, log.length); return li; } private static void doInsertLog (PageCache pc, byte [] log, int flag) { InsertLogInfo li = parseInsertLog(log); Page pg = null ; try { pg = pc.getPage(li.pgno); } catch (Exception e) { Panic.panic(e); } try { if (flag == UNDO) { DataItem.setDataItemRawInvalid(li.raw); } PageX.recoverInsert(pg, li.raw, li.offset); } finally { pg.release(); } } ... }
页面索引 页面索引的作用是缓存每一页的可用空间, 在插入数据的时候能够根据页面索引快速查找出适合插入的空闲空间
索引结构 页面索引的实现是通过一个list数组实现, 每个list里存放的时PageInfo, 包含页号和空闲空间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class PageIndex { private static final int INTERVALS_NO = 40 ; private static final int THRESHOLD = PageCache.PAGE_SIZE / INTERVALS_NO; private Lock lock; private List<PageInfo>[] lists; public PageIndex () { lock = new ReentrantLock (); lists = new List [INTERVALS_NO + 1 ]; for (int i = 0 ; i < INTERVALS_NO + 1 ; i++) { lists[i] = new ArrayList <>(); } } }
PageInfo 1 2 3 4 5 6 7 8 9 public class PageInfo { public int pgno; public int freeSpace; public PageInfo (int pgno, int freeSpace) { this .pgno = pgno; this .freeSpace = freeSpace; } }
添加索引 1 2 3 4 5 6 7 8 9 10 11 12 13 public class PageIndex { ... public void add (int pgno, int freeSpace) { lock.lock(); try { int number = freeSpace / THRESHOLD; lists[number].add(new PageInfo (pgno, freeSpace)); } finally { lock.unlock(); } } ... }
通过索引选择页面
注意选到页面后要将该页面从当前索引中移除, 同一个页面不允许并发写, 当上层模块使用完这个页面之后需要将这个页面重新插入索引
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class PageIndex { ... public PageInfo select (int spaceSize) { lock.lock(); try { int number = spaceSize / THRESHOLD; if (number < INTERVALS_NO) { number++; } while (number<= INTERVALS_NO) { if (lists[number].size() == 0 ) { number++; continue ; } return lists[number].remove(0 ); } return null ; } finally { lock.unlock(); } } ... }
DataItem DataItem就是DM对数据的抽象, 以供上层模块使用
DataItem结构: [ValidFlag(1B)][DataSize(2B)][Data]
ValidFlag标记了数据是否有效, 类似于逻辑删除, 将其值设为0即代表着删除该条数据
1 2 3 4 5 6 7 8 9 10 11 12 13 public class DataItemImpl implements DataItem { static final int OF_VALID = 0 ; static final int OF_SIZE = 1 ; static final int OF_DATA = 3 ; private SubArray raw; private byte [] oldRaw; private Lock rLock; private Lock wLock; private DataManagerImpl dm; private long uid; private Page pg; }
上层模块在获取到DataItem后, 可以通过data()方法, 该方法返回的数组是数据共享的, 而不是拷贝实现的, 所以使用了SubArray
1 2 3 4 5 6 7 8 public class DataItemImpl implements DataItem { ... @Override public SubArray data () { return new SubArray (raw.raw, raw.start+OF_DATA, raw.end); } ... }
在上层模块对DataItem进行修改时, 要先执行before()方法, 想要撤销修改就执行unBefore()方法, 修改完成之后调用after()方法, after方法中会操作日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class DataItemImpl implements DataItem { ... @Override public void before () { wLock.lock(); pg.setDirty(true ); System.arraycopy(raw.raw, raw.start, oldRaw, 0 , oldRaw.length); } @Override public void unBefore () { System.arraycopy(oldRaw, 0 , raw.raw, raw.start, oldRaw.length); wLock.unlock(); } @Override public void after (long xid) { dm.logDataItem(xid, this ); wLock.unlock(); } ... }
DM实现 DataManager是对上层模块提供方法的类, 同时也集成了DataItem的缓存
DataItem的缓存 DataItem中存储着一个uid, uid是由页号和页内偏移组成的8字节整数, 页号和偏移各占4字节
通过解析uid的前32位得到页号, 后32位得到页内偏移
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class DataManagerImpl extends AbstractCache <DataItem> implements DataManager { ... @Override protected DataItem getForCache (long uid) throws Exception { short offset = (short )(uid & ((1L << 16 ) - 1 )); uid >>>= 32 ; int pgno = (int )(uid & ((1L << 32 ) - 1 )); Page pg = pc.getPage(pgno); return DataItem.parseDataItem(pg, offset, this ); } ... }
缓存释放
调用page的release即可写回数据源并释放缓存
1 2 3 4 5 6 7 8 public class DataManagerImpl extends AbstractCache <DataItem> implements DataManager { ... @Override protected void releaseForCache (DataItem di) { di.page().release(); } ... }
创建与打开 创建 从空文件创建首先需要对第一页进行初始化
1 2 3 4 5 6 7 8 9 10 11 12 public interface DataManager { ... public static DataManager create (String path, long mem, TransactionManager tm) { PageCache pc = PageCache.create(path, mem); Logger lg = Logger.create(path); DataManagerImpl dm = new DataManagerImpl (pc, lg, tm); dm.initPageOne(); return dm; } ... }
初始化第一页
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class DataManagerImpl extends AbstractCache <DataItem> implements DataManager { ... void initPageOne () { int pgno = pc.newPage(PageOne.InitRaw()); assert pgno = = 1 ; try { pageOne = pc.getPage(pgno); } catch (Exception e) { Panic.panic(e); } pc.flushPage(pageOne); } ... }
打开 从已有文件创建, 则是需要对第一页进行校验, 来判断是否需要执行恢复流程, 并重新对第一页生成随机字节, 还要生成页面索引
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public interface DataManager { ... public static DataManager open (String path, long mem, TransactionManager tm) { PageCache pc = PageCache.open(path, mem); Logger lg = Logger.open(path); DataManagerImpl dm = new DataManagerImpl (pc, lg, tm); if (!dm.loadCheckPageOne()) { Recover.recover(tm, lg, pc); } dm.fillPageIndex(); PageOne.setVcOpen(dm.pageOne); dm.pc.flushPage(dm.pageOne); return dm; } ... }
校验第一页
1 2 3 4 5 6 7 8 9 10 11 12 13 public class DataManagerImpl extends AbstractCache <DataItem> implements DataManager { ... boolean loadCheckPageOne () { try { pageOne = pc.getPage(1 ); } catch (Exception e) { Panic.panic(e); } return PageOne.checkVc(pageOne); } ... }
生成页面索引
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class DataManagerImpl extends AbstractCache <DataItem> implements DataManager { ... void fillPageIndex () { int pageNumber = pc.getPageNumber(); for (int i = 2 ; i <= pageNumber; i ++) { Page pg = null ; try { pg = pc.getPage(i); } catch (Exception e) { Panic.panic(e); } pIndex.add(pg.getPageNumber(), PageX.getFreeSpace(pg)); pg.release(); } } ... }
对外接口 DM 层提供了三个功能供上层使用, 分别是读, 插入和修改, 修改是通过读出的DataItem实现的, 于是DataManager只需要提供read()和insert()方法
read()通过uid从缓存中获取DataItem
1 2 3 4 5 6 7 8 9 10 11 12 13 public class DataManagerImpl extends AbstractCache <DataItem> implements DataManager { ... @Override public DataItem read (long uid) throws Exception { DataItemImpl di = (DataItemImpl)super .get(uid); if (!di.isValid()) { di.release(); return null ; } return di; } ... }
insert()先在页面索引中获取到可以插入的页号, 首先插入日志, 在通过PageX来插入数据, 最后把页面信息重新添加到页面索引
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class DataManagerImpl extends AbstractCache <DataItem> implements DataManager { ... @Override public long insert (long xid, byte [] data) throws Exception { byte [] raw = DataItem.wrapDataItemRaw(data); if (raw.length > PageX.MAX_FREE_SPACE) { throw Error.DataTooLargeException; } PageInfo pi = null ; for (int i = 0 ; i < 5 ; i ++) { pi = pIndex.select(raw.length); if (pi != null ) { break ; } else { int newPgno = pc.newPage(PageX.initRaw()); pIndex.add(newPgno, PageX.MAX_FREE_SPACE); } } if (pi == null ) { throw Error.DatabaseBusyException; } Page pg = null ; int freeSpace = 0 ; try { pg = pc.getPage(pi.pgno); byte [] log = Recover.insertLog(xid, pg, raw); logger.log(log); short offset = PageX.insert(pg, raw); pg.release(); return Types.addressToUid(pi.pgno, offset); } finally { if (pg != null ) { pIndex.add(pi.pgno, PageX.getFreeSpace(pg)); } else { pIndex.add(pi.pgno, freeSpace); } } } ... }
DataManager正常关闭时, 需要执行缓存和日志的关闭流程, 关闭时要鸡蛋设置第一页的字节校验即拷贝随机数
1 2 3 4 5 6 7 8 9 10 11 12 13 public class DataManagerImpl extends AbstractCache <DataItem> implements DataManager { ... @Override public void close () { super .close(); logger.close(); PageOne.setVcClose(pageOne); pageOne.release(); pc.close(); } ... }
总结 DM层各个子类的功能
AbstractCache: 引用计数法的缓存框架, 留了两个从数据源获取数据和释放缓存的抽象方法给具体实现类去实现
PageImpl: 数据页的数据结构, 包含页号、是否脏数据页、数据内容、所在的PageCache缓存
PageOne: 校验页面, 用于启动DM的时候进行文件校验
PageX: 每个数据页的管理器。initRaw()新建一个数据页并设置FSO值, FSO后面存的其实就是一个个DataItem数据包
PageCacheImpl: 数据页的缓存具体实现类, 除了重写获取 和释放两个方法外, 还完成了所有数据页的统一管理:
获取数据库中的数据页总数; getPageNumber()
新建一个数据页并写入数据库文件; newPage(byte[] initData)
从缓存中获取指定的数据页; getPage(int pgno)
删除指定位置后面的数据页; truncateByBgno(int maxPgno)
PageIndex: 方便DataItem的快速定位插入, 其实现原理可以理解为HashMap那种数组+链表结构(实际实现是 List+ArrayList), 先是一个大小为41的数组 存的是区间号(区间号从1>开始), 然后每个区间号数组后面跟一个数组存满足空闲大小的所有数据页信息(PageInfo)
Recover: 日志恢复策略, 主要维护两个日志: updateLog和insertLog, 重做所有已完成事务 redo, 撤销所有未完成事务undo
DataManager: 统揽全局的类, 主要方法也就是读写和修改, 全部通过DataItem进行。
DM层执行流程 DataManager的所有功能(主要功能就是CRUD), 进行数据的读写修改都是靠DataItem进行操作的 , 所以PageX管理页面的时候FSO后面的DATA其实就是一个个的DataItem包
首先从DataManager进去创建DM(打开DM就不谈了, 只是多了个检验PageOne 和更新PageIndex), 需要执行的操作是:
新建PageCache, DM里面有页面缓存和DataItem缓存两个实现; DataItem缓存也是在PageCache中获取的, DataItem缓存不存在的时候就去PageCache缓存获取, PageCache缓存没有才去数据库文件中获取
新建日志
构建DM管理器
初始化校验页面 initPageOne() 和 启动时候进行校验: loadCheckPageOne()
读取数据 read(long uid): 从DataItem缓存中读取一个DataItem数据包并进行校验, 如果DataItem缓存中没有就会调用 DataManager下的getForCache(long uid)从PageCache缓存中读取DataItem数据包并加入DataItem缓存(其实PageCache缓存和DataItem缓存都是共用的一个cache Map存的, 只是key不一样, page的key是页号, DataItem的key是uid, 页号+偏移量), 如果PgeCache也没有就去数据库文件读取。
插入数据 insert(long xid, byte[] data): 先把数据打包成DataItem格式, 然后在 pageIndex 中获取一个足以存储插入内容的页面的页号; 获取页面后, 需要先写入插入日志Recover.insertLog(xid, pg, raw), 接着才可以通过 pageX 在目标数据页插入数据PageX.insert(pg, raw), 并返回插入位置的偏移。如果在pageIndex中没有空闲空间足够插入数据了, 就需要新建一个数据页pc.newPage(PageX.initRaw()), 最后需要将页面信息重新插入 pageIndex
修改数据就是先读取数据, 然后修改DataItem内容, 再插入DataItem数据。但是在修改数据操作的前后需要调用DataItemImp.after()进行解写锁并记录更新日志, 这里需要依赖DataManager里面的logDataItem(long xid, DataItem di)方法
释放缓存: 释放DataItem的缓存, 实质上就是释放DataItem所在页的PageCache缓存
Version Manager VM基于两段锁协议实现了调度序列的可串行化, 并实现了 MVCC 以消除读写阻塞, 同时实现了两种隔离级别
记录和版本 使用Entry来保存一条条的记录, 每个Entry中保存着一个DataItem
结构 [XMIN] [XMAX] [data]
XMIN表示创建该条记录的事务编号, XMAX是删除或者更新该条记录的事务编号, data存储数据
1 2 3 4 5 6 7 8 9 public class Entry { private static final int OF_XMIN = 0 ; private static final int OF_XMAX = OF_XMIN + 8 ; private static final int OF_DATA = OF_XMAX + 8 ; private long uid; private DataItem dataItem; private VersionManager vm; }
封装 把事务号和数据包装成Entry
1 2 3 4 5 6 7 public class Entry { public static byte [] wrapEntryRaw(long xid, byte [] data) { byte [] xmin = Parser.long2Byte(xid); byte [] xmax = new byte [8 ]; return Bytes.concat(xmin, xmax, data); } }
获取数据 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class Entry { public byte [] data() { dataItem.rLock(); try { SubArray sa = dataItem.data(); byte [] data = new byte [sa.end - sa.start - OF_DATA]; System.arraycopy(sa.raw, sa.start + OF_DATA, data, 0 , data.length); return data; } finally { dataItem.rUnLock(); } } }
修改 如果要修改的话, 需要对DataItem执行before()和after()方法
1 2 3 4 5 6 7 8 9 10 11 public class Entry { public void setXmax (long xid) { dataItem.before(); try { SubArray sa = dataItem.data(); System.arraycopy(Parser.long2Byte(xid), 0 , sa.raw, sa.start+OF_XMAX, 8 ); } finally { dataItem.after(xid); } } }
事务的隔离级别 xdb支持两种事务隔离: 读提交, 可重复读
版本可见性: 如果一个记录的最新版本被加锁, 当另一个事务想要修改或读取这条记录时, MYDB 就会返回一个较旧的版本的数据, 这时就可以认为, 最新的被加锁的版本, 对于另一个事务来说,是不可见的
读提交 “读提交”就是保证事务在读取数据时只能读取已经提交事务产生的数据
为每个版本, 维护两个特殊的变量, XMIN, XMAX, 他们含义如下:
XMIN: 创建该版本的事务XID;
XMAX: 删除该版本的事务XID;
XMIN自然是在版本被建立的时候填写. XMAX是在该版本被删除, 或是有新版本出现时, 被填写.
1 2 3 4 5 6 7 8 (XMIN == Ti and // created by Ti itself and XMAX == NULL // not deleted now ) or // or (XMIN is commited and // created by a commited transaction and (XMAX == NULL or // not deleted now or (XMAX != Ti and XMAX is not commited) // deleted by a uncommited transaction ))
如果上述逻辑为true, 则该版本对Ti可见
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class Visibility { private static boolean readCommitted (TransactionManager tm, Transaction t, Entry e) { long xid = t.xid; long xmin = e.getXmin(); long xmax = e.getXmax(); if (xmin == xid && xmax == 0 ) { return true ; } if (tm.isCommitted(xmin)) { if (xmax == 0 ) { return true ; } if (xmax != xid) { if (!tm.isCommitted(xmax)) { return true ; } } } return false ; } }
可重复读 “读提交”可能会使一个事务在执行期间对同一个数据项的读取得到不同结果, 例如
1 2 3 4 5 6 7 (假如X一开始为0) T1 begin R1(X) // T1读得0 T2 begin U2(X) // 将X修改为1 T2 commit R1(X) // T1读的1
在上面的两次R1(X)中, 读出的X是不一样的. 有些事务是不希望出现这种情况的, 为此, 我们提供一个更加严格的隔离度”可重复读”, “可重复读”在”读提交”的基础上, 还保证事务执行期间, 多次对同一记录的读取, 将会是一致
规定: 事务只能读取它开始时, 就已经结束的那些事务产生的数据版本, 也就是忽略下面两种情况的数据:
比Ti后开始的事务的数据;
Ti开始时还为active状态的事务的数据;
对于第一条, 我们直接通过比较版本的XID, 即可完成, 既事务只能读取XID比他小的事务产生的数据版本
对于第二条, 引入”快照”(snapshot)技术, 在Ti开始时, 我们记录下当前所有活跃的事务, 并存在SP(Ti)中. 那么, 如果某个版本的XMIN在SP(Ti)中的话, 那么该版本应该对Ti不可见
1 2 3 4 5 6 7 8 9 10 11 12 13 (XMIN == Ti and // 由Ti创建且 (XMAX == NULL or // 尚未被删除 )) or // 或 (XMIN is commited and // 由一个已提交的事务创建且 XMIN < XID and // 这个事务小于Ti且 XMIN is not in SP(Ti) and // 这个事务在Ti开始前提交且 (XMAX == NULL or // 尚未被删除或 (XMAX != Ti and // 由其他事务删除但是 (XMAX is not commited or // 这个事务尚未提交或 XMAX > Ti or // 这个事务在Ti开始之后才开始或 XMAX is in SP(Ti) // 这个事务在Ti开始前还未提交 ))))
需要提供一个结构以保存快照数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class Transaction { public long xid; public int level; public Map<Long, Boolean> snapshot; public Exception err; public boolean autoAborted; public static Transaction newTransaction (long xid, int level, Map<Long, Transaction> active) { Transaction t = new Transaction (); t.xid = xid; t.level = level; if (level != 0 ) { t.snapshot = new HashMap <>(); for (Long x : active.keySet()) { t.snapshot.put(x, true ); } } return t; } public boolean isInSnapshot (long xid) { if (xid == TransactionManagerImpl.SUPER_XID) { return false ; } return snapshot.containsKey(xid); } }
一个版本是否对事务可见的逻辑判断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class Visibility { private static boolean repeatableRead (TransactionManager tm, Transaction t, Entry e) { long xid = t.xid; long xmin = e.getXmin(); long xmax = e.getXmax(); if (xmin == xid && xmax == 0 ) { return true ; } if (tm.isCommitted(xmin) && xmin < xid && !t.isInSnapshot(xmin)) { if (xmax == 0 ) { return true ; } if (xmax != xid) { if (!tm.isCommitted(xmax) || xmax > xid || t.isInSnapshot(xmax)) { return true ; } } } return false ; } }
版本跳跃问题 “多版本”使得VM的”删除”和”撤销”操作变得意外的简单, 只需要将这个事务标记为aborted即可
但是”多版本”却会导致”版本跳跃”, 为了解决它, VM会限制出现版本跳跃的事务回滚
考虑如下: 假设两个”可重复读”的事务, 有如下的执行序列:
1 2 3 4 5 6 7 8 9 (假设X一开始只有x0版本) T1 begin T2 begin R1(X) // T1读取x0 R2(X) // T2读取x0 U1(X) // T1将X更新到x1 T1 commit U2(X) // T2将X更新到x2 T2 commit
上述操作本身不会有任何问题, 但是逻辑上却有不妥. T1是将X从x0->x1, 这没问题. 但T2实际上是想将X从x0->x2, 跳过了x1这个版本, 因为T2读取时, X还只有x0. 这是有问题的
于是, 我们对两个隔离度的事务, 分别做出如下的规定:
“读提交”的事务允许出现版本跳跃;
“可重复读”的事务不允许出现版本跳跃;
解决”可重复读”隔离度下的版本跳跃问题的思路: 如果Ti想要修改X, 但X已经被某个Ti不可见的事务Tj修改过了, 那么要求Ti回滚
即: 如果Ti想要修改X, 但X的最新版本是Tj创建的, 且XID(Tj) > XID(Ti) 或者 Tj在SP(Ti)中, 则令Ti回滚, 以防止版本跳跃
实现的时候也非常简单, 直接取出X的最新版本, 如果该版本对Ti不可见, 那么Ti要求修改X则一定会发生版本跳跃, 于是要求Ti回滚
1 2 3 4 5 6 7 8 9 10 11 public class Visibility { public static boolean isVersionSkip (TransactionManager tm, Transaction t, Entry e) { long xmax = e.getXmax(); if (t.level == 0 ) { return false ; } else { return tm.isCommitted(xmax) && (xmax > t.xid || t.isInSnapshot(xmax)); } } }
死锁问题 VM利用2PL来实现”可串行化”调度, 2PL处理除了会阻塞事务外, 更严重的, 会造成死锁问题
如果Ti锁定了X, 现在Tj准备更新X, 那么Tj会被阻塞, 等待Ti释放X的锁. 这种等待关系可以用有向图来表示, 比如上述关系就可以表示为”Tj -> Ti”. 于是, 将所有这样的关系, 转化为图后, 死锁判断就简单了. 如果图中有环, 则有死锁; 否则无死锁
VM使用一个LockTable对象, 在内存中维护这张图, 每当出现等待时, 则向图中加入一条边. 每向图中加入一条边, 便进行一次死锁检测. 如果加入某条边后检测到了死锁, 则撤销加入这条边的事务
1 2 3 4 5 6 7 8 9 10 public class LockTable { private Map<Long, List<Long>> x2u; private Map<Long, Long> u2x; private Map<Long, List<Long>> wait; private Map<Long, Lock> waitLock; private Map<Long, Long> waitU; private Lock lock; ... }
当出现等待的情况时, 就向图中添加一条边
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class LockTable { public Lock add (long xid, long uid) throws Exception { lock.lock(); try { if (isInList(x2u, xid, uid)) { return null ; } if (!u2x.containsKey(uid)) { u2x.put(uid, xid); putIntoList(x2u, xid, uid); return null ; } waitU.put(xid, uid); putIntoList(wait, xid, uid); if (hasDeadLock()) { waitU.remove(xid); removeFromList(wait, uid, xid); throw Error.DeadlockException; } Lock l = new ReentrantLock (); l.lock(); waitLock.put(xid, l); return l; } finally { lock.unlock(); } } }
调用 add, 如果需要等待的话, 会返回一个上了锁的Lock对象, 调用方在获取到该对象时, 需要尝试获取该对象的锁, 由此实现阻塞线程的目的, 例如:
1 2 3 4 5 6 7 Lock l = lt.add(xid, uid);if (l != null ) { l.lock(); l.unlock(); }
检测图中是否有环: dfs
查找图中是否有环的算法也非常简单, 就是一个深搜, 只是需要注意这个图不一定是连通图, 思路就是为每个节点设置一个访问戳, 都初始化为 -1, 随后遍历所有节点, 以每个非 -1 的节点作为根进行深搜, 并将深搜该连通图中遇到的所有节点都设置为同一个数字, 不同的连通图数字不同, 这样, 如果在遍历某个图时, 遇到了之前遍历过的节点, 说明出现了环
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class LockTable { private boolean hasDeadLock () { xidStamp = new HashMap <>(); stamp = 1 ; for (long xid : x2u.keySet()) { Integer s = xidStamp.get(xid); if (s != null && s > 0 ) { continue ; } stamp++; if (dfs(xid)) { return true ; } } return false ; } private boolean dfs (long xid) { Integer stp = xidStamp.get(xid); if (stp != null && stp == stamp) { return true ; } if (stp != null && stp < stamp) { return false ; } xidStamp.put(xid, stamp); Long uid = waitU.get(xid); if (uid == null ) { return false ; } Long x = u2x.get(uid); assert x != null ; return dfs(x); } }
在一个事务commit或者abort时, 就可以释放所有它持有的锁, 并将自身从等待图中删除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class LockTable { public void remove (long xid) { lock.lock(); try { List<Long> l = x2u.get(xid); if (l != null ) { while (l.size() > 0 ) { Long uid = l.remove(0 ); selectNewXID(uid); } } waitU.remove(xid); x2u.remove(xid); waitLock.remove(xid); } finally { lock.unlock(); } } }
VM实现 VM接口 1 2 3 4 5 6 7 8 9 public interface VersionManager { byte [] read(long xid, long uid) throws Exception; long insert (long xid, byte [] data) throws Exception; boolean delete (long xid, long uid) throws Exception; long begin (int level) ; void commit (long xid) throws Exception; void abort (long xid) ; }
实现 缓存 VM实现类还实现了Entry的缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class VersionManagerImpl extends AbstractCache <Entry> implements VersionManager { @Override protected Entry getForCache (long uid) throws Exception { Entry entry = Entry.loadEntry(this , uid); if (entry == null ) { throw Error.NullEntryException; } return entry; } @Override protected void releaseForCache (Entry entry) { entry.remove(); } }
begin() 开启一个事务, 并初始化事务的结构, 将其存放在activeTransaction中, 用于检查和快照使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class VersionManagerImpl extends AbstractCache <Entry> implements VersionManager { @Override public long begin (int level) { lock.lock(); try { long xid = tm.begin(); Transaction t = Transaction.newTransaction(xid, level, activeTransaction); activeTransaction.put(xid, t); return xid; } finally { lock.unlock(); } } }
commit() 提交一个事务, 主要就是free掉相关的结构, 并且释放持有的锁, 并修改 TM 状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class VersionManagerImpl extends AbstractCache <Entry> implements VersionManager { @Override public void commit (long xid) throws Exception { lock.lock(); Transaction t = activeTransaction.get(xid); lock.unlock(); try { if (t.err != null ) { throw t.err; } } catch (NullPointerException n) { System.out.println(xid); System.out.println(activeTransaction.keySet()); Panic.panic(n); } lock.lock(); activeTransaction.remove(xid); lock.unlock(); lt.remove(xid); tm.commit(xid); } }
abort abort事务的方法则有两种, 手动和自动, 手动指的是调用abort()方法, 而自动, 则是在事务被检测出出现死锁时, 会自动撤销回滚事务, 或者出现版本跳跃时, 也会自动回滚
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class VersionManagerImpl extends AbstractCache <Entry> implements VersionManager { @Override public void abort (long xid) { internAbort(xid, false ); } private void internAbort (long xid, boolean autoAborted) { lock.lock(); Transaction t = activeTransaction.get(xid); if (!autoAborted) { activeTransaction.remove(xid); } lock.unlock(); if (t.autoAborted) { return ; } lt.remove(xid); tm.abort(xid); } }
read() 读取一个Entry, 注意判断可见性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class VersionManagerImpl extends AbstractCache <Entry> implements VersionManager { @Override public byte [] read(long xid, long uid) throws Exception { lock.lock(); Transaction t = activeTransaction.get(xid); lock.unlock(); if (t.err != null ) { throw t.err; } Entry entry = null ; try { entry = super .get(uid); } catch (Exception e) { if (e == Error.NullEntryException) { return null ; } else { throw e; } } try { if (Visibility.isVisible(tm, t, entry)) { return entry.data(); } else { return null ; } } finally { entry.release(); } } }
insert() 将数据包装为Entry, 交给DM插入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class VersionManagerImpl extends AbstractCache <Entry> implements VersionManager { @Override public long insert (long xid, byte [] data) throws Exception { lock.lock(); Transaction t = activeTransaction.get(xid); lock.unlock(); if (t.err != null ) { throw t.err; } byte [] raw = Entry.wrapEntryRaw(xid, data); return dm.insert(xid, raw); } }
delete() 删除的操作只是设置 XMAX即可
但是有三个需要判断
可见性判断
获取资源的锁
版本跳跃判断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public class VersionManagerImpl extends AbstractCache <Entry> implements VersionManager { @Override public boolean delete (long xid, long uid) throws Exception { lock.lock(); Transaction t = activeTransaction.get(xid); lock.unlock(); if (t.err != null ) { throw t.err; } Entry entry = null ; try { entry = super .get(uid); } catch (Exception e) { if (e == Error.NullEntryException) { return false ; } else { throw e; } } try { if (!Visibility.isVisible(tm, t, entry)) { return false ; } Lock l = null ; try { l = lt.add(xid, uid); } catch (Exception e) { t.err = Error.ConcurrentUpdateException; internAbort(xid, true ); t.autoAborted = true ; throw t.err; } if (l != null ) { l.lock(); l.unlock(); } if (entry.getXmax() == xid) { return false ; } if (Visibility.isVersionSkip(tm, t, entry)) { t.err = Error.ConcurrentUpdateException; internAbort(xid, true ); t.autoAborted = true ; throw t.err; } entry.setXmax(xid); return true ; } finally { entry.release(); } } }
总结
为了实现”可串行化”调度, VM使用了2PL;
为了减少2PL的阻塞率, VM实现了MVCC;
为了实现MVCC, VM抽象出了”记录”和”版本”;
为了对应”版本”和事务, VM使用了可见性判断;
为了实现更严格”可重复读”隔离度的可见性, VM引入了”快照”技术;
“多版本”使得VM的”删除”和”撤销”操作变得意外的简单;
但是”多版本”却会导致”版本跳跃”, 为了解决它, VM会限制出现版本跳跃的事务回滚;
VM还解决了2PL带来的”死锁”问题.
Index Manager IM基于DM, 维护了索引的结构, xdb实现了基于B+树索引结构
IM 对上层模块主要提供两种能力:插入索引和搜索节点
结点结构 二叉树由一个个 Node 组成,每个 Node 都存储在一条 DataItem 中。结构如下:
1 2 [LeafFlag][KeyNumber][SiblingUid] [Son0][Key0][Son1][Key1]...[SonN][KeyN]
其中 LeafFlag 标记了该节点是否是个叶子节点, KeyNumber 为该节点中 key 的个数, SiblingUid 是其兄弟节点存储在 DM 中的 UID, 后续是穿插的子节点(SonN)和KeyN, 最后的一个 KeyN 始终为 MAX_VALUE,以此方便查找
Node 类持有了其 B+ 树结构的引用, DataItem 的引用和 SubArray 的引用, 用于方便快速修改数据和释放数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class Node { static final int IS_LEAF_OFFSET = 0 ; static final int NO_KEYS_OFFSET = IS_LEAF_OFFSET + 1 ; static final int SIBLING_OFFSET = NO_KEYS_OFFSET + 2 ; static final int NODE_HEADER_SIZE = SIBLING_OFFSET + 8 ; static final int BALANCE_NUMBER = 32 ; static final int NODE_SIZE = NODE_HEADER_SIZE + (2 *8 )*(BALANCE_NUMBER*2 +2 ); BPlusTree tree; DataItem dataItem; SubArray raw; long uid; ... }
根节点生成 根节点的初始两个子节点为 left 和 right, 初始键值为 key
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class Node { ... static byte [] newRootRaw(long left, long right, long key) { SubArray raw = new SubArray (new byte [NODE_SIZE], 0 , NODE_SIZE); setRawIsLeaf(raw, false ); setRawNoKeys(raw, 2 ); setRawSibling(raw, 0 ); setRawKthSon(raw, left, 0 ); setRawKthKey(raw, key, 0 ); setRawKthSon(raw, right, 1 ); setRawKthKey(raw, Long.MAX_VALUE, 1 ); return raw.raw; } ... }
空根节点生成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class Node { ... static byte [] newNilRootRaw() { SubArray raw = new SubArray (new byte [NODE_SIZE], 0 , NODE_SIZE); setRawIsLeaf(raw, true ); setRawNoKeys(raw, 0 ); setRawSibling(raw, 0 ); return raw.raw; } ... }
插入操作 searchNext 寻找对应 key 的 UID, 如果找不到, 则返回兄弟节点的 UID
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class Node { ... class SearchNextRes { long uid; long siblingUid; } public SearchNextRes searchNext (long key) { dataItem.rLock(); try { SearchNextRes res = new SearchNextRes (); int noKeys = getRawNoKeys(raw); for (int i = 0 ; i < noKeys; i ++) { long ik = getRawKthKey(raw, i); if (key < ik) { res.uid = getRawKthSon(raw, i); res.siblingUid = 0 ; return res; } } res.uid = 0 ; res.siblingUid = getRawSibling(raw); return res; } finally { dataItem.rUnLock(); } } ... }
搜索操作 leafSearchRange 方法在当前节点进行范围查找,范围是 [leftKey, rightKey],这里约定如果 rightKey 大于等于该节点的最大的 key, 则还同时返回兄弟节点的 UID,方便继续搜索下一个节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class Node { ... class LeafSearchRangeRes { List<Long> uids; long siblingUid; } public LeafSearchRangeRes leafSearchRange (long leftKey, long rightKey) { dataItem.rLock(); try { int noKeys = getRawNoKeys(raw); int kth = 0 ; while (kth < noKeys) { long ik = getRawKthKey(raw, kth); if (ik >= leftKey) { break ; } kth ++; } List<Long> uids = new ArrayList <>(); while (kth < noKeys) { long ik = getRawKthKey(raw, kth); if (ik <= rightKey) { uids.add(getRawKthSon(raw, kth)); kth ++; } else { break ; } } long siblingUid = 0 ; if (kth == noKeys) { siblingUid = getRawSibling(raw); } LeafSearchRangeRes res = new LeafSearchRangeRes (); res.uids = uids; res.siblingUid = siblingUid; return res; } finally { dataItem.rUnLock(); } } ... }
BPlusTree 由于 B+ 树在插入删除时, 会动态调整, 根节点不是固定节点, 于是设置一个 bootDataItem, 该 DataItem 中存储了根节点的 UID, 可以注意到, IM在操作DM时, 使用的事务都是SUPER_XID
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class BPlusTree { DataItem bootDataItem; private long rootUid () { bootLock.lock(); try { SubArray sa = bootDataItem.data(); return Parser.parseLong(Arrays.copyOfRange(sa.raw, sa.start, sa.start+8 )); } finally { bootLock.unlock(); } } private void updateRootUid (long left, long right, long rightKey) throws Exception { bootLock.lock(); try { byte [] rootRaw = Node.newRootRaw(left, right, rightKey); long newRootUid = dm.insert(TransactionManagerImpl.SUPER_XID, rootRaw); bootDataItem.before(); SubArray diRaw = bootDataItem.data(); System.arraycopy(Parser.long2Byte(newRootUid), 0 , diRaw.raw, diRaw.start, 8 ); bootDataItem.after(TransactionManagerImpl.SUPER_XID); } finally { bootLock.unlock(); } } }
删除操作 IM没有Delete的接口, 下面解释原因, 和VM有关.
如果用户想要删除某条记录X, 那么实际的执行过程大致是:
TBM解析语句;
TBM利用IM, 查询到X的地址;
TBM调用VM, 将X删除;
VM将X对该事务可见的那个版本的XMAX设置为该事务的XID;
删除结束.
上述过程并没有要求从IM中删除X对应的索引, 假如T1删除X之后, 又有T5事务, 尝试读取X, 那会怎样呢? 过程大致如下:
TBM解析语句;
TBM利用IM, 查询到X的地址;
TBM利用VM读取X;
由于X已经被删除, VM将找不到合适的版本, 于是返回nil;
TBM接受到nil, 当做该条记录不存在, 返回给用户这个结果.
可见, 由于VM的存在, 使得IM不用删除这些索引也没关系.
但是如果IM一直不删除这些索引的话, 索引树又会变得极其的庞大, 占用资源, 也降低NYADB效率. 所以, 在必要的时候, 可以让IM对索引树进行整理.
B+树并发控制协议 本小节描述B+树上的并发控制协议, 并证明该协议是无死锁的.
我们用u来表示B+树上的某个节点, 用s(u)来表示它右边的那个兄弟节点. 每个节点都有一个读写锁, 并规定, 在对u做任意读取之前, 都必须要调用u.RLock(), 在对u做任意修改之前, 都必须要调用u.WLock(). 且u.Leaf()能够返回该节点是否为叶子节点.
另外, 我们规定, 任意的事务Ti, 在某一时刻, 最多只能取得一个节点的锁. 也就是说, 现在Ti取得u的锁, 如果它想访问下一个节点v, 那么它必须先释放掉u的锁. 该协议能够保证在并发访问的情况下, 不会出现死锁.
(下面的过程叙述需要你熟知B+树的算法, 请自行查阅资料.)
下面先描述I(k, v)操作的过程, 假设初始u为B+树根节点:
1)u.RLock();
2)如果u.Leaf() == false, 则在u中查找下一个需要迭代的子节点v;
2.1)如果查找失败(失败原因见后文), u.RUnlock(), u:=s(u), 重复2.
2.2)如果查找成功, u.RUnlock(), 则另u:=v, 重复1.
3)如果u.Leaf() == true, u.RUnlock(), u.WLock(), 并尝试向u中插入(k, v);
3.1)如果插入失败(失败原因见后文), u.WUnlock(), 则另u:=s(u), 重复3.
3.2)如果插入成功, 检测u是否需要分裂;
3.2.1)如果不需要, u.WUnlock(), 插入成功, 直接返回.
3.2.2)如果需要, 则依照B+树算法, 创建新节点v, 另s(u):=v, u.WUnlock().
4)递归的向父亲节点插入新增加的节点(如果需要的话), 直至根节点.
现在说明插入失败, 和查找失败的原因. 假设T1准备向B+树中插入(10, 10), 并有如下的执行序列:
T1当前在u节点, 并查询到它下一个需要访问的节点是v; // 注意此时T1并没有v的锁
T2向v中插入了(8, 10), 使得v从原来的[(1, 1), (2, 2), (10, 10)], 变成了[(1, 1), (2, 2), (8, 8), (10, 10)], 并被分裂成为v[(1, 1), (2, 2)], s(v)[(8, 8), (10, 10)]
T2执行v.WUnlock()
T1取得v, 并执行v.RLock(), 接着进行查询
可见, 在T1对v进行查询时, 便会发生查询失败, 原因也很显而易见: 在T1得知要查询v, 到T1对v进行查询期间, 有其他事务对v操作并让其产生了分裂.
v被分裂过后, 它原本的一部分数据就会被移动到s(v)中, 于是则需要在查询失败后, 继续对s(v)进行查询. 插入操作也是同理.
对于S(key)的操作就不用赘述了, 就是I(key, value)中的1), 2), 3)步, 只不过把对应的插入操作改为查询操作.
B+树的事务无关和错误处理 B+树是事务无关的, 既B+树直接以超级事务在执行. 试想如果某个事务, 在B+树中插入了很多索引, 然后又被回滚了, 会怎么样? 结果就是该事务的索引依然留在B+树种, 但是VM却”读不出来”, 和B+树没有Delete操作类似, 因此不会对其他事务造成影响, 这些废弃的索引是安全的.
现在再来看看如果B+树在执行过程中, 发生了崩溃会怎么样? 如果Ti在对u进行修改时, 发生了崩溃, 那么节点u的内部结构就被破坏了, 但是由于B+树是建立在DM上的, 在下一次数据库重启时, u就会被恢复成修改之前的状态. 也就是说, 由于DM的保护, 对B+树节点的操作, 是原子性的!
于是我们现在就不用考虑节点内部错误的情况了, 只需要考虑节点间错误的情况, 而这样的错误情况只有一种: 某次对u的插入操作创建了新节点v, 此时s(u)=v, 但是v却并没有被插入到他的父节点中. 于是成了大致如下的状态:
1 2 3 4 [parent] | v [u] -> [v]
而正确的状态应该如下:
1 2 3 4 [ parent ] | | v v [u] -> [v]
和正确状态相比, 错误状态下, 少了一支从父亲节点到v的指针. 但是这样是没问题的! 因为在插入和查询操作中, 如果失败, 就会不断的向右兄弟节点迭代. 因此, 在错误的状态下, 如果想找v中的内容, 那么情况是: 1)找到parent, 2)通过parent找到u, 3)在u中查找失败, 4)通过u找到v, 5)查找成功.
于是, 在DM的原子性保护下, 结合B+树本身的算法过程, 能够证明B+树是完全能够应对数据库崩坏的.
总结
B+树基于DM.
B+树只提供了Insert和Search两种操作.
和TBM依赖VM不同, B+树自己管理锁来进行并发控制.
B+树的并发协议是不会产生死锁的.
B+树本身的算法性质和DM原子性保证,使得它能够应对错误情况.