2023-01-28 clickhouse-聚合函数sum执行分析
执行 函数 分析 2023 01 聚合 28 sum
2023-09-27 14:25:42 时间
摘要:
以聚合函数sum为切入点,分析clickhouse的聚合执行流程
clickhouse编译debug版本
mkdir build && cd build
cmake .. -DCMAKE_BUILD_TYPE=DEBUG -DPARALLEL_COMPILE_JOBS=`nproc` -DPARALLEL_LINK_JOBS=`nproc`
clickhouse的启动与客户端连接:
启动:
clickhouse server
客户端连接:
clickhouse client
DDL:
CREATE DATABASE test
create table t1 (id Int32,name String) engine=TinyLog;
insert into t1 (id, name) values (1, 'abc'), (2, 'bbbb'),(3,'sdfg');
插入数据脚本:
#!/bin/bash
for ((i=0;i <100000;i++)); do
echo clickhouse client --database test --query="insert into t1 (id, name) values($i, '$i')";
clickhouse client --database test --query="insert into t1 (id, name) values($i, '$i')";
done
查询SQL:
select sum(id) from t1 where id > 1;
clickhouse client --database test --query="select sum(id) from t1 where id > 1;";
核心流程:
DB::AggregateFunctionSumData<long>::addMany<int>
调用堆栈:
(gdb) bt
#0 DB::AggregateFunctionSumAddOverflowImpl<long>::add (lhs=<optimized out>, rhs=<optimized out>) at ../src/AggregateFunctions/AggregateFunctionSum.h:38
#1 DB::AggregateFunctionSumData<long>::addManyImplAVX2<int> (this=0x7f0caeb98000, ptr=0x7f0dcc0ff300, start=<optimized out>, end=<optimized out>)
at ../src/AggregateFunctions/AggregateFunctionSum.h:62
#2 0x00000000162bf6df in DB::Aggregator::executeWithoutKeyImpl<false> (this=this@entry=0x7f0cb6fd7c10, res=@0x7f0cafe62660: 0x7f0caeb98000 "", row_begin=row_begin@entry=0, row_end=row_end@entry=6,
aggregate_instructions=<optimized out>, arena=0x0) at ../src/Interpreters/Aggregator.cpp:1328
#3 0x00000000162428d3 in DB::Aggregator::executeOnBlock (this=0x7f0cb6fd7c10, columns=..., row_begin=1, row_end=139697234899712, result=..., key_columns=..., aggregate_columns=...,
no_more_keys=@0x7f0cb6fd7730: false) at ../src/Interpreters/Aggregator.cpp:1544
#4 0x0000000017af92a0 in DB::AggregatingTransform::consume (this=this@entry=0x7f0cb6fd7618, chunk=...) at ../src/Processors/Transforms/AggregatingTransform.cpp:533
#5 0x0000000017af7776 in DB::AggregatingTransform::work (this=0x7f0cb6fd7618) at ../src/Processors/Transforms/AggregatingTransform.cpp:492
#6 0x00000000179687c0 in DB::executeJob (node=0x7f0d1abb2600, read_progress_callback=0x7f0d0e083060) at ../src/Processors/Executors/ExecutionThreadContext.cpp:47
#7 DB::ExecutionThreadContext::executeTask (this=0x7f0cd53a3180) at ../src/Processors/Executors/ExecutionThreadContext.cpp:92
#8 0x000000001795ee7a in DB::PipelineExecutor::executeStepImpl (this=this@entry=0x7f0cafd24218, thread_num=<optimized out>, yield_flag=yield_flag@entry=0x0)
at ../src/Processors/Executors/PipelineExecutor.cpp:229
#9 0x000000001796036f in DB::PipelineExecutor::executeSingleThread (this=0x7f0cafd24218, thread_num=139697234899712) at ../src/Processors/Executors/PipelineExecutor.cpp:195
#10 DB::PipelineExecutor::spawnThreads()::$_0::operator()() const (this=0x7f0cbd7f39f0) at ../src/Processors/Executors/PipelineExecutor.cpp:320
#11 std::__1::__invoke[abi:v15000]<DB::PipelineExecutor::spawnThreads()::$_0&> (__f=...) at ../contrib/llvm-project/libcxx/include/__functional/invoke.h:394
#12 std::__1::__apply_tuple_impl[abi:v15000]<DB::PipelineExecutor::spawnThreads()::$_0&, std::__1::tuple<>&>(DB::PipelineExecutor::spawnThreads()::$_0&, std::__1::tuple<>&, std::__1::__tuple_indices<>) (__f=..., __t=...) at ../contrib/llvm-project/libcxx/include/tuple:1789
#13 std::__1::apply[abi:v15000]<DB::PipelineExecutor::spawnThreads()::$_0&, std::__1::tuple<>&>(DB::PipelineExecutor::spawnThreads()::$_0&, std::__1::tuple<>&) (__f=..., __t=...)
at ../contrib/llvm-project/libcxx/include/tuple:1798
#14 ThreadFromGlobalPoolImpl<true>::ThreadFromGlobalPoolImpl<DB::PipelineExecutor::spawnThreads()::$_0>(DB::PipelineExecutor::spawnThreads()::$_0&&)::{lambda()#1}::operator()() (this=0x7f0cb3a410d0)
at ../src/Common/ThreadPool.h:196
#15 std::__1::__invoke[abi:v15000]<ThreadFromGlobalPoolImpl<true>::ThreadFromGlobalPoolImpl<DB::PipelineExecutor::spawnThreads()::$_0>(DB::PipelineExecutor::spawnThreads()::$_0&&)::{lambda()#1}&> (
__f=...) at ../contrib/llvm-project/libcxx/include/__functional/invoke.h:394
#16 std::__1::__invoke_void_return_wrapper<void, true>::__call<ThreadFromGlobalPoolImpl<true>::ThreadFromGlobalPoolImpl<DB::PipelineExecutor::spawnThreads()::$_0>(DB::PipelineExecutor::spawnThreads()::$_0&&)::{lambda()#1}&>(ThreadFromGlobalPoolImpl<true>::ThreadFromGlobalPoolImpl<DB::PipelineExecutor::spawnThreads()::$_0>(DB::PipelineExecutor::spawnThreads()::$_0&&)::{lambda()#1}&) (__args=...)
at ../contrib/llvm-project/libcxx/include/__functional/invoke.h:479
#17 std::__1::__function::__default_alloc_func<ThreadFromGlobalPoolImpl<true>::ThreadFromGlobalPoolImpl<DB::PipelineExecutor::spawnThreads()::$_0>(DB::PipelineExecutor::spawnThreads()::$_0&&)::{lambda()#1}, void ()>::operator()[abi:v15000]() (this=0x7f0cb3a410d0) at ../contrib/llvm-project/libcxx/include/__functional/function.h:235
#18 std::__1::__function::__policy_invoker<void ()>::__call_impl<std::__1::__function::__default_alloc_func<ThreadFromGlobalPoolImpl<true>::ThreadFromGlobalPoolImpl<DB::PipelineExecutor::spawnThreads()::$_0>(DB::PipelineExecutor::spawnThreads()::$_0&&)::{lambda()#1}, void ()> >(std::__1::__function::__policy_storage const*) (__buf=<optimized out>)
at ../contrib/llvm-project/libcxx/include/__functional/function.h:716
#19 0x000000000c1bb546 in std::__1::__function::__policy_func<void ()>::operator()[abi:v15000]() const (this=0x7f0cbd7f3cc0) at ../contrib/llvm-project/libcxx/include/__functional/function.h:848
#20 std::__1::function<void ()>::operator()() const (this=0x7f0cbd7f3cc0) at ../contrib/llvm-project/libcxx/include/__functional/function.h:1187
#21 ThreadPoolImpl<std::__1::thread>::worker (this=0x7f0dcde3e800, thread_it=...) at ../src/Common/ThreadPool.cpp:295
#22 0x000000000c1be86e in ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, long, std::__1::optional<unsigned long>, bool)::{lambda()#2}::operator()() const (
this=0x7f0dcbcff608) at ../src/Common/ThreadPool.cpp:144
#23 std::__1::__invoke[abi:v15000]<ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, long, std::__1::optional<unsigned long>, bool)::{lambda()#2}> (__f=...)
at ../contrib/llvm-project/libcxx/include/__functional/invoke.h:394
#24 std::__1::__thread_execute[abi:v15000]<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, long, std::__1::optional<unsigned long>, bool)::{lambda()#2}>(std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, long, std::__1::optional<unsigned long>, bool)::{lambda()#2}>&, std::__1::__tuple_indices<>) (__t=...)
at ../contrib/llvm-project/libcxx/include/thread:284
#25 std::__1::__thread_proxy[abi:v15000]<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, long, std::__1::optional<unsigned long>, bool)::{lambda()#2}> >(void*) (__vp=0x7f0dcbcff600) at ../contrib/llvm-project/libcxx/include/thread:295
--Type <RET> for more, q to quit, c to continue without paging--
#26 0x00007f0dcec68802 in start_thread () from /lib64/libc.so.6
#27 0x00007f0dcec08450 in clone3 () from /lib64/libc.so.6
核心函数:
AggregatingTransform::consume
void AggregatingTransform::consume(Chunk chunk)
{
const UInt64 num_rows = chunk.getNumRows();
if (num_rows == 0 && params->params.empty_result_for_aggregation_by_empty_set)
return;
if (!is_consume_started)
{
LOG_TRACE(log, "Aggregating");
is_consume_started = true;
}
src_rows += num_rows;
src_bytes += chunk.bytes();
if (params->params.only_merge)
{
auto block = getInputs().front().getHeader().cloneWithColumns(chunk.detachColumns());
block = materializeBlock(block);
if (!params->aggregator.mergeOnBlock(block, variants, no_more_keys))
is_consume_finished = true;
}
else
{
if (!params->aggregator.executeOnBlock(chunk.detachColumns(), 0, num_rows, variants, key_columns, aggregate_columns, no_more_keys))
is_consume_finished = true;
}
}
Aggregator::executeOnBlock
bool Aggregator::executeOnBlock(Columns columns,
size_t row_begin, size_t row_end,
AggregatedDataVariants & result,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns,
bool & no_more_keys) const
{
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
/// How to perform the aggregation?
if (result.empty())
{
initDataVariantsWithSizeHint(result, method_chosen, params);
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
LOG_TRACE(log, "Aggregation method: {}", result.getMethodName());
}
/** Constant columns are not supported directly during aggregation.
* To make them work anyway, we materialize them.
*/
Columns materialized_columns;
/// Remember the columns we will work with
for (size_t i = 0; i < params.keys_size; ++i)
{
materialized_columns.push_back(recursiveRemoveSparse(columns.at(keys_positions[i]))->convertToFullColumnIfConst());
key_columns[i] = materialized_columns.back().get();
if (!result.isLowCardinality())
{
auto column_no_lc = recursiveRemoveLowCardinality(key_columns[i]->getPtr());
if (column_no_lc.get() != key_columns[i])
{
materialized_columns.emplace_back(std::move(column_no_lc));
key_columns[i] = materialized_columns.back().get();
}
}
}
NestedColumnsHolder nested_columns_holder;
AggregateFunctionInstructions aggregate_functions_instructions;
prepareAggregateInstructions(columns, aggregate_columns, materialized_columns, aggregate_functions_instructions, nested_columns_holder);
if ((params.overflow_row || result.type == AggregatedDataVariants::Type::without_key) && !result.without_key)
{
AggregateDataPtr place = result.aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(place);
result.without_key = place;
}
/// We select one of the aggregation methods and call it.
/// For the case when there are no keys (all aggregate into one row).
if (result.type == AggregatedDataVariants::Type::without_key)
{
/// TODO: Enable compilation after investigation
// #if USE_EMBEDDED_COMPILER
// if (compiled_aggregate_functions_holder)
// {
// executeWithoutKeyImpl<true>(result.without_key, row_begin, row_end, aggregate_functions_instructions.data(), result.aggregates_pool);
// }
// else
// #endif
{
executeWithoutKeyImpl<false>(result.without_key, row_begin, row_end, aggregate_functions_instructions.data(), result.aggregates_pool);
}
}
else
{
/// This is where data is written that does not fit in `max_rows_to_group_by` with `group_by_overflow_mode = any`.
AggregateDataPtr overflow_row_ptr = params.overflow_row ? result.without_key : nullptr;
executeImpl(result, row_begin, row_end, key_columns, aggregate_functions_instructions.data(), no_more_keys, overflow_row_ptr);
}
size_t result_size = result.sizeWithoutOverflowRow();
Int64 current_memory_usage = 0;
if (auto * memory_tracker_child = CurrentThread::getMemoryTracker())
if (auto * memory_tracker = memory_tracker_child->getParent())
current_memory_usage = memory_tracker->get();
/// Here all the results in the sum are taken into account, from different threads.
auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation;
bool worth_convert_to_two_level = worthConvertToTwoLevel(
params.group_by_two_level_threshold, result_size, params.group_by_two_level_threshold_bytes, result_size_bytes);
/** Converting to a two-level data structure.
* It allows you to make, in the subsequent, an effective merge - either economical from memory or parallel.
*/
if (result.isConvertibleToTwoLevel() && worth_convert_to_two_level)
result.convertToTwoLevel();
/// Checking the constraints.
if (!checkLimits(result_size, no_more_keys))
return false;
/** Flush data to disk if too much RAM is consumed.
* Data can only be flushed to disk if a two-level aggregation structure is used.
*/
if (params.max_bytes_before_external_group_by
&& result.isTwoLevel()
&& current_memory_usage > static_cast<Int64>(params.max_bytes_before_external_group_by)
&& worth_convert_to_two_level)
{
size_t size = current_memory_usage + params.min_free_disk_space;
writeToTemporaryFile(result, size);
}
return true;
}
Aggregator::prepareAggregateInstructions
void Aggregator::prepareAggregateInstructions(
Columns columns,
AggregateColumns & aggregate_columns,
Columns & materialized_columns,
AggregateFunctionInstructions & aggregate_functions_instructions,
NestedColumnsHolder & nested_columns_holder) const
{
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i].resize(params.aggregates[i].argument_names.size());
aggregate_functions_instructions.resize(params.aggregates_size + 1);
aggregate_functions_instructions[params.aggregates_size].that = nullptr;
for (size_t i = 0; i < params.aggregates_size; ++i)
{
bool allow_sparse_arguments = aggregate_columns[i].size() == 1;
bool has_sparse_arguments = false;
for (size_t j = 0; j < aggregate_columns[i].size(); ++j)
{
const auto pos = header.getPositionByName(params.aggregates[i].argument_names[j]);
materialized_columns.push_back(columns.at(pos)->convertToFullColumnIfConst());
aggregate_columns[i][j] = materialized_columns.back().get();
auto full_column = allow_sparse_arguments
? aggregate_columns[i][j]->getPtr()
: recursiveRemoveSparse(aggregate_columns[i][j]->getPtr());
full_column = recursiveRemoveLowCardinality(full_column);
if (full_column.get() != aggregate_columns[i][j])
{
materialized_columns.emplace_back(std::move(full_column));
aggregate_columns[i][j] = materialized_columns.back().get();
}
if (aggregate_columns[i][j]->isSparse())
has_sparse_arguments = true;
}
aggregate_functions_instructions[i].has_sparse_arguments = has_sparse_arguments;
aggregate_functions_instructions[i].arguments = aggregate_columns[i].data();
aggregate_functions_instructions[i].state_offset = offsets_of_aggregate_states[i];
const auto * that = aggregate_functions[i];
/// Unnest consecutive trailing -State combinators
while (const auto * func = typeid_cast<const AggregateFunctionState *>(that))
that = func->getNestedFunction().get();
aggregate_functions_instructions[i].that = that;
if (const auto * func = typeid_cast<const AggregateFunctionArray *>(that))
{
/// Unnest consecutive -State combinators before -Array
that = func->getNestedFunction().get();
while (const auto * nested_func = typeid_cast<const AggregateFunctionState *>(that))
that = nested_func->getNestedFunction().get();
auto [nested_columns, offsets] = checkAndGetNestedArrayOffset(aggregate_columns[i].data(), that->getArgumentTypes().size());
nested_columns_holder.push_back(std::move(nested_columns));
aggregate_functions_instructions[i].batch_arguments = nested_columns_holder.back().data();
aggregate_functions_instructions[i].offsets = offsets;
}
else
aggregate_functions_instructions[i].batch_arguments = aggregate_columns[i].data();
aggregate_functions_instructions[i].batch_that = that;
}
}
Aggregator::executeWithoutKeyImpl
template <bool use_compiled_functions>
void NO_INLINE Aggregator::executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t row_begin, size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const
{
if (row_begin == row_end)
return;
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
{
std::vector<ColumnData> columns_data;
for (size_t i = 0; i < aggregate_functions.size(); ++i)
{
if (!is_aggregate_function_compiled[i])
continue;
AggregateFunctionInstruction * inst = aggregate_instructions + i;
size_t arguments_size = inst->that->getArgumentTypes().size();
for (size_t argument_index = 0; argument_index < arguments_size; ++argument_index)
{
columns_data.emplace_back(getColumnData(inst->batch_arguments[argument_index]));
}
}
auto add_into_aggregate_states_function_single_place = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function_single_place;
add_into_aggregate_states_function_single_place(row_begin, row_end, columns_data.data(), res);
#if defined(MEMORY_SANITIZER)
/// We compile only functions that do not allocate some data in Arena. Only store necessary state in AggregateData place.
for (size_t aggregate_function_index = 0; aggregate_function_index < aggregate_functions.size(); ++aggregate_function_index)
{
if (!is_aggregate_function_compiled[aggregate_function_index])
continue;
auto aggregate_data_with_offset = res + offsets_of_aggregate_states[aggregate_function_index];
auto data_size = params.aggregates[aggregate_function_index].function->sizeOfData();
__msan_unpoison(aggregate_data_with_offset, data_size);
}
#endif
}
#endif
/// Adding values
for (size_t i = 0; i < aggregate_functions.size(); ++i)
{
AggregateFunctionInstruction * inst = aggregate_instructions + i;
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
if (is_aggregate_function_compiled[i])
continue;
#endif
if (inst->offsets)
inst->batch_that->addBatchSinglePlace(
inst->offsets[static_cast<ssize_t>(row_begin) - 1],
inst->offsets[row_end - 1],
res + inst->state_offset,
inst->batch_arguments,
arena);
else if (inst->has_sparse_arguments)
inst->batch_that->addBatchSparseSinglePlace(
row_begin, row_end,
res + inst->state_offset,
inst->batch_arguments,
arena);
else
inst->batch_that->addBatchSinglePlace(
row_begin, row_end,
res + inst->state_offset,
inst->batch_arguments,
arena);
}
}
AggregateFunctionSumData::addMany
/// Vectorized version
template <typename Value>
void NO_INLINE addMany(const Value * __restrict ptr, size_t start, size_t end)
{
#if USE_MULTITARGET_CODE
if (isArchSupported(TargetArch::AVX2))
{
addManyImplAVX2(ptr, start, end);
return;
}
else if (isArchSupported(TargetArch::SSE42))
{
addManyImplSSE42(ptr, start, end);
return;
}
#endif
addManyImpl(ptr, start, end);
}
/// Vectorized version
MULTITARGET_FUNCTION_AVX2_SSE42(
MULTITARGET_FUNCTION_HEADER(
template <typename Value>
void NO_SANITIZE_UNDEFINED NO_INLINE
), addManyImpl, MULTITARGET_FUNCTION_BODY((const Value * __restrict ptr, size_t start, size_t end) /// NOLINT
{
ptr += start;
size_t count = end - start;
const auto * end_ptr = ptr + count;
if constexpr (std::is_floating_point_v<T>)
{
/// Compiler cannot unroll this loop, do it manually.
/// (at least for floats, most likely due to the lack of -fassociative-math)
/// Something around the number of SSE registers * the number of elements fit in register.
constexpr size_t unroll_count = 128 / sizeof(T);
T partial_sums[unroll_count]{};
const auto * unrolled_end = ptr + (count / unroll_count * unroll_count);
while (ptr < unrolled_end)
{
for (size_t i = 0; i < unroll_count; ++i)
Impl::add(partial_sums[i], ptr[i]);
ptr += unroll_count;
}
for (size_t i = 0; i < unroll_count; ++i)
Impl::add(sum, partial_sums[i]);
}
/// clang cannot vectorize the loop if accumulator is class member instead of local variable.
T local_sum{};
while (ptr < end_ptr)
{
Impl::add(local_sum, *ptr);
++ptr;
}
Impl::add(sum, local_sum);
})
)
template <typename T>
struct AggregateFunctionSumAddOverflowImpl
{
static void NO_SANITIZE_UNDEFINED ALWAYS_INLINE add(T & lhs, const T & rhs)
{
lhs += rhs;
}
};
相关文章
- L#脚本语言,直接把DLL当脚本执行(图解说明)
- Python 执行 shell命令 的几个方法小结
- C#执行异步操作的几种方式比较和总结
- js基础 js自执行函数、调用递归函数、圆括号运算符、函数声明的提升 js 布尔值 ASP.NET MVC中设置跨域
- 立即执行函数
- JS高级--函数进阶(原型、调用方法、this、bind、严格模式、闭包、垃圾回收、递归深拷贝、匿名函数、回调函数、立即执行函数)
- Sampler:Shell命令执行可视化和告警工具
- js回调函数:js先执行完一个函数后再执行下面的逻辑或者方法
- Hadoop入门程序WordCount的执行过程
- 学习总结【匿名函数,匿名自执行函数】
- JIT与JVM的三种执行模式:解释模式、编译模式、混合模式
- 《敏捷可执行需求说明 Scrum提炼及实现技术》—— 3.4 关注干系人的“愿求”
- Vue3实践指南:使用reactive函数声明数组如何正确赋值响应式、script setup语法糖中toRefs如何优雅呈现、Options API 与 Composition API 如何选择及混用是否对性能有影响、关于 setup 中没有 this 的问题及 setup 的执行时机
- 浅析Vue3相关基础知识点:setup()入口函数、ref()定义响应式数据、reactive()定义多个响应式数据-深层的、toRefs()转换为每个属性都是一个ref、computed()计算属性、watch()监听数据、watchEffect()监听数据变化执行回调、生命周期对比、provide/inject跨层级组件通信
- 【Unity3D小技巧】Unity3D中判断Animation以及Animator动画播放结束,以及动画播放结束之后执行函数
- Mysql学习---视图/触发器/存储过程/函数/执行计划/sql优化 180101
- kubectl get pod 状态为Completed 原因为 command: 或args 参数错误无法正常执行
- 正尝试在 OS 加载程序锁内执行托管代码。不要尝试在 DllMain 或映像初始化函数内运行托管代码,这样做会导致应用程序挂起。