zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

ClickHouse 源码解析(三):SQL 的一生(下)

2023-02-18 16:33:54 时间

基于 ClickHouse version 22.10.1 学习并写下 ClickHouse 源码解析系列。由于 CK 版本迭代很快,可能导致代码可能有些出入,但是整体流程大差不差吧。由于源码阅读过于枯燥,并且不太利于后续复习,所以在逻辑梳理时,我会使用思维导图或者流程图的方式来描述类与类之间的调用逻辑,后半部分会挑出核心的源码去分析。


概述

上一篇ClickHouse 源码解析(二):SQL 的一生(中)在源码解析部分分析了 ExecutingGraph 的初始化流程,并且在文末画了调度节点状态轮转图,可以直观的看到一个节点是如何从 Read 转为 Finished 状态。那么本篇我们紧接着从源码的角度去学习 ExecutingGraph 是如何被调度的,节点状态是如何在其中轮转的。

逻辑梳理

为了方便复习,先挂上之前画的思维导图。ClickHouse 源码解析(一):SQL 的一生(上)

我们聚焦于执行 Pipeline 这部分,学习 SelectQuery 的调度流程。

Pipeline 机制

我们先来看一下根据 QueryPlan 构建完成的 QueryPipeline 是个什么样子的,这里举例一个带 Join 查询的 SelectQuery:

SELECT * FROM customer AS c
INNER JOIN order AS o ON c.C_CUSTKEY = o.C_CUSTKEY;

通过EXPLAIN PIPELINE可以查看这条 SQL 所构建的 Pipeline,如下所示:

比如,(ReadFromMergeTree)就是 QueryPlan 中的一个算子,MergeTreeInOrder则代表实现这个算子向 Pipeline 中添加的 Processor,Processor 后面的数字0 -> 1表示InputPort数量为 0,OutputPort数量为 1,如果 Processor 后面没有数字则默认InputPortOutputPort数量都是 1。酱紫看可能有些抽象,我们来看看下面这个图,这张图画的是 SQL 的 QueryPlan:

然后我们看看根据这个 QueryPlan 构建出来的 QueryPipeline 的样子:

注意:Source只有OutputPortSink只有InputPort

InputPortOutputPortPort的子类,在Port中有个成员变量std::shared_ptr<State> state它保存了两个连接Port的共享数据:

Port.h源码(省略部分代码):

class Port
{
protected:
    /// Shared state of two connected ports.
    class State
    {
    public:
        struct Data
        {
            /// Note: std::variant can be used. But move constructor for it can't be inlined.
            Chunk chunk;
            std::exception_ptr exception;
        };
    private:
        std::atomic<Data *> data;
    }
}

Processor之间传递数据就是通过这个State来实现的,就像下面这个样子:

Processor 在执行完成后,会将数据 Push 到ShareState中,下一个 Processor 在执行时,会从ShareState将数据 Pull 出来进行处理,处理完之后又 Push 下一个 Prot 中,整个 Pipeline 数据就是这样流动起来的。

ExecutingGraph 调度

个人理解,Pipeline 只是作为数据流通的数据结构,ExecutingGraph记录了 Processor 的执行状态,它对整个 Graph 进行调度处理。ExecutingGraph大致结构如下:

/// Graph of executing pipeline.
class ExecutingGraph
{
public:
    /// Edge represents connection between OutputPort and InputPort.
    /// For every connection, two edges are created: direct and backward (it is specified by backward flag).
    struct Edge
    {
        /// Port numbers. They are same for direct and backward edges.
        uint64_t input_port_number;
        uint64_t output_port_number;
        
        /// Edge version is increased when port's state is changed (e.g. when data is pushed). See Port.h for details.
        /// To compare version with prev_version we can decide if neighbour processor need to be prepared.
        Port::UpdateInfo update_info;
    };

    /// Graph node. Represents single Processor.
    struct Node
    {
        /// Direct edges are for output ports, back edges are for input ports.
        Edges direct_edges;
        Edges back_edges;
    };

    Nodes nodes;
};

简单来说,ExcutingGraph由多个Node组成,每个Node相当于 Processor,Node之间通过Edge连接,Node中记录了direct_edgesback_edges分别对应OutputPortInputPort。我们将上面得到的 QueryPipeline 转换为ExecutingGraph大致如下图所示:

LimitsCheckingTransformLazyOutputFormat是在 QueryPipeline 构建完成后添加的通用逻辑处理的 Processor,在后续调度中会用到。

既然ExcutingGraph构建完成,终于要开始调度执行了。那么在调度之前,需要给最先接受处理的Node初始化状态,CK 中 Processor 定义了如下六种状态:

enum class Status
{
    /// Processor needs some data at its inputs to proceed.
    /// You need to run another processor to generate required input and then call 'prepare' again.
    NeedData,

    /// Processor cannot proceed because output port is full or not isNeeded().
    /// You need to transfer data from output port to the input port of another processor and then call 'prepare' again.
    PortFull,

    /// All work is done (all data is processed or all output are closed), nothing more to do.
    Finished,

    /// No one needs data on output ports.
    /// Unneeded,

    /// You may call 'work' method and processor will do some work synchronously.
    Ready,

    /// You may call 'schedule' method and processor will return descriptor.
    /// You need to poll this descriptor and call work() afterwards.
    Async,

    /// Processor wants to add other processors to pipeline.
    /// New processors must be obtained by expandPipeline() call.
    ExpandPipeline,
};

ExcutingGraph在初始化节点,各个节点状态如下图所示:

处于Ready状态的Node,会调用当前 Processor 的work()方法处理数据,执行完成之后,MergeTree调用prepare()方法将数据 Push 到OutputPort,节点状态如下:

紧接着,Expression调用prepare()方法将 Pull 数据并且进入Ready状态,节点状态如下:

后续的节点还是以上面的方式,调用prepare()work()方法更新Node状态,完成整个ExcutingGraph调度逻辑。在文章末尾画了一部分算子较为完整的调度状态轮转图

ExecutingGraph 调度

沿用之前的例子,那么ExecutingGraph初始化完成之后的节点状态如下:

完成ExecutingGraph初始化,重新回到PipelineExecutor::initializeExecution()方法,代码如下:

void PipelineExecutor::initializeExecution(size_t num_threads)
{
    ...
    
    Queue queue;
    // 初始化 ExecutingGraph.
    graph->initializeExecution(queue);
    
    // 初始化 ExecutorTasks.
    tasks.init(num_threads, use_threads, profile_processors, trace_processors, read_progress_callback.get());
    // 将状态为 Ready 的 node,放入 ExecutorTasks 的 task_queue 中,等待调度.
    tasks.fill(queue);

    std::unique_lock lock{threads_mutex};
    threads.reserve(num_threads);
}

ExecutorTasks用于PipelineExecutor,管理着已经准备好待执行的任务。ExecutorTasks大概的结构:

/// Manage tasks which are ready for execution. Used in PipelineExecutor.
class ExecutorTasks
{
    /// Contexts for every executing thread.
    std::vector<std::unique_ptr<ExecutionThreadContext>> executor_contexts;

    /// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set.
    /// Stores processors need to be prepared. Preparing status is already set for them.
    TaskQueue<ExecutingGraph::Node> task_queue;

    /// Queue which stores tasks where processors returned Async status after prepare.
    /// If multiple threads are using, main thread will wait for async tasks.
    /// For single thread, will wait for async tasks only when task_queue is empty.
    PollingQueue async_task_queue;

    /// Maximum amount of threads. Constant after initialization, based on `max_threads` setting.
    size_t num_threads = 0;

    /// Started thread count (allocated by `ConcurrencyControl`). Can increase during execution up to `num_threads`.
    size_t use_threads = 0;

    /// This is the total number of waited async tasks which are not executed yet.
    /// sum(executor_contexts[i].async_tasks.size())
    size_t num_waiting_async_tasks = 0;

    /// A set of currently waiting threads.
    ThreadsQueue threads_queue;
};

ExecutorTasks使用task_queue保存了已经准备就绪的 processors,每个线程都可以并发地从task_queue获取 task 去执行。

PipelineExecutor::executeImpl()方法继续往下走,由于测试数据是小数据量,所以会使用单线程执行,来到PipelineExecutor::executeSingleThread()方法,该方法会调用PipelineExecutor::executeStepImpl()方法,这个方法是 ExecutingGraph 调度轮转的核心方法,代码如下:

void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yield_flag)
{
    auto & context = tasks.getThreadContext(thread_num);
    bool yield = false;

    while (!tasks.isFinished() && !yield)
    {
        /// First, find any processor to execute.
        /// Just traverse graph and prepare any processor.
        while (!tasks.isFinished() && !context.hasTask())
            // 1.尝试为当前线程获取 task,并放入 context.
            tasks.tryGetTask(context);

        while (context.hasTask() && !yield)
        {
            if (tasks.isFinished())
                break;
            
            // 2.线程执行 task.
            if (!context.executeTask())
                cancel();

            if (tasks.isFinished())
                break;

            if (!checkTimeLimitSoft())
                break;

            /// Try to execute neighbour processor.
            {
                Queue queue;
                Queue async_queue;

                /// Prepare processor after execution.
                // 3.更新已经执行节点相邻的节点状态.
                if (!graph->updateNode(context.getProcessorID(), queue, async_queue))
                    finish();

                /// Push other tasks to global queue.
                // 4.将新准备好的 task 放入全局队列.
                tasks.pushTasks(queue, async_queue, context);
            }

            /// Upscale if possible.
            spawnThreads();

            /// We have executed single processor. Check if we need to yield execution.
            if (yield_flag && *yield_flag)
                yield = true;
        }
    }
}

这个函数就是线程调度的循环体,执行 Processor 并更新相邻的节点状态(updataNode()方法上一篇已经讲过了,不再赘述)。我们需要关注context.executeTask()方法,在ExecutionThreadContext::executeTask()函数会调executeJob(node,read_progress_callback)函数,代码如下:

static void executeJob(ExecutingGraph::Node * node, ReadProgressCallback * read_progress_callback)
{
    try
    {
        node->processor->work();

        /// Update read progress only for source nodes.
        bool is_source = node->back_edges.empty();

        if (is_source && read_progress_callback)
        {
            if (auto read_progress = node->processor->getReadProgress())
            {
                if (read_progress->counters.total_rows_approx)
                    read_progress_callback->addTotalRowsApprox(read_progress->counters.total_rows_approx);

                if (!read_progress_callback->onProgress(read_progress->counters.read_rows, read_progress->counters.read_bytes, read_progress->limits))
                    node->processor->cancel();
            }
        }
    }
    catch (Exception & exception)
    {
        if (checkCanAddAdditionalInfoToException(exception))
            exception.addMessage("While executing " + node->processor->getName());
        throw;
    }
}

在这个函数中,会调用 node 中对应的IProcessor::work()方法,work()方法就是算子处理数据、传递数据的关键方法。在执行算子之后,会判断是否是 Source 算子,如果是数据源算子则会调用 progress_callback 回复进度信息。

以第一个算子 MergeTree 举例,它是一个 ISource 继承自 IProcessor,所以会调用ISource::work()函数,代码如下:

void ISource::work()
{
    try
    {
        read_progress_was_set = false;

        if (auto chunk = tryGenerate())
        {
            current_chunk.chunk = std::move(*chunk);
            if (current_chunk.chunk)
            {
                has_input = true;
                if (auto_progress && !read_progress_was_set)
                    progress(current_chunk.chunk.getNumRows(), current_chunk.chunk.bytes());
            }
        }
        else
            finished = true;

        if (isCancelled())
            finished = true;
    }
    catch (...)
    {
        finished = true;
        throw;
    }
}

存储层通过继承 ISource 类,实现tryGenerate()generate()来生成数据并返回 chunk。至此 Pipeline 中第一个算子就会执行完成,然后将生成的数据通过 Pipeline 传递给下一个算子,直至所有算子处理完成数据并返回,这样子一个 ExecutingGraph 调度也就完成啦。

下一篇将深入学习执行层是如何与存储层之间建立桥梁的,存储层是如何处理传递数据的。

调度状态轮转图