try { // 获取需要执行sync的场景 while (true) { takeSyncFuture = null; // We have to process what we 'take' from the queue takeSyncFuture = this.syncFutures.take(); currentSequence = this.sequence; longsyncFutureSequence= takeSyncFuture.getTxid(); if (syncFutureSequence > currentSequence) { thrownewIllegalStateException("currentSequence=" + currentSequence + ", syncFutureSequence=" + syncFutureSequence); } // See if we can process any syncfutures BEFORE we go sync. // 掠过一些已经sync过的sequence,可能在别syncRunner已经执行过了,或当时多加的部分。 longcurrentHighestSyncedSequence= highestSyncedTxid.get(); if (currentSequence < currentHighestSyncedSequence) { syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null); // Done with the 'take'. Go around again and do a new 'take'. continue; } break; } try { // 执行sync writer.sync(useHsync); // 更新highestSyncedTxid,此处记录了目前sync的最高Sequence。 currentSequence = updateHighestSyncedSequence(currentSequence); } catch (IOException e) { ... } finally { // First release what we 'took' from the queue. // release目前取到的SyncFuture。 syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException); // Can we release other syncs? // release所有目前小于当前sequence的SyncFuture。这步感觉好像是没有必要的。 syncCount += releaseSyncFutures(currentSequence, lastException); if (lastException != null) { requestLogRoll(); } else { checkLogRoll(); } } postSync(System.nanoTime() - start, syncCount); } catch (InterruptedException e) { ... } }