ability-in-profession

能力

所有的事情最终都会落到成事的能力,证明成事能力的途径感觉只有两种途径。

  1. 领导给一件事就成一件事。
  2. 在专业领域展示能力,形成自己的影响力。

其实公司不是一个让人积极向上,让人自动进步的地方,否则就不会出现这么多空降,否则就不会出现你解决不了一个项目直接裁掉从外面招人解决。能积极向上的只有自己。

paldb解读

github地址:https://github.com/linkedin/PalDB.git

相关内容可以建项目的readme。

主要分析函数如下:

StoreWriter的put方法

paldb的数据按key对应的byte数组的长度散列。不同key长度会有不同index file和data file。

此方法是paldb写入数据的主流程。

相关代码和一些注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
 public void put(byte[] key, byte[] value)
throws IOException {
int keyLength = key.length;

//获取indexfile的stream,如果在indexFiles数组和indexStreams数组没有对应key长度索引文件则创建。
DataOutputStream indexStream = getIndexStream(keyLength);

// 在stream中写入key。
indexStream.write(key);

// 判断此key对应长度的最后一个插入的值是否和目前值一致。不明白此处的判断意义。
byte[] lastValue = lastValues[keyLength];
boolean sameValue = lastValue != null && Arrays.equals(value, lastValue);

// 获取对应的长度已写入的数据的长度,这个值在后续close时会使用到。
long dataLength = dataLengths[keyLength];
if (sameValue) {
dataLength -= lastValuesLength[keyLength];
}

// 在indexStream写入的长度(就是的value的offset),获取offset的长度,,并在maxOffsetLengths记录当前key长度的最大offset的长度,此数值在后面的计算slot时会使用到。
int offsetLength = LongPacker.packLong(indexStream, dataLength);
System.out.println("offsetLength: " + offsetLength + " maxOffsetLengths[keyLength]: " + maxOffsetLengths[keyLength]);
maxOffsetLengths[keyLength] = Math.max(offsetLength, maxOffsetLengths[keyLength]);

// 只有sameValue为false时才执行下面的命令:
if (!sameValue) {
// 获取datafile的stream,如果在dataFiles数组和dataStreams数组没有对应key长度索引文件则创建。
DataOutputStream dataStream = getDataStream(keyLength);

// 在datastream写入value长度和value值
int valueSize = LongPacker.packInt(dataStream, value.length);
dataStream.write(value);

// 更新数据长度,此处可以认为是在datastream里的offset,在后续close时会使用到。
dataLengths[keyLength] += valueSize + value.length;

// 更新最后一个插入值的信息,在lastValues和lastValuesLength中。
lastValues[keyLength] = value;
lastValuesLength[keyLength] = valueSize + value.length;
// 更新所有插入的value的count计数
valueCount++;
}
// 更新key的count计数, 此数和valueCount不一定一致。
keyCount++;
keyCounts[keyLength]++;
}

StoreWriter的close方法

此方法为close方法,主要整理了从打卡到目前为止的临时数据。

相关代码和一些注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public void close()
throws IOException {
// 关闭data file和index file的stream
for (DataOutputStream dos : dataStreams) {
if (dos != null) {
dos.close();
}
}
for (DataOutputStream dos : indexStreams) {
if (dos != null) {
dos.close();
}
}

// Stats
LOGGER.log(Level.INFO, "Number of keys: {0}", keyCount);
LOGGER.log(Level.INFO, "Number of values: {0}", valueCount);

// 新建一个list变量filesToMerge来存储要merge的文件,包括meta文件,index文件,data文件
List<File> filesToMerge = new ArrayList<File>();

try {

// 写metadata文件(以metadata.dat结尾)
File metadataFile = new File(tempFolder, "metadata.dat");
metadataFile.deleteOnExit();
FileOutputStream metadataOututStream = new FileOutputStream(metadataFile);
DataOutputStream metadataDataOutputStream = new DataOutputStream(metadataOututStream);
writeMetadata(metadataDataOutputStream);
metadataDataOutputStream.close();
metadataOututStream.close();
filesToMerge.add(metadataFile);

// 创建索引文件
for (int i = 0; i < indexFiles.length; i++) {
if (indexFiles[i] != null) {
// 按的key长度索引文件。
filesToMerge.add(buildIndex(i));
}
}

// Stats collisions
LOGGER.log(Level.INFO, "Number of collisions: {0}", collisions);

// 把数据文件加入到filesToMerge文件
for (File dataFile : dataFiles) {
if (dataFile != null) {
filesToMerge.add(dataFile);
}
}

// 检查磁盘空间
checkFreeDiskSpace(filesToMerge);
// merge三类文件值一个文件outputStream中。
mergeFiles(filesToMerge, outputStream);
} finally {
// 关闭outputStream,删除临时文件(filesToMerge)
outputStream.close();
cleanup(filesToMerge);
// 最终所有的数据合并在outputStream(也就是我们指出的文件中)中。
}
}

StoreWriter的writeMetadata方法

此方法为写meta部分的代码,从中可以看到meta部分的结构。

相关代码和一些注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
private void writeMetadata(DataOutputStream dataOutputStream)
throws IOException {
// 写入版本信息
dataOutputStream.writeUTF(FormatVersion.getLatestVersion().name());

// 写入当前时间
dataOutputStream.writeLong(System.currentTimeMillis());

// 获取对应长度有值的个数和目前key长度的最大
int keyLengthCount = getNumKeyCount();
int maxKeyLength = keyCounts.length - 1;

// 写入key个数
dataOutputStream.writeInt(keyCount);

// 写入获取到的有值的key长度的个数keyLengthCount
dataOutputStream.writeInt(keyLengthCount);

// 写入获取到key长度最大值maxKeyLength
dataOutputStream.writeInt(maxKeyLength);

// 初始化datasLength
long datasLength = 0l;
// 针对每个key长度有值的长度执行如下命令
for (int i = 0; i < keyCounts.length; i++) {
// 判断是否有此长度的key
if (keyCounts[i] > 0) {
// 写入长度
dataOutputStream.writeInt(i);

// 写入key的个数
dataOutputStream.writeInt(keyCounts[i]);

// 写入slot个数(有key的个数除以loadFactor,loadFactor默认为0.75),此slot用于包含数据。
int slots = (int) Math.round(keyCounts[i] / loadFactor);
dataOutputStream.writeInt(slots);

// 以下四步记录各个部分key对应的index和data的offset。
// 写入slot大小(key长度 + 最大offset的长度)
int offsetLength = maxOffsetLengths[i];
dataOutputStream.writeInt(i + offsetLength);

// 写入index的offset
dataOutputStream.writeInt((int) indexesLength);

// 偏移此长度对应的index需要的offset偏移
indexesLength += (i + offsetLength) * slots;

// 写入数据长度
dataOutputStream.writeLong(datasLength);

// 偏移本长度对应的data需要的offset偏移
datasLength += dataLengths[i];
}
}

// 写入序列化信息
try {
Serializers.serialize(dataOutputStream, config.getSerializers());
} catch (Exception e) {
throw new RuntimeException();
}

// 写入最终文件中index部分的offset和data部分的offset。index通过目前再加两个值的offset开始,data部分从在index之后再写完所有index的长度开始。
int indexOffset = dataOutputStream.size() + (Integer.SIZE / Byte.SIZE) + (Long.SIZE / Byte.SIZE);
dataOutputStream.writeInt(indexOffset);
dataOutputStream.writeLong(indexOffset + indexesLength);
}

StoreWriter的buildIndex方法

此方法为buildindex,可以看到paldb怎么获取offset,怎么做冲突等。

相关代码和一些注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
private File buildIndex(int keyLength)
throws IOException {
// 初始化count(本长度key的个数),slots个数,offsetLength,slotSize(key长度+offset长度)
long count = keyCounts[keyLength];
int slots = (int) Math.round(count / loadFactor);
int offsetLength = maxOffsetLengths[keyLength];
int slotSize = keyLength + offsetLength;

// 初始化index文件,此index文件没有temp
File indexFile = new File(tempFolder, "index" + keyLength + ".dat");
RandomAccessFile indexAccessFile = new RandomAccessFile(indexFile, "rw");
try {
indexAccessFile.setLength(slots * slotSize);
FileChannel indexChannel = indexAccessFile.getChannel();
// 使用了一个MappedByteBuffer
MappedByteBuffer byteBuffer = indexChannel.map(FileChannel.MapMode.READ_WRITE, 0, indexAccessFile.length());

// 初始化临时index文件的用于读的流
File tempIndexFile = indexFiles[keyLength];
DataInputStream tempIndexStream = new DataInputStream(new BufferedInputStream(new FileInputStream(tempIndexFile)));
try {
byte[] keyBuffer = new byte[keyLength];
byte[] slotBuffer = new byte[slotSize];
byte[] offsetBuffer = new byte[offsetLength];

// 处理所有的key,由于slot为key个数除以一个系数,系统小于1,所以slot肯定可以放下所有的数据,如果slot过于,冲突会少,但是更多多余的空间会被浪费。
for (int i = 0; i < count; i++) {
// 读取key
tempIndexStream.readFully(keyBuffer);

// 读取的offset
long offset = LongPacker.unpackLong(tempIndexStream);

// 计算hash值
long hash = (long) hashUtils.hash(keyBuffer);

boolean collision = false;
// 用一个探针去试探,检测冲突,这同时也是在散列。
for (int probe = 0; probe < count; probe++) {
int slot = (int) ((hash + probe) % slots);
byteBuffer.position(slot * slotSize);
byteBuffer.get(slotBuffer);

long found = LongPacker.unpackLong(slotBuffer, keyLength);
// 去试探的对应的位置是否有数据
if (found == 0) {
// 如果没有值,则说明没有冲突,写入key和offset。
byteBuffer.position(slot * slotSize);
byteBuffer.put(keyBuffer);
int pos = LongPacker.packLong(offsetBuffer, offset);
byteBuffer.put(offsetBuffer, 0, pos);
break;
} else {
// 如果有值说明有冲突,则标记。
collision = true;
// Check for duplicates
if (Arrays.equals(keyBuffer, Arrays.copyOf(slotBuffer, keyLength))) {
throw new RuntimeException(
String.format("A duplicate key has been found for for key bytes %s", Arrays.toString(keyBuffer)));
}
}
}
// 统计冲突个数
if (collision) {
collisions++;
}
}

String msg = " Max offset length: " + offsetLength + " bytes" +
"\n Slot size: " + slotSize + " bytes";

LOGGER.log(Level.INFO, "Built index file {0}\n" + msg, indexFile.getName());
} finally {
// 关闭临时index文件输入流
tempIndexStream.close();

// 关闭index文件的channel,释放资源。
indexChannel.close();
indexChannel = null;
byteBuffer = null;

// 删除临时index文件
if (tempIndexFile.delete()) {
LOGGER.log(Level.INFO, "Temporary index file {0} has been deleted", tempIndexFile.getName());
}
}
} finally{
// 关闭index文件
indexAccessFile.close();
indexAccessFile = null;
// 手动执行一次gc,所以在这时候不能在jdk参数中关闭手动gc。
System.gc();
}

return indexFile;
}

StoreReader的get方法

此方法是paldb读取数据的主流程。

相关代码和一些注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
 public byte[] get(byte[] key)
throws IOException {
// 获取key的长度,检测key长度是否有数据在
int keyLength = key.length;
if (keyLength >= slots.length || keyCounts[keyLength] == 0) {
return null;
}
// 计算hash值
long hash = (long) hashUtils.hash(key);
// 获取对应的key长度的slot个数
int numSlots = slots[keyLength];
// 获取对应的key长度的slot大小
int slotSize = slotSizes[keyLength];
// 获取index部分的offset
int indexOffset = indexOffsets[keyLength];
// 获取data部分的offset
long dataOffset = dataOffsets[keyLength];
// 用一个探针去试探对应位置上是否有数据。
for (int probe = 0; probe < numSlots; probe++) {
int slot = (int) ((hash + probe) % numSlots);
// slotBuffer 为slotSize的byte数组, 将所有的key+offset打到一个固定大小slot的好处就是我们可以使用slotSize来定位。
indexBuffer.position(indexOffset + slot * slotSize);
indexBuffer.get(slotBuffer, 0, slotSize);
// 解析读取出来的对应slot中的数据
long offset = LongPacker.unpackLong(slotBuffer, keyLength);
// 判断此slot里是否有数据
if (offset == 0) {
return null;
}
// 比较slotBuffer和key是否完全相等
if (isKey(slotBuffer, key)) {
// mMapData来标记的是否使用MMap
byte[] value = mMapData ? getMMapBytes(dataOffset + offset) : getDiskBytes(dataOffset + offset);
return value;
}
}
return null;
}

StoreReader的getMMapBytes方法

此方法为启用mmap时读取真实value的方法。

相关代码和一些注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private byte[] getMMapBytes(long offset)
throws IOException {
// 一些bytebuffer的操作,暂不详细分析, 主要的是数据不能一下子读取出来,比较麻烦。
//Read the first 4 bytes to get the size of the data
ByteBuffer buf = getDataBuffer(offset);
int maxLen = (int) Math.min(5, dataSize - offset);
// 获取的size
int size;
if (buf.remaining() >= maxLen) {
//Continuous read
int pos = buf.position();
size = LongPacker.unpackInt(buf);

// Used in case of data is spread over multiple buffers
offset += buf.position() - pos;
} else {
//The size of the data is spread over multiple buffers
int len = maxLen;
int off = 0;
sizeBuffer.reset();
while (len > 0) {
buf = getDataBuffer(offset + off);
int count = Math.min(len, buf.remaining());
buf.get(sizeBuffer.getBuf(), off, count);
off += count;
len -= count;
}
size = LongPacker.unpackInt(sizeBuffer);
offset += sizeBuffer.getPos();
buf = getDataBuffer(offset);
}

// 初始化输出结果
byte[] res = new byte[size];

//Check if the data is one buffer
if (buf.remaining() >= size) {
//Continuous read
buf.get(res, 0, size);
} else {
int len = size;
int off = 0;
while (len > 0) {
buf = getDataBuffer(offset);
int count = Math.min(len, buf.remaining());
buf.get(res, off, count);
offset += count;
off += count;
len -= count;
}
}

return res;
}

StoreReader的getDiskBytes方法

此方法为没有启用mmap时读取真实value的方法。

相关代码和一些注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private byte[] getDiskBytes(long offset)
throws IOException {
// seek到对应长度的offset
mappedFile.seek(dataOffset + offset);

// 从对应长度读取size
int size = LongPacker.unpackInt(mappedFile);

// 初始化输出结果
byte[] res = new byte[size];

// 读取数据
if (mappedFile.read(res) == -1) {
throw new EOFException();
}

return res;
}

重要成员变量

1
2
3
4
5
6
indexOffset: index部分的偏移量
dataOffset:data部分的偏移量
slots(int[]):slot个数,用于hash计算
slotSizes(int[]):slot的大小,用于计算offset
indexOffsets(int[]):index部分中每个key长度对应的offset
dataOffsets(int[]):data部分中每个key长度对应的offset

文件格式图

描述

从图中可以知道,paldb的文件主要的分为三个部分,metadata,index和data。

metadata:主要用于描述paldb文件,其中包括了重要成员变量。

index:主要保存了key和对应的offset。在paldb中数据按key长度区分,并提出了一个slot的概念。一个slot包含key和对应的offset(此offset用于在data部分找到真实数据)。slot的长度是固定,所以在index去获取offset时可以,直接使用下标和slotSize的乘积。下标通过对key的hash计算得到,并使用探针的方式去不停尝试获取到正确的下标。相当于在文件中实现了一个hash函数。

data:主要保存了value长度和value具体的值。具体的offset由在index部分获取。在这里的length的字节数是固定的,所以可以拿到offset后就可以解析处value长度和value值。

可能的问题

  • 在创建slot时,不好把握factor计算slot个数。
  • 如果hash特别不均衡,会导致探针试探次数很多,由于每次试探会读取磁盘,代价略高。

在macbook的os上直接安装hadoop&hbase

今天决定抛弃vmare+centos虚拟机的形式安装hadoop&hbase了,开始直接在macbook的os上安装hadoop&hbase。理由:

  1. 需要配资源,如果安装在os x上会更加直接全面地利用机器的资源。
  2. 两个系统之间倒来倒去不是一件高效的事情。
  3. 今天发现这台macbook的大部分使用场景都是和工作有关,那就不如直接使用macbook了。

预备

现在安装的是hbase,提前需要做的就是:

  • 给macbook安装jdk

  • 在Sharing里打开远程登录,使机器能免密执行ssh localhost。

  • 下载zookeeper-3.4.12.tar.gz, hadoop-3.1.1.tar.gz和hbase-2.1.1-bin.tar.gz。

zookeeper安装

将zoo_sample.cfg拷贝一份为zoo.cfg,就可以通过zkServer.sh的命令来启动。

然后可以通过zkCli.sh来对zk服务进行访问。

##hdfs安装

  1. 修改core-site.xml,hdfs-site.xml文件。
  2. hadoop namenode -format。
  3. 通过sbin/start-dfs.sh启动服务。

hbase安装

几个坑:

  1. htrace-core-3.1.0-incubating.jar需要自己下载,并且hadoop中htrace-core4-4.0.1-incubating.jar是不行的。有不兼容的情况。

  2. 需要自己在配置文件中注定hbase.master.ipc.address,否则在mac中会报Can’t assign requested address 无法启动rpc,但是没有日志显示这个配置是啥。

    hbase.master.ipc.addresslocalhost

##集成验证

在hbase shell中创建一个表,写入一条数据,检索。

  1. create ‘test’, ‘cf1’
  2. put ‘test’, ‘key1’, ‘cf1:col1’ , ‘value’
  3. scan ‘test’

《银行估值与价值管理》读后感

这是一本关于银行的书,书名为《银行估值与价值管理 - 存贷款定价,绩效评估与风险管理》,作者为杰恩•德米内。总体上来说,这书比较难读,而且可能由于翻译或门槛太高,阅读起来感觉有些累,但是值得读,因为从中可以感觉到作者在银行的业务和管理方面有很深的造诣。

本书分为如下四个部分。

基础知识

讲述了最基本的各种利率有关的现值的计算;一些基本的统计学知识;银行的意义及两张表的简化形式。这部分内容比较基础。

银行价值

介绍了几种估值方式:1. 市值倍数(市盈率,市净率)。2. 未来股利折现法。 3. 未来经济利润折现法。4. 价值驱动因子法。作者提的这个的第四种方法其中很有用。因为第四种可以归结到一些重要数据,我们在估值时可以从几个重要数据去判断。第三种从某种程度上来说也是一个可行的办法,不过得是那种利润增长比较稳健并且管理很稳健的银行。

在价值驱动因子法中,作者做了如下公式:

​ 贷款业务的银行价值 = 清算价值 + 存款特许权价值 + 贷款特许权价值 + 营业费用现值 + 固定资产(不太考虑) + 税收罚金 。

从上述公式中我们可以看到几个非常重要的变量,净利差,营业费用支出比例

中间业务估值两种方法:1. 现金流方法。 2. 基于资产价值的方法。中间业务比较抽象,不好评价。

价值管理

作者讲到了怎么在内部去评价员工,这从某种程度上来说也是一种怎么去让一家银行更加稳健运行的方向。

  • 讨论了资金转移定价来评估存贷款的盈利能力,包括基础法和高级法。
  • 讨论了存款定价。
  • 还讨论了有关巴塞尔协议的部分内容。
  • 以及非常重要的违约损失率和不良贷款率,这些也是从巴塞尔协议引出来。

对于外部人士来说,感觉不需要过多的考虑细节,抓住主要矛盾,关注银行的不良贷款率和一些资本充足率等事项就可以了。最后讲了一个资产证券化的主题。

##风险管理

这章是最让我感觉神奇的地方,里面讲到了各种怎么操作风险的手段。

  • 在银行账户利率风险章节中提到了在线收益和在险经济价值两个概念。
  • 交易账户的在险价值。
  • 其他的讨论到的风险还包括流动性风险,信用风险,边际风险贡献等。
  • 包括了一些分散风险的方式,包括远期,互换等。
  • 最后还说了操作风险。

这部分给我的感觉是知道了一些概念,能大概去判断一些事情对银行可能影响。

总结

从这篇文章来看的加粗字体的文字来看,对于一个外部人士看银行,主要有如下几个重要变量。

  1. 规模,净利差
  2. 营业费用支出比例
  3. 中间业务
  4. 不良贷款率
  5. 资本充足率等监管项目

后续如果有时间,可以讨论一下招商银行等我国优秀重要的银行。

《跳着踢踏舞去上班》读后感

本书从整体上来看是值得阅读的。

读下来主要是包含了几部分:巴菲特对股票和市场的看法,巴菲特对钱和慈善的看法。

##巴菲特对股票和市场的看法

他喜欢的投资标的有这么几个特征:

  • 可以有很强的现金流
  • 可以不费太大的力气就跟着时代
  • 有自己欣赏和认可的人来领导

除了这些,巴菲特对市场情绪十分的了解,例如后视镜的观点,并且能够利用这种情绪。

巴菲特也是比较早的了解到投资的真谛,并且一直恪守着这写原则。

从某种程度上看,巴菲特在保险和会计等领域应该是做到全行业非常靠前的地步。

##巴菲特对钱和慈善的看法

这种事情,总之是他有钱他厉害。

盖茨能和巴菲特这么相投,估计也验证了聪明的人会在一起玩耍,和绝大部分人不在一个层级。

其他

很理性的对待很多事情,很多时候的会很仔细的计算的者成本和收益,然后做比较;自己提出一些想法,还会讲故事来让人明白这件事情,这点和芒格很像。

《程序员修炼之道》读后感

此书总整体上来说是本值得读的书。

此书分为编程生产,整理工具,自我管理,团队协作,走进公司,留意你的企业和改善七个章节。具体编码部分并不占本书最大篇幅,相反,本书用了很大篇幅写了程序员绝大部分时间不会注意的内容,比如怎么去解决人和人之间的问题,给人的性格分类等绝大部分程序员根本瞧不起的事情。

离开学校四年,确实感觉代码只是我们工作的一小部分内容。我们需要去读很多软件工程,与人交流,其他行业的专业知识。这些才是我们区别单纯编码人员的重点。

以下是一些读此书时书中觉得不错的语句,特此记录一下。

  • 当把观点从“它太烂了”转变成“要是。。。它岂不是很酷”的时候,你便从一种失败主义者的思维变成了一种创新性的思维方式。– 我们遇到觉得很烂的事情,去把它变酷绝对是件酷的事情。
  • 真正的艺术家能让产品上市。 – 所有我们很好的想法,说辞等都比不上一个产品。产品才是实际的,有说服力的。产品可能是个app,可能是段代码,可能是个硬件产品,也可能是实际的投资决策。
  • 一种是从市场获取信息(数据分析等)发现机会,另一种是一个想法去找市场。

此书也许未来可以再翻看一次。

长跑

最近两周跑了四次左右的八公里或十公里,需要记录一下感受。

  • 肯定是有助于减肥,而且随着最近跑量的增加,感觉在6分钟配速跑五公里也没这么累,跑八公里的时候也没以前觉得累,不是一直跑五公里,选几次跑到八公里和十公里是个好的选择。
  • 跑步过程中会有一些的不好的感受,比如孤独,疼痛。但是跑了几次之后,现在感觉长跑的过程和其他很多东西很像,比如解决一个问题,需要深入理解问题,中间可能会有烦躁,有迷惘,但是坚持到最后得到的满足感是足够强烈。比如买股票,最近证券市场行情不太好,能理性的看待目前的状态并且不随大众恐慌性的操作账户,确实内心会承受一些煎熬,这种煎熬相当的痛苦,不过现在的策略就是少去看股票账户,专注到工作就好了,心态就会好很多。
  • 很多大佬有长跑的习惯,我现在感觉到我们在面对困难时都需要承受一些的不好的感受,甚至到能让你放弃的境界。我们可能没办法一直让自己经历这种面对困难的痛苦,但是我们可以在习惯中加入长跑等运动,在此过程中让自己能比较频繁的面对这种痛苦的感受,也许在面对生活中真正的痛苦的时候会更加的淡定和从容。从这个角度看,长跑不是为加入习惯列表好的选项。

《另一种选择》读后感

今天刚阅读完谢丽尔·桑德伯格的《另一种选择》,有几点感想想要分享。

  1. 我们应该勇敢的去面对失败,出错。
  2. 看完书不得不感慨作者的厉害,不仅仅是在google和facebook成功,也包括了如此勇敢的去面对痛苦并且去努力的帮助别人。并且书中体现一些素养也确实觉得她值得这么成功。
  3. 我觉得成功的人无非两种,对于宏观经济和自己行业的绝对了解,另一种是,对于自己或用户的人性的了解。能很好理解和掌握自己和用户的人性上的需求在事业不会差到哪里。
  4. 也许,我们很多时候想的和做的决定都不是最好的,甚至是错的。这可能也是我们需要从别人和从从前的自己学的原因。

最近整个工作的节奏和完成速度确实让我觉得有点失望,不过整个swift项目和python的难度确实在预料范围之外。应该更努力的解决问题。

给自己一个设想,如果自己未来只有五十年,三十年,十年,五年甚至一年的时间。我们还是如此散漫的挥霍宝贵的周末时光吗?以后一定要尽量让自己不要处于如此散漫的地步。

DFSOutputStream分析

private void waitAndQueueCurrentPacket()
调用者:

1
2
3
private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum)
public synchronized void close()

public class DFSOutputStream extends FSOutputSummer

1
2
3
4
5
6
7
implements Syncable, CanSetDropBehind {
private final LinkedList<Packet> dataQueue = new LinkedList<Packet>(); // dataQueue是数据队列,用于保存等待发送给datanode的数据包
private final LinkedList<Packet> ackQueue = new LinkedList<Packet>(); // ackQueue是确认队列,保存还没有被datanode确认接收的数据包
private DataStreamer streamer; // streamer线程,不停的从dataQueue中取出数据包,发送给datanode
...
}

1
2
3
4
5
6
7
8
9
class DataStreamer extends Daemon {
private volatile boolean streamerClosed = false;
private ExtendedBlock block; // its length is number of bytes acked
private Token<BlockTokenIdentifier> accessToken;
private DataOutputStream blockStream; // socket的输出流(client->datanode),用于将数据传输给datanode
private DataInputStream blockReplyStream; // socket的输入流(datanode->client),用户收到datanode的确认包
private ResponseProcessor response = null; // response线程,用于接收从datanode返回的反馈信息
...
}

block > packet > chunk(带chunksum)

主要看看writeChunk方法。

  1. 构造currentPacket,写入data和checksum,并且的chunk数++,bytesCurBlock增加数据的长度。
  2. 如果chunk数足够多。则调用waitAndQueueCurrentPacket
  3. 如果如果block的大小足够大的话,

其中一个处理时添加chunk到缓存中。一下具体处理为判断是否可以添加入发送给datanode的缓存中。lastQueuedSeqno保存了最后一个需要处理的序号。
/**

  • 如果没有足够空间话就等待,然后
    */
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    private void waitAndQueueCurrentPacket() throws IOException {
    synchronized (dataQueue) {
    try {
    // If queue is full, then wait till we have enough space
    while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS) {
    try {
    dataQueue.wait();
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    break;
    }
    }
    checkClosed();
    queueCurrentPacket();
    } catch (ClosedChannelException e) {
    ...
    }
    }
    }
    /**
  • 添加dataQueue,设置lastQueuedSeqno
    */
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    private void queueCurrentPacket() {
    synchronized (dataQueue) {
    if (currentPacket == null) return;
    dataQueue.addLast(currentPacket);
    lastQueuedSeqno = currentPacket.seqno; //在waitForAckedSeqno方法使用,判断是否所有数据都被写入并接受到ack了。
    if (DFSClient.LOG.isDebugEnabled()) {
    DFSClient.LOG.debug("Queued packet " + currentPacket.seqno);
    }
    currentPacket = null;
    dataQueue.notifyAll();
    }
    }

DataStreamer是处理添加入缓存dataQueue的数据。将他们发送然后添加到ackQueue中。
在while中有processDatanodeError的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
dataQueue.removeFirst();
ackQueue.addLast(one);
dataQueue.notifyAll();
}
}
// write out data to remote datanode
try {
one.writeTo(blockStream);
blockStream.flush();
}

还需要一个线程接受ack,为ResponseProcessor
//获取返回的序号
ack.readFields(blockReplyStream);
long seqno = ack.getSeqno();
//获取dataQueue里第一个包的序号
Packet one;
synchronized (dataQueue) {
one = ackQueue.getFirst();
}
if (one.seqno != seqno) {
throw new IOException("ResponseProcessor: Expecting seqno " +
" for block " + block +
one.seqno + " but received " + seqno);
}
//移除ackQueuede里的第一个,
synchronized (dataQueue) {
lastAckedSeqno = seqno; //lastAckedSeqno在waitForAckedSeqno有使用
ackQueue.removeFirst();
dataQueue.notifyAll();
}

toWaitFor = lastQueuedSeqno;
waitForAckedSeqno(toWaitFor)在flushInternal和flushOrSync会调用。