zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

kafka之七 sinkTask详解大数据

Kafka数据 详解 之七
2023-06-13 09:20:26 时间

使用kafka connector 功能实现一个数据从kafka到MySQL的sinkTask

一:实现JdbcSinkConnector类

 

public class JdbcSinkConnector extends SinkConnector{ 

 private String url; 

 private String driven; 

 private String userName; 

 private String passwd; 

 public void start(Map String, String props) { 

 this.url = PropertyVerify.getOrElse(props, Constant_Global.URL, "jdbc:mysql://localhost/test", "URL is null"); 

 this.driven = PropertyVerify.getOrElse(props, Constant_Global.DRIVEN, "com.mysql.jdbc.Driver", "DRIVEN is null"); 

 this.userName = PropertyVerify.getOrElse(props, Constant_Global.USERNAME, "root", "USERNAME is null"); 

 this.passwd = PropertyVerify.getOrElse(props, Constant_Global.PASSED, "root", "PASSED is null"); 

 public Class ? extends Task taskClass() { 

 return JdbcSinkTask.class; 

 public List Map String, String taskConfigs(int maxTasks) { 

 ArrayList Map String, String configs = new ArrayList (); 

 for(int i=0;i maxTasks;i++){ 

 Map String,String conf = new HashMap String,String 

 conf.put("url", url); 

 conf.put("driven", driven); 

 conf.put("userName", userName); 

 conf.put("passwd", passwd); 

 configs.add(conf); 

 return configs; 

 @Override 

 public String version() { 

 return AppInfoParser.getVersion(); 

 @Override 

 public void stop() { 

 // TODO Auto-generated method stub 

}

二:实现JdbcSinkConnector类

 

public class JdbcSinkTask extends SinkTask{ 

 private static final Logger LOG = LoggerFactory.getLogger(JdbcSinkTask.class); 

 //private Connection conn = null; 

 public String shcema; 

 private JdbcDbWriter writer; 

 @Override 

 public String version() { 

 return new JdbcSinkConnector().version(); 

 @Override 

 public void flush(Map TopicPartition, OffsetAndMetadata map) { 

 LOG.info("================flush Map start................==========================================================="); 

 @Override 

 public void put(Collection SinkRecord sinkRecords) { 

 if(sinkRecords.isEmpty()){ 

 return; 

 try { 

 writer.write(sinkRecords,shcema,email); 

 } catch (SQLException | IOException e) { 

 try { 

 EmailUtil.init(Constant_Global.STMP, Constant_Global.EMAILUSER, Constant_Global.EMAILPASSWD, Constant_Global.EMAILTITAL, Constant_Global.EMAILADREE, email); 

 EmailUtil.send(" kafka sink 数据写入有问题 "); 

 } catch (MessagingException e1) { 

 e1.printStackTrace(); 

 throw new JDBCConntorException("数据写入有问题"); 

 @Override 

 public void start(Map String, String pro) { 

 try { 

 DbPool.init(pro); 

 writer =new JdbcDbWriter(); 

 } catch (PropertyVetoException e1) { 

 e1.printStackTrace(); 

 LOG.info("数据库配置异常====="); 

 @Override 

 public void stop() { 

}

三 :打包运行

  3.1 单机版运行,配置文件在kafka/config目录下

  a: cp   connect-file-sink.properties  connect-jdbc-sink.properties

  b: vim connect-jdbc-sink.properties  配置如下

 

 # kafka connector properties 

 name=canal-sink-connector #定义task名称 

 connector. >

  c: 启动命令

 
./connect-standalone.sh   ../config/connect-standalone.properties  ../config/connect-jdbc-sink.properties

  3.2 单机版用于测试,生产环境建议使用分布式

            a: 配置文件 vim  jdbc-sink-distributed.properties 

bootstrap.servers=node1:6667,node2:6667,node3:6667 

group.id=test-consumer-group 

key.converter=org.apache.kafka.connect.storage.StringConverter 

value.converter=org.apache.kafka.connect.json.JsonConverter 

key.converter.schemas.enable=false 

value.converter.schemas.enable=false 

internal.key.converter=org.apache.kafka.connect.json.JsonConverter 

internal.value.converter=org.apache.kafka.connect.json.JsonConverter 

internal.key.converter.schemas.enable=false 

internal.value.converter.schemas.enable=false 

offset.storage.topic=connect-offsets 

offset.flush.interval.ms=10000 

config.storage.topic=configs-topic 

status.storage.topic=connect-status

  3.2 启动命令 用rest接口启动

 

curl -X POST /connectors HTTP/1.1