Py学习  »  Elasticsearch

ElasticSearch中是否有批量部分更新?

Mustafa Çağatay Kızıltan • 6 年前 • 908 次点击  

我有一个50个属性的模型用于ElasticSearch,我正在将数据传输到ElasticSearch。但是,我的ElasticSearch别名中最多有150000个文档,我希望通过批量部分更新来更新这些文档的3个属性。我知道批量更新和部分更新是分开的,但是在弹性搜索中有部分批量更新吗?

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/37963
文章 [ 1 ]  |  最新文章 6 年前
Russ Cam
Reply   •   1 楼
Russ Cam    7 年前

您可以使用批量API发送部分更新。下面是一个例子

private static void Main()
{
    var defaultIndex = "documents";
    var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));

    var settings = new ConnectionSettings(pool)
        .DefaultIndex(defaultIndex);

    var client = new ElasticClient(settings);

    if (client.IndexExists(defaultIndex).Exists)
        client.DeleteIndex(defaultIndex);

    var docs = Enumerable.Range(1, 10).Select(i => new MyDocument(i) 
        {
            Message = $"message {i}"
        });

    // bulk index the documents   
    var bulkResponse = client.Bulk(b => b
        .IndexMany(docs)
        .Refresh(Refresh.WaitFor)
    );

    var searchResponse = client.Search<MyDocument>(s => s
        .Sort(so => so.Ascending("_id"))
    );

    // update the documents
    bulkResponse = client.Bulk(b => b
        .UpdateMany<MyDocument, object>(docs, (bu, doc) => 
        {
            if (doc.Id % 3 == 0)
            {
                // use script to update
                bu.Id(doc.Id).Script(s => s
                    .Source("ctx._source.message = 'message ' + (Integer.parseInt(ctx._id) * 2);")
                );
            }
            else if (doc.Id % 2 == 0)
            {
                // use partial document to update
                bu.Id(doc.Id).Doc(new { message = "updated message" });
            }
            else
            {
                // send the original document to update
                bu.Doc(doc);
            }

            return bu;
        })
        .Refresh(Refresh.WaitFor)
    );

    searchResponse = client.Search<MyDocument>(s => s
        .Sort(so => so.Ascending("_id"))
    );    
}


public class MyDocument 
{
    public MyDocument(int id) => Id = id;

    public int Id { get; set; }  

    public string Message { get; set; }
}

最终搜索响应返回

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 10,
    "max_score" : null,
    "hits" : [
      {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "1",
        "_score" : null,
        "_source" : {
          "id" : 1,
          "message" : "message 1"
        },
        "sort" : [
          "1"
        ]
      },
      {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "10",
        "_score" : null,
        "_source" : {
          "id" : 10,
          "message" : "updated message"
        },
        "sort" : [
          "10"
        ]
      },
      {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "2",
        "_score" : null,
        "_source" : {
          "id" : 2,
          "message" : "updated message"
        },
        "sort" : [
          "2"
        ]
      },
      {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "3",
        "_score" : null,
        "_source" : {
          "id" : 3,
          "message" : "message 6"
        },
        "sort" : [
          "3"
        ]
      },
      {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "4",
        "_score" : null,
        "_source" : {
          "id" : 4,
          "message" : "updated message"
        },
        "sort" : [
          "4"
        ]
      },
      {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "5",
        "_score" : null,
        "_source" : {
          "id" : 5,
          "message" : "message 5"
        },
        "sort" : [
          "5"
        ]
      },
      {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "6",
        "_score" : null,
        "_source" : {
          "id" : 6,
          "message" : "message 12"
        },
        "sort" : [
          "6"
        ]
      },
      {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "7",
        "_score" : null,
        "_source" : {
          "id" : 7,
          "message" : "message 7"
        },
        "sort" : [
          "7"
        ]
      },
      {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "8",
        "_score" : null,
        "_source" : {
          "id" : 8,
          "message" : "updated message"
        },
        "sort" : [
          "8"
        ]
      },
      {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "9",
        "_score" : null,
        "_source" : {
          "id" : 9,
          "message" : "message 18"
        },
        "sort" : [
          "9"
        ]
      }
    ]
  }
}

注意源文档已更新

  1. 带有 _id 可被3整除已使用脚本更新更新文档
  2. 文档 圣婴 可被2除尽的部分更新了文档。
  3. 其余文档已通过传递原始文档进行了更新;这将导致 noop 在批量响应中。