zl程序教程

您现在的位置是:首页 >  工具

当前栏目

【异常】Flink整合ES出错,The implementation of the provided ElasticsearchSinkFunction is not serializable.

ESflink异常 The not of is 整合
2023-09-14 09:14:14 时间

一、异常内容

Exception in thread "main" java.lang.IllegalArgumentException: The implementation of the provided ElasticsearchSinkFunction is not serializable. The object probably contains or references non-serializable fields.
	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:142)
	at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.<init>(ElasticsearchSinkBase.java:215)
	at org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink.<init>(ElasticsearchSink.java:72)
	at org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink.<init>(ElasticsearchSink.java:61)
	at org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink$Builder.build(ElasticsearchSink.java:209)
	at com.desaysv.analyze.consumer.DataStreamSinkServiceImpl.startDataStreamSink(DataStreamSinkServiceImpl.java:64)
	at com.desaysv.analyze.LuckyLogAnalyzeApplication.main(LuckyLogAnalyzeApplication.java:24)

二、错误说明

flink输出(sink)到elasticsearch,ElasticsearchSinkFunction无法序列化问题
原因是由于该类传入了外部参数,导致无法序列化。

三、错误解决

自定义一个类去实现ElasticsearchSinkFunction这个类,可以参考以下代码

public class MyElasticSearchSinkFunction implements ElasticsearchSinkFunction<String>, Serializable {
    private String index;

    MyElasticSearchSinkFunction(String index) {
        this.index = index;
    }
    public IndexRequest createIndexRequest(String element) {
        JSONObject jsonObject = new JSONObject(); 
        jsonObject.put(columns[i],element);  
        return Requests.indexRequest()
                .index(index)
                .type(type)
                .source(jsonObject, XContentType.JSON);
    }

    @Override
    public void process(String element, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
        System.out.println(element);
        requestIndexer.add(createIndexRequest(element));
    }
}

然后用构造函数去接收参数,这样在elasticsearchSink就可以直接使用了

ElasticsearchSink.Builder<String> elasticSinkBuilder = new ElasticsearchSink.Builder<>(
        httpHosts,new MyElasticSearchSinkFunction(xxxx.getIndex(),
        xxxx.getType(),xxxx.getColumns)
);