github地址:https://github.com/linkedin/PalDB.git
相关内容可以建项目的readme。
主要分析函数如下:
StoreWriter的put方法 paldb的数据按key对应的byte数组的长度散列。不同key长度会有不同index file和data file。
此方法是paldb写入数据的主流程。
相关代码和一些注释如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public void put (byte [] key, byte [] value) throws IOException { int keyLength = key.length; DataOutputStream indexStream = getIndexStream(keyLength); indexStream.write(key); byte [] lastValue = lastValues[keyLength]; boolean sameValue = lastValue != null && Arrays.equals(value, lastValue); long dataLength = dataLengths[keyLength]; if (sameValue) { dataLength -= lastValuesLength[keyLength]; } int offsetLength = LongPacker.packLong(indexStream, dataLength); System.out.println("offsetLength: " + offsetLength + " maxOffsetLengths[keyLength]: " + maxOffsetLengths[keyLength]); maxOffsetLengths[keyLength] = Math.max(offsetLength, maxOffsetLengths[keyLength]); if (!sameValue) { DataOutputStream dataStream = getDataStream(keyLength); int valueSize = LongPacker.packInt(dataStream, value.length); dataStream.write(value); dataLengths[keyLength] += valueSize + value.length; lastValues[keyLength] = value; lastValuesLength[keyLength] = valueSize + value.length; valueCount++; } keyCount++; keyCounts[keyLength]++; }
StoreWriter的close方法 此方法为close方法,主要整理了从打卡到目前为止的临时数据。
相关代码和一些注释如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 public void close () throws IOException { for (DataOutputStream dos : dataStreams) { if (dos != null ) { dos.close(); } } for (DataOutputStream dos : indexStreams) { if (dos != null ) { dos.close(); } } LOGGER.log(Level.INFO, "Number of keys: {0}" , keyCount); LOGGER.log(Level.INFO, "Number of values: {0}" , valueCount); List<File> filesToMerge = new ArrayList <File>(); try { File metadataFile = new File (tempFolder, "metadata.dat" ); metadataFile.deleteOnExit(); FileOutputStream metadataOututStream = new FileOutputStream (metadataFile); DataOutputStream metadataDataOutputStream = new DataOutputStream (metadataOututStream); writeMetadata(metadataDataOutputStream); metadataDataOutputStream.close(); metadataOututStream.close(); filesToMerge.add(metadataFile); for (int i = 0 ; i < indexFiles.length; i++) { if (indexFiles[i] != null ) { filesToMerge.add(buildIndex(i)); } } LOGGER.log(Level.INFO, "Number of collisions: {0}" , collisions); for (File dataFile : dataFiles) { if (dataFile != null ) { filesToMerge.add(dataFile); } } checkFreeDiskSpace(filesToMerge); mergeFiles(filesToMerge, outputStream); } finally { outputStream.close(); cleanup(filesToMerge); } }
此方法为写meta部分的代码,从中可以看到meta部分的结构。
相关代码和一些注释如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 private void writeMetadata (DataOutputStream dataOutputStream) throws IOException { dataOutputStream.writeUTF(FormatVersion.getLatestVersion().name()); dataOutputStream.writeLong(System.currentTimeMillis()); int keyLengthCount = getNumKeyCount(); int maxKeyLength = keyCounts.length - 1 ; dataOutputStream.writeInt(keyCount); dataOutputStream.writeInt(keyLengthCount); dataOutputStream.writeInt(maxKeyLength); long datasLength = 0l ; for (int i = 0 ; i < keyCounts.length; i++) { if (keyCounts[i] > 0 ) { dataOutputStream.writeInt(i); dataOutputStream.writeInt(keyCounts[i]); int slots = (int ) Math.round(keyCounts[i] / loadFactor); dataOutputStream.writeInt(slots); int offsetLength = maxOffsetLengths[i]; dataOutputStream.writeInt(i + offsetLength); dataOutputStream.writeInt((int ) indexesLength); indexesLength += (i + offsetLength) * slots; dataOutputStream.writeLong(datasLength); datasLength += dataLengths[i]; } } try { Serializers.serialize(dataOutputStream, config.getSerializers()); } catch (Exception e) { throw new RuntimeException (); } int indexOffset = dataOutputStream.size() + (Integer.SIZE / Byte.SIZE) + (Long.SIZE / Byte.SIZE); dataOutputStream.writeInt(indexOffset); dataOutputStream.writeLong(indexOffset + indexesLength); }
StoreWriter的buildIndex方法 此方法为buildindex,可以看到paldb怎么获取offset,怎么做冲突等。
相关代码和一些注释如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 private File buildIndex (int keyLength) throws IOException { long count = keyCounts[keyLength]; int slots = (int ) Math.round(count / loadFactor); int offsetLength = maxOffsetLengths[keyLength]; int slotSize = keyLength + offsetLength; File indexFile = new File (tempFolder, "index" + keyLength + ".dat" ); RandomAccessFile indexAccessFile = new RandomAccessFile (indexFile, "rw" ); try { indexAccessFile.setLength(slots * slotSize); FileChannel indexChannel = indexAccessFile.getChannel(); MappedByteBuffer byteBuffer = indexChannel.map(FileChannel.MapMode.READ_WRITE, 0 , indexAccessFile.length()); File tempIndexFile = indexFiles[keyLength]; DataInputStream tempIndexStream = new DataInputStream (new BufferedInputStream (new FileInputStream (tempIndexFile))); try { byte [] keyBuffer = new byte [keyLength]; byte [] slotBuffer = new byte [slotSize]; byte [] offsetBuffer = new byte [offsetLength]; for (int i = 0 ; i < count; i++) { tempIndexStream.readFully(keyBuffer); long offset = LongPacker.unpackLong(tempIndexStream); long hash = (long ) hashUtils.hash(keyBuffer); boolean collision = false ; for (int probe = 0 ; probe < count; probe++) { int slot = (int ) ((hash + probe) % slots); byteBuffer.position(slot * slotSize); byteBuffer.get(slotBuffer); long found = LongPacker.unpackLong(slotBuffer, keyLength); if (found == 0 ) { byteBuffer.position(slot * slotSize); byteBuffer.put(keyBuffer); int pos = LongPacker.packLong(offsetBuffer, offset); byteBuffer.put(offsetBuffer, 0 , pos); break ; } else { collision = true ; if (Arrays.equals(keyBuffer, Arrays.copyOf(slotBuffer, keyLength))) { throw new RuntimeException ( String.format("A duplicate key has been found for for key bytes %s" , Arrays.toString(keyBuffer))); } } } if (collision) { collisions++; } } String msg = " Max offset length: " + offsetLength + " bytes" + "\n Slot size: " + slotSize + " bytes" ; LOGGER.log(Level.INFO, "Built index file {0}\n" + msg, indexFile.getName()); } finally { tempIndexStream.close(); indexChannel.close(); indexChannel = null ; byteBuffer = null ; if (tempIndexFile.delete()) { LOGGER.log(Level.INFO, "Temporary index file {0} has been deleted" , tempIndexFile.getName()); } } } finally { indexAccessFile.close(); indexAccessFile = null ; System.gc(); } return indexFile; }
StoreReader的get方法 此方法是paldb读取数据的主流程。
相关代码和一些注释如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public byte [] get(byte [] key) throws IOException { int keyLength = key.length; if (keyLength >= slots.length || keyCounts[keyLength] == 0 ) { return null ; } long hash = (long ) hashUtils.hash(key); int numSlots = slots[keyLength]; int slotSize = slotSizes[keyLength]; int indexOffset = indexOffsets[keyLength]; long dataOffset = dataOffsets[keyLength]; for (int probe = 0 ; probe < numSlots; probe++) { int slot = (int ) ((hash + probe) % numSlots); indexBuffer.position(indexOffset + slot * slotSize); indexBuffer.get(slotBuffer, 0 , slotSize); long offset = LongPacker.unpackLong(slotBuffer, keyLength); if (offset == 0 ) { return null ; } if (isKey(slotBuffer, key)) { byte [] value = mMapData ? getMMapBytes(dataOffset + offset) : getDiskBytes(dataOffset + offset); return value; } } return null ; }
StoreReader的getMMapBytes方法 此方法为启用mmap时读取真实value的方法。
相关代码和一些注释如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 private byte [] getMMapBytes(long offset) throws IOException { ByteBuffer buf = getDataBuffer(offset); int maxLen = (int ) Math.min(5 , dataSize - offset); int size; if (buf.remaining() >= maxLen) { int pos = buf.position(); size = LongPacker.unpackInt(buf); offset += buf.position() - pos; } else { int len = maxLen; int off = 0 ; sizeBuffer.reset(); while (len > 0 ) { buf = getDataBuffer(offset + off); int count = Math.min(len, buf.remaining()); buf.get(sizeBuffer.getBuf(), off, count); off += count; len -= count; } size = LongPacker.unpackInt(sizeBuffer); offset += sizeBuffer.getPos(); buf = getDataBuffer(offset); } byte [] res = new byte [size]; if (buf.remaining() >= size) { buf.get(res, 0 , size); } else { int len = size; int off = 0 ; while (len > 0 ) { buf = getDataBuffer(offset); int count = Math.min(len, buf.remaining()); buf.get(res, off, count); offset += count; off += count; len -= count; } } return res; }
StoreReader的getDiskBytes方法 此方法为没有启用mmap时读取真实value的方法。
相关代码和一些注释如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private byte [] getDiskBytes(long offset) throws IOException { mappedFile.seek(dataOffset + offset); int size = LongPacker.unpackInt(mappedFile); byte [] res = new byte [size]; if (mappedFile.read(res) == -1 ) { throw new EOFException (); } return res; }
重要成员变量 1 2 3 4 5 6 indexOffset: index部分的偏移量 dataOffset:data部分的偏移量 slots(int[]):slot个数,用于hash计算 slotSizes(int[]):slot的大小,用于计算offset indexOffsets(int[]):index部分中每个key长度对应的offset dataOffsets(int[]):data部分中每个key长度对应的offset
文件格式图
描述 从图中可以知道,paldb的文件主要的分为三个部分,metadata,index和data。
metadata:主要用于描述paldb文件,其中包括了重要成员变量。
index:主要保存了key和对应的offset。在paldb中数据按key长度区分,并提出了一个slot的概念。一个slot包含key和对应的offset(此offset用于在data部分找到真实数据)。slot的长度是固定,所以在index去获取offset时可以,直接使用下标和slotSize的乘积。下标通过对key的hash计算得到,并使用探针的方式去不停尝试获取到正确的下标。相当于在文件中实现了一个hash函数。
data:主要保存了value长度和value具体的值。具体的offset由在index部分获取。在这里的length的字节数是固定的,所以可以拿到offset后就可以解析处value长度和value值。
可能的问题
在创建slot时,不好把握factor计算slot个数。
如果hash特别不均衡,会导致探针试探次数很多,由于每次试探会读取磁盘,代价略高。