【异常】Flink整合ES出错,The implementation of the provided ElasticsearchSinkFunction is not serializable.
2023-09-14 09:14:14 时间
一、异常内容
Exception in thread "main" java.lang.IllegalArgumentException: The implementation of the provided ElasticsearchSinkFunction is not serializable. The object probably contains or references non-serializable fields.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:142)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.<init>(ElasticsearchSinkBase.java:215)
at org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink.<init>(ElasticsearchSink.java:72)
at org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink.<init>(ElasticsearchSink.java:61)
at org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink$Builder.build(ElasticsearchSink.java:209)
at com.desaysv.analyze.consumer.DataStreamSinkServiceImpl.startDataStreamSink(DataStreamSinkServiceImpl.java:64)
at com.desaysv.analyze.LuckyLogAnalyzeApplication.main(LuckyLogAnalyzeApplication.java:24)
二、错误说明
flink输出(sink)到elasticsearch,ElasticsearchSinkFunction无法序列化问题
原因是由于该类传入了外部参数,导致无法序列化。
三、错误解决
自定义一个类去实现ElasticsearchSinkFunction这个类,可以参考以下代码
public class MyElasticSearchSinkFunction implements ElasticsearchSinkFunction<String>, Serializable {
private String index;
MyElasticSearchSinkFunction(String index) {
this.index = index;
}
public IndexRequest createIndexRequest(String element) {
JSONObject jsonObject = new JSONObject();
jsonObject.put(columns[i],element);
return Requests.indexRequest()
.index(index)
.type(type)
.source(jsonObject, XContentType.JSON);
}
@Override
public void process(String element, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
System.out.println(element);
requestIndexer.add(createIndexRequest(element));
}
}
然后用构造函数去接收参数,这样在elasticsearchSink就可以直接使用了
ElasticsearchSink.Builder<String> elasticSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,new MyElasticSearchSinkFunction(xxxx.getIndex(),
xxxx.getType(),xxxx.getColumns)
);
相关文章
- es 其实是不是就是数据库_初识ES数据库「建议收藏」
- 每天3分钟,重学ES6-ES12(十八)ES Module
- ES 聚合查询
- es 在数据量很大的情况下(数十亿级别)如何提高查询效率
- ES系列三、基本知识准备
- ES系列五、ES6.3常用api之搜索类api
- 【ES三周年】基于ELK的日志分析服务
- 【ES三周年】搜索在计算机中的地位十分重要
- IOS – OpenGL ES 调节图像对比度 GPUImageContrastFilter
- IOS – OpenGL ES 绘制十字 GPUImageCrosshairGenerator
- IOS – OpenGL ES 设置图像反遮罩锐化 GPUImageUnsharpMaskFilter
- IOS – OpenGL ES 图像凹面镜移动效果 GPUImagePinchDistortionFilter
- 【ES三周年】Win10安装ElasticSearch笔记
- 【ES三周年】linux-centos7安装elasticsearch-head插件
- 【ES三周年】elasticsearch 常用数据类型详解和范例
- ES与MySQL的完美同步解决方案(es同步mysql)
- 使用ES连接Oracle一种简单的解决方案(es连接oracle)
- ES来打破传统从Oracle走向 NoSQL(es替代oracle)
- 借助ES技术可靠地迁移数据至Oracle(es 数据到oracle)
- 使用ES导入Oracle数据库一招制胜(es 导入 Oracle)
- 数据存储看您选择ES还是Redis(数据存es还是redis)
- 比较ES和Redis的数据写入功能(写入es和写入redis)
- Oracle与ES协同同步实现最佳性能(oracle与es同步)