社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Elasticsearch

Apache Flink(v1.6.0)验证ElasticSearch接收器(v6.4)

mmkd • 4 年前 • 1476 次点击  

我使用的是ApacheFlink v1.6.0版,我正试图写入ElasticSearchv6.4.0,它托管在 Elastic Cloud . 在对弹性云集群进行身份验证时,我遇到了一些问题。

我已经能够让Flink写入本地ElasticSearchV6.4.0节点,该节点没有使用以下代码进行加密:

/*
    Elasticsearch Configuration
*/
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));

// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder<ObjectNode> esSinkBuilder = new ElasticsearchSink.Builder<>(
        httpHosts,
        new ElasticsearchSinkFunction<ObjectNode>() {
            private IndexRequest createIndexRequest(ObjectNode payload) {

                // remove the value node so the fields are at the base of the json payload
                JsonNode jsonOutput = payload.get("value");

                return Requests.indexRequest()
                        .index("raw-payload")
                        .type("payload")
                        .source(jsonOutput.toString(), XContentType.JSON);
            }

            @Override
            public void process(ObjectNode payload, RuntimeContext ctx, RequestIndexer indexer) {
                indexer.add(createIndexRequest(payload));
            }
        }
);

// set number of events to be seen before writing to Elasticsearch
esSinkBuilder.setBulkFlushMaxActions(1);

// finally, build and add the sink to the job's pipeline
stream.addSink(esSinkBuilder.build());

但是,当我尝试将身份验证添加到代码库中时,如文档所示 here 在Flink文档中 here 关于相应的弹性搜索Java文档。看起来是这样的:

// provide a RestClientFactory for custom configuration on the internally created REST client
Header[] defaultHeaders = new Header[]{new BasicHeader("username", "password")};
esSinkBuilder.setRestClientFactory(
        restClientBuilder -> {
            restClientBuilder.setDefaultHeaders(defaultHeaders);
        }
);

执行作业时出现以下错误:

14:49:54,700 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka RPC service.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: org.elasticsearch.ElasticsearchStatusException: method [HEAD], host [https://XXXXXXXXXXXXXX.europe-west1.gcp.cloud.es.io:9243], URI [/], status line [HTTP/1.1 401 Unauthorized]
    at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
    at com.downuk.AverageStockSalePrice.main(AverageStockSalePrice.java:146)
Caused by: org.elasticsearch.ElasticsearchStatusException: method [HEAD], host [https://XXXXXXXXXXXXXX.europe-west1.gcp.cloud.es.io:9243], URI [/], status line [HTTP/1.1 401 Unauthorized]
    at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:625)

有人能帮我指出哪里出了问题吗?

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/37960
 
1476 次点击  
文章 [ 2 ]  |  最新文章 4 年前
TT. Mansoor
Reply   •   1 楼
TT. Mansoor    5 年前
override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
        // TODO Additional rest client args go here - authentication headers for secure connections etc...
      }
    })

我希望这能对你有所帮助。

mmkd
Reply   •   2 楼
mmkd    5 年前

我看了一个打火的例子,就知道了。 here 以及ElasticSearch文档 here .

结果我试图在上面设置错误的配置:

restClientBuilder.setDefaultHeaders(...);

不是实际需要的设置:

restClientBuilder.setHttpClientConfigCallback(...);

一旦您使用了正确的自定义配置,剩下的就非常简单了。所以我错过的部分是:

// provide a RestClientFactory for custom configuration on the internally created REST client
esSinkBuilder.setRestClientFactory(
    restClientBuilder -> {
        restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {

                // elasticsearch username and password
                CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("$USERNAME", "$PASSWORD"));

                return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            }
        });
    }
);

最后是一个完整的ElasticSearch接收器的片段:

/*
    Elasticsearch Configuration
*/
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));

// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder<ObjectNode> esSinkBuilder = new ElasticsearchSink.Builder<>(
        httpHosts,
        new ElasticsearchSinkFunction<ObjectNode>() {
            private IndexRequest createIndexRequest(ObjectNode payload) {

                // remove the value node so the fields are at the base of the json payload
                JsonNode jsonOutput = payload.get("value");

                return Requests.indexRequest()
                        .index("raw-payload")
                        .type("payload")
                        .source(jsonOutput.toString(), XContentType.JSON);
            }

            @Override
            public void process(ObjectNode payload, RuntimeContext ctx, RequestIndexer indexer) {
                indexer.add(createIndexRequest(payload));
            }
        }
);

// set number of events to be seen before writing to Elasticsearch
esSinkBuilder.setBulkFlushMaxActions(1);

// provide a RestClientFactory for custom configuration on the internally created REST client
esSinkBuilder.setRestClientFactory(
    restClientBuilder -> {
        restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {

                // elasticsearch username and password
                CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("$USERNAME", "$PASSWORD"));

                return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            }
        });
    }
);

// finally, build and add the sink to the job's pipeline
stream.addSink(esSinkBuilder.build());

我希望这能帮助其他被困在同一个地方的人!