一个奇怪的rwlock锁卡住的case

遇到一个case,有关于读写锁。原因路径如下:

  1. 线上有一个datanode节点宕机,导致与此节点的连接出现问题。
  2. 有hdfs客户端进程在处理此连接超时部分的代码有问题,没有将外层的超市传入至底层socket里,出问题的线程持有了无法超市推出,一直hang住,此线程还持有了一个读写锁的读锁。
  3. 由于内部某些机制另一个线程需要写锁,同时此锁的读锁会不停有人由于请求需要拿到和释放。
  4. 这时候出现了其他线程的读锁也无法获取到。

从线程栈看,就是一个线程获取了读锁,阻塞了别的线程获取读锁和写锁。

还原现场例子代码地址:https://github.com/liubinbin/sta/tree/master/rwlock/src/rwlock

1
看看ReentrantReadWriteLock,注意构造函数里可以传一个布尔值,区分公平锁和非公平锁。

NonfairSync和FairSync分别是两种实现,其中都实现了readerShouldBlock方法。这问题分两种情况:

  1. 公平锁:这种情况下对AQS的请求是需要安装时间顺序,如果新来的读请求在写请求之后就需要等待。
  2. 非公平锁:为了防止写请求饥饿,读请求会先判断等待队列头是否是写请求,所以在这种情况下,写请求会隔离读锁和读锁。

此问题最后解决办法是让那个出问题的读锁尽快的释放,不要一直占着读锁就可以把影响降低。

最近使用docker的一些小记录

mysql出了需要grant之外,还要修改bind-address

docker 基本命令

docker 中container基于image制作,image类似一个模版,基于某个image制作的contaner都具有和image一样的内容

我们可以根据REPOSITORY来判断这个镜像是来自哪个服务器,如果没有 / 则表示官方镜像,并且在search的时候会标有OFFICIAL,类似于ip:port/repos_name则表示的是私服。

docker pull username/repository<:tag_name> 或者 docker pull repository,pull和push相对应。

5.2 运行出一个container放到后台运行

1
2
3
# docker run -d ubuntu /bin/sh -c "while true; do echo hello world; sleep 2; done"
ae60c4b642058fefcc61ada85a610914bed9f5df0e2aa147100eab85cea785dc

它将直接把启动的container挂起放在后台运行(这才叫saas),并且会输出一个CONTAINER ID,通过docker ps可以看到这个容器的信息,可在container外面查看它的输出docker logs ae60c4b64205,也可以通过docker attach ae60c4b64205连接到这个正在运行的终端,此时在Ctrl+C退出container就消失了,按ctrl-p ctrl-q可以退出到宿主机,而保持container仍然在运行

端口映射

Docker中运行的程序的端口是不能直接访问的,需要映射到本地,通过-p参数实现,例如将6379端口映射到本机的6378端口

容器日志

查看当前容器的日志

docker logs container-name/container-id

我们可以查看之前redis镜像的容器

docker logs test-redis

可以看到redis启动的日志

运行中的容器其实就是一个完备的Linux操作系统,我们可以登录访问当前容器,登录后可以在容器中进行常规的Linux操作。
docker exec -it container-id/container-name bash

系统版本

[root@bogon yum.repos.d]# uname -a
Linux bogon 2.6.32-642.el6.x86_64 #1 SMP Tue May 10 17:27:01 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux
[root@bogon yum.repos.d]# cat /etc/redhat-release
CentOS release 6.8 (Final)

安装EPEL

因为系统自带的repo中不带docker需要安装epel

rpm -Uvh http://ftp.riken.jp/Linux/fedora/epel/6Server/x86_64/epel-release-6-8.noarch.rpm

安装Docker

yum install -y docker-io

开机自启动与启动Docker

[root@bogon yum.repos.d]# service docker start
Starting cgconfig service: [ OK ]
Starting docker: [ OK ]
[root@bogon yum.repos.d]# chkconfig docker on
[root@bogon yum.repos.d]# chkconfig docker –list
docker 0:off 1:off 2:on 3:on 4:on 5:on 6:off
[root@bogon yum.repos.d]#

至此docker已经安装完成

docker rm 删除container

docker rmi 删除image

http://blog.csdn.net/permike/article/details/51879578 总结的非常好

已完成
docker pull morrisjobke/docker-swift-onlyone
docker pull kahing/docker-swift
docker pull debian

docker pull [选项] [Docker Registry 地址[:端口号]/]仓库名[:标签]

启动container
docker run -d -P -v /srv/node/sdb1/docker/:/swift/nodes -t bouncestorage/swift-aio
通过docker ps 获取端口
然后通过http://127.0.0.1:/auth/v1.0 进行系统的操作和访问
具体的命令形式如下
swift -A http://127.0.0.1:32770/auth/v1.0 -U test:tester -K testing

swift-aio-docker

docker run -it –rm debian bash
-it: 这是两个参数,一个是 -i:交互式操作,一个是 -t 终端。
–rm: 这个参数是说容器退出后随之将其删除

bash:放在镜像名后的是命令,这里我们希望有个交互式 Shell

1
docker pull [选项] [Docker Registry 地址[:端口号]/]仓库名[:标签]

Docker Registry 地址[:端口号] 默认docker hub

仓库名为<用户名>/<软件名> ,默认为 library

latest为默认标签

最近感想

最近在读萨缪尔森的《经济学》,其实很早此书就在淘宝的购物车里,一直没有买来读。大概是自己懒的原因吧。整本书很大并且很厚,很有教材的感觉,读完真的很累,但是觉得很值。现在渐渐感觉看到一件事,一个政策,一个评论能想到一些东西。

之前微博里在说保险相关的信息,其实我们每个人都应该读点经济学和金融的书,使自己能更好的理解发布的各种信息背后可能隐藏的一些更深层次的信息,以便我们能更好的做出更加理智的判断。

点赞学长,前几天下载的《Linux System Prorgramming》中文翻译的pdf是工大的一些学长做的。我真的感慨自己表达出的观点真的太少,并且没有做能影响别人的事情,以后我一定要多在此博客中发表自己的想法,并且希望以后能做一些影响别人的事情,至少是一些有趣的事情。

经济学

那本书里面其实说了很多原理性的东西,读着很枯燥,但是原理又是理解一切的基础。在那本书中,我最感兴趣的其实是货币政策和财政政策这些部分,那两部分是比较接近有用,能用于生活判断一些东西。

等过段时间像把货币相关的一些东西再找几本书捋一捋,希望可以到时候能更加准确的理解这些事情。等更加理解了之后再写点东西。

大概一个月之前,看了《大逃港》这本书,给我的感觉是,每个人最终应该是为了利益在做事情,一件事情也只有有足够的利益诱惑才能推行直至完成。嗨,不能讲太多,不是很确定line在哪里。

##最近想法

最近看完那本《经济学》之后,越来越感觉自己应该找些事情做。当然基本应该是和计算机相关的,下面列一些最近想到的一些东西。

写一个数据库

在大学时,个人对数据库其实也挺感兴趣,最近三年的工作其实很多也是和数据库相关。现在的想法是自己写个数据库应该是一件有意思的事情,另一个动因是买了《代码大全》这本书,感觉看完之后应该能够有东西来实践。两者可以结合在一起做。应该是件有意思的事情。

获取跑步路线

之前在北京的时候,跑步很多时候是在公园进行的。跑完看着那个路线图,如果自己跑了一个2017等数字的形状的,应该是件有意思的事情。

最近稍微想了这个事情,这功能的实现很依赖地图数据的样子。

水果店买水果

有一次去水果店买东西的时候,店里工作让加入一个群,说里面有各种优惠。到现在也对此种产品信息的推送感觉不是很理想。

  1. 水果店的水果信息没法完全展示在顾客面前,因为每天在群里只能看到一些优惠的水果。

  2. 通过每个人把自己想买的东西加入一个列表里,然后在群里不停的发列表。这种方式其实是对店主友好,对群里的顾客不友好。

  3. 仔细想这件事,本质很团购或会员卡没什么区别,但是这其中店主和顾客之间信息的交流很不顺畅。

    信息的顺畅交流才能更好地提升效率,利用好互联网这个工具和提高营收。

阅读英文查阅的很不方便和音乐软件的割裂

英文阅读方面,扇贝做的不错。看到有不懂的词汇或词组可以直接长摁然后显示意思,可以非常快的回到阅读中,对整体的阅读流畅性影响不是很大,有一个问题是文章比较尴尬。如果是别的文章质量更加好的app或网址的话,生词查阅方面做的没这么好。

网易云音乐貌似遇到的版权问题,我看了一下我收藏的音乐,基本没有变灰无法听的,突然感觉自己的音乐品味是不是和大众不太一样。

研报,公告的信息获取

研报和公告信息相对来说比较难。

vmware安装centos

此文记录自己安装centos时的一些步骤纯粹为了下次安装更加快速的完成,因为在前段时间删除虚拟机之后发现一下子想不起来是怎么安装的centos,需要求助搜索引擎和一步步的探索才能完成。

第0步 安装vmware和下载centos

下载centos从之前的6版本切换到了7版本,此处为了保持和目前线上机器的版本一致,可以使自己能更适应新版的centos。

第1步 创建centos虚拟机

选择Create a custom virtual machine

选择对应的版本CentOS 64-bit

选择Create a new virtual disk

选择Customize Settings,然后选择Save.

然后开始设置配置,主要如下:

  • Processors & Memory (4core 8G)
  • Hard Disk (80G)
  • CD/DVD (IDE) (此处一定需要设置,将下载的iso加入配置中,并且需要连接DVD)

然后开始安装,更改mini 安装模式,添加gnome桌面,然后进入正常的linux安装界面。之后一切就比较顺利。

最后可能需要安装图形界面:yum groupinstall “GNOME Desktop”,重启进入root执行“init 5”

##注意事项

###新版系统需要改变设置

Please check System Preferences ==>Security&Privacy==>Privacy==>Accessibility, make sure you have added Fusion into the list.

###键盘切换快捷键:

cmd + G : 切到虚拟机

cmd + control:切到mac系统

rit in HBase

以前在别人博客中看到看到hbase中RIT的蛋疼,最近也确实感觉这个状态是比较尴尬的,别人会来找你。最近会放一些精力来解析hbase的rit状态,同时也是希望自己能对hbase有更全面和深入的了解。感觉过去两个月对hbase的的检索和加载流程看的比较多了。

遇到一个case:

一个region处于pending_open的状态。

大概逻辑,本来master给某个节点发送open此region的请求,过程进行的很好。

HBase源码系列之HFile

本文讨论0.98版本的hbase里v2版本。其实对于HFile能有一个大体的较深入理解是在我去查看”到底是不是一条记录不能垮block“的时候突然意识到的。

首先说一个对HFile很直观的感觉,我觉得HFile的整个设计中很重要的一点是为减少内容占用。首先写时候可以把一个个block按顺序写入,满足一个chunk写入一个元数据(包括bloomfilter),最后是一些HFile的元数据。对于HFile,我个人觉得主要把握好几个问题。

  1. block的组织
  2. bf和block的关系
  3. index和block的关系
  4. 写入顺序和一些基本的元数据信息结构
  5. 记录能不能跨block

明白这四个问题感觉基本可以大致的描绘出HFile了。

HFileWriterV2

首先,我们知道会引起下HFile的操作有flush和compaction。在此,我们就选择从flush这个入口跟进去看。

在StoreFile中,以下方法主要是为了Store书写到一个HFile中。

1
long org.apache.hadoop.hbase.regionserver.StoreFlusher.performFlush(InternalScanner scanner, CellSink sink, long smallestReadPoint) throws IOException

在此方法会调用如下方法

1
2
3
4
5
6
7
8
9
public void append(final KeyValue kv) throws IOException {
//如下两行主要是来加入到bf中的,下面的这个就是我们经常说的bf索引。
appendGeneralBloomfilter(kv);
appendDeleteFamilyBloomFilter(kv);
//这行是重点
writer.append(kv);
//这行先不管,处理时间戳
trackTimestamps(kv);
}

以下分解append方法

1
2
3
4
5
6
7
8
9
10
//检查key是否有问题,是否按顺序(memstore使用ConcurrentSkipListMap存储,应该不会有此问题)。
//并且返回key是否重复
boolean dupKey = checkKey(key, koffset, klength);
checkValue(value, voffset, vlength);
//如果不重复,则不检查边界,答案不能,因为如果有重复,不会检查边界更不会新建一个block。***问题5***
if (!dupKey) {
//此出会检查block的大小,并且有一处需要注意,在里面的代码中有一些记录block信息的,这个以后会有用。
//此处会写出chunk,处理readyChunks
checkBlockBoundary();
}

上面注释中说的那个代码如下

1
2
byte[] indexKey = comparator.calcIndexKey(lastKeyOfPreviousBlock, firstKeyInBlock);
dataBlockIndexWriter.addEntry(indexKey, lastDataBlockOffset, onDiskSize);

append下面是一些很正常的数据写入(都是对stream的添加操作),元数据记录(firstKeyInBlock)等。

回到appendGeneralBloomfilter(kv)方法,此方法里面有一个判断是值得注意的。

1
2
3
4
5
//在此代码中会判断key的个数,如果key的个数达到了一定程度就新建一个chunk,放入readyChunks(这个会在checkBlockBoundary中处理),此出会写bf。***问题2***
enqueueReadyChunk(false);
... 这种是处理chunk被写出的时候的操作。重置一些值 ...
//真正的添加到bf中
chunk.add(bloomKey, keyOffset, keyLength);

在enqueueReadyChunk(false)中有

1
2
3
4
5
ReadyChunk readyChunk = new ReadyChunk();
readyChunk.chunkId = numChunks - 1;
readyChunk.chunk = chunk;
readyChunk.firstKey = firstKeyInChunk;
readyChunks.add(readyChunk);

然后时间很快就到了close环节。

1
2
//此处组织了block,将加入到此HFile的chunk生成树的结构。
long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);

block组织也分两类,一个chunk里组织block(他们共生存啊,用了一个bf),另外是root index和intermedia index的组织,实际这个更多感觉是组织chunk。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void writeInlineBlocks(boolean closing) throws IOException {
//inlineBlockWriters 应该就3个,两个bf和一个block(待确定)
for (InlineBlockWriter ibw : inlineBlockWriters) {
while (ibw.shouldWriteBlock(closing)) {
long offset = outputStream.getPos();
boolean cacheThisBlock = ibw.getCacheOnWrite();
ibw.writeInlineBlock(fsBlockWriter.startWriting(
ibw.getInlineBlockType()));
fsBlockWriter.writeHeaderAndData(outputStream);
//此处添加leaf index block
ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
fsBlockWriter.getUncompressedSizeWithoutHeader());
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();

if (cacheThisBlock) {
doCacheOnWrite(offset);
}
}
}
}

ibw.shouldWriteBlock(closing)方法的判断如下,实际是判断是否有chunk

1
2
3
4
5
public boolean shouldWriteBlock(boolean closing) {
enqueueReadyChunk(closing);
//readyChunks中保存的是chunk,也就是lead index block
return !readyChunks.isEmpty();
}

下面是写入bloom meta index,感觉就是chunk的那些。

1
bloomBlockIndexWriter.writeSingleLevelIndex(out, "Bloom filter");

其实还有部分元数据(各种offset和树的生成)没有分析。以后在说吧。

HFileReaderV2

由上述的代码分析来看,其实读取的时候最主要要解决的是是否读此block。决定了读此block之后已经没有太多需要在此文章中分析了,因为那是检索流程的事情(组织memstore和storefile)

  1. 读block index和bloom filter信息
  2. 使用这两种索引过滤block

HFileReader主要涉及到的几个方法,包括获取和open。发生在在检索获取scanner和过滤scanner时。

在List HStore.getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt)中如下代码,获取此store中的file对应的scanner。

1
List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt);

此方法调用了如下方法。

1
2
//此方法会调用Open方法
StoreFile.Reader r = file.createReader(canUseDrop);

接着调用open方法,方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
   if (this.reader != null) {
throw new IllegalAccessError("Already open");
}

// Open the StoreFile.Reader
this.reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind);

// Load up indices and fileinfo. This also loads Bloom filter type.
metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());

// Read in our metadata.
byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
if (b != null) {
// By convention, if halfhfile, top half has a sequence number > bottom
// half. Thats why we add one in below. Its done for case the two halves
// are ever merged back together --rare. Without it, on open of store,
// since store files are distinguished by sequence id, the one half would
// subsume the other.
this.sequenceid = Bytes.toLong(b);
if (fileInfo.isTopReference()) {
this.sequenceid += 1;
}
}

if (isBulkLoadResult()){
// generate the sequenceId from the fileName
// fileName is of the form <randomName>_SeqId_<id-when-loaded>_
String fileName = this.getPath().getName();
// Use lastIndexOf() to get the last, most recent bulk load seqId.
int startPos = fileName.lastIndexOf("SeqId_");
if (startPos != -1) {
this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
fileName.indexOf('_', startPos + 6)));
// Handle reference files as done above.
if (fileInfo.isTopReference()) {
this.sequenceid += 1;
}
}
this.reader.setBulkLoaded(true);
}
this.reader.setSequenceID(this.sequenceid);

b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
if (b != null) {
this.maxMemstoreTS = Bytes.toLong(b);
}

b = metadataMap.get(MAJOR_COMPACTION_KEY);
if (b != null) {
boolean mc = Bytes.toBoolean(b);
if (this.majorCompaction == null) {
this.majorCompaction = new AtomicBoolean(mc);
} else {
this.majorCompaction.set(mc);
}
} else {
// Presume it is not major compacted if it doesn't explicity say so
// HFileOutputFormat explicitly sets the major compacted key.
this.majorCompaction = new AtomicBoolean(false);
}

b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));

//此出会读取bloom filter
BloomType hfileBloomType = reader.getBloomFilterType();
if (cfBloomType != BloomType.NONE) {
reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
if (hfileBloomType != cfBloomType) {
LOG.info("HFile Bloom filter type for "
+ reader.getHFileReader().getName() + ": " + hfileBloomType
+ ", but " + cfBloomType + " specified in column family "
+ "configuration");
}
} else if (hfileBloomType != BloomType.NONE) {
LOG.info("Bloom filter turned off by CF config for "
+ reader.getHFileReader().getName());
}

// load delete family bloom filter
reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);

try {
this.reader.timeRange = TimeRangeTracker.getTimeRange(metadataMap.get(TIMERANGE_KEY));
} catch (IllegalArgumentException e) {
LOG.error("Error reading timestamp range data from meta -- " +
"proceeding without", e);
this.reader.timeRange = null;
}
return this.reader;

判断的一个文件是否需要读取时,在伟大的 boolean org.apache.hadoop.hbase.regionserver.StoreFileScanner.shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long oldestUnexpiredTS) 方法中的如下方法使用了bloomfilter。

1
2
//此处使用bloomfilter过滤。在此方法中会调用bloomFilter.contains,在此contains会先使用block index 判断。
reader.passesBloomFilter(scan, columns)

里面会调用一个contains

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
   //判断读取哪个block,rootBlockContaingKey里的blockKeys为chunk的个数。
//index是从bloommeta中读取,DataInput bloomMeta = reader.getGeneralBloomFilterMetadata(); 代码获取。
int block = index.rootBlockContainingKey(key, keyOffset, keyLength);
if (block < 0) {
result = false; // This key is not in the file.
} else {
HFileBlock bloomBlock;
try {
// We cache the block and use a positional read.
//读取那个chunk的bf
bloomBlock = reader.readBlock(index.getRootBlockOffset(block),
index.getRootBlockDataSize(block), true, true, false, true,
BlockType.BLOOM_CHUNK);
} catch (IOException ex) {
// The Bloom filter is broken, turn it off.
throw new IllegalArgumentException(
"Failed to load Bloom block for key "
+ Bytes.toStringBinary(key, keyOffset, keyLength), ex);
}

ByteBuffer bloomBuf = bloomBlock.getBufferReadOnly();
result = ByteBloomFilter.contains(key, keyOffset, keyLength,
bloomBuf.array(), bloomBuf.arrayOffset() + bloomBlock.headerSize(),
bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount);
}

在如下方法(感觉时seekTO时,用于scan时指定了开始的rowkey,这样解释就合理了。在reader.passesBloomFilter中有判断是否时scan)中使用block index过滤了。

1
BlockWithScanInfo org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexReader.loadDataBlockWithScanInfo(byte[] key, int keyOffset, int keyLength, HFileBlock currentBlock, boolean cacheBlocks, boolean pread, boolean isCompaction) throws IOException

CompoundBloomFilter构造方法中读取Block index的数据。

HBase源码系列之compaction

像hbase这种基于LSM的架构,compaction是其中很重要一个环节。

触发compaction

  1. 手工出发

  2. chore线程

    1. CompactionChecker里判断 「s.needsCompaction() 和 s.isMajorCompaction()」
  3. flush触发

执行compaction

​ 此处才是本文的重点,上面的触发更多是条件的判断,不过也很重要(对于线上系统的运维和问题定位解决)此张章节会来较详细讨论HBase的compaction的有关源码方面的一些记录。

​ 首先调用的是如下方法,生成CompactionContext构造CompactionRunner放入线程池中。

1
2
private synchronized CompactionRequest requestCompactionInternal(final HRegion r, final Store s,
final String why, int priority, CompactionRequest request, boolean selectNow, User user)

​ 在里面调用

1
boolean completed = region.compact(compaction, store, compactionThroughputController, user);

​ 然后调用相应store的compact方法

1
store.compact(compaction, throughputController, user);

​ 里面一个compact操作具体的步骤如下:

​ 1. 开始合并返回合并的结果。

1
List<Path> newFiles = compaction.compact(throughputController, user);

​ 此方法中一个判断是否是major

1
ScanType scanType = scannerFactory.getScanType(request);

​ 此ScanType传递给了ScanQueryMatcher来做scan类型的判断。

​ 2. 结果写入对应的cf的目录中。

1
sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);

​ 3. 将本次compaction写入HLOG中。

1
writeCompactionWalRecord(filesToCompact, sfs);

​ 4. 更新StoreFileManager的storefiles,去除旧的文件,加入新的文件

1
replaceStoreFiles(filesToCompact, sfs);

​ 5. 此时已经可以安心的文件读取了,最后一步骤就是删除旧的数据

1
completeCompaction(filesToCompact); 

HBase的一些坑

最近遇到了一些hbase的坑,先记录一下。

  1. mvcc的坑:在前几天的同事的一次调试,使用的put的指定了ts => 想再测一次就删除了 => 这些数据再次加入hbase查不到。 因为hbase的多版本方式不删除数据,所以在major_compact之前,delete的这个操作是保存在hbase中的。在这种情况下,如果新加入的数据的ts不必delete大时,会被hbase认为这些数据应该被删除。(不过这个感觉很比较,可以使用mvcc的版本来解决操作请求的顺序,而不出现delete后加入的数据的不能被显示出来)

  2. 有时候重启regionserver需要有些region没有上线,需要执行hbase hbck -repair,此命令很危险。

hdfs balance的一些解读和问题记录

##balance过程的解读

  1. 准备工作
    获取的namenode

  2. 调用Balancer.run L1422,开始迭代
    一个循环(除了2.1的返回结果ReturnStatus.IN_PROGRESS 继续之外,其他都结束)

2.1 ReturnStatus r = b.run L1352

​ 2.1.1 initNode获取需要移动的字节数bytesLeftToMove
​ 计算平均使用率
​ 计算出过载和未充分利用的节点需要移动的字节数,两者选取较大值(已排除在与平均值相差在threshold内的节点)
​ 2.1.2 chooseNode获取决定要移动的字节
chooseDatanodes: 三种匹配类型Matcher选取(同一组,同一机架,剩下的),每一类型做如下类型操作

​ 1. 处理过载到未好好利用的。 2. 处理过载到使用少的。 3. 处理多使用到少使用

​ chooseCandidate对于每个源节点,选个候选节点(如果能符合匹配规则Matcher就选择他)
​ matchSourceWithTargetToMove选择两者能移动的少的字节数,形成NodeTask
​ 2.1.3 dispatchBlockMoves L1103启动线程去移动数据,处理NodeTask
​ {两个条件会结束noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS 和Time.now()- startTime > MAX_ITERATION_TIME}
​ dispatchBlocks L614 为每个source启动一个线程去移动数据(线程放入dispatcherExecutor线程池),然后等待回复
​ 此处一轮迭代有时间限制
​ dispatcherExecutor.submit(source.new BlockMoveDispatcher());(调用dispatchBlocks())
​ chooseNextBlockToMove 获取下一步需要移动block,选择的代理节点是拥有此block但是传递到target比较好。
​ 此处有一个5个任务的限制,难怪增大timeout的时间之后不会quota is exceeded***
​ chooseBlockAndProxy 选择块和代理节点
​ isGoodBlockCandidate 选择好的块
​ chooseProxySource 在选中的块中选择一个和target比较好的location
​ scheduleBlockMove 发送到代理节点并给代理节点的发送复制请求
​ dispatch() 发送数据(此处看代码不是异步处理,果然不是)
​ 在目的节点DataXceiver.replaceBlock方法接受来自proxy的block发送流
​ 在此处有balanceThrottler限制
​ filterMovedBlocks 过滤
​ 判断需要不需要更多的block
​ waitForMoveCompletion()等待所有targets里的pending任务结束(如果没有结束此处会等待)
​ shouldContinue如果移动字节数(dispatchBlockMoves结果)大于零或等于零次数少于 MAX_NOT_CHANGED_ITERATIONS就是in_progress
2.2 resetData清楚数据,为再来一次循环做准备
2.3 根据r判断是否结束,判断条件在上面

简单概括

--> 计算那些需要移动的节点 
--> 在此节点中选择想要移动的block 
--> 对此block选择一个proxy(此proxy也有这个block的副本,并且传输起来比较好) 
--> 建立任务(向target节点发送replaceBlock请求,target节点向proxy节点发送copyBlock请求)拷贝数据

问题与解决

  1. balancer节点发送请求会有超时,在日志文件中报Read timed out -> 日志会报线程数超出配额 -> 提早退出balance过程,这种情况是balancer节点和target节点之间的rpc断开连接。只要改大超时设置就可以了(不清楚为啥不把超时统一到那个配置,并且hdfs把那些异常处理不打印问题,这个处理方式也很奇怪,可能新版本有改进)。

HBase源码系列之server端的get请求处理

scanner的层次

一个Region对应一个RegionScannerImpl


  • RegionScannerImpl (下面缩进代表了层级)

    • StoreScanner (组成storeHeap 和 joinedHeap, 这俩都是KeyValueHeap, 用的是region的Comparator)

      • MemStoreScanner
      • StoreFileScanner

      ​ (MemStoreScanner 和 StoreFileScanner 组成heap ,也是一个KeyValueHeap, 用的是store的Comparator)

对scanner预处理(裁剪等)

  1. 获取此region中store里的mem和file能组成的scanner,并做一定程度的过滤(rang,bf和ts)

    获取所有的scanner

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    //对此region中每个store(列族)都需要获取scanner
    for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
    //此方法是获取了scanner,过滤了一些scanner,并且还粗略的检索到需要的位置。
    //此scanner是一个store对应的
    KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
    if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) {
    scanners.add(scanner);
    } else {
    //比如有filter的情况
    joinedScanners.add(scanner);
    }
    }
    //生成俩KeyValueHeap,此处的比较器不知道啥意思。真他妈尴尬
    initializeKVHeap(scanners, joinedScanners, region);

    1.1 跟入store.getScanner方法,现在次问题只在一个store中讨论。store间的协调后面会提到。

    1
    2
    3
    4
    5
    6
    //此方法和1.1.4节中想呼应。
    this.store.addChangedReaderObserver(this);
    //获取所有的file,打开文件,做一些过滤等。
    List<KeyValueScanner> scanners = getScannersNoCompaction();
    seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, isParallelSeekEnabled);
    resetKVHeap(scanners, store.getComparator());

    1.1.1 getScannersNoCompaction()里面有重要的方法如下。

    1
    2
    selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
    isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));

    ​ 主要两层,store.getScanners中是获取所有的file,打开文件。

    ​ selectScannersFrom主要做过滤,主要是如下方法

    1
    2
    3
    kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)
    此方法两个分支,mem和file值得关注,此处就不说了。反正用了一些range和bf(bf只针对单行和单行列族)
    bf的过滤有一个判断(!scan.isGetScan()),如果是get,并且isStartRowAndEqualsStopRow为true才走bf。

    1.1.2 seekScanners

    ​ bf判断,时间判断,伪造kv使其跳过此个检索吧。

    1.1.3 resetKVHeap

    ​ 组织成一个KeyValueHeap,比较器有代表时间的id等。

    1.1.4另外如下方法也会让检索重新打开scanner,一般是有compact,bulkload等操作。需要去重新,应该为更改了一些东西。

    1
    notifyChangedReadersObservers()
  2. 裁剪的storefile和block的方式

    我个人觉得总共几个部分,包括time裁剪,key裁剪,bf和索引

    获取scanner分两个,一个memstore的,另一个是file的。

    获取有关file的Scanner如下:

    1
    2
    3
    4
    this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);

    List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan,
    cacheBlocks, usePread, isCompaction, false, matcher, readPt);

    获取有关mem的Scanner如下:

    1
    memStoreScanners = this.memstore.getScanners(readPt);

    然后在返回之前做了筛选,注意那个selectScannerFrom方法

    1
    2
    selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
    isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));

    selectScannerFrom还是分两个分支,有两个实现,file的和mem的。

    file的过滤分三种,time,key和bf。file过滤实现如下:

    1
    2
    3
    4
    public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long oldestUnexpiredTS) {
    return reader.passesTimerangeFilter(scan.getTimeRange(), oldestUnexpiredTS)
    && reader.passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, columns);
    }

    memstore的实现如下:

    1
    2
    3
    4
    5
    6
    7
    public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
    TimeRange timeRange = scan.getTimeRange();
    return (timeRangeTracker.includesTimeRange(timeRange) ||
    snapshotTimeRangeTracker.includesTimeRange(timeRange)) &&
    (Math.max(timeRangeTracker.getMax(), snapshotTimeRangeTracker.getMax())
    >= oldestUnexpiredTS);
    }

    从文件格式看,主要是几个过滤,主要代码如下

    主要的实现在如下方法中

    1
    2
    3
    BlockWithScanInfo blockWithScanInfo =
    indexReader.loadDataBlockWithScanInfo(key, offset, length, block, 、
    cacheBlocks, pread, isCompaction);

    在HFileReaderV2中,由索引过滤和blockcache的实现,在

    根据索引信息过滤block

    1
    int rootLevelIndex = rootBlockContainingKey(key, keyOffset, keyLength);

    方法实现如下

    1
    2
    3
    4
    5
    public int rootBlockContainingKey(final byte[] key, int offset,
    int length) {
    int pos = Bytes.binarySearch(blockKeys, key, offset, length,
    comparator);
    ....}

    如果需要读取,会先从blockcache中读取,减少io。blcokcache中读取block,代码如下

    1
    2
    3
    block = cachingBlockReader.readBlock(currentOffset,
    currentOnDiskSize, shouldCache, pread, isCompaction, true,
    expectedBlockType);

    两处使用bf索引过滤,一处是isLazy==true时

    1
    boolean org.apache.hadoop.hbase.regionserver.StoreFileScanner.requestSeek(KeyValue kv, boolean forward, boolean useBloom)

    bf的粒度是chunk。

协调一个cf下的检索

  1. 我们讲获取到的scanner组织成heap,然后从heap中获取对应的数据。这个heap需要有一定的组织,比如检索到了数据,需要关闭;对于同一个key,需要在所有的scanner中获取到等

    获取到scanner之后就开始获取数据结果了。

    1
    scanner.next(results);

    里面一堆逻辑的,实际获取数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    private KeyValue populateResult(List<Cell> results, KeyValueHeap heap, int limit,
    byte[] currentRow, int offset, short length) throws IOException {
    KeyValue nextKv;
    do {
    heap.next(results, limit - results.size());
    if (limit > 0 && results.size() == limit) {
    return KV_LIMIT;
    }
    nextKv = heap.peek();
    } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
    return nextKv;
    }

StoreFileScanner中检索相关的处理

。。。

协调不同cf下的检索

暂时不看,暂时在业务中不需要。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69





===================== 以下太乱 ======================

hbase检索在region中的检索流程,其中主要需要分析的是在一个store中的检索的操作,分两个层面:一个是到scanner的上层的处理,另一个是scanner的内部的处理(包括men和file)两种。

## 处理scanner

## scanner内部

### memscanner

要从memstore中获取想要的数据,感觉不会很难,因为memstore是个有一定顺序。

### filescanner

对于filestore的检索,hbase提供了集中方式来加速检索。

1. bloomfilter的过滤,见”处理scanner“
2. index的过滤
3. blockcache的使用








## 对一个get的请求怎么判断结束

boolean org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl.nextInternal(List<Cell> results, int limit)

只执行一次。一个keyvalue只会在一个block中,其实并不需要再去检索,只要检索一次就可以了,感觉是对的。没想到前几天想的有问题。



## scan判断结束

scan的判断结束

```java
while (i < rows) {
// Stop collecting results if maxScannerResultSize is set and we have exceeded it
if ((maxResultSize < Long.MAX_VALUE) &&
(currentScanResultSize >= maxResultSize)) {
builder.setMoreResultsInRegion(true);
break;
}
// Collect values to be returned here
moreRows = scanner.nextRaw(values);
if (!values.isEmpty()) {
for (Cell cell : values) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
currentScanResultSize += kv.heapSizeWithoutTags();
totalKvSize += kv.getLength();
}
results.add(Result.create(values));
i++;
}
if (!moreRows) {
//结束了
break;
}
values.clear();
}

检索keyvalue

1
int org.apache.hadoop.hbase.io.hfile.HFileReaderV2.AbstractScannerV2.seekTo(byte[] key, int offset, int length, boolean rewind) throws IOException

实现获取block

​ index 去检索

检索到指定位置

bloom index的位置

第一层的root index就是chunk的位置。

但是root index指向的是imtermediate或leaf,理论上来说leaf对应chunk

HFile中除了Data Block需要索引之外,上一篇文章提到过Bloom Block也需要索引,索引结构实际上就是采用了single-level结构,文中Bloom Index Block就是一种Root Index Block。

bloom block 和 data block的索引不一致。bloom block采用单层结构,data block采用多层结构。

bloom block的单层结构的叶子结点是chunk。

get请求读取block的种类和情况

  1. 正常读取block的

get 流程真是复杂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  public boolean seek(KeyValue key) throws IOException {
if (seekCount != null) seekCount.incrementAndGet();

try {
try {
if(!seekAtOrAfter(hfs, key)) {
LOG.info("liubb seekAtOrAfter false");
close();
return false;
} else {
LOG.info("liubb seekAtOrAfter true");
}

cur = hfs.getKeyValue();

return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
} finally {
realSeekDone = true;
}
} catch (FileNotFoundException e) {
throw e;
} catch (IOException ioe) {
throw new IOException("Could not seek " + this + " to key " + key, ioe);
}
}
里面的seekAtOrAfter很重要。
1
2
3
4
5
ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
qcode = optimize(qcode, kv);
switch(qcode) {

此处各种code的解释对于不同storefile的理解也很好。

可以看到一些问题和关注点

  1. RegionScannerImpl里使用StoreScanner 时,各种heap的操作主要需要关注一个记录的组织,因为一条记录的不同列族保存在不同的store中。还要关注什么时候结束。
  2. StoreScanner中的两种scanner(StoreFileScanner可能有多个)需要协调在不同scanner中的不同的版本的问题。还要关注什么时候结束。