Py学习  »  Elasticsearch

是否有人能够使用elasticsearch xpack sql和Spark?

codeBarer • 4 年前 • 424 次点击  

使用PySpark我试图从elasticsearch读取数据。通常,我会将查询设置为行上的某个内容(请参见下面的查询),并将es.resource设置为索引,例如“my_index/doc”,这样我就可以将数据读入spark:

q ="""{
          "query": {
              "match_all": {}
          }  
      }"""

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: 
org.elasticsearch.hadoop.rest.EsHadoopRemoteException: 
invalid_index_name_exception: Invalid index name [_xpack], must not start with '_'.
null

有没有人试过使用xpack或者知道如何从Elasticsearch hadoop插件执行Elasticsearch SQL查询?

q = """{"query": "select * from eg_flight limit 1"}"""

es_read_conf = {
    "es.nodes" : "192.168.1.71,192.168.1.72,192.168.1.73",
    "es.port" : "9200",
    "es.resource" :  "_xpack/sql",
    "es.query" : q
}

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/53808
 
424 次点击  
文章 [ 1 ]  |  最新文章 4 年前
codeBarer
Reply   •   1 楼
codeBarer    5 年前

我认为这个功能不受支持。PySpark中的另一个解决方案是使用JDBC驱动程序,我确实尝试过。我尝试了以下方法:

es_df = spark.read.jdbc(url="jdbc:es://http://192.168.1.71:9200", table = "(select * from eg_flight) mytable")

Py4JJavaError: An error occurred while calling o2488.jdbc.
: java.sql.SQLFeatureNotSupportedException: Found 1 problem(s)
line 1:8: Unexecutable item

...

另一种方法是使用核心Python和请求,但我不建议对大型数据集使用它。

import requests as r
import json


es_template = {
    "query": "select * from eg_flight"
}

es_link = "http://192.168.1.71:9200/_xpack/sql"
headers = {'Content-type': 'application/json'}


if __name__ == "__main__":

    load = r.post(es_link, data=json.dumps(es_template), headers=headers)
    if load.status_code == 200:
        load = load.json()
        #do something with it