HBase源码系列之memstore

主要结构

​ 主要结构包括「kvset,snapshot」,先说snapshot,主要适用于生成HFile时临时存放的kvset的一个快照。更改的snaphost一定条件判断可以生成HFile时调用

1
FlushResult org.apache.hadoop.hbase.regionserver.HRegion.internalFlushcache(HLog wal, long myseqid, MonitoredTask status)

此方法两种情况会调用

​ 一个是关闭时。

1
HRegion.doClose(boolean abort, MonitoredTask status)

​ 另一个时需要flush时。

1
FlushResult org.apache.hadoop.hbase.regionserver.HRegion.internalFlushcache(HLog wal, long myseqid, MonitoredTask status)

flush过程

  1. 准备工作
  2. 获取HRegion.updatesLock的写锁
  3. 获取mvcc事务,并努力前提读点
  4. 调用prepare()
    1. 将snapshot设置为的kvset,并重新生成一个kvset,用于写入。
  5. 释放写锁
  6. 同步wal日志
  7. 等到mvcc结束
  8. flushcache()
  9. commit()
    1. 才更新构造scanner使用的org.apache.hadoop.hbase.regionserver.DefaultStoreFileManager.storefiles

疑问与解释

问题:

最开始,我感觉逻辑有问题,可能会导致在某个时间点(flush)时,读取不到已经写入的数据。

回答:

有三个方面:

  1. 读取数据时会读取snapshot的数据。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    @Override
    public synchronized KeyValue next() {
    if (theNext == null) {
    return null;
    }

    final KeyValue ret = theNext;

    // Advance one of the iterators
    if (theNext == kvsetNextRow) {
    kvsetNextRow = getNext(kvsetIt);
    } else {
    snapshotNextRow = getNext(snapshotIt);
    }

    // Calculate the next value
    theNext = getLowest(kvsetNextRow, snapshotNextRow);

    //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
    //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
    // getLowest() + " threadpoint=" + readpoint);
    return ret;
    }
  2. 清除snapshot和更新文件时在一个锁内。

    insertNewFiles方法将刷写成的HFile加入Store的fileManger里storefiles里,此结构用语构造storefilescanner。

    clearSnapshot方法将snapshot设置为一个新的空的结构。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    this.lock.writeLock().lock();
    try {
    this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
    this.memstore.clearSnapshot(set);
    } finally {
    // We need the lock, as long as we are updating the storeFiles
    // or changing the memstore. Let us release it before calling
    // notifyChangeReadersObservers. See HBASE-4485 for a possible
    // deadlock scenario that could have happened if continue to hold
    // the lock.
    this.lock.writeLock().unlock();
    }
  3. 第三个方面,注意到第一个方面读取时或去的scanner方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet,
    boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
    byte[] stopRow, long readPt) throws IOException {
    Collection<StoreFile> storeFilesToScan;
    List<KeyValueScanner> memStoreScanners;
    this.lock.readLock().lock();
    try {
    storeFilesToScan =
    this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
    memStoreScanners = this.memstore.getScanners(readPt);
    } finally {
    this.lock.readLock().unlock();
    }
    ....
    }

    snapshot里的数据要么被getScanner引用到要么清空并生成新文件,而且两者互斥(使用的是同一个锁)。这样就可以保证数据能被读取到了。