scanner的层次
一个Region对应一个RegionScannerImpl
RegionScannerImpl (下面缩进代表了层级)
StoreScanner (组成storeHeap 和 joinedHeap, 这俩都是KeyValueHeap, 用的是region的Comparator)
- MemStoreScanner
- StoreFileScanner
(MemStoreScanner 和 StoreFileScanner 组成heap ,也是一个KeyValueHeap, 用的是store的Comparator)
对scanner预处理(裁剪等)
获取此region中store里的mem和file能组成的scanner,并做一定程度的过滤(rang,bf和ts)
获取所有的scanner
1
2
3
4
5
6
7
8
9
10
11
12
13
14//对此region中每个store(列族)都需要获取scanner
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
//此方法是获取了scanner,过滤了一些scanner,并且还粗略的检索到需要的位置。
//此scanner是一个store对应的
KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) {
scanners.add(scanner);
} else {
//比如有filter的情况
joinedScanners.add(scanner);
}
}
//生成俩KeyValueHeap,此处的比较器不知道啥意思。真他妈尴尬
initializeKVHeap(scanners, joinedScanners, region);1.1 跟入store.getScanner方法,现在次问题只在一个store中讨论。store间的协调后面会提到。
1
2
3
4
5
6//此方法和1.1.4节中想呼应。
this.store.addChangedReaderObserver(this);
//获取所有的file,打开文件,做一些过滤等。
List<KeyValueScanner> scanners = getScannersNoCompaction();
seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, isParallelSeekEnabled);
resetKVHeap(scanners, store.getComparator());1.1.1 getScannersNoCompaction()里面有重要的方法如下。
1
2selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt)); 主要两层,store.getScanners中是获取所有的file,打开文件。
selectScannersFrom主要做过滤,主要是如下方法
1
2
3kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)
此方法两个分支,mem和file值得关注,此处就不说了。反正用了一些range和bf(bf只针对单行和单行列族)
bf的过滤有一个判断(!scan.isGetScan()),如果是get,并且isStartRowAndEqualsStopRow为true才走bf。1.1.2 seekScanners
bf判断,时间判断,伪造kv使其跳过此个检索吧。
1.1.3 resetKVHeap
组织成一个KeyValueHeap,比较器有代表时间的id等。
1.1.4
另外
如下方法也会让检索重新打开scanner,一般是有compact,bulkload等操作。需要去重新,应该为更改了一些东西。1
notifyChangedReadersObservers()
裁剪的storefile和block的方式
我个人觉得总共几个部分,包括time裁剪,key裁剪,bf和索引
获取scanner分两个,一个memstore的,另一个是file的。
获取有关file的Scanner如下:
1
2
3
4this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan,
cacheBlocks, usePread, isCompaction, false, matcher, readPt);获取有关mem的Scanner如下:
1
memStoreScanners = this.memstore.getScanners(readPt);
然后在返回之前做了筛选,注意那个selectScannerFrom方法
1
2selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));selectScannerFrom还是分两个分支,有两个实现,file的和mem的。
file的过滤分三种,time,key和bf。file过滤实现如下:
1
2
3
4public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long oldestUnexpiredTS) {
return reader.passesTimerangeFilter(scan.getTimeRange(), oldestUnexpiredTS)
&& reader.passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, columns);
}memstore的实现如下:
1
2
3
4
5
6
7public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
TimeRange timeRange = scan.getTimeRange();
return (timeRangeTracker.includesTimeRange(timeRange) ||
snapshotTimeRangeTracker.includesTimeRange(timeRange)) &&
(Math.max(timeRangeTracker.getMax(), snapshotTimeRangeTracker.getMax())
>= oldestUnexpiredTS);
}从文件格式看,主要是几个过滤,主要代码如下
主要的实现在如下方法中
1
2
3BlockWithScanInfo blockWithScanInfo =
indexReader.loadDataBlockWithScanInfo(key, offset, length, block, 、
cacheBlocks, pread, isCompaction);在HFileReaderV2中,由索引过滤和blockcache的实现,在
根据索引信息过滤block
1
int rootLevelIndex = rootBlockContainingKey(key, keyOffset, keyLength);
方法实现如下
1
2
3
4
5public int rootBlockContainingKey(final byte[] key, int offset,
int length) {
int pos = Bytes.binarySearch(blockKeys, key, offset, length,
comparator);
....}如果需要读取,会先从blockcache中读取,减少io。blcokcache中读取block,代码如下
1
2
3block = cachingBlockReader.readBlock(currentOffset,
currentOnDiskSize, shouldCache, pread, isCompaction, true,
expectedBlockType);两处使用bf索引过滤,一处是isLazy==true时
1
boolean org.apache.hadoop.hbase.regionserver.StoreFileScanner.requestSeek(KeyValue kv, boolean forward, boolean useBloom)
bf的粒度是chunk。
协调一个cf下的检索
我们讲获取到的scanner组织成heap,然后从heap中获取对应的数据。这个heap需要有一定的组织,比如检索到了数据,需要关闭;对于同一个key,需要在所有的scanner中获取到等
获取到scanner之后就开始获取数据结果了。
1
scanner.next(results);
里面一堆逻辑的,实际获取数据
1
2
3
4
5
6
7
8
9
10
11
12private KeyValue populateResult(List<Cell> results, KeyValueHeap heap, int limit,
byte[] currentRow, int offset, short length) throws IOException {
KeyValue nextKv;
do {
heap.next(results, limit - results.size());
if (limit > 0 && results.size() == limit) {
return KV_LIMIT;
}
nextKv = heap.peek();
} while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
return nextKv;
}
StoreFileScanner中检索相关的处理
。。。
协调不同cf下的检索
1 |
|
检索keyvalue
1 | int org.apache.hadoop.hbase.io.hfile.HFileReaderV2.AbstractScannerV2.seekTo(byte[] key, int offset, int length, boolean rewind) throws IOException |
实现获取block
index 去检索
检索到指定位置
bloom index的位置
第一层的root index就是chunk的位置。
但是root index指向的是imtermediate或leaf,理论上来说leaf对应chunk
HFile中除了Data Block需要索引之外,上一篇文章提到过Bloom Block也需要索引,索引结构实际上就是采用了single-level结构,文中Bloom Index Block就是一种Root Index Block。
bloom block 和 data block的索引不一致。bloom block采用单层结构,data block采用多层结构。
bloom block的单层结构的叶子结点是chunk。
get请求读取block的种类和情况
- 正常读取block的
get 流程真是复杂。
1 | public boolean seek(KeyValue key) throws IOException { |
1 | ScanQueryMatcher.MatchCode qcode = matcher.match(kv); |
可以看到一些问题和关注点
- RegionScannerImpl里使用StoreScanner 时,各种heap的操作主要需要关注一个记录的组织,因为一条记录的不同列族保存在不同的store中。还要关注什么时候结束。
- StoreScanner中的两种scanner(StoreFileScanner可能有多个)需要协调在不同scanner中的不同的版本的问题。还要关注什么时候结束。