jdb-usage

在 java 进程有一些莫名其妙的问题,可以通过 jdb 来处理问题。

在上周在通过 HttpClient 访问时,有发现代码里的 NPE,从源码日志看逻辑是没有问题,运行的进程的 jar 包又不好替换,就使用 jdb 命令发现 getContentLength 方法返回值为 -1(后来发现另一个可以使用的接口里没有判断这个值,并且这个值也为 -1,我猜想是从 stream中获取导致不需要这个值),会导致 NPE。在这种情况下使用 jdb 命令非常方便的看到是怎么回事。

具体使用方法有两个博客可以参考,这里记录一下:

  1. https://www.ibm.com/developerworks/cn/java/joy-jdb/index.html
  2. https://blog.csdn.net/arkblue/article/details/39718947

just-pick-the-first-one-from-todolist-and-finish-that

一件件的完成手头上的事

最近确实有些忙,渐渐的发现一个不然自己显得慌忙的方式:一件件的完成手头上的事。这话咋看起来是觉得比较莫名其妙。

这几天偶然看到好几年前自己的QQ签名,“当你犹豫不决,不知道应该先做好什么事情的时候,就静下心来,去做手头最近的事情。犹豫和切换事最浪费时间的事情”。

所以,最近开始疯狂的依赖便签,工作时间只从便签的第一个开始做,当然有时候会改变便签内事情的顺序。如果在做这件事的过程中,又插入一个件事,那就把它插入到便签里,这时候可能会考虑一下它到底在哪个顺位上。这么做了之后,发现两个效果:

  1. 不会出现该干什么事情好的时候,应该任何时候就去做第一件事情就可以了。
  2. 单把项从便签中删除时,会有比较高的成就感,会更有动力去做下一件事情。

timezone-in-spring-mybatis-mysql

spring,mybatis 和 mysql 里的时区问题

大概路径

客户端发起请求

  1. 前端请求
  2. spring 服务端
  3. mybatis 访问
  4. mysql 数据
  5. spring 服务端
  6. 前端显示

mysql

通过命令 show variables like “%time_zone%”; 可以获取 mysql 的时区。

1
2
3
4
5
6
7
8
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| test |
+--------------------+
2 rows in set (0.00 sec)

mybatis

在 spring.datasource.url 参数中需要制定 serverTimezone 和数据库应该对应起来。

1
serverTimezone=GMT%2b8

spring

可以通过如下代码获取时区

1
TimeZone.getDefault()

如果数据为 Date 类型我们可以在服务打印对应的数据来查看数据是否是对的(当然需要考虑时区)。

在对外显示时,我们需要 spring.jackson.time-zone 参数。比如我们如下制定东八区的话就可以看到Date类型被显示成 2019-09-23T00:32:12.000+0800。

1
spring.jackson.time-zone=GMT+8

序列化和反序列化

###反序列化

如果在 query ( 前端传给服务端 )中,默认格式是奇葩的 yyyy-MM-dd’T’HH:mm:ss.SSSZ 格式。一般情况我们会使用到 yyyy-MM-dd HH:mm:ss ,这时候我们需要自定义反序列化。

1
2
@JsonDeserialize(using = CustomJsonDateDeserializer.class)
private Date operateStartDate;

反序列化类

1
2
3
4
5
6
7
8
9
10
11
12
public class CustomJsonDateDeserializer extends JsonDeserializer<Date> {
@Override
public Date deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String date = jp.getText();
try {
return format.parse(date);
} catch (ParseException e1) {
throw new RuntimeException(e1);
}
}
}

序列化

如果在 dto (服务器端传给前端)中,默认的格式是奇葩的 2019-09-22T16:32:28.000+0800。一般情况下我们会使用到 yyyy-MM-dd HH:mm:ss ,这时候我们需要自定义序列化。

1
2
@JsonSerialize(using = CustomJsonDateSerializer.class)
private Date create_time;

序列化类

1
2
3
4
5
6
7
8
9
10
11
12
13
public class CustomJsonDateSerializer extends JsonSerializer<Date> {

@Override
public void serialize(Date date, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
String dateString = dateFormat.format(date);
jsonGenerator.writeString(dateString);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

多时区

如果是多时区,应该使用 UTC 进行存储,然后在显示的地方(前端)转换到各种地方对应的时区。

some-thoughts-about-recent-internal-job-transfer

最近公司内部转岗

最近做了一个比较大的转折,公司内部转岗到别的部门。工作内容也从原来的大数据转到了目前的业务系统。主要思考如下:

  1. 之前维护的服务比较稳定,并且可以预测到业务的发展并不需要我去做作很多事情。
  2. 工作了五年,确实需要在业务或更深的技术上做选择了。

刚好公司给了一个内部转岗到一个做 2B 业务的部门的机会,所以想先去试试,看看做业务的感觉之后再决定。

目前对 2B 业务的感受:

  1. 从质量包括应该是整体设计和关键技术点上去把控项目完成质量。所以看一个项目应该从更宏观的角度看,而不能过多的纠结与细节。
  2. 通过几轮测试和代码 review 去保证细节实现。类似的业务应该去提炼相似的内容然后脚本化,比如生成测试数据。

感觉现在对未来还是比较焦虑,也不知道很多年后回头看这种焦虑是否是对的。

jdk-src-treemap

wiki: https://zh.wikipedia.org/wiki/%E7%BA%A2%E9%BB%91%E6%A0%91

基本结构

static final class Entry<K, V> implements Map.Entry<K, V> {
    K key;
    V value;
    Entry<K, V> left;
    Entry<K, V> right;
    Entry<K, V> parent;
    boolean color = BLACK;
    ...
}

Entry 成员变量有 key,value,左子节点,右子节点和颜色(默认为黑色)。

查看源码小窍门

由于 TreeMap 的代码比较干净和独立,我从源码包 src.zip 中拷贝出 TreeMap.java 重命名加入自己的工程内。就你可以对源码直接编辑了。

核心思想

  • 把红节点往上推或者往另一个子树推。

写入数据

public V put(K key, V value) :

  1. 如果 root 为空,则先设置 root。

  2. 不停的按树的结构寻找需要放 key 的 Entry。

    2.1 如果数中有对应 key 的 Entry,直接设置并返回。

    2.2 如果没有,则找到 key 对应的 parent。

  3. 新建 key 和 value 的 Entry,加入到 parent 的子树中。

  4. 执行 fixAfterInsertion。

  5. 设置 size 和 modCount 值。

fixAfterInsertion代码解读注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private void fixAfterInsertion(Entry<K, V> x) {
x.color = RED;

while (x != null && x != root && x.parent.color == RED) {
// 如果是父节点是黑色的话,则不会改变红黑树性质,不做调整。如果父节点则说明无法满足性质,需要调整。
if (parentOf(x) == leftOf(parentOf(parentOf(x)))) {
// 父节点是祖父节点做子节点。else里和此对应。
// 获取叔父节点
Entry<K, V> y = rightOf(parentOf(parentOf(x)));
if (colorOf(y) == RED) {
// 情形3
setColor(parentOf(x), BLACK);
setColor(y, BLACK);
setColor(parentOf(parentOf(x)), RED);
x = parentOf(parentOf(x));
} else {
if (x == rightOf(parentOf(x))) {
// 情形4(此情形是为5服务的)
x = parentOf(x);
// 左旋
rotateLeft(x);
}
// 情形5,
setColor(parentOf(x), BLACK);
setColor(parentOf(parentOf(x)), RED);
// 右旋,右旋的意义在于不改变平衡树和红黑树性质的情况下,把一个红色转移到另一子树中。应为子树不会影响性质。
rotateRight(parentOf(parentOf(x)));
}
} else {
// 与if内容类似,只是更改了左右方向。
Entry<K, V> y = leftOf(parentOf(parentOf(x)));
if (colorOf(y) == RED) {
setColor(parentOf(x), BLACK);
setColor(y, BLACK);
setColor(parentOf(parentOf(x)), RED);
x = parentOf(parentOf(x));
} else {
if (x == leftOf(parentOf(x))) {
x = parentOf(x);
rotateRight(x);
}
setColor(parentOf(x), BLACK);
setColor(parentOf(parentOf(x)), RED);
rotateLeft(parentOf(parentOf(x)));
}
}
}
root.color = BLACK;
}

读取数据

和二叉树的搜索一样,根据大小的比较不停的往左子树和右子树搜索。知道找到对应的值或到叶子节点。

删除数据

删除逻辑的思路其实比较清晰,就是一步步去构造情形6,因为6可以通过旋转达到我们改变路径中黑色节点的个数。

deleteEntry代码解读注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private void deleteEntry(Entry<K, V> p) {
modCount++;
size--;

// If strictly internal, copy successor's element to p and then make p
// point to successor.
// 如果有两个子节点,则获取后继,然后将后继的节点的内容拷贝至p中,接下来要处理的就变成后继节点。
if (p.left != null && p.right != null) {
Entry<K, V> s = successor(p);
p.key = s.key;
p.value = s.value;
p = s;
} // p has 2 children

// Start fixup at replacement node, if it exists.
/**
* 在这个时候我们可以保证的是,p节点最多只有一个子节点。分为三种情况。
* 1. 有一个节点
* 2. 此节点为ROOT
* 3. 没有子节点
* 此外,我们需要做当节点是黑色时 ,我们需要进行旋转,因为删除黑色会改变红黑树的性质。
*/

Entry<K, V> replacement = (p.left != null ? p.left : p.right);

if (replacement != null) {
// 1. 有一个节点
// Link replacement to parent
// 将replacement替换给节点p,替换之后,我们就少了一个节点,如果的节点p是黑色,我们需要做些调整。
replacement.parent = p.parent;
if (p.parent == null)
root = replacement;
else if (p == p.parent.left)
p.parent.left = replacement;
else
p.parent.right = replacement;

// Null out links so they are OK to use by fixAfterDeletion.
p.left = p.right = p.parent = null;

// Fix replacement
if (p.color == BLACK)
fixAfterDeletion(replacement);
} else if (p.parent == null) { // return if we are the only node.
// 2. 此节点为ROOT
root = null;
} else { // No children. Use self as phantom replacement and unlink.
// 3. 没有子节点
// 节点p是的黑色的话,需要做调整,然后才能把节点p移除。
if (p.color == BLACK)
fixAfterDeletion(p);
// 解除掉节点p
if (p.parent != null) {
if (p == p.parent.left)
p.parent.left = null;
else if (p == p.parent.right)
p.parent.right = null;
p.parent = null;
}
}
}

fixAfterDeletion代码解读注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/** From CLR
*
* x 节点需要调整,x节点的路径少了个黑色节点,需要平衡。
* 1. x下面的有一个黑色节点删除。
* 2. x会被删除,x是黑色。
*
* */
private void fixAfterDeletion(Entry<K, V> x) {
while (x != root && colorOf(x) == BLACK) {
if (x == leftOf(parentOf(x))) {
Entry<K, V> sib = rightOf(parentOf(x));

if (colorOf(sib) == RED) {
// 情形2
setColor(sib, BLACK);
setColor(parentOf(x), RED);
rotateLeft(parentOf(x));
sib = rightOf(parentOf(x));
}
// 到此为止,sib变为黑色
if (colorOf(leftOf(sib)) == BLACK && colorOf(rightOf(sib)) == BLACK) {
// 情形3
setColor(sib, RED);
x = parentOf(x);
} else {
if (colorOf(rightOf(sib)) == BLACK) {
// 情形5
setColor(leftOf(sib), BLACK);
setColor(sib, RED);
rotateRight(sib);
sib = rightOf(parentOf(x));
}
// 情形6
setColor(sib, colorOf(parentOf(x)));
setColor(parentOf(x), BLACK);
setColor(rightOf(sib), BLACK);
rotateLeft(parentOf(x));
x = root; // 可以终止算法,说明这种情形是我们最终的想要的。
}
} else { // symmetric
...
}
}

setColor(x, BLACK);
}

some-new-thoughts-about-career-development

职业发展的一些新思考

  1. 所有的事情应该是目标导向的,定的目标一定要全力完成,完不成时是不应该有借口的。
  2. 对于程序员或工程师而言,我们应该是介入一些非技术领域的内容,比如所在业务的行业知识等。这样会使我们能更清晰地去体会到实现目标的路径。否则一次目标的实现无法给我们带来做事的经验。
  3. 我们其实很清晰的看到不同层次的人做事风格是很不一样的。其实仔细地往深了想,应该是由于这些不同的做事风格导致了人在不同的阶层。
  4. 所有的事情能否完成最终都会落到成事的能力上,证明成事的能力:一是不停的完成目标;二是在专业领域展现出高水平的能力。
  5. 公司不是一个让人积极向上,让人进步的地方,能改变成积极向上的只有自己。

kafka-security-saslscram-acl

1. 介绍

kafka安全主题的内容主要是分为认证和权限。

认证为了证明你是alice,而权限是决定了你能做什么事情,比如能不能些topic,能不能读topic等之类的事情。

http://kafka.apache.org/documentation/#security 里介绍了好多种类型。如果我们想要在线上开启认证和权限,我们需要考虑好多东西。比如能否动态增加权限,客户端接入操作是否足够简单等等。基于各种讨论后,我们目前决定使用sasl/scram + acl的方式。

此文章主要分为两块:一个是配置,另一个是操作命令。

2. 配置

2.1 说明

在broker和zookeeper之间的认证不支持org.apache.kafka.common.security.scram.ScramLoginModule,所以引入org.apache.zookeeper.server.auth.DigestLoginModule。详见如下链接:

https://cwiki.apache.org/confluence/display/ZOOKEEPER/Client-Server+mutual+authentication

如果不配置的话也可以,但是会在日志限制No JAAS Configure ‘Client’

1
[2019-02-24 16:03:15,121] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/Users/liubinbin/Documents/install/kafka_2.12-2.1.0/config/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)

如果配置之后启动broker会在日志里显示:

1
2
3
4
5
6
[2019-02-24 14:06:09,599] INFO Client successfully logged in. (org.apache.zookeeper.Login)
[2019-02-24 14:06:09,600] INFO Client will use DIGEST-MD5 as SASL mechanism. (org.apache.zookeeper.client.ZooKeeperSaslClient)
[2019-02-24 14:06:09,657] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will attempt to SASL-authenticate using Login Context section 'Client' (org.apache.zookeeper.ClientCnxn)
[2019-02-24 14:06:09,676] INFO Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2019-02-24 14:06:09,795] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x100043c21d70000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2019-02-24 14:06:09,799] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient)

2.2 broker

2.2.1 server.properties

listeners=SASL_PLAINTEXT://bin:9092

########### SASL/SCRAM ############################
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256

########### ACL ############################
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
super.users=User:Admin

######################### ZK ############################
zookeeper.set.acl=true

######################### ZK ############################
auto.create.topics.enable=false

2.2.2 kafka_server_jaas.conf

新增jaas文件,内容如下:

KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username=”admin”
password=”admin-secret”;
};

Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username=”admin”
password=”admin-secret”;
};

此配置通过在kafka-server-start.sh中添加如下命令,将jaas文件加入broker的jvm中。

export KAFKA_OPTS=”-Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_server_jaas.conf”

2.3 zookeeper

2.3.1 kafka_zk_jaas.conf

新增jaas文件,内容如下:

Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_admin=”admin-secret”;
};

需要和kafka_server_jaas.conf里Client对应。

此配置通过在zookeeper-server-start.sh中添加如下命令,将jaas文件加入zk的jvm中。

export KAFKA_OPTS=”-Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_zk_jaas.conf”

2.3.2 zoo.cfg

添加如下配置:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

requireClientAuthScheme=sasl

2.4 client

2.4.1 kafka_client_jaas.conf

新增jaas文件,内容如下:

KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username=”alice”
password=”alice-secret”;
};

此配置通过在kafka-console-consumer.sh和kafka-console-producer.sh中添加如下命令,将jaas文件加入zk的jvm中。

export KAFKA_OPTS=”-Djava.security.auth.login.config=/Users/liubinbin/Documents/install/kafka_2.12-2.1.0/config/kafka_client_jaas.conf”

2.4.2 saslscram-producer.properties

添加如下配置:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256

2.4.3 saslscram-consumer.properties

添加如下配置:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256

3. 命令

3.1 认证

3.1.1 添加用户

1
2
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin

3.1.2 列出用户

1
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users

3.1.3 查看用户

1
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name alice

3.1.4 删除用户

1
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice

3.2 权限

3.2.1 增加权限

1
2
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:alice --operation Write --topic liubb
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:alice --operation Read --topic liubb --group test-consumer-group

3.2.2 列出权限

1
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic liubb

3.2.3 删除权限

1
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:alice --operation Read --operation Write --topic liubb

3.3 客户端

ps:localhost 在虚拟机内可能需要换成 hostname

3.3.1 生产者

1
bin/kafka-console-producer.sh --broker-list localhost:9092  --topic liubb --producer.config config/saslscram-producer.properties

3.3.2 消费者

1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic liubb --consumer.config config/saslscram-consumer.properties

HBase源码系列

在2018年即将结束之际,想开一个系列,名字为HBase源码系列。

先说一下对HBase的感觉,首先,一个很重要的原因,我本人对数据非常感兴趣。其次,个人觉得HBase是个非常重要的项目,在接下来的几年也会保持下去。最后,HBase是个可以学习的不错项目,里面很多的设计和特性值得学习,并且我有使用和维护经验。

  1. wal
  2. hfile
  3. compaction
  4. get
  5. memstore
  6. mvcc
  7. To be continued

HBase源码系列之wal

简介

wal在hbase中是为了持久化memstore中未flush到hfile的数据,以防rs宕机或异常退出导致数据的丢失。

wal实现的一头是多个handler线程处理put请求,另一头是针对hdfs写这种费时间的操作。并且需要实现两件事情:一是在写hdfs时不能出现混乱,二是写完hdfs之后需要有个机制通知到在等待hdfs写返回的处理写请求的线程。

wal用了一个ringbuffer,ringbuffer传递内容包括的sync标志(主要用于传递SyncFuture)和数据。分开可以用于控制。

主流程

  1. handler将entry和txid写入disruptor,然后通过sync函数等待,通过一个threadlocal的设置了txid的SyncFuture,调用get方法阻塞。
  2. 把SyncFuture和Sync标志写入到disruptor中。
  3. 在disruptor的Handler的onEvent里:
    1. 将entry给append到writer里。
    2. 设置SyncFuture,用于传递。
  4. 给syncRunners传递带txid的SyncFuture
  5. SyncRunner会在while里不停的跑,
    1. releaseSyncFuture已经sync过的数据(可能在别的syncRunner)
    2. sync数据,然后releaseSyncFuture

三个主要线程

整个写wal流程涉及到如下三个线程。

handler线程

handler通过netty接受来自客户端或thriftserver的写请求。

在private void doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException 总在执行写wal,写memstore,更改mvcc版本等操作。本篇文章主要集中于写wal(否则就跑题了),主要在如下方法中。

1
2
writeEntry = doWALAppend(walEdit, batchOp.durability, batchOp.getClusterIds(), now,
nonceKey.getNonceGroup(), nonceKey.getNonce(), batchOp.getOrigLogSeqNum());

doWALAppend方法里主要内容如下:

1
2
3
4
5
long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
// Call sync on our edit.
if (txid != 0) {
sync(txid, durability);
}

append主要把数据加入到ringbuffer中,sync方法具体如下:

  1. 讲ThreadLocal cachedSyncFutures然后扔到ringbuffer里,然后讲syncFuture返回。
  2. 执行syncFuture.get(walSyncTimeoutNs)阻塞。

handler线程会卡在syncFuture.get(walSyncTimeoutNs)处。get具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
synchronized long get(long timeoutNs) throws InterruptedException,
ExecutionException, TimeoutIOException {
final long done = System.nanoTime() + timeoutNs;
while (!isDone()) {
wait(1000);
if (System.nanoTime() >= done) {
throw new TimeoutIOException(
"Failed to get sync result after " + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
+ " ms for txid=" + this.txid + ", WAL system stuck?");
}
}
if (this.throwable != null) {
throw new ExecutionException(this.throwable);
}
return this.doneTxid;
}

从上面的代码可以看到handler线程如果想要退出需要isDone方法返回false或者时间超过timeoutNs。

1
2
3
synchronized boolean isDone() {
return this.doneTxid != NOT_DONE;
}

handler线程部分到此我们只需要记住doneTxid的修改决定handler线程退出。

onEvent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if (truck.type() == RingBufferTruck.Type.SYNC) {
// 给syncFutures其中一个设置。
this.syncFutures[this.syncFuturesCount.getAndIncrement()] = truck.unloadSync();
// Force flush of syncs if we are carrying a full complement of syncFutures.
if (this.syncFuturesCount.get() == this.syncFutures.length) {
endOfBatch = true;
}
} else if (truck.type() == RingBufferTruck.Type.APPEND) {
FSWALEntry entry = truck.unloadAppend();
try {
...
append(entry);
} catch (Exception e) {
....
return;
}
}

在onEvent里获取到的数据分为两种:

  1. SYNC,在syncFutures设置handler线程传递过来的syncFuture。
  2. APPEND,调用append。在这里我们可以看到ringbuffer的作用就是把多个线程的写入按时间顺序的append。

我们需要注意这里的endOfBatch,通过一个ringbuffer其实让我们更容易去控制batch,将难度从多个线程移到了一个线程里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if (!endOfBatch || this.syncFuturesCount.get() <= 0) {
return;
}
this.syncRunnerIndex = (this.syncRunnerIndex + 1) % this.syncRunners.length;
try {
// 给syncRunners其中一个设置,在offer方法内,将下标小于syncFuturesCount的所有syncFutures传递给其中一个的SyncRunners。
// 下标小于syncFuturesCount的所有syncFutures都传递过去其实就代表了一个batch,这个逻辑可以通过endOfBatch捋出来。 *** 此处很重要 ***
this.syncRunners[this.syncRunnerIndex].offer(sequence, this.syncFutures,
this.syncFuturesCount.get());
} catch (Exception e) {
// Should NEVER get here.
requestLogRoll();
this.exception = new DamagedWALException("Failed offering sync", e);
}
....
this.syncFuturesCount.set(0);

这里通过轮询的方式向syncRunners传递syncFutures。

syncRunner

syncRunner是一些线程,总共syncRunnerCount个,此线程是一个while不停的执行。多个线程通过highestSyncedTxid来沟通到什么地步了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
   while (!isInterrupted()) {
int syncCount = 0;

try {
// 获取需要执行sync的场景
while (true) {
takeSyncFuture = null;
// We have to process what we 'take' from the queue
takeSyncFuture = this.syncFutures.take();
currentSequence = this.sequence;
long syncFutureSequence = takeSyncFuture.getTxid();
if (syncFutureSequence > currentSequence) {
throw new IllegalStateException("currentSequence=" + currentSequence
+ ", syncFutureSequence=" + syncFutureSequence);
}
// See if we can process any syncfutures BEFORE we go sync.
// 掠过一些已经sync过的sequence,可能在别syncRunner已经执行过了,或当时多加的部分。
long currentHighestSyncedSequence = highestSyncedTxid.get();
if (currentSequence < currentHighestSyncedSequence) {
syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null);
// Done with the 'take'. Go around again and do a new 'take'.
continue;
}
break;
}
try {
// 执行sync
writer.sync(useHsync);
// 更新highestSyncedTxid,此处记录了目前sync的最高Sequence。
currentSequence = updateHighestSyncedSequence(currentSequence);
} catch (IOException e) {
...
} finally {
// First release what we 'took' from the queue.
// release目前取到的SyncFuture。
syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException);
// Can we release other syncs?
// release所有目前小于当前sequence的SyncFuture。这步感觉好像是没有必要的。
syncCount += releaseSyncFutures(currentSequence, lastException);
if (lastException != null) {
requestLogRoll();
} else {
checkLogRoll();
}
}
postSync(System.nanoTime() - start, syncCount);
} catch (InterruptedException e) {
...
}
}

调用完releaseSyncFuture之后,handler阻塞住的的get方式才能顺利进行下去。

此处需要一张图

思考:syncRunner为多个,大概是为了隔离notifier和sync,两种操作不要在一起,最终可以减轻同步代价。

简单版代码实现

见地址 https://github.com/liubinbin/pan/tree/master/src/main/java/cn/liubinbin/pan/experiment/log/v3