HBase源码系列之memstore

主要结构

​ 主要结构包括「kvset,snapshot」,先说snapshot,主要适用于生成HFile时临时存放的kvset的一个快照。更改的snaphost一定条件判断可以生成HFile时调用

1
FlushResult org.apache.hadoop.hbase.regionserver.HRegion.internalFlushcache(HLog wal, long myseqid, MonitoredTask status)

此方法两种情况会调用

​ 一个是关闭时。

1
HRegion.doClose(boolean abort, MonitoredTask status)

​ 另一个时需要flush时。

1
FlushResult org.apache.hadoop.hbase.regionserver.HRegion.internalFlushcache(HLog wal, long myseqid, MonitoredTask status)

flush过程

  1. 准备工作
  2. 获取HRegion.updatesLock的写锁
  3. 获取mvcc事务,并努力前提读点
  4. 调用prepare()
    1. 将snapshot设置为的kvset,并重新生成一个kvset,用于写入。
  5. 释放写锁
  6. 同步wal日志
  7. 等到mvcc结束
  8. flushcache()
  9. commit()
    1. 才更新构造scanner使用的org.apache.hadoop.hbase.regionserver.DefaultStoreFileManager.storefiles

疑问与解释

问题:

最开始,我感觉逻辑有问题,可能会导致在某个时间点(flush)时,读取不到已经写入的数据。

回答:

有三个方面:

  1. 读取数据时会读取snapshot的数据。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    @Override
    public synchronized KeyValue next() {
    if (theNext == null) {
    return null;
    }

    final KeyValue ret = theNext;

    // Advance one of the iterators
    if (theNext == kvsetNextRow) {
    kvsetNextRow = getNext(kvsetIt);
    } else {
    snapshotNextRow = getNext(snapshotIt);
    }

    // Calculate the next value
    theNext = getLowest(kvsetNextRow, snapshotNextRow);

    //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
    //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
    // getLowest() + " threadpoint=" + readpoint);
    return ret;
    }
  2. 清除snapshot和更新文件时在一个锁内。

    insertNewFiles方法将刷写成的HFile加入Store的fileManger里storefiles里,此结构用语构造storefilescanner。

    clearSnapshot方法将snapshot设置为一个新的空的结构。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    this.lock.writeLock().lock();
    try {
    this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
    this.memstore.clearSnapshot(set);
    } finally {
    // We need the lock, as long as we are updating the storeFiles
    // or changing the memstore. Let us release it before calling
    // notifyChangeReadersObservers. See HBASE-4485 for a possible
    // deadlock scenario that could have happened if continue to hold
    // the lock.
    this.lock.writeLock().unlock();
    }
  3. 第三个方面,注意到第一个方面读取时或去的scanner方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet,
    boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
    byte[] stopRow, long readPt) throws IOException {
    Collection<StoreFile> storeFilesToScan;
    List<KeyValueScanner> memStoreScanners;
    this.lock.readLock().lock();
    try {
    storeFilesToScan =
    this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
    memStoreScanners = this.memstore.getScanners(readPt);
    } finally {
    this.lock.readLock().unlock();
    }
    ....
    }

    snapshot里的数据要么被getScanner引用到要么清空并生成新文件,而且两者互斥(使用的是同一个锁)。这样就可以保证数据能被读取到了。

HBase源码系列之MVCC

关键点

​ KeyValue类中有long mvcc的变量,感觉此处可以跟踪。

​ MultiVersionConsistencyControl在一个Region有一个。

​ MultiVersionConsistencyControl{

​ memstoreRead:能够读取的序列号。

​ memstoreWrite:写操作的序列号。

​ writeQueue:写操作队列}

问题点

就是需要去回答的问题,这些问题十分有助于思考这个大问题

  1. 读取是版本的选择

    在org.apache.hadoop.hbase.regionserver.MemStore.MemStoreScanner.getNext(Iterator it) L777中判断了曲出来的KeyValue是否到了可以读的范围(v.getMvccVersion() <= this.readPoint)。否则掠过

    在构造scanner时会将readpoint传入到scanner,说明能读到的版本在构造开始就定好了。

  2. 写的时候的版本

    w = mvcc.beginMemstoreInsert();

    ​ 申请一个写操作的序列号,使用的变量是memstoreWrite

    mvcc.completeMemstoreInsert(w);

    ​ 标记操作结束,尽量更新memstoreRead,并且通知readWriters(在compaction和flush时会用到,因为这时需要将那时的版本都等待写完才能操作)。

    ​ 等待到此操作完成

  3. 回滚memstore的时候版本的控制

    回滚操作时找kv有版本比较MemStore.java L334。并且会滚后再执行mvcc.completeMemstoreInsert(w);

ps:long org.apache.hadoop.hbase.regionserver.HRegion.getReadpoint(IsolationLevel isolationLevel) L1115用了一个隔离级别。之后写一篇隔离级别的小文,加深理解。

去杭州

大学毕业在北京工作生活了近三年之后,即将回到自己的家乡的省会(杭州)来工作生活。明天就要去杭州正式开始我的杭州的生活了。有诸多感慨,从生活和工作两方面,在此小记一下。

生活

北京的生活方面貌似常被吐槽,具体有空气不好,节奏快等等。但是我对北京的感觉还不错,这是个非常适合学习和工作的城市,这也是个伟大的城市。

北京有十分发达的地铁网络并且有十分发达的文化资源,我在工作日可以非常便捷的去上班,在周末可以非常方便的去逛北京的博物馆,美术馆等,去了一些博物馆之后,可以感受到这个世界有很多值得我们去关注的,真是真心喜欢北京这座城市。相较之下,杭州就没这么好,很多地方没通地铁,哭。当然我隐约的感觉到杭州潜力很大,并且希望自己能参与到其中,能随着这个城市的成长而成长。

在去年清明假期时,一个来杭州呆了一个假期,晚上睡宾馆,白天到处走走停停看看,感觉十分惬意,只是略讨厌那一只下雨的天气。当时第一天就去了浙大的玉泉校区,走在曾经梦想的校园里,有种考上浙大来读书的感觉,当我在留学生食堂吃完晚饭之后听着校园里的音乐,走在校园的一个个角落,我感觉这就是天堂。之后去了杭州图书馆,太子湾,植物园等,可以说这很斌斌。不知道为什么,自己很喜欢这种略文艺的东西,也许以后我会用脚步去丈量杭州的每条路。

工作

毫无疑问,自己真的没法说喜欢在北京的工作。不仅仅是自己感觉没有激情的,而且做的事情也不是未来长期能发展的。去绿湾面试时,被问到在上一家公司的感受时,说了一句“不太开心,只是觉得数据量大,人比较有意思”,我猜这应该是我最真实的感受。

到了杭州开始找工作之后,有两个问题一直在心里徘徊:技术还要做多久?我还能不能在技术深度上去努力达到一个程度了。目前还无解。

最近倒是对python去分析金融信息有一定的兴趣,什么时候要搞一搞。

城市

谈谈对杭州的感觉,据说杭州市政府对应届毕业生来杭工作给很大补助,本人也感觉杭州最近人口的增长太慢了。其次,从面试来看隐约感觉杭州现在吸引了大量的人才,而且环境好,未来估计会好。但是在六月份市政府出了如此蛋疼的入户政策,简直无语。

IT和金融感觉是未来很有前途的。

PS

其实从来也没有在网上发表过自己很内心的想法,经过这几个月,也确实发现自己需要改变,需要有更多的表达,以后要多写一些东西,多发些东西。

写RCFile不停报的ArrayIndexOutOfBoundsException错误

场景:使用flume+自主开发的形式将数据按系统的要求写入hdfs中,并生成相应的索引
最近在一个试运行集群中经常发生数据无法入库情况,然后看了一下系统日志,发现日志在不停的抛出如下异常。

步骤1

登录及其的查看异常(不停歇的报此错)如下

1
2
3
4
java.lang.ArrayIndexOutOfBoundsException: 7107
at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:76)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:50)
at java.io.DataOutputStream.writeInt(DataOutputStream.java:197)

首先,我看到此日志,我内心第一反应,这错真是奇葩,没想到定位和解决这个问题用了好几天。

步骤2

看到此异常后,我第一反应是得先找到第一个报错的地方并且查看的此报错的原因。然后我就开始看此处涉及到hdfs的代码,想看看为什么会报错。

1
2
3
4
5
6
7
public void sync() throws IOException {
if (sync != null && lastSyncPos != out.getPos()) {
out.writeInt(SYNC_ESCAPE); // mark the start of the sync
out.write(sync); // write sync
lastSyncPos = out.getPos(); // update lastSyncPos
}
}

RCFile.java中如上代码,程序在out.writeInt就开始报错了,lastSyncPos无法的更新。但是在上层使用如下代码的判断的调用sync。

1
2
3
4
5
private void checkAndWriteSync() throws IOException {
if (sync != null && out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
sync();
}
}

好吧,这writeInt报错了,对于这个我真是醉了。

步骤3

发现了此处无法理解的地方后,我将重点转向了第一个报错的地方。因为他是不确定时间报错,我就开始启动程序后,时常的查看日志,终于在之后的某个早上,我在查看日志时,我发现了第一个报错的地方。然后在那上面看到了如下异常。

1
2
3
4
java.io.IOException: All datanodes ...:50010 are bad. Aborting...
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1147)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:945)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:496)

然后在日志中找第一个报次错的地方,还发现了其他的报错

1
2
3
4
5
6
7
[ShortCircuitCache_SlotReleaser] (org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache$SlotReleaser.run:215)  - ShortCircuitCache(0x3fbdc007): failed to release short-circuit shared memory slot Slot(slotIdx=1, shm=DfsClientShm(7bf35643718a2bf0a9d85884afb8f4f8)) by sending ReleaseShortCircuitAccessRequestProto to /var/run/hdfs-sockets/dn.  Closing shared memory segment.

java.net.ConnectException: connect(2) error: Connection refused when trying to connect to '/var/run/hdfs-sockets/dn'

29 七月 2016 10:14:17,390 WARN [ShortCircuitCache_SlotReleaser] (org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.shutdown:380) - EndpointShmManager(10.139.90.136:50010, parent=ShortCircuitShmManager(5c89577b)): error shutting down shm: got IOException calling shutdown(SHUT_RDWR)

java.nio.channels.ClosedChannelException

步骤4

看到这些日志后,我感觉在10:14一定发生了什么,于是我开始寻找datanode中相关时间点日志STARTUP_MSG: Starting DataNode这种datanode启动信息,我内心一万只草泥马飘过。
在此日志的上面我看到了诸如的的日志。

1
2
2016-07-29 10:14:12,374 INFO org.apache.hadoop.util.JvmPauseMonitor: Detected pause in JVM or host machine (eg GC): pause of approximately 4519ms
No GCs detected

然后就是

1
2016-07-29 10:14:36,720 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: STARTUP_MSG:

中间隔了三十秒。
从这种情况看感觉应该是datanode进程死了,然后又重启了。

步骤5

然后我就开始全力找为什么此datanode会挂。列出几点信息,这些信息是感觉有点头疼。

  1. supervisord.log的日志中有2016-07-29 10:14:16,820 INFO exited: 411-hdfs-DATANODE (terminated by SIGKILL; not expected)此时间正好在10:14:12到10:14:36之间。
  2. 之前无聊时看过datanode虚拟机参数,记得有这么个参数。-XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh
    我开始怀疑是不是我这个参数给datanode进程发了kill信号,然后又重新启动。在/var/log/messages中也没发现oom的异常。

步骤6

开始看killparent.sh脚本,发现在调用此脚本时会将一个字符串写入一个文件,但是我在系统中没有发现这个文件,然后我就有点懵逼了。然后我在此脚本中加了一行我自己的代码,等再看看会不会再出现这种情况。

步骤7

跟了一个周末后,发现killparent.sh的脚本没有调用。然后开始在晚上搜索,看了一些hdfs源码,可以确定的是flume的hdfs客户端和datanode之间的连接断了,然后导致datanode报超时错误,最后在flume中报读取流出错。

步骤8

然后开始了两天完全懵逼的时光,在网上看到说可能ulimit对进程的限制,然后就开始在的集群中查看lsof中flume和datanode的链接使用数量。监控了一晚上什么收获都没有,然后就放弃了。

步骤9

然后开始猜测是不是flume进程的某些原因,因为我在成个几次重启的过程中没有重启过datanode,而只重启了flume,但是重启了之后集群就可以开始加载了。然后又开始做尝试,同时看到看tailf过滤datanode日志中的error和warn信息和flume的出错日志。不停等,终于等到了出错,发现只有datanode报连接异常的时候才会导致flume出错,这时候就断定是flume和datanode之间的连接,突然想到可能是gc,想到之前的看过datanode没有full gc,那就可能是flume,看了flumegc日志,尼玛在出错时间点之前还真有一次400多秒的gc。我内心是崩溃的。

步骤10

明天开始尝试将timeout时间再延长,将超时时间设置为3600,跑了周末两天都没什么问题。

暂时告一段落

因为接下来要做的是调整gc参数。