HDFS源码分析之EditLogTailer
editLogTailer = new EditLogTailer(this, conf); editLogTailer.start();利用当前FSNamesystem实例this和配置信息conf实例化一个EditLogTailer对象,然后调用其start()方法启动它。
接下来我们看看EditLogTailer的实现,先来看下其成员变量,代码如下:
// 编辑日志跟踪线程EditLogTailerThread实例tailerThread private final EditLogTailerThread tailerThread; // HDFS配置信息Configuration实例conf private final Configuration conf; // 文件系统命名空间FSNamesystem实例namesystem private final FSNamesystem namesystem; // 文件系统编辑日志FSEditLog实例editLog private FSEditLog editLog; // Active NameNode地址InetSocketAddress private InetSocketAddress activeAddr; // 名字节点通信接口NamenodeProtocol private NamenodeProtocol cachedActiveProxy = null; * The last transaction ID at which an edit log roll was initiated. * 一次编辑日志滚动开始时的最新事务ID private long lastRollTriggerTxId = HdfsConstants.INVALID_TXID; * The highest transaction ID loaded by the Standby. * StandBy NameNode加载的最高事务ID private long lastLoadedTxnId = HdfsConstants.INVALID_TXID; * The last time we successfully loaded a non-zero number of edits from the * shared directory. * 最后一次我们从共享目录成功加载一个非零编辑的时间 private long lastLoadTimestamp; * How often the Standby should roll edit logs. Since the Standby only reads * from finalized log segments, the Standby will only be as up-to-date as how * often the logs are rolled. * StandBy NameNode滚动编辑日志的时间间隔。 private final long logRollPeriodMs; * How often the Standby should check if there are new finalized segment(s) * available to be read from. * StandBy NameNode检查是否存在可以读取的新的最终日志段的时间间隔 private final long sleepTimeMs;其中,比较重要的几个变量如下:
1、EditLogTailerThread tailerThread:它是编辑日志跟踪线程,
我们再来看下EditLogTailer的构造方法,如下:
public EditLogTailer(FSNamesystem namesystem, Configuration conf) { // 实例化编辑日志追踪线程EditLogTailerThread this.tailerThread = new EditLogTailerThread(); // 根据入参初始化配置信息conf和文件系统命名系统namesystem this.conf = conf; this.namesystem = namesystem; // 从namesystem中获取editLog this.editLog = namesystem.getEditLog(); // 最新加载edit log时间lastLoadTimestamp初始化为当前时间 lastLoadTimestamp = now(); // StandBy NameNode滚动编辑日志的时间间隔logRollPeriodMs // 取参数dfs.ha.log-roll.period,参数未配置默认为2min logRollPeriodMs = conf.getInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_DEFAULT) * 1000; // 如果logRollPeriodMs大于等于0 if (logRollPeriodMs = 0) { // 调用getActiveNodeAddress()方法初始化Active NameNode地址activeAddr this.activeAddr = getActiveNodeAddress(); Preconditions.checkArgument(activeAddr.getPort() 0, "Active NameNode must have an IPC port configured. " + "Got address %s", activeAddr); LOG.info("Will roll logs on active node at " + activeAddr + " every " + (logRollPeriodMs / 1000) + " seconds."); } else { LOG.info("Not going to trigger log rolls on active node because " + DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY + " is negative."); // StandBy NameNode检查是否存在可以读取的新的最终日志段的时间间隔sleepTimeMs // 取参数dfs.ha.tail-edits.period,参数未配置默认为1min sleepTimeMs = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT) * 1000; LOG.debug("logRollPeriodMs=" + logRollPeriodMs + " sleepTime=" + sleepTimeMs); }下面,我们再看下这个十分重要的编辑日志追踪线程EditLogTailerThread的实现,它的构造方法很简单,没有什么可说的,我们着重看下它的run()方法,代码如下:
@Override public void run() { SecurityUtil.doAsLoginUserOrFatal( new PrivilegedAction Object () { @Override public Object run() { doWork(); return null; }run()方法内继而调用了doWork()方法,代码如下:
private void doWork() { // 标志位shouldRun为true时一直循环 while (shouldRun) { try { // Theres no point in triggering a log roll if the Standby hasnt // read any more transactions since the last time a roll was // triggered. // 自从上次日志滚动触发以来,如果StandBy NameNode没有读到任何事务的话,没有点触发一次日志滚动, // 如果是自从上次加载后过了太长时间,并且上次编辑日志滚动开始时的最新事务ID小于上次StandBy NameNode加载的最高事务ID if (tooLongSinceLastLoad() lastRollTriggerTxId lastLoadedTxnId) { // 触发Active NameNode进行编辑日志滚动 triggerActiveLogRoll(); * Check again in case someone calls {@link EditLogTailer#stop} while * were triggering an edit log roll, since ipc.Client catches and * ignores {@link InterruptedException} in a few places. This fixes * the bug described in HDFS-2823. // 判断标志位shouldRun,如果其为false的话,退出循环 if (!shouldRun) { break; // 调用doTailEdits()方法执行日志追踪 doTailEdits(); } catch (EditLogInputException elie) { LOG.warn("Error while reading edits from disk. Will try again.", elie); } catch (InterruptedException ie) { // interrupter should have already set shouldRun to false continue; } catch (Throwable t) { LOG.fatal("Unknown error encountered while tailing edits. " + "Shutting down standby NN.", t); terminate(1, t); // 线程休眠sleepTimeMs时间后继续工作 try { Thread.sleep(sleepTimeMs); } catch (InterruptedException e) { LOG.warn("Edit log tailer interrupted", e); }当标志位shouldRun为true时,doWork()方法一直在while循环内执行,其处理逻辑如下:
1、如果是自从上次加载后过了太长时间,并且上次编辑日志滚动开始时的最新事务ID小于上次StandBy NameNode加载的最高事务ID,触发Active NameNode进行编辑日志滚动:
自从上次加载后过了太长时间是根据tooLongSinceLastLoad()方法判断的,而触发Active NameNode进行编辑日志滚动则是通过triggerActiveLogRoll()方法来完成的;
2、判断标志位shouldRun,如果其为false的话,退出循环;
3、调用doTailEdits()方法执行日志追踪;
4、线程休眠sleepTimeMs时间后继续执行上述工作。
我们先来看下如果确定自从上次加载后过了太长时间,tooLongSinceLastLoad()方法代码如下:
/** * @return true if the configured log roll period has elapsed. private boolean tooLongSinceLastLoad() { // StandBy NameNode滚动编辑日志的时间间隔logRollPeriodMs大于0, // 且最后一次我们从共享目录成功加载一个非零编辑的时间到现在的时间间隔大于logRollPeriodMs return logRollPeriodMs = 0 (now() - lastLoadTimestamp) logRollPeriodMs ; }它判断的主要依据就是,StandBy NameNode滚动编辑日志的时间间隔logRollPeriodMs大于0,且最后一次我们从共享目录成功加载一个非零编辑的时间到现在的时间间隔大于logRollPeriodMs。
触发Active NameNode进行编辑日志滚动的triggerActiveLogRoll()方法代码如下:
/** * Trigger the active node to roll its logs. * 触发Active NameNode滚动日志 private void triggerActiveLogRoll() { LOG.info("Triggering log roll on remote NameNode " + activeAddr); try { // 获得Active NameNode的代理,并调用其rollEditLog()方法滚动编辑日志 getActiveNodeProxy().rollEditLog(); // 将上次StandBy NameNode加载的最高事务ID,即lastLoadedTxnId,赋值给上次编辑日志滚动开始时的最新事务ID,即lastRollTriggerTxId, // 这么做是为了方便进行日志回滚 lastRollTriggerTxId = lastLoadedTxnId; } catch (IOException ioe) { LOG.warn("Unable to trigger a roll of the active NN", ioe); }它首先会获得Active NameNode的代理,并调用其rollEditLog()方法滚动编辑日志,然后将上次StandBy NameNode加载的最高事务ID,即lastLoadedTxnId,赋值给上次编辑日志滚动开始时的最新事务ID,即lastRollTriggerTxId,这么做是为了方便进行日志回滚以及逻辑判断。
好了,最后我们看下最重要的执行日志追踪的doTailEdits()方法吧,代码如下:
@VisibleForTesting void doTailEdits() throws IOException, InterruptedException { // Write lock needs to be interruptible here because the // transitionToActive RPC takes the write lock before calling // tailer.stop() -- so if were not interruptible, it will // deadlock. // namesystem加写锁 namesystem.writeLockInterruptibly(); try { // 通过namesystem获取文件系统镜像FSImage实例image FSImage image = namesystem.getFSImage(); // 通过文件系统镜像FSImage实例image获取最新的事务ID long lastTxnId = image.getLastAppliedTxId(); if (LOG.isDebugEnabled()) { LOG.debug("lastTxnId: " + lastTxnId); Collection EditLogInputStream streams; try { // 从编辑日志editLog中获取编辑日志输入流集合streams,获取的输入流为最新事务ID加1之后的数据 streams = editLog.selectInputStreams(lastTxnId + 1, 0, null, false); } catch (IOException ioe) { // This is acceptable. If we try to tail edits in the middle of an edits // log roll, i.e. the last one has been finalized but the new inprogress // edits file hasnt been started yet. LOG.warn("Edits tailer failed to find any streams. Will try again " + "later.", ioe); return; if (LOG.isDebugEnabled()) { LOG.debug("edit streams to load from: " + streams.size()); // Once we have streams to load, errors encountered are legitimate cause // for concern, so we dont catch them here. Simple errors reading from // disk are ignored. long editsLoaded = 0; try { // 调用文件系统镜像FSImage实例image的loadEdits(), // 利用编辑日志输入流集合streams,加载编辑日志至目标namesystem中的文件系统镜像FSImage, // 并获得编辑日志加载的大小editsLoaded editsLoaded = image.loadEdits(streams, namesystem); } catch (EditLogInputException elie) { editsLoaded = elie.getNumEditsLoaded(); throw elie; } finally { if (editsLoaded 0 || LOG.isDebugEnabled()) { LOG.info(String.format("Loaded %d edits starting from txid %d ", editsLoaded, lastTxnId)); if (editsLoaded 0) {// 如果editsLoaded大于0 // 最后一次我们从共享目录成功加载一个非零编辑的时间lastLoadTimestamp更新为当前时间 lastLoadTimestamp = now(); // 上次StandBy NameNode加载的最高事务ID更新为image中最新事务ID lastLoadedTxnId = image.getLastAppliedTxId(); } finally { // namesystem去除写锁 namesystem.writeUnlock(); }大体处理流程如下:
1、首先,namesystem加写锁;
2、通过namesystem获取文件系统镜像FSImage实例image;
3、通过文件系统镜像FSImage实例image获取最新的事务ID,即lastTxnId;
4、从编辑日志editLog中获取编辑日志输入流集合streams,获取的输入流为最新事务ID加1之后的数据:
ps:注意,这个编辑日志输入流集合streams并非读取的是editLog对象中的数据,毕竟editLog也是根据namesystem来获取的,如果从其中读取数据再加载到namesystem中的fsimage中,没有多大意义,这个日志输入流实际上是通过Hadoop HA中的JournalNode来获取的,这个我们以后再分析。
5、调用文件系统镜像FSImage实例image的loadEdits(),利用编辑日志输入流集合streams,加载编辑日志至目标namesystem中的文件系统镜像FSImage,并获得编辑日志加载的大小editsLoaded;
6、如果editsLoaded大于0,最后一次我们从共享目录成功加载一个非零编辑的时间lastLoadTimestamp更新为当前时间;
7、上次StandBy NameNode加载的最高事务ID更新为image中最新事务ID;
8、namesystem去除写锁。
部分涉及FSImage、FSEditLog、JournalNode等的细节,限于篇幅,我们以后再分析!
Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(1) HDFS写文件是整个Hadoop中最为复杂的流程之一,它涉及到HDFS中NameNode、DataNode、DFSClient等众多角色的分工与合作。 首先上一段代码,客户端是如何写文件的: Configuration conf = new Configuration(); FileSystem fs = FileSystem.
HDFS源码分析之FSImage文件内容(一)总体格式 FSImage文件是HDFS中名字节点NameNode上文件/目录元数据在特定某一时刻的持久化存储文件。它的作用不言而喻,在HA出现之前,NameNode因为各种原因宕机后,若要恢复或在其他机器上重启NameNode,重新组织元数据,就需要加载对应的FSImage文件、FSEditLog文件,并在内存中重做FSEditLog文件中的事务条目。
HDFS源码分析数据块校验之DataBlockScanner DataBlockScanner是运行在数据节点DataNode上的一个后台线程。它为所有的块池管理块扫描。针对每个块池,一个BlockPoolSliceScanner对象将会被创建,其运行在一个单独的线程中,为该块池扫描、校验数据块。
数据块的复制当然需要一个源数据节点,从其上拷贝数据块至目标数据节点。那么数据块复制是如何选取复制源节点的呢?本文我们将针对这一问题进行研究。 在BlockManager中,chooseSourceDatanode()方法就是用来选取数据块复制时的源节点的,它负责解析数据块所属数据节点列表,并选择一个,用它作为数据块的复制源。
ReplicationMonitor是HDFS中关于数据块复制的监控线程,它的主要作用就是计算DataNode工作,并将复制请求超时的块重新加入到待调度队列。其定义及作为线程核心的run()方法如下: * Periodically calls computeReplicationWork().
相关文章
- 最新酒桌小游戏喝酒小程序源码_带流量主源码下载
- SwiftUI 4 新功能 之 网格视图Gridview组件 (教程含源码)
- SwiftUI 复杂界面之任务管理界面支卡片选择 (教程含源码)
- SwiftUI iOS 精品项目之动物世界应用支持视频播放自定义Map Annotation(教程含源码)
- SwiftUI 新属性InlinePickerStyle 教程含源码
- macOS SwiftUI 封装组件之图片组件NSImageView(教程含源码)
- macOS SwiftUI 教程之左右分栏ListStyle 折叠功能Section(教程含源码)
- DragGesture (SwiftUI 中文文档手册 教程含源码)
- springboot+vue准妈妈孕期交流平台(源码+文档)
- spring源码分析之spring-web remoting模块概况及基本概念
- hdfs源码分析第二弹
- hdfs源码分析第一弹
- solr源码分析之数据导入DataImporter追溯。