zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

Datax 往 hdfs 写数据配置 HA 高可用

配置数据HDFS 可用 HA datax
2023-09-14 09:14:50 时间

Datax 往 hdfs 写数据配置 HA 高可用

配置脚本

vim gen_import_config_ha.py

内容 :

# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb

#MySQL相关配置,需根据实际情况作出修改
mysql_host = "cpu102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "xxxxx"

#HDFS HA 相关配置,需根据实际情况作出修改
my_nameservices = "mycluster"
my_namenodes_1 = "nn1"
my_namenodes_2 = "nn2"
my_rpc_address_1 = "cpu101:8020"
my_rpc_address_2 = "cpu102:8020"

#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/import"

#获取mysql连接
def get_connection():
    return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)

#获取表格的元数据  包含列名和数据类型
def get_mysql_meta(database, table):
    connection = get_connection()
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall

#获取mysql表的列名
def get_mysql_columns(database, table):
    return map(lambda x: x[0], get_mysql_meta(database, table))

#将获取的元数据中 mysql 的数据类型转换为 hive 的数据类型  写入到 hdfswriter 中
def get_hive_columns(database, table):
    def type_mapping(mysql_type):
        mappings = {
            "bigint": "bigint",
            "int": "bigint",
            "smallint": "bigint",
            "tinyint": "bigint",
            "decimal": "string",
            "double": "double",
            "float": "float",
            "binary": "string",
            "char": "string",
            "varchar": "string",
            "datetime": "string",
            "time": "string",
            "timestamp": "string",
            "date": "string",
            "text": "string"
        }
        return mappings[mysql_type]

    meta = get_mysql_meta(database, table)
    return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)

#生成json文件
def generate_json(source_database, source_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": mysql_user,
                        "password": mysql_passwd,
                        "column": get_mysql_columns(source_database, source_table),
                        "splitPk": "",
                        "connection": [{
                            "table": [source_table],
                            "jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
                        }]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS":"hdfs://" + my_nameservices,
						"hadoopConfig":{
						  "dfs.nameservices": my_nameservices,
						  "dfs.ha.namenodes." + my_nameservices: my_namenodes_1 + "," + my_namenodes_2,
						  "dfs.namenode.rpc-address." + my_nameservices + "." + my_namenodes_1 : my_rpc_address_1,
						  "dfs.namenode.rpc-address." + my_nameservices + "." + my_namenodes_2: my_rpc_address_2,
						  "dfs.client.failover.proxy.provider." + my_nameservices: "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
						},
                        "fileType": "text",
                        "path": "${targetdir}",
                        "fileName": source_table,
                        "column": get_hive_columns(source_database, source_table),
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress": "gzip"
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
        json.dump(job, f)


def main(args):
    source_database = ""
    source_table = ""

    options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-d', '--sourcedb'):
            source_database = opt_value
        if opt_name in ('-t', '--sourcetbl'):
            source_table = opt_value

    generate_json(source_database, source_table)


if __name__ == '__main__':
    main(sys.argv[1:])

权限 :

chmod 777 gen_import_config_ha.py

配置全表生成脚本

vim gen_import_config_ha.sh
#!/bin/bash

python ~/bin/gen_import_config_ha.py -d gmall -t activity_info
python ~/bin/gen_import_config_ha.py -d gmall -t activity_rule
python ~/bin/gen_import_config_ha.py -d gmall -t base_category1
python ~/bin/gen_import_config_ha.py -d gmall -t base_category2
python ~/bin/gen_import_config_ha.py -d gmall -t base_category3
python ~/bin/gen_import_config_ha.py -d gmall -t base_dic
python ~/bin/gen_import_config_ha.py -d gmall -t base_province
python ~/bin/gen_import_config_ha.py -d gmall -t base_region
python ~/bin/gen_import_config_ha.py -d gmall -t base_trademark
python ~/bin/gen_import_config_ha.py -d gmall -t cart_info
python ~/bin/gen_import_config_ha.py -d gmall -t coupon_info
python ~/bin/gen_import_config_ha.py -d gmall -t sku_attr_value
python ~/bin/gen_import_config_ha.py -d gmall -t sku_info
python ~/bin/gen_import_config_ha.py -d gmall -t sku_sale_attr_value
python ~/bin/gen_import_config_ha.py -d gmall -t spu_info

在这里插入图片描述

权限

chmod 777 gen_import_config_ha.sh

在这里插入图片描述