HBase源码系列之server端的get请求处理

scanner的层次

一个Region对应一个RegionScannerImpl


  • RegionScannerImpl (下面缩进代表了层级)

    • StoreScanner (组成storeHeap 和 joinedHeap, 这俩都是KeyValueHeap, 用的是region的Comparator)

      • MemStoreScanner
      • StoreFileScanner

      ​ (MemStoreScanner 和 StoreFileScanner 组成heap ,也是一个KeyValueHeap, 用的是store的Comparator)

对scanner预处理(裁剪等)

  1. 获取此region中store里的mem和file能组成的scanner,并做一定程度的过滤(rang,bf和ts)

    获取所有的scanner

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    //对此region中每个store(列族)都需要获取scanner
    for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
    //此方法是获取了scanner,过滤了一些scanner,并且还粗略的检索到需要的位置。
    //此scanner是一个store对应的
    KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
    if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) {
    scanners.add(scanner);
    } else {
    //比如有filter的情况
    joinedScanners.add(scanner);
    }
    }
    //生成俩KeyValueHeap,此处的比较器不知道啥意思。真他妈尴尬
    initializeKVHeap(scanners, joinedScanners, region);

    1.1 跟入store.getScanner方法,现在次问题只在一个store中讨论。store间的协调后面会提到。

    1
    2
    3
    4
    5
    6
    //此方法和1.1.4节中想呼应。
    this.store.addChangedReaderObserver(this);
    //获取所有的file,打开文件,做一些过滤等。
    List<KeyValueScanner> scanners = getScannersNoCompaction();
    seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, isParallelSeekEnabled);
    resetKVHeap(scanners, store.getComparator());

    1.1.1 getScannersNoCompaction()里面有重要的方法如下。

    1
    2
    selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
    isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));

    ​ 主要两层,store.getScanners中是获取所有的file,打开文件。

    ​ selectScannersFrom主要做过滤,主要是如下方法

    1
    2
    3
    kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)
    此方法两个分支,mem和file值得关注,此处就不说了。反正用了一些range和bf(bf只针对单行和单行列族)
    bf的过滤有一个判断(!scan.isGetScan()),如果是get,并且isStartRowAndEqualsStopRow为true才走bf。

    1.1.2 seekScanners

    ​ bf判断,时间判断,伪造kv使其跳过此个检索吧。

    1.1.3 resetKVHeap

    ​ 组织成一个KeyValueHeap,比较器有代表时间的id等。

    1.1.4另外如下方法也会让检索重新打开scanner,一般是有compact,bulkload等操作。需要去重新,应该为更改了一些东西。

    1
    notifyChangedReadersObservers()
  2. 裁剪的storefile和block的方式

    我个人觉得总共几个部分,包括time裁剪,key裁剪,bf和索引

    获取scanner分两个,一个memstore的,另一个是file的。

    获取有关file的Scanner如下:

    1
    2
    3
    4
    this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);

    List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan,
    cacheBlocks, usePread, isCompaction, false, matcher, readPt);

    获取有关mem的Scanner如下:

    1
    memStoreScanners = this.memstore.getScanners(readPt);

    然后在返回之前做了筛选,注意那个selectScannerFrom方法

    1
    2
    selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
    isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));

    selectScannerFrom还是分两个分支,有两个实现,file的和mem的。

    file的过滤分三种,time,key和bf。file过滤实现如下:

    1
    2
    3
    4
    public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long oldestUnexpiredTS) {
    return reader.passesTimerangeFilter(scan.getTimeRange(), oldestUnexpiredTS)
    && reader.passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, columns);
    }

    memstore的实现如下:

    1
    2
    3
    4
    5
    6
    7
    public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
    TimeRange timeRange = scan.getTimeRange();
    return (timeRangeTracker.includesTimeRange(timeRange) ||
    snapshotTimeRangeTracker.includesTimeRange(timeRange)) &&
    (Math.max(timeRangeTracker.getMax(), snapshotTimeRangeTracker.getMax())
    >= oldestUnexpiredTS);
    }

    从文件格式看,主要是几个过滤,主要代码如下

    主要的实现在如下方法中

    1
    2
    3
    BlockWithScanInfo blockWithScanInfo =
    indexReader.loadDataBlockWithScanInfo(key, offset, length, block, 、
    cacheBlocks, pread, isCompaction);

    在HFileReaderV2中,由索引过滤和blockcache的实现,在

    根据索引信息过滤block

    1
    int rootLevelIndex = rootBlockContainingKey(key, keyOffset, keyLength);

    方法实现如下

    1
    2
    3
    4
    5
    public int rootBlockContainingKey(final byte[] key, int offset,
    int length) {
    int pos = Bytes.binarySearch(blockKeys, key, offset, length,
    comparator);
    ....}

    如果需要读取,会先从blockcache中读取,减少io。blcokcache中读取block,代码如下

    1
    2
    3
    block = cachingBlockReader.readBlock(currentOffset,
    currentOnDiskSize, shouldCache, pread, isCompaction, true,
    expectedBlockType);

    两处使用bf索引过滤,一处是isLazy==true时

    1
    boolean org.apache.hadoop.hbase.regionserver.StoreFileScanner.requestSeek(KeyValue kv, boolean forward, boolean useBloom)

    bf的粒度是chunk。

协调一个cf下的检索

  1. 我们讲获取到的scanner组织成heap,然后从heap中获取对应的数据。这个heap需要有一定的组织,比如检索到了数据,需要关闭;对于同一个key,需要在所有的scanner中获取到等

    获取到scanner之后就开始获取数据结果了。

    1
    scanner.next(results);

    里面一堆逻辑的,实际获取数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    private KeyValue populateResult(List<Cell> results, KeyValueHeap heap, int limit,
    byte[] currentRow, int offset, short length) throws IOException {
    KeyValue nextKv;
    do {
    heap.next(results, limit - results.size());
    if (limit > 0 && results.size() == limit) {
    return KV_LIMIT;
    }
    nextKv = heap.peek();
    } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
    return nextKv;
    }

StoreFileScanner中检索相关的处理

。。。

协调不同cf下的检索

暂时不看,暂时在业务中不需要。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69





===================== 以下太乱 ======================

hbase检索在region中的检索流程,其中主要需要分析的是在一个store中的检索的操作,分两个层面:一个是到scanner的上层的处理,另一个是scanner的内部的处理(包括men和file)两种。

## 处理scanner

## scanner内部

### memscanner

要从memstore中获取想要的数据,感觉不会很难,因为memstore是个有一定顺序。

### filescanner

对于filestore的检索,hbase提供了集中方式来加速检索。

1. bloomfilter的过滤,见”处理scanner“
2. index的过滤
3. blockcache的使用








## 对一个get的请求怎么判断结束

boolean org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl.nextInternal(List<Cell> results, int limit)

只执行一次。一个keyvalue只会在一个block中,其实并不需要再去检索,只要检索一次就可以了,感觉是对的。没想到前几天想的有问题。



## scan判断结束

scan的判断结束

```java
while (i < rows) {
// Stop collecting results if maxScannerResultSize is set and we have exceeded it
if ((maxResultSize < Long.MAX_VALUE) &&
(currentScanResultSize >= maxResultSize)) {
builder.setMoreResultsInRegion(true);
break;
}
// Collect values to be returned here
moreRows = scanner.nextRaw(values);
if (!values.isEmpty()) {
for (Cell cell : values) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
currentScanResultSize += kv.heapSizeWithoutTags();
totalKvSize += kv.getLength();
}
results.add(Result.create(values));
i++;
}
if (!moreRows) {
//结束了
break;
}
values.clear();
}

检索keyvalue

1
int org.apache.hadoop.hbase.io.hfile.HFileReaderV2.AbstractScannerV2.seekTo(byte[] key, int offset, int length, boolean rewind) throws IOException

实现获取block

​ index 去检索

检索到指定位置

bloom index的位置

第一层的root index就是chunk的位置。

但是root index指向的是imtermediate或leaf,理论上来说leaf对应chunk

HFile中除了Data Block需要索引之外,上一篇文章提到过Bloom Block也需要索引,索引结构实际上就是采用了single-level结构,文中Bloom Index Block就是一种Root Index Block。

bloom block 和 data block的索引不一致。bloom block采用单层结构,data block采用多层结构。

bloom block的单层结构的叶子结点是chunk。

get请求读取block的种类和情况

  1. 正常读取block的

get 流程真是复杂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  public boolean seek(KeyValue key) throws IOException {
if (seekCount != null) seekCount.incrementAndGet();

try {
try {
if(!seekAtOrAfter(hfs, key)) {
LOG.info("liubb seekAtOrAfter false");
close();
return false;
} else {
LOG.info("liubb seekAtOrAfter true");
}

cur = hfs.getKeyValue();

return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
} finally {
realSeekDone = true;
}
} catch (FileNotFoundException e) {
throw e;
} catch (IOException ioe) {
throw new IOException("Could not seek " + this + " to key " + key, ioe);
}
}
里面的seekAtOrAfter很重要。
1
2
3
4
5
ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
qcode = optimize(qcode, kv);
switch(qcode) {

此处各种code的解释对于不同storefile的理解也很好。

可以看到一些问题和关注点

  1. RegionScannerImpl里使用StoreScanner 时,各种heap的操作主要需要关注一个记录的组织,因为一条记录的不同列族保存在不同的store中。还要关注什么时候结束。
  2. StoreScanner中的两种scanner(StoreFileScanner可能有多个)需要协调在不同scanner中的不同的版本的问题。还要关注什么时候结束。