zl程序教程

您现在的位置是:首页 >  其他

当前栏目

Impala 从 Fragment -> DataSink -> RowBatch 粗讲

2023-04-18 16:25:18 时间

Impala

Impala 如果是Fragment 一步一步 Prepare -> Open -> Read -> Close

Fragment -> DataSink --> RowBatch

PlanNode

Status PlanNode::CreatePlanNode(ObjectPool * pool, const TPlanNode & tnode, PlanNode ** node)
{
    switch (tnode.node_type)
    {
        case TPlanNodeType::HDFS_SCAN_NODE:
            *node = pool->Add(new HdfsScanPlanNode());
            break;
        case TPlanNodeType::HBASE_SCAN_NODE:
        case TPlanNodeType::DATA_SOURCE_NODE:
        case TPlanNodeType::KUDU_SCAN_NODE:
            *node = pool->Add(new ScanPlanNode());
            break;
        case TPlanNodeType::AGGREGATION_NODE:
            *node = pool->Add(new AggregationPlanNode());
            break;
        case TPlanNodeType::HASH_JOIN_NODE:
            *node = pool->Add(new PartitionedHashJoinPlanNode());
            break;
        case TPlanNodeType::NESTED_LOOP_JOIN_NODE:
            *node = pool->Add(new NestedLoopJoinPlanNode());
            break;
        case TPlanNodeType::EMPTY_SET_NODE:
            *node = pool->Add(new EmptySetPlanNode());
            break;
        case TPlanNodeType::EXCHANGE_NODE:
            *node = pool->Add(new ExchangePlanNode());
            break;
        case TPlanNodeType::SELECT_NODE:
            *node = pool->Add(new SelectPlanNode());
            break;
        case TPlanNodeType::SORT_NODE:
            if (tnode.sort_node.type == TSortType::PARTIAL)
            {
                *node = pool->Add(new PartialSortPlanNode());
            }
            else if (tnode.sort_node.type == TSortType::TOPN || tnode.sort_node.type == TSortType::PARTITIONED_TOPN)
            {
                *node = pool->Add(new TopNPlanNode());
            }
            else
            {
                DCHECK(tnode.sort_node.type == TSortType::TOTAL);
                *node = pool->Add(new SortPlanNode());
            }
            break;
        case TPlanNodeType::UNION_NODE:
            *node = pool->Add(new UnionPlanNode());
            break;
        case TPlanNodeType::ANALYTIC_EVAL_NODE:
            *node = pool->Add(new AnalyticEvalPlanNode());
            break;
        case TPlanNodeType::SINGULAR_ROW_SRC_NODE:
            *node = pool->Add(new SingularRowSrcPlanNode());
            break;
        case TPlanNodeType::SUBPLAN_NODE:
            *node = pool->Add(new SubplanPlanNode());
            break;
        case TPlanNodeType::UNNEST_NODE:
            *node = pool->Add(new UnnestPlanNode());
            break;
        case TPlanNodeType::CARDINALITY_CHECK_NODE:
            *node = pool->Add(new CardinalityCheckPlanNode());
            break;
        default:
            map<int, const char *>::const_iterator i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
            const char * str = "unknown node type";
            if (i != _TPlanNodeType_VALUES_TO_NAMES.end())
            {
                str = i->second;
            }
            stringstream error_msg;
            error_msg << str << " not implemented";
            return Status(error_msg.str());
    }
    return Status::OK();
}

DataSinkConfig 创建

Status DataSinkConfig::CreateConfig(
    const TDataSink & thrift_sink, const RowDescriptor * row_desc, FragmentState * state, DataSinkConfig ** data_sink)
{
    ObjectPool * pool = state->obj_pool();
    *data_sink = nullptr;
    switch (thrift_sink.type)
    {
        case TDataSinkType::DATA_STREAM_SINK:
            if (!thrift_sink.__isset.stream_sink)
                return Status("Missing data stream sink.");
            // TODO: figure out good buffer size based on size of output row
            *data_sink = pool->Add(new KrpcDataStreamSenderConfig());
            break;
        case TDataSinkType::TABLE_SINK:
            if (!thrift_sink.__isset.table_sink)
                return Status("Missing table sink.");
            switch (thrift_sink.table_sink.type)
            {
                case TTableSinkType::HDFS:
                    *data_sink = pool->Add(new HdfsTableSinkConfig());
                    break;
                case TTableSinkType::KUDU:
                    RETURN_IF_ERROR(CheckKuduAvailability());
                    *data_sink = pool->Add(new KuduTableSinkConfig());
                    break;
                case TTableSinkType::HBASE:
                    *data_sink = pool->Add(new HBaseTableSinkConfig());
                    break;
                default:
                    stringstream error_msg;
                    map<int, const char *>::const_iterator i = _TTableSinkType_VALUES_TO_NAMES.find(thrift_sink.table_sink.type);
                    const char * str = i != _TTableSinkType_VALUES_TO_NAMES.end() ? i->second : "Unknown table sink";
                    error_msg << str << " not implemented.";
                    return Status(error_msg.str());
            }
            break;
        case TDataSinkType::PLAN_ROOT_SINK:
            *data_sink = pool->Add(new PlanRootSinkConfig());
            break;
        case TDataSinkType::HASH_JOIN_BUILDER: {
            *data_sink = pool->Add(new PhjBuilderConfig());
            break;
        }
        case TDataSinkType::NESTED_LOOP_JOIN_BUILDER: {
            *data_sink = pool->Add(new NljBuilderConfig());
            break;
        }
        default:
            stringstream error_msg;
            map<int, const char *>::const_iterator i = _TDataSinkType_VALUES_TO_NAMES.find(thrift_sink.type);
            const char * str = i != _TDataSinkType_VALUES_TO_NAMES.end() ? i->second : "Unknown data sink type ";
            error_msg << str << " not implemented.";
            return Status(error_msg.str());
    }
    RETURN_IF_ERROR((*data_sink)->Init(thrift_sink, row_desc, state));
    return Status::OK();
}

ScanNode

Status ScanPlanNode::CreateExecNode(RuntimeState * state, ExecNode ** node) const
{
    ObjectPool * pool = state->obj_pool();
    switch (tnode_->node_type)
    {
        case TPlanNodeType::HBASE_SCAN_NODE:
            *node = pool->Add(new HBaseScanNode(pool, *this, state->desc_tbl()));
            break;
        case TPlanNodeType::DATA_SOURCE_NODE:
            *node = pool->Add(new DataSourceScanNode(pool, *this, state->desc_tbl()));
            break;
        case TPlanNodeType::KUDU_SCAN_NODE:
            if (tnode_->kudu_scan_node.use_mt_scan_node)
            {
                DCHECK_GT(state->query_options().mt_dop, 0);
                *node = pool->Add(new KuduScanNodeMt(pool, *this, state->desc_tbl()));
            }
            else
            {
                DCHECK(state->query_options().mt_dop == 0 || state->query_options().num_scanner_threads == 1);
                *node = pool->Add(new KuduScanNode(pool, *this, state->desc_tbl()));
            }
            break;
        default:
            DCHECK(false) << "Unexpected scan node type: " << tnode_->node_type;
    }
    return Status::OK();
}