kafka之七 sinkTask详解大数据
2023-06-13 09:20:26 时间
使用kafka connector 功能实现一个数据从kafka到MySQL的sinkTask
一:实现JdbcSinkConnector类
public class JdbcSinkConnector extends SinkConnector{ private String url; private String driven; private String userName; private String passwd; public void start(Map String, String props) { this.url = PropertyVerify.getOrElse(props, Constant_Global.URL, "jdbc:mysql://localhost/test", "URL is null"); this.driven = PropertyVerify.getOrElse(props, Constant_Global.DRIVEN, "com.mysql.jdbc.Driver", "DRIVEN is null"); this.userName = PropertyVerify.getOrElse(props, Constant_Global.USERNAME, "root", "USERNAME is null"); this.passwd = PropertyVerify.getOrElse(props, Constant_Global.PASSED, "root", "PASSED is null"); public Class ? extends Task taskClass() { return JdbcSinkTask.class; public List Map String, String taskConfigs(int maxTasks) { ArrayList Map String, String configs = new ArrayList (); for(int i=0;i maxTasks;i++){ Map String,String conf = new HashMap String,String conf.put("url", url); conf.put("driven", driven); conf.put("userName", userName); conf.put("passwd", passwd); configs.add(conf); return configs; @Override public String version() { return AppInfoParser.getVersion(); @Override public void stop() { // TODO Auto-generated method stub }
二:实现JdbcSinkConnector类
public class JdbcSinkTask extends SinkTask{ private static final Logger LOG = LoggerFactory.getLogger(JdbcSinkTask.class); //private Connection conn = null; public String shcema; private JdbcDbWriter writer; @Override public String version() { return new JdbcSinkConnector().version(); @Override public void flush(Map TopicPartition, OffsetAndMetadata map) { LOG.info("================flush Map start................==========================================================="); @Override public void put(Collection SinkRecord sinkRecords) { if(sinkRecords.isEmpty()){ return; try { writer.write(sinkRecords,shcema,email); } catch (SQLException | IOException e) { try { EmailUtil.init(Constant_Global.STMP, Constant_Global.EMAILUSER, Constant_Global.EMAILPASSWD, Constant_Global.EMAILTITAL, Constant_Global.EMAILADREE, email); EmailUtil.send(" kafka sink 数据写入有问题 "); } catch (MessagingException e1) { e1.printStackTrace(); throw new JDBCConntorException("数据写入有问题"); @Override public void start(Map String, String pro) { try { DbPool.init(pro); writer =new JdbcDbWriter(); } catch (PropertyVetoException e1) { e1.printStackTrace(); LOG.info("数据库配置异常====="); @Override public void stop() { }
三 :打包运行
3.1 单机版运行,配置文件在kafka/config目录下
a: cp connect-file-sink.properties connect-jdbc-sink.properties
b: vim connect-jdbc-sink.properties 配置如下
# kafka connector properties name=canal-sink-connector #定义task名称 connector. >c: 启动命令
./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-jdbc-sink.properties3.2 单机版用于测试,生产环境建议使用分布式
a: 配置文件 vim jdbc-sink-distributed.properties
bootstrap.servers=node1:6667,node2:6667,node3:6667 group.id=test-consumer-group key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.topic=connect-offsets offset.flush.interval.ms=10000 config.storage.topic=configs-topic status.storage.topic=connect-status3.2 启动命令 用rest接口启动
curl -X POST /connectors HTTP/1.1