社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Elasticsearch

ElasticSearch中是否有批量部分更新?

Mustafa Çağatay Kızıltan • 4 年前 • 530 次点击  

我有一个50个属性的模型用于ElasticSearch,我正在将数据传输到ElasticSearch。但是,我的ElasticSearch别名中最多有150000个文档,我希望通过批量部分更新来更新这些文档的3个属性。我知道批量更新和部分更新是分开的,但是在弹性搜索中有部分批量更新吗?

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/37963
 
530 次点击  
文章 [ 1 ]  |  最新文章 4 年前
Russ Cam
Reply   •   1 楼
Russ Cam    5 年前

您可以使用批量API发送部分更新。下面是一个例子

private static void Main()
{
    var defaultIndex = "documents";
    var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));

    var settings = new ConnectionSettings(pool)
        .DefaultIndex(defaultIndex);

    var client = new ElasticClient(settings);

    if (client.IndexExists(defaultIndex).Exists)
        client.DeleteIndex(defaultIndex);

    var docs = Enumerable.Range(1, 10).Select(i => new MyDocument(i) 
        {
            Message = $"message {i}"
        });

    // bulk index the documents   
    var bulkResponse = client.Bulk(b => b
        .IndexMany(docs)
        .Refresh(Refresh.WaitFor)
    );

    var searchResponse = client.Search<MyDocument>(s => s
        .Sort(so => so.Ascending("_id"))
    );

    // update the documents
    bulkResponse = client.Bulk(b => b
        .UpdateMany<MyDocument, object>(docs, (bu, doc) => 
        {
            if (doc.Id % 3 == 0)
            {
                // use script to update
                bu.Id(doc.Id).Script(s => s
                    .Source("ctx._source.message = 'message ' + (Integer.parseInt(ctx._id) * 2);")
                );
            }
            else if (doc.Id % 2 == 0)
            {
                // use partial document to update
                bu.Id(doc.Id).Doc(new { message = "updated message" });
            }
            else
            {
                // send the original document to update
                bu.Doc(doc);
            }

            return bu;
        })
        .Refresh(Refresh.WaitFor)
    );

    searchResponse = client.Search<MyDocument>(s => s
        .Sort(so => so.Ascending("_id"))
    );    
}


public class MyDocument 
{
    public MyDocument(int id) => Id = id;

    public int Id { get; set; }  

    public string Message { get; set; }
}

最终搜索响应返回

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 10,
    "max_score" : null,
    "hits" : [
      {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "1",
        "_score" : null,
        "_source" : {
          "id" : 1,
          "message" : "message 1"
        },
        "sort" : [
          "1"
        ]
      },
      {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "10",
        "_score" : null,
        "_source" : {
          "id" : 10,
          "message" : "updated message"
        },
        "sort" : [
          "10"
        ]
      },
      {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "2",
        "_score" : null,
        "_source" : {
          "id" : 2,
          "message" : "updated message"
        },
        "sort" : [
          "2"
        ]
      },
      {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "3",
        "_score" : null,
        "_source" : {
          "id" : 3,
          "message" : "message 6"
        },
        "sort" : [
          "3"
        ]
      },
      {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "4",
        "_score" : null,
        "_source" : {
          "id" : 4,
          "message" : "updated message"
        },
        "sort" : [
          "4"
        ]
      },
      {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "5",
        "_score" : null,
        "_source" : {
          "id" : 5,
          "message" : "message 5"
        },
        "sort" : [
          "5"
        ]
      },
      {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "6",
        "_score" : null,
        "_source" : {
          "id" : 6,
          "message" : "message 12"
        },
        "sort" : [
          "6"
        ]
      },
      {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "7",
        "_score" : null,
        "_source" : {
          "id" : 7,
          "message" : "message 7"
        },
        "sort" : [
          "7"
        ]
      },
      {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "8",
        "_score" : null,
        "_source" : {
          "id" : 8,
          "message" : "updated message"
        },
        "sort" : [
          "8"
        ]
      },
      {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "9",
        "_score" : null,
        "_source" : {
          "id" : 9,
          "message" : "message 18"
        },
        "sort" : [
          "9"
        ]
      }
    ]
  }
}

注意源文档已更新

  1. 带有 _id 可被3整除已使用脚本更新更新文档
  2. 文档 圣婴 可被2除尽的部分更新了文档。
  3. 其余文档已通过传递原始文档进行了更新;这将导致 noop 在批量响应中。