《另一种选择》读后感

今天刚阅读完谢丽尔·桑德伯格的《另一种选择》,有几点感想想要分享。

  1. 我们应该勇敢的去面对失败,出错。
  2. 看完书不得不感慨作者的厉害,不仅仅是在google和facebook成功,也包括了如此勇敢的去面对痛苦并且去努力的帮助别人。并且书中体现一些素养也确实觉得她值得这么成功。
  3. 我觉得成功的人无非两种,对于宏观经济和自己行业的绝对了解,另一种是,对于自己或用户的人性的了解。能很好理解和掌握自己和用户的人性上的需求在事业不会差到哪里。
  4. 也许,我们很多时候想的和做的决定都不是最好的,甚至是错的。这可能也是我们需要从别人和从从前的自己学的原因。

最近整个工作的节奏和完成速度确实让我觉得有点失望,不过整个swift项目和python的难度确实在预料范围之外。应该更努力的解决问题。

给自己一个设想,如果自己未来只有五十年,三十年,十年,五年甚至一年的时间。我们还是如此散漫的挥霍宝贵的周末时光吗?以后一定要尽量让自己不要处于如此散漫的地步。

DFSOutputStream分析

private void waitAndQueueCurrentPacket()
调用者:

1
2
3
private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum)
public synchronized void close()

public class DFSOutputStream extends FSOutputSummer

1
2
3
4
5
6
7
implements Syncable, CanSetDropBehind {
private final LinkedList<Packet> dataQueue = new LinkedList<Packet>(); // dataQueue是数据队列,用于保存等待发送给datanode的数据包
private final LinkedList<Packet> ackQueue = new LinkedList<Packet>(); // ackQueue是确认队列,保存还没有被datanode确认接收的数据包
private DataStreamer streamer; // streamer线程,不停的从dataQueue中取出数据包,发送给datanode
...
}

1
2
3
4
5
6
7
8
9
class DataStreamer extends Daemon {
private volatile boolean streamerClosed = false;
private ExtendedBlock block; // its length is number of bytes acked
private Token<BlockTokenIdentifier> accessToken;
private DataOutputStream blockStream; // socket的输出流(client->datanode),用于将数据传输给datanode
private DataInputStream blockReplyStream; // socket的输入流(datanode->client),用户收到datanode的确认包
private ResponseProcessor response = null; // response线程,用于接收从datanode返回的反馈信息
...
}

block > packet > chunk(带chunksum)

主要看看writeChunk方法。

  1. 构造currentPacket,写入data和checksum,并且的chunk数++,bytesCurBlock增加数据的长度。
  2. 如果chunk数足够多。则调用waitAndQueueCurrentPacket
  3. 如果如果block的大小足够大的话,

其中一个处理时添加chunk到缓存中。一下具体处理为判断是否可以添加入发送给datanode的缓存中。lastQueuedSeqno保存了最后一个需要处理的序号。
/**

  • 如果没有足够空间话就等待,然后
    */
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    private void waitAndQueueCurrentPacket() throws IOException {
    synchronized (dataQueue) {
    try {
    // If queue is full, then wait till we have enough space
    while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS) {
    try {
    dataQueue.wait();
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    break;
    }
    }
    checkClosed();
    queueCurrentPacket();
    } catch (ClosedChannelException e) {
    ...
    }
    }
    }
    /**
  • 添加dataQueue,设置lastQueuedSeqno
    */
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    private void queueCurrentPacket() {
    synchronized (dataQueue) {
    if (currentPacket == null) return;
    dataQueue.addLast(currentPacket);
    lastQueuedSeqno = currentPacket.seqno; //在waitForAckedSeqno方法使用,判断是否所有数据都被写入并接受到ack了。
    if (DFSClient.LOG.isDebugEnabled()) {
    DFSClient.LOG.debug("Queued packet " + currentPacket.seqno);
    }
    currentPacket = null;
    dataQueue.notifyAll();
    }
    }

DataStreamer是处理添加入缓存dataQueue的数据。将他们发送然后添加到ackQueue中。
在while中有processDatanodeError的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
dataQueue.removeFirst();
ackQueue.addLast(one);
dataQueue.notifyAll();
}
}
// write out data to remote datanode
try {
one.writeTo(blockStream);
blockStream.flush();
}

还需要一个线程接受ack,为ResponseProcessor
//获取返回的序号
ack.readFields(blockReplyStream);
long seqno = ack.getSeqno();
//获取dataQueue里第一个包的序号
Packet one;
synchronized (dataQueue) {
one = ackQueue.getFirst();
}
if (one.seqno != seqno) {
throw new IOException("ResponseProcessor: Expecting seqno " +
" for block " + block +
one.seqno + " but received " + seqno);
}
//移除ackQueuede里的第一个,
synchronized (dataQueue) {
lastAckedSeqno = seqno; //lastAckedSeqno在waitForAckedSeqno有使用
ackQueue.removeFirst();
dataQueue.notifyAll();
}

toWaitFor = lastQueuedSeqno;
waitForAckedSeqno(toWaitFor)在flushInternal和flushOrSync会调用。

一个奇怪的rwlock锁卡住的case

遇到一个case,有关于读写锁。原因路径如下:

  1. 线上有一个datanode节点宕机,导致与此节点的连接出现问题。
  2. 有hdfs客户端进程在处理此连接超时部分的代码有问题,没有将外层的超市传入至底层socket里,出问题的线程持有了无法超市推出,一直hang住,此线程还持有了一个读写锁的读锁。
  3. 由于内部某些机制另一个线程需要写锁,同时此锁的读锁会不停有人由于请求需要拿到和释放。
  4. 这时候出现了其他线程的读锁也无法获取到。

从线程栈看,就是一个线程获取了读锁,阻塞了别的线程获取读锁和写锁。

还原现场例子代码地址:https://github.com/liubinbin/sta/tree/master/rwlock/src/rwlock

1
看看ReentrantReadWriteLock,注意构造函数里可以传一个布尔值,区分公平锁和非公平锁。

NonfairSync和FairSync分别是两种实现,其中都实现了readerShouldBlock方法。这问题分两种情况:

  1. 公平锁:这种情况下对AQS的请求是需要安装时间顺序,如果新来的读请求在写请求之后就需要等待。
  2. 非公平锁:为了防止写请求饥饿,读请求会先判断等待队列头是否是写请求,所以在这种情况下,写请求会隔离读锁和读锁。

此问题最后解决办法是让那个出问题的读锁尽快的释放,不要一直占着读锁就可以把影响降低。

最近使用docker的一些小记录

mysql出了需要grant之外,还要修改bind-address

docker 基本命令

docker 中container基于image制作,image类似一个模版,基于某个image制作的contaner都具有和image一样的内容

我们可以根据REPOSITORY来判断这个镜像是来自哪个服务器,如果没有 / 则表示官方镜像,并且在search的时候会标有OFFICIAL,类似于ip:port/repos_name则表示的是私服。

docker pull username/repository<:tag_name> 或者 docker pull repository,pull和push相对应。

5.2 运行出一个container放到后台运行

1
2
3
# docker run -d ubuntu /bin/sh -c "while true; do echo hello world; sleep 2; done"
ae60c4b642058fefcc61ada85a610914bed9f5df0e2aa147100eab85cea785dc

它将直接把启动的container挂起放在后台运行(这才叫saas),并且会输出一个CONTAINER ID,通过docker ps可以看到这个容器的信息,可在container外面查看它的输出docker logs ae60c4b64205,也可以通过docker attach ae60c4b64205连接到这个正在运行的终端,此时在Ctrl+C退出container就消失了,按ctrl-p ctrl-q可以退出到宿主机,而保持container仍然在运行

端口映射

Docker中运行的程序的端口是不能直接访问的,需要映射到本地,通过-p参数实现,例如将6379端口映射到本机的6378端口

容器日志

查看当前容器的日志

docker logs container-name/container-id

我们可以查看之前redis镜像的容器

docker logs test-redis

可以看到redis启动的日志

运行中的容器其实就是一个完备的Linux操作系统,我们可以登录访问当前容器,登录后可以在容器中进行常规的Linux操作。
docker exec -it container-id/container-name bash

系统版本

[root@bogon yum.repos.d]# uname -a
Linux bogon 2.6.32-642.el6.x86_64 #1 SMP Tue May 10 17:27:01 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux
[root@bogon yum.repos.d]# cat /etc/redhat-release
CentOS release 6.8 (Final)

安装EPEL

因为系统自带的repo中不带docker需要安装epel

rpm -Uvh http://ftp.riken.jp/Linux/fedora/epel/6Server/x86_64/epel-release-6-8.noarch.rpm

安装Docker

yum install -y docker-io

开机自启动与启动Docker

[root@bogon yum.repos.d]# service docker start
Starting cgconfig service: [ OK ]
Starting docker: [ OK ]
[root@bogon yum.repos.d]# chkconfig docker on
[root@bogon yum.repos.d]# chkconfig docker –list
docker 0:off 1:off 2:on 3:on 4:on 5:on 6:off
[root@bogon yum.repos.d]#

至此docker已经安装完成

docker rm 删除container

docker rmi 删除image

http://blog.csdn.net/permike/article/details/51879578 总结的非常好

已完成
docker pull morrisjobke/docker-swift-onlyone
docker pull kahing/docker-swift
docker pull debian

docker pull [选项] [Docker Registry 地址[:端口号]/]仓库名[:标签]

启动container
docker run -d -P -v /srv/node/sdb1/docker/:/swift/nodes -t bouncestorage/swift-aio
通过docker ps 获取端口
然后通过http://127.0.0.1:/auth/v1.0 进行系统的操作和访问
具体的命令形式如下
swift -A http://127.0.0.1:32770/auth/v1.0 -U test:tester -K testing

swift-aio-docker

docker run -it –rm debian bash
-it: 这是两个参数,一个是 -i:交互式操作,一个是 -t 终端。
–rm: 这个参数是说容器退出后随之将其删除

bash:放在镜像名后的是命令,这里我们希望有个交互式 Shell

1
docker pull [选项] [Docker Registry 地址[:端口号]/]仓库名[:标签]

Docker Registry 地址[:端口号] 默认docker hub

仓库名为<用户名>/<软件名> ,默认为 library

latest为默认标签

最近感想

最近在读萨缪尔森的《经济学》,其实很早此书就在淘宝的购物车里,一直没有买来读。大概是自己懒的原因吧。整本书很大并且很厚,很有教材的感觉,读完真的很累,但是觉得很值。现在渐渐感觉看到一件事,一个政策,一个评论能想到一些东西。

之前微博里在说保险相关的信息,其实我们每个人都应该读点经济学和金融的书,使自己能更好的理解发布的各种信息背后可能隐藏的一些更深层次的信息,以便我们能更好的做出更加理智的判断。

点赞学长,前几天下载的《Linux System Prorgramming》中文翻译的pdf是工大的一些学长做的。我真的感慨自己表达出的观点真的太少,并且没有做能影响别人的事情,以后我一定要多在此博客中发表自己的想法,并且希望以后能做一些影响别人的事情,至少是一些有趣的事情。

经济学

那本书里面其实说了很多原理性的东西,读着很枯燥,但是原理又是理解一切的基础。在那本书中,我最感兴趣的其实是货币政策和财政政策这些部分,那两部分是比较接近有用,能用于生活判断一些东西。

等过段时间像把货币相关的一些东西再找几本书捋一捋,希望可以到时候能更加准确的理解这些事情。等更加理解了之后再写点东西。

大概一个月之前,看了《大逃港》这本书,给我的感觉是,每个人最终应该是为了利益在做事情,一件事情也只有有足够的利益诱惑才能推行直至完成。嗨,不能讲太多,不是很确定line在哪里。

##最近想法

最近看完那本《经济学》之后,越来越感觉自己应该找些事情做。当然基本应该是和计算机相关的,下面列一些最近想到的一些东西。

写一个数据库

在大学时,个人对数据库其实也挺感兴趣,最近三年的工作其实很多也是和数据库相关。现在的想法是自己写个数据库应该是一件有意思的事情,另一个动因是买了《代码大全》这本书,感觉看完之后应该能够有东西来实践。两者可以结合在一起做。应该是件有意思的事情。

获取跑步路线

之前在北京的时候,跑步很多时候是在公园进行的。跑完看着那个路线图,如果自己跑了一个2017等数字的形状的,应该是件有意思的事情。

最近稍微想了这个事情,这功能的实现很依赖地图数据的样子。

水果店买水果

有一次去水果店买东西的时候,店里工作让加入一个群,说里面有各种优惠。到现在也对此种产品信息的推送感觉不是很理想。

  1. 水果店的水果信息没法完全展示在顾客面前,因为每天在群里只能看到一些优惠的水果。

  2. 通过每个人把自己想买的东西加入一个列表里,然后在群里不停的发列表。这种方式其实是对店主友好,对群里的顾客不友好。

  3. 仔细想这件事,本质很团购或会员卡没什么区别,但是这其中店主和顾客之间信息的交流很不顺畅。

    信息的顺畅交流才能更好地提升效率,利用好互联网这个工具和提高营收。

阅读英文查阅的很不方便和音乐软件的割裂

英文阅读方面,扇贝做的不错。看到有不懂的词汇或词组可以直接长摁然后显示意思,可以非常快的回到阅读中,对整体的阅读流畅性影响不是很大,有一个问题是文章比较尴尬。如果是别的文章质量更加好的app或网址的话,生词查阅方面做的没这么好。

网易云音乐貌似遇到的版权问题,我看了一下我收藏的音乐,基本没有变灰无法听的,突然感觉自己的音乐品味是不是和大众不太一样。

研报,公告的信息获取

研报和公告信息相对来说比较难。

vmware安装centos

此文记录自己安装centos时的一些步骤纯粹为了下次安装更加快速的完成,因为在前段时间删除虚拟机之后发现一下子想不起来是怎么安装的centos,需要求助搜索引擎和一步步的探索才能完成。

第0步 安装vmware和下载centos

下载centos从之前的6版本切换到了7版本,此处为了保持和目前线上机器的版本一致,可以使自己能更适应新版的centos。

第1步 创建centos虚拟机

选择Create a custom virtual machine

选择对应的版本CentOS 64-bit

选择Create a new virtual disk

选择Customize Settings,然后选择Save.

然后开始设置配置,主要如下:

  • Processors & Memory (4core 8G)
  • Hard Disk (80G)
  • CD/DVD (IDE) (此处一定需要设置,将下载的iso加入配置中,并且需要连接DVD)

然后开始安装,更改mini 安装模式,添加gnome桌面,然后进入正常的linux安装界面。之后一切就比较顺利。

最后可能需要安装图形界面:yum groupinstall “GNOME Desktop”,重启进入root执行“init 5”

##注意事项

###新版系统需要改变设置

Please check System Preferences ==>Security&Privacy==>Privacy==>Accessibility, make sure you have added Fusion into the list.

###键盘切换快捷键:

cmd + G : 切到虚拟机

cmd + control:切到mac系统

rit in HBase

以前在别人博客中看到看到hbase中RIT的蛋疼,最近也确实感觉这个状态是比较尴尬的,别人会来找你。最近会放一些精力来解析hbase的rit状态,同时也是希望自己能对hbase有更全面和深入的了解。感觉过去两个月对hbase的的检索和加载流程看的比较多了。

遇到一个case:

一个region处于pending_open的状态。

大概逻辑,本来master给某个节点发送open此region的请求,过程进行的很好。

HBase源码系列之HFile

本文讨论0.98版本的hbase里v2版本。其实对于HFile能有一个大体的较深入理解是在我去查看”到底是不是一条记录不能垮block“的时候突然意识到的。

首先说一个对HFile很直观的感觉,我觉得HFile的整个设计中很重要的一点是为减少内容占用。首先写时候可以把一个个block按顺序写入,满足一个chunk写入一个元数据(包括bloomfilter),最后是一些HFile的元数据。对于HFile,我个人觉得主要把握好几个问题。

  1. block的组织
  2. bf和block的关系
  3. index和block的关系
  4. 写入顺序和一些基本的元数据信息结构
  5. 记录能不能跨block

明白这四个问题感觉基本可以大致的描绘出HFile了。

HFileWriterV2

首先,我们知道会引起下HFile的操作有flush和compaction。在此,我们就选择从flush这个入口跟进去看。

在StoreFile中,以下方法主要是为了Store书写到一个HFile中。

1
long org.apache.hadoop.hbase.regionserver.StoreFlusher.performFlush(InternalScanner scanner, CellSink sink, long smallestReadPoint) throws IOException

在此方法会调用如下方法

1
2
3
4
5
6
7
8
9
public void append(final KeyValue kv) throws IOException {
//如下两行主要是来加入到bf中的,下面的这个就是我们经常说的bf索引。
appendGeneralBloomfilter(kv);
appendDeleteFamilyBloomFilter(kv);
//这行是重点
writer.append(kv);
//这行先不管,处理时间戳
trackTimestamps(kv);
}

以下分解append方法

1
2
3
4
5
6
7
8
9
10
//检查key是否有问题,是否按顺序(memstore使用ConcurrentSkipListMap存储,应该不会有此问题)。
//并且返回key是否重复
boolean dupKey = checkKey(key, koffset, klength);
checkValue(value, voffset, vlength);
//如果不重复,则不检查边界,答案不能,因为如果有重复,不会检查边界更不会新建一个block。***问题5***
if (!dupKey) {
//此出会检查block的大小,并且有一处需要注意,在里面的代码中有一些记录block信息的,这个以后会有用。
//此处会写出chunk,处理readyChunks
checkBlockBoundary();
}

上面注释中说的那个代码如下

1
2
byte[] indexKey = comparator.calcIndexKey(lastKeyOfPreviousBlock, firstKeyInBlock);
dataBlockIndexWriter.addEntry(indexKey, lastDataBlockOffset, onDiskSize);

append下面是一些很正常的数据写入(都是对stream的添加操作),元数据记录(firstKeyInBlock)等。

回到appendGeneralBloomfilter(kv)方法,此方法里面有一个判断是值得注意的。

1
2
3
4
5
//在此代码中会判断key的个数,如果key的个数达到了一定程度就新建一个chunk,放入readyChunks(这个会在checkBlockBoundary中处理),此出会写bf。***问题2***
enqueueReadyChunk(false);
... 这种是处理chunk被写出的时候的操作。重置一些值 ...
//真正的添加到bf中
chunk.add(bloomKey, keyOffset, keyLength);

在enqueueReadyChunk(false)中有

1
2
3
4
5
ReadyChunk readyChunk = new ReadyChunk();
readyChunk.chunkId = numChunks - 1;
readyChunk.chunk = chunk;
readyChunk.firstKey = firstKeyInChunk;
readyChunks.add(readyChunk);

然后时间很快就到了close环节。

1
2
//此处组织了block,将加入到此HFile的chunk生成树的结构。
long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);

block组织也分两类,一个chunk里组织block(他们共生存啊,用了一个bf),另外是root index和intermedia index的组织,实际这个更多感觉是组织chunk。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void writeInlineBlocks(boolean closing) throws IOException {
//inlineBlockWriters 应该就3个,两个bf和一个block(待确定)
for (InlineBlockWriter ibw : inlineBlockWriters) {
while (ibw.shouldWriteBlock(closing)) {
long offset = outputStream.getPos();
boolean cacheThisBlock = ibw.getCacheOnWrite();
ibw.writeInlineBlock(fsBlockWriter.startWriting(
ibw.getInlineBlockType()));
fsBlockWriter.writeHeaderAndData(outputStream);
//此处添加leaf index block
ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
fsBlockWriter.getUncompressedSizeWithoutHeader());
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();

if (cacheThisBlock) {
doCacheOnWrite(offset);
}
}
}
}

ibw.shouldWriteBlock(closing)方法的判断如下,实际是判断是否有chunk

1
2
3
4
5
public boolean shouldWriteBlock(boolean closing) {
enqueueReadyChunk(closing);
//readyChunks中保存的是chunk,也就是lead index block
return !readyChunks.isEmpty();
}

下面是写入bloom meta index,感觉就是chunk的那些。

1
bloomBlockIndexWriter.writeSingleLevelIndex(out, "Bloom filter");

其实还有部分元数据(各种offset和树的生成)没有分析。以后在说吧。

HFileReaderV2

由上述的代码分析来看,其实读取的时候最主要要解决的是是否读此block。决定了读此block之后已经没有太多需要在此文章中分析了,因为那是检索流程的事情(组织memstore和storefile)

  1. 读block index和bloom filter信息
  2. 使用这两种索引过滤block

HFileReader主要涉及到的几个方法,包括获取和open。发生在在检索获取scanner和过滤scanner时。

在List HStore.getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt)中如下代码,获取此store中的file对应的scanner。

1
List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt);

此方法调用了如下方法。

1
2
//此方法会调用Open方法
StoreFile.Reader r = file.createReader(canUseDrop);

接着调用open方法,方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
   if (this.reader != null) {
throw new IllegalAccessError("Already open");
}

// Open the StoreFile.Reader
this.reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind);

// Load up indices and fileinfo. This also loads Bloom filter type.
metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());

// Read in our metadata.
byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
if (b != null) {
// By convention, if halfhfile, top half has a sequence number > bottom
// half. Thats why we add one in below. Its done for case the two halves
// are ever merged back together --rare. Without it, on open of store,
// since store files are distinguished by sequence id, the one half would
// subsume the other.
this.sequenceid = Bytes.toLong(b);
if (fileInfo.isTopReference()) {
this.sequenceid += 1;
}
}

if (isBulkLoadResult()){
// generate the sequenceId from the fileName
// fileName is of the form <randomName>_SeqId_<id-when-loaded>_
String fileName = this.getPath().getName();
// Use lastIndexOf() to get the last, most recent bulk load seqId.
int startPos = fileName.lastIndexOf("SeqId_");
if (startPos != -1) {
this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
fileName.indexOf('_', startPos + 6)));
// Handle reference files as done above.
if (fileInfo.isTopReference()) {
this.sequenceid += 1;
}
}
this.reader.setBulkLoaded(true);
}
this.reader.setSequenceID(this.sequenceid);

b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
if (b != null) {
this.maxMemstoreTS = Bytes.toLong(b);
}

b = metadataMap.get(MAJOR_COMPACTION_KEY);
if (b != null) {
boolean mc = Bytes.toBoolean(b);
if (this.majorCompaction == null) {
this.majorCompaction = new AtomicBoolean(mc);
} else {
this.majorCompaction.set(mc);
}
} else {
// Presume it is not major compacted if it doesn't explicity say so
// HFileOutputFormat explicitly sets the major compacted key.
this.majorCompaction = new AtomicBoolean(false);
}

b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));

//此出会读取bloom filter
BloomType hfileBloomType = reader.getBloomFilterType();
if (cfBloomType != BloomType.NONE) {
reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
if (hfileBloomType != cfBloomType) {
LOG.info("HFile Bloom filter type for "
+ reader.getHFileReader().getName() + ": " + hfileBloomType
+ ", but " + cfBloomType + " specified in column family "
+ "configuration");
}
} else if (hfileBloomType != BloomType.NONE) {
LOG.info("Bloom filter turned off by CF config for "
+ reader.getHFileReader().getName());
}

// load delete family bloom filter
reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);

try {
this.reader.timeRange = TimeRangeTracker.getTimeRange(metadataMap.get(TIMERANGE_KEY));
} catch (IllegalArgumentException e) {
LOG.error("Error reading timestamp range data from meta -- " +
"proceeding without", e);
this.reader.timeRange = null;
}
return this.reader;

判断的一个文件是否需要读取时,在伟大的 boolean org.apache.hadoop.hbase.regionserver.StoreFileScanner.shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long oldestUnexpiredTS) 方法中的如下方法使用了bloomfilter。

1
2
//此处使用bloomfilter过滤。在此方法中会调用bloomFilter.contains,在此contains会先使用block index 判断。
reader.passesBloomFilter(scan, columns)

里面会调用一个contains

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
   //判断读取哪个block,rootBlockContaingKey里的blockKeys为chunk的个数。
//index是从bloommeta中读取,DataInput bloomMeta = reader.getGeneralBloomFilterMetadata(); 代码获取。
int block = index.rootBlockContainingKey(key, keyOffset, keyLength);
if (block < 0) {
result = false; // This key is not in the file.
} else {
HFileBlock bloomBlock;
try {
// We cache the block and use a positional read.
//读取那个chunk的bf
bloomBlock = reader.readBlock(index.getRootBlockOffset(block),
index.getRootBlockDataSize(block), true, true, false, true,
BlockType.BLOOM_CHUNK);
} catch (IOException ex) {
// The Bloom filter is broken, turn it off.
throw new IllegalArgumentException(
"Failed to load Bloom block for key "
+ Bytes.toStringBinary(key, keyOffset, keyLength), ex);
}

ByteBuffer bloomBuf = bloomBlock.getBufferReadOnly();
result = ByteBloomFilter.contains(key, keyOffset, keyLength,
bloomBuf.array(), bloomBuf.arrayOffset() + bloomBlock.headerSize(),
bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount);
}

在如下方法(感觉时seekTO时,用于scan时指定了开始的rowkey,这样解释就合理了。在reader.passesBloomFilter中有判断是否时scan)中使用block index过滤了。

1
BlockWithScanInfo org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexReader.loadDataBlockWithScanInfo(byte[] key, int keyOffset, int keyLength, HFileBlock currentBlock, boolean cacheBlocks, boolean pread, boolean isCompaction) throws IOException

CompoundBloomFilter构造方法中读取Block index的数据。

HBase源码系列之compaction

像hbase这种基于LSM的架构,compaction是其中很重要一个环节。

触发compaction

  1. 手工出发

  2. chore线程

    1. CompactionChecker里判断 「s.needsCompaction() 和 s.isMajorCompaction()」
  3. flush触发

执行compaction

​ 此处才是本文的重点,上面的触发更多是条件的判断,不过也很重要(对于线上系统的运维和问题定位解决)此张章节会来较详细讨论HBase的compaction的有关源码方面的一些记录。

​ 首先调用的是如下方法,生成CompactionContext构造CompactionRunner放入线程池中。

1
2
private synchronized CompactionRequest requestCompactionInternal(final HRegion r, final Store s,
final String why, int priority, CompactionRequest request, boolean selectNow, User user)

​ 在里面调用

1
boolean completed = region.compact(compaction, store, compactionThroughputController, user);

​ 然后调用相应store的compact方法

1
store.compact(compaction, throughputController, user);

​ 里面一个compact操作具体的步骤如下:

​ 1. 开始合并返回合并的结果。

1
List<Path> newFiles = compaction.compact(throughputController, user);

​ 此方法中一个判断是否是major

1
ScanType scanType = scannerFactory.getScanType(request);

​ 此ScanType传递给了ScanQueryMatcher来做scan类型的判断。

​ 2. 结果写入对应的cf的目录中。

1
sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);

​ 3. 将本次compaction写入HLOG中。

1
writeCompactionWalRecord(filesToCompact, sfs);

​ 4. 更新StoreFileManager的storefiles,去除旧的文件,加入新的文件

1
replaceStoreFiles(filesToCompact, sfs);

​ 5. 此时已经可以安心的文件读取了,最后一步骤就是删除旧的数据

1
completeCompaction(filesToCompact); 

HBase的一些坑

最近遇到了一些hbase的坑,先记录一下。

  1. mvcc的坑:在前几天的同事的一次调试,使用的put的指定了ts => 想再测一次就删除了 => 这些数据再次加入hbase查不到。 因为hbase的多版本方式不删除数据,所以在major_compact之前,delete的这个操作是保存在hbase中的。在这种情况下,如果新加入的数据的ts不必delete大时,会被hbase认为这些数据应该被删除。(不过这个感觉很比较,可以使用mvcc的版本来解决操作请求的顺序,而不出现delete后加入的数据的不能被显示出来)

  2. 有时候重启regionserver需要有些region没有上线,需要执行hbase hbck -repair,此命令很危险。