社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Elasticsearch

是否有人能够使用elasticsearch xpack sql和Spark?

codeBarer • 4 年前 • 442 次点击  

使用PySpark我试图从elasticsearch读取数据。通常,我会将查询设置为行上的某个内容(请参见下面的查询),并将es.resource设置为索引,例如“my_index/doc”,这样我就可以将数据读入spark:

q ="""{
          "query": {
              "match_all": {}
          }  
      }"""

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: 
org.elasticsearch.hadoop.rest.EsHadoopRemoteException: 
invalid_index_name_exception: Invalid index name [_xpack], must not start with '_'.
null

有没有人试过使用xpack或者知道如何从Elasticsearch hadoop插件执行Elasticsearch SQL查询?

q = """{"query": "select * from eg_flight limit 1"}"""

es_read_conf = {
    "es.nodes" : "192.168.1.71,192.168.1.72,192.168.1.73",
    "es.port" : "9200",
    "es.resource" :  "_xpack/sql",
    "es.query" : q
}

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/53808
 
442 次点击  
文章 [ 1 ]  |  最新文章 4 年前
codeBarer
Reply   •   1 楼
codeBarer    5 年前

我认为这个功能不受支持。PySpark中的另一个解决方案是使用JDBC驱动程序,我确实尝试过。我尝试了以下方法:

es_df = spark.read.jdbc(url="jdbc:es://http://192.168.1.71:9200", table = "(select * from eg_flight) mytable")

Py4JJavaError: An error occurred while calling o2488.jdbc.
: java.sql.SQLFeatureNotSupportedException: Found 1 problem(s)
line 1:8: Unexecutable item

...

另一种方法是使用核心Python和请求,但我不建议对大型数据集使用它。

import requests as r
import json


es_template = {
    "query": "select * from eg_flight"
}

es_link = "http://192.168.1.71:9200/_xpack/sql"
headers = {'Content-type': 'application/json'}


if __name__ == "__main__":

    load = r.post(es_link, data=json.dumps(es_template), headers=headers)
    if load.status_code == 200:
        load = load.json()
        #do something with it