zl程序教程

您现在的位置是:首页 >  其他

当前栏目

2023-01-28 clickhouse-聚合函数sum执行分析

执行 函数 分析 2023 01 聚合 28 sum
2023-09-27 14:25:42 时间

摘要:

以聚合函数sum为切入点,分析clickhouse的聚合执行流程

clickhouse编译debug版本

mkdir build && cd build
cmake .. -DCMAKE_BUILD_TYPE=DEBUG -DPARALLEL_COMPILE_JOBS=`nproc` -DPARALLEL_LINK_JOBS=`nproc`

clickhouse的启动与客户端连接:

启动:

clickhouse server

客户端连接:

clickhouse client

DDL:

CREATE DATABASE test
create table t1 (id Int32,name String) engine=TinyLog;
insert into t1 (id, name) values (1, 'abc'), (2, 'bbbb'),(3,'sdfg');

插入数据脚本:

#!/bin/bash

for ((i=0;i <100000;i++)); do
    echo clickhouse client  --database test --query="insert into t1 (id, name) values($i, '$i')";
    clickhouse client  --database test --query="insert into t1 (id, name) values($i, '$i')";
done

查询SQL:

select sum(id) from t1 where id > 1;
clickhouse client  --database test --query="select sum(id) from t1 where id > 1;";

核心流程:

DB::AggregateFunctionSumData<long>::addMany<int>

调用堆栈:

(gdb) bt
#0  DB::AggregateFunctionSumAddOverflowImpl<long>::add (lhs=<optimized out>, rhs=<optimized out>) at ../src/AggregateFunctions/AggregateFunctionSum.h:38
#1  DB::AggregateFunctionSumData<long>::addManyImplAVX2<int> (this=0x7f0caeb98000, ptr=0x7f0dcc0ff300, start=<optimized out>, end=<optimized out>)
    at ../src/AggregateFunctions/AggregateFunctionSum.h:62
#2  0x00000000162bf6df in DB::Aggregator::executeWithoutKeyImpl<false> (this=this@entry=0x7f0cb6fd7c10, res=@0x7f0cafe62660: 0x7f0caeb98000 "", row_begin=row_begin@entry=0, row_end=row_end@entry=6, 
    aggregate_instructions=<optimized out>, arena=0x0) at ../src/Interpreters/Aggregator.cpp:1328
#3  0x00000000162428d3 in DB::Aggregator::executeOnBlock (this=0x7f0cb6fd7c10, columns=..., row_begin=1, row_end=139697234899712, result=..., key_columns=..., aggregate_columns=..., 
    no_more_keys=@0x7f0cb6fd7730: false) at ../src/Interpreters/Aggregator.cpp:1544
#4  0x0000000017af92a0 in DB::AggregatingTransform::consume (this=this@entry=0x7f0cb6fd7618, chunk=...) at ../src/Processors/Transforms/AggregatingTransform.cpp:533
#5  0x0000000017af7776 in DB::AggregatingTransform::work (this=0x7f0cb6fd7618) at ../src/Processors/Transforms/AggregatingTransform.cpp:492
#6  0x00000000179687c0 in DB::executeJob (node=0x7f0d1abb2600, read_progress_callback=0x7f0d0e083060) at ../src/Processors/Executors/ExecutionThreadContext.cpp:47
#7  DB::ExecutionThreadContext::executeTask (this=0x7f0cd53a3180) at ../src/Processors/Executors/ExecutionThreadContext.cpp:92
#8  0x000000001795ee7a in DB::PipelineExecutor::executeStepImpl (this=this@entry=0x7f0cafd24218, thread_num=<optimized out>, yield_flag=yield_flag@entry=0x0)
    at ../src/Processors/Executors/PipelineExecutor.cpp:229
#9  0x000000001796036f in DB::PipelineExecutor::executeSingleThread (this=0x7f0cafd24218, thread_num=139697234899712) at ../src/Processors/Executors/PipelineExecutor.cpp:195
#10 DB::PipelineExecutor::spawnThreads()::$_0::operator()() const (this=0x7f0cbd7f39f0) at ../src/Processors/Executors/PipelineExecutor.cpp:320
#11 std::__1::__invoke[abi:v15000]<DB::PipelineExecutor::spawnThreads()::$_0&> (__f=...) at ../contrib/llvm-project/libcxx/include/__functional/invoke.h:394
#12 std::__1::__apply_tuple_impl[abi:v15000]<DB::PipelineExecutor::spawnThreads()::$_0&, std::__1::tuple<>&>(DB::PipelineExecutor::spawnThreads()::$_0&, std::__1::tuple<>&, std::__1::__tuple_indices<>) (__f=..., __t=...) at ../contrib/llvm-project/libcxx/include/tuple:1789
#13 std::__1::apply[abi:v15000]<DB::PipelineExecutor::spawnThreads()::$_0&, std::__1::tuple<>&>(DB::PipelineExecutor::spawnThreads()::$_0&, std::__1::tuple<>&) (__f=..., __t=...)
    at ../contrib/llvm-project/libcxx/include/tuple:1798
#14 ThreadFromGlobalPoolImpl<true>::ThreadFromGlobalPoolImpl<DB::PipelineExecutor::spawnThreads()::$_0>(DB::PipelineExecutor::spawnThreads()::$_0&&)::{lambda()#1}::operator()() (this=0x7f0cb3a410d0)
    at ../src/Common/ThreadPool.h:196
#15 std::__1::__invoke[abi:v15000]<ThreadFromGlobalPoolImpl<true>::ThreadFromGlobalPoolImpl<DB::PipelineExecutor::spawnThreads()::$_0>(DB::PipelineExecutor::spawnThreads()::$_0&&)::{lambda()#1}&> (
    __f=...) at ../contrib/llvm-project/libcxx/include/__functional/invoke.h:394
#16 std::__1::__invoke_void_return_wrapper<void, true>::__call<ThreadFromGlobalPoolImpl<true>::ThreadFromGlobalPoolImpl<DB::PipelineExecutor::spawnThreads()::$_0>(DB::PipelineExecutor::spawnThreads()::$_0&&)::{lambda()#1}&>(ThreadFromGlobalPoolImpl<true>::ThreadFromGlobalPoolImpl<DB::PipelineExecutor::spawnThreads()::$_0>(DB::PipelineExecutor::spawnThreads()::$_0&&)::{lambda()#1}&) (__args=...)
    at ../contrib/llvm-project/libcxx/include/__functional/invoke.h:479
#17 std::__1::__function::__default_alloc_func<ThreadFromGlobalPoolImpl<true>::ThreadFromGlobalPoolImpl<DB::PipelineExecutor::spawnThreads()::$_0>(DB::PipelineExecutor::spawnThreads()::$_0&&)::{lambda()#1}, void ()>::operator()[abi:v15000]() (this=0x7f0cb3a410d0) at ../contrib/llvm-project/libcxx/include/__functional/function.h:235
#18 std::__1::__function::__policy_invoker<void ()>::__call_impl<std::__1::__function::__default_alloc_func<ThreadFromGlobalPoolImpl<true>::ThreadFromGlobalPoolImpl<DB::PipelineExecutor::spawnThreads()::$_0>(DB::PipelineExecutor::spawnThreads()::$_0&&)::{lambda()#1}, void ()> >(std::__1::__function::__policy_storage const*) (__buf=<optimized out>)
    at ../contrib/llvm-project/libcxx/include/__functional/function.h:716
#19 0x000000000c1bb546 in std::__1::__function::__policy_func<void ()>::operator()[abi:v15000]() const (this=0x7f0cbd7f3cc0) at ../contrib/llvm-project/libcxx/include/__functional/function.h:848
#20 std::__1::function<void ()>::operator()() const (this=0x7f0cbd7f3cc0) at ../contrib/llvm-project/libcxx/include/__functional/function.h:1187
#21 ThreadPoolImpl<std::__1::thread>::worker (this=0x7f0dcde3e800, thread_it=...) at ../src/Common/ThreadPool.cpp:295
#22 0x000000000c1be86e in ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, long, std::__1::optional<unsigned long>, bool)::{lambda()#2}::operator()() const (
    this=0x7f0dcbcff608) at ../src/Common/ThreadPool.cpp:144
#23 std::__1::__invoke[abi:v15000]<ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, long, std::__1::optional<unsigned long>, bool)::{lambda()#2}> (__f=...)
    at ../contrib/llvm-project/libcxx/include/__functional/invoke.h:394
#24 std::__1::__thread_execute[abi:v15000]<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, long, std::__1::optional<unsigned long>, bool)::{lambda()#2}>(std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, long, std::__1::optional<unsigned long>, bool)::{lambda()#2}>&, std::__1::__tuple_indices<>) (__t=...)
    at ../contrib/llvm-project/libcxx/include/thread:284
#25 std::__1::__thread_proxy[abi:v15000]<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, long, std::__1::optional<unsigned long>, bool)::{lambda()#2}> >(void*) (__vp=0x7f0dcbcff600) at ../contrib/llvm-project/libcxx/include/thread:295
--Type <RET> for more, q to quit, c to continue without paging--
#26 0x00007f0dcec68802 in start_thread () from /lib64/libc.so.6
#27 0x00007f0dcec08450 in clone3 () from /lib64/libc.so.6

核心函数:

AggregatingTransform::consume


void AggregatingTransform::consume(Chunk chunk)
{
    const UInt64 num_rows = chunk.getNumRows();

    if (num_rows == 0 && params->params.empty_result_for_aggregation_by_empty_set)
        return;

    if (!is_consume_started)
    {
        LOG_TRACE(log, "Aggregating");
        is_consume_started = true;
    }

    src_rows += num_rows;
    src_bytes += chunk.bytes();

    if (params->params.only_merge)
    {
        auto block = getInputs().front().getHeader().cloneWithColumns(chunk.detachColumns());
        block = materializeBlock(block);
        if (!params->aggregator.mergeOnBlock(block, variants, no_more_keys))
            is_consume_finished = true;
    }
    else
    {
        if (!params->aggregator.executeOnBlock(chunk.detachColumns(), 0, num_rows, variants, key_columns, aggregate_columns, no_more_keys))
            is_consume_finished = true;
    }
}

Aggregator::executeOnBlock


bool Aggregator::executeOnBlock(Columns columns,
    size_t row_begin, size_t row_end,
    AggregatedDataVariants & result,
    ColumnRawPtrs & key_columns,
    AggregateColumns & aggregate_columns,
    bool & no_more_keys) const
{
    /// `result` will destroy the states of aggregate functions in the destructor
    result.aggregator = this;

    /// How to perform the aggregation?
    if (result.empty())
    {
        initDataVariantsWithSizeHint(result, method_chosen, params);
        result.keys_size = params.keys_size;
        result.key_sizes = key_sizes;
        LOG_TRACE(log, "Aggregation method: {}", result.getMethodName());
    }

    /** Constant columns are not supported directly during aggregation.
      * To make them work anyway, we materialize them.
      */
    Columns materialized_columns;

    /// Remember the columns we will work with
    for (size_t i = 0; i < params.keys_size; ++i)
    {
        materialized_columns.push_back(recursiveRemoveSparse(columns.at(keys_positions[i]))->convertToFullColumnIfConst());
        key_columns[i] = materialized_columns.back().get();

        if (!result.isLowCardinality())
        {
            auto column_no_lc = recursiveRemoveLowCardinality(key_columns[i]->getPtr());
            if (column_no_lc.get() != key_columns[i])
            {
                materialized_columns.emplace_back(std::move(column_no_lc));
                key_columns[i] = materialized_columns.back().get();
            }
        }
    }

    NestedColumnsHolder nested_columns_holder;
    AggregateFunctionInstructions aggregate_functions_instructions;
    prepareAggregateInstructions(columns, aggregate_columns, materialized_columns, aggregate_functions_instructions, nested_columns_holder);

    if ((params.overflow_row || result.type == AggregatedDataVariants::Type::without_key) && !result.without_key)
    {
        AggregateDataPtr place = result.aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
        createAggregateStates(place);
        result.without_key = place;
    }

    /// We select one of the aggregation methods and call it.

    /// For the case when there are no keys (all aggregate into one row).
    if (result.type == AggregatedDataVariants::Type::without_key)
    {
        /// TODO: Enable compilation after investigation
// #if USE_EMBEDDED_COMPILER
//         if (compiled_aggregate_functions_holder)
//         {
//             executeWithoutKeyImpl<true>(result.without_key, row_begin, row_end, aggregate_functions_instructions.data(), result.aggregates_pool);
//         }
//         else
// #endif
        {
            executeWithoutKeyImpl<false>(result.without_key, row_begin, row_end, aggregate_functions_instructions.data(), result.aggregates_pool);
        }
    }
    else
    {
        /// This is where data is written that does not fit in `max_rows_to_group_by` with `group_by_overflow_mode = any`.
        AggregateDataPtr overflow_row_ptr = params.overflow_row ? result.without_key : nullptr;
        executeImpl(result, row_begin, row_end, key_columns, aggregate_functions_instructions.data(), no_more_keys, overflow_row_ptr);
    }

    size_t result_size = result.sizeWithoutOverflowRow();
    Int64 current_memory_usage = 0;
    if (auto * memory_tracker_child = CurrentThread::getMemoryTracker())
        if (auto * memory_tracker = memory_tracker_child->getParent())
            current_memory_usage = memory_tracker->get();

    /// Here all the results in the sum are taken into account, from different threads.
    auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation;

    bool worth_convert_to_two_level = worthConvertToTwoLevel(
        params.group_by_two_level_threshold, result_size, params.group_by_two_level_threshold_bytes, result_size_bytes);

    /** Converting to a two-level data structure.
      * It allows you to make, in the subsequent, an effective merge - either economical from memory or parallel.
      */
    if (result.isConvertibleToTwoLevel() && worth_convert_to_two_level)
        result.convertToTwoLevel();

    /// Checking the constraints.
    if (!checkLimits(result_size, no_more_keys))
        return false;

    /** Flush data to disk if too much RAM is consumed.
      * Data can only be flushed to disk if a two-level aggregation structure is used.
      */
    if (params.max_bytes_before_external_group_by
        && result.isTwoLevel()
        && current_memory_usage > static_cast<Int64>(params.max_bytes_before_external_group_by)
        && worth_convert_to_two_level)
    {
        size_t size = current_memory_usage + params.min_free_disk_space;
        writeToTemporaryFile(result, size);
    }

    return true;
}

Aggregator::prepareAggregateInstructions


void Aggregator::prepareAggregateInstructions(
    Columns columns,
    AggregateColumns & aggregate_columns,
    Columns & materialized_columns,
    AggregateFunctionInstructions & aggregate_functions_instructions,
    NestedColumnsHolder & nested_columns_holder) const
{
    for (size_t i = 0; i < params.aggregates_size; ++i)
        aggregate_columns[i].resize(params.aggregates[i].argument_names.size());

    aggregate_functions_instructions.resize(params.aggregates_size + 1);
    aggregate_functions_instructions[params.aggregates_size].that = nullptr;

    for (size_t i = 0; i < params.aggregates_size; ++i)
    {
        bool allow_sparse_arguments = aggregate_columns[i].size() == 1;
        bool has_sparse_arguments = false;

        for (size_t j = 0; j < aggregate_columns[i].size(); ++j)
        {
            const auto pos = header.getPositionByName(params.aggregates[i].argument_names[j]);
            materialized_columns.push_back(columns.at(pos)->convertToFullColumnIfConst());
            aggregate_columns[i][j] = materialized_columns.back().get();

            auto full_column = allow_sparse_arguments
                ? aggregate_columns[i][j]->getPtr()
                : recursiveRemoveSparse(aggregate_columns[i][j]->getPtr());

            full_column = recursiveRemoveLowCardinality(full_column);
            if (full_column.get() != aggregate_columns[i][j])
            {
                materialized_columns.emplace_back(std::move(full_column));
                aggregate_columns[i][j] = materialized_columns.back().get();
            }

            if (aggregate_columns[i][j]->isSparse())
                has_sparse_arguments = true;
        }

        aggregate_functions_instructions[i].has_sparse_arguments = has_sparse_arguments;
        aggregate_functions_instructions[i].arguments = aggregate_columns[i].data();
        aggregate_functions_instructions[i].state_offset = offsets_of_aggregate_states[i];

        const auto * that = aggregate_functions[i];
        /// Unnest consecutive trailing -State combinators
        while (const auto * func = typeid_cast<const AggregateFunctionState *>(that))
            that = func->getNestedFunction().get();
        aggregate_functions_instructions[i].that = that;

        if (const auto * func = typeid_cast<const AggregateFunctionArray *>(that))
        {
            /// Unnest consecutive -State combinators before -Array
            that = func->getNestedFunction().get();
            while (const auto * nested_func = typeid_cast<const AggregateFunctionState *>(that))
                that = nested_func->getNestedFunction().get();
            auto [nested_columns, offsets] = checkAndGetNestedArrayOffset(aggregate_columns[i].data(), that->getArgumentTypes().size());
            nested_columns_holder.push_back(std::move(nested_columns));
            aggregate_functions_instructions[i].batch_arguments = nested_columns_holder.back().data();
            aggregate_functions_instructions[i].offsets = offsets;
        }
        else
            aggregate_functions_instructions[i].batch_arguments = aggregate_columns[i].data();

        aggregate_functions_instructions[i].batch_that = that;
    }
}

Aggregator::executeWithoutKeyImpl


template <bool use_compiled_functions>
void NO_INLINE Aggregator::executeWithoutKeyImpl(
    AggregatedDataWithoutKey & res,
    size_t row_begin, size_t row_end,
    AggregateFunctionInstruction * aggregate_instructions,
    Arena * arena) const
{
    if (row_begin == row_end)
        return;

#if USE_EMBEDDED_COMPILER
    if constexpr (use_compiled_functions)
    {
        std::vector<ColumnData> columns_data;

        for (size_t i = 0; i < aggregate_functions.size(); ++i)
        {
            if (!is_aggregate_function_compiled[i])
                continue;

            AggregateFunctionInstruction * inst = aggregate_instructions + i;
            size_t arguments_size = inst->that->getArgumentTypes().size();

            for (size_t argument_index = 0; argument_index < arguments_size; ++argument_index)
            {
                columns_data.emplace_back(getColumnData(inst->batch_arguments[argument_index]));
            }
        }

        auto add_into_aggregate_states_function_single_place = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function_single_place;
        add_into_aggregate_states_function_single_place(row_begin, row_end, columns_data.data(), res);

#if defined(MEMORY_SANITIZER)

        /// We compile only functions that do not allocate some data in Arena. Only store necessary state in AggregateData place.
        for (size_t aggregate_function_index = 0; aggregate_function_index < aggregate_functions.size(); ++aggregate_function_index)
        {
            if (!is_aggregate_function_compiled[aggregate_function_index])
                continue;

            auto aggregate_data_with_offset = res + offsets_of_aggregate_states[aggregate_function_index];
            auto data_size = params.aggregates[aggregate_function_index].function->sizeOfData();
            __msan_unpoison(aggregate_data_with_offset, data_size);
        }
#endif
    }
#endif

    /// Adding values
    for (size_t i = 0; i < aggregate_functions.size(); ++i)
    {
        AggregateFunctionInstruction * inst = aggregate_instructions + i;

#if USE_EMBEDDED_COMPILER
        if constexpr (use_compiled_functions)
            if (is_aggregate_function_compiled[i])
                continue;
#endif

        if (inst->offsets)
            inst->batch_that->addBatchSinglePlace(
                inst->offsets[static_cast<ssize_t>(row_begin) - 1],
                inst->offsets[row_end - 1],
                res + inst->state_offset,
                inst->batch_arguments,
                arena);
        else if (inst->has_sparse_arguments)
            inst->batch_that->addBatchSparseSinglePlace(
                row_begin, row_end,
                res + inst->state_offset,
                inst->batch_arguments,
                arena);
        else
            inst->batch_that->addBatchSinglePlace(
                row_begin, row_end,
                res + inst->state_offset,
                inst->batch_arguments,
                arena);
    }
}

AggregateFunctionSumData::addMany


    /// Vectorized version
    template <typename Value>
    void NO_INLINE addMany(const Value * __restrict ptr, size_t start, size_t end)
    {
#if USE_MULTITARGET_CODE
        if (isArchSupported(TargetArch::AVX2))
        {
            addManyImplAVX2(ptr, start, end);
            return;
        }
        else if (isArchSupported(TargetArch::SSE42))
        {
            addManyImplSSE42(ptr, start, end);
            return;
        }
#endif

        addManyImpl(ptr, start, end);
    }


    /// Vectorized version
    MULTITARGET_FUNCTION_AVX2_SSE42(
    MULTITARGET_FUNCTION_HEADER(
    template <typename Value>
    void NO_SANITIZE_UNDEFINED NO_INLINE
    ), addManyImpl, MULTITARGET_FUNCTION_BODY((const Value * __restrict ptr, size_t start, size_t end) /// NOLINT
    {
        ptr += start;
        size_t count = end - start;
        const auto * end_ptr = ptr + count;

        if constexpr (std::is_floating_point_v<T>)
        {
            /// Compiler cannot unroll this loop, do it manually.
            /// (at least for floats, most likely due to the lack of -fassociative-math)

            /// Something around the number of SSE registers * the number of elements fit in register.
            constexpr size_t unroll_count = 128 / sizeof(T);
            T partial_sums[unroll_count]{};

            const auto * unrolled_end = ptr + (count / unroll_count * unroll_count);

            while (ptr < unrolled_end)
            {
                for (size_t i = 0; i < unroll_count; ++i)
                    Impl::add(partial_sums[i], ptr[i]);
                ptr += unroll_count;
            }

            for (size_t i = 0; i < unroll_count; ++i)
                Impl::add(sum, partial_sums[i]);
        }

        /// clang cannot vectorize the loop if accumulator is class member instead of local variable.
        T local_sum{};
        while (ptr < end_ptr)
        {
            Impl::add(local_sum, *ptr);
            ++ptr;
        }
        Impl::add(sum, local_sum);
    })
    )

template <typename T>
struct AggregateFunctionSumAddOverflowImpl
{
    static void NO_SANITIZE_UNDEFINED ALWAYS_INLINE add(T & lhs, const T & rhs)
    {
        lhs += rhs;
    }
};