【Elasticsearch7.0】文档接口之bulk接口

star2017 1年前 ⋅ 547 阅读

基本语法

可以在一个请求中同时处理删除、更新操作,可以大大增加索引速度。接口语法是/_bulk,它期望使用以下换行分隔的JSON (NDJSON)结构:

action_and_meta_data\n
optional_source\n
action_and_meta_data\n
optional_source\n
....
action_and_meta_data\n
optional_source\n

注意:最后一行必须以\n结尾,每个新的行必须以\r开头,请求头的Content-Type必须是application/x-ndjson。
支持的操作有:index、create、delete、update。index和create操作后面一行必须跟source属性(create的时候如果文档存在那么会创建失败,index会新增或者替换索引),delete行后面不需要跟source,update后面需要更修改的doc、upsert、script,来看下示例:

curl -XPOST "http://127.0.0.1:9200/_bulk?pretty"  -H "Content-Type:application/x-ndjson" -d'
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"}}
'

返回值为:

{
  "took" : 234,
  "errors" : true,
  "items" : [
    {
      "index" : {
        "_index" : "test",
        "_type" : "_doc",
        "_id" : "1",
        "_version" : 3,
        "result" : "updated",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 16,
        "_primary_term" : 8,
        "status" : 200
      }
    },
    {
      "delete" : {
        "_index" : "test",
        "_type" : "_doc",
        "_id" : "2",
        "_version" : 1,
        "result" : "not_found",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 17,
        "_primary_term" : 8,
        "status" : 404
      }
    },
    {
      "create" : {
        "_index" : "test",
        "_type" : "_doc",
        "_id" : "3",
        "status" : 409,
        "error" : {
          "type" : "version_conflict_engine_exception",
          "reason" : "[3]: version conflict, document already exists (current version [1])",
          "index_uuid" : "sNvURoEgQX-a2VLHwJt59Q",
          "shard" : "0",
          "index" : "test"
        }
      }
    },
    {
      "update" : {
        "_index" : "test",
        "_type" : "_doc",
        "_id" : "1",
        "_version" : 4,
        "result" : "updated",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 18,
        "_primary_term" : 8,
        "status" : 200
      }
    }
  ]
}

也可以直接下url中带上index,这样不需要在请求体中带上index。
为什么使用这个格式?作者的想法是使这个过程尽可能快,由于一些操作将被重定向到其他节点上的其他分片,因此只有action_meta_data在接收节点端被解析。使用此协议的客户端库应该尝试并努力在客户端做类似的事情,并尽可能减少缓冲。
对批量操作的响应是一个大型JSON结构,每个操作的结果都按照与请求中出现的操作相同的顺序执行。单个操作的失败不影响其余操作。
es没有说大概多少执行数据比较合理,这个是取决于你的es服务器还有需要进行不同大小的压测,才能最终确认大概需要多少执行数据,可以达到性能最大化。
如果使用HTTP API,请确保客户端不发送HTTP块,因为这会降低速度。

乐观并发控制

在bluk api中的index和delete操作,都会维护if_seq_no和if_primary_term参数,来进行并发操作的控制,可以确保索引或者删除的都是最新的。

版本控制

每个bulk选项都可以使用version属性,它自动遵循基于_version映射的索引/删除操作的行为,还支持version_type属性。

路由

每个bulk选项都可以使用routing属性,它自动遵循基于_routing映射的索引/删除操作的行为。

等待可用分片

当使用批量调用时,可以使用wait_for_active_shards参数,在开始处理大容量请求之前,要激活的碎片副本的最小数量。

刷新

控制什么时候可以被搜索到,只有接收到批量请求的碎片才会受到refresh的影响,假设接口是_bulk?refresh=wait_for,并且有3个文档需要路由到5个不同的分片中,请求将只等待3个分片的刷新,另外两个分片不参与_bulk请求。

更新

当使用update操作的时候可以设置retry_on_conflict属性,指定版本冲突的时候最多可以执行多少次。
update操作支持以下属性:doc(部分文档)、upsert、doc_as_upsert、script、params(为script提供)、_source、lang(为script提供),示例如下:

curl -XPOST "http://127.0.0.1:9200/_bulk?pretty" -H "Content-Type:application/x-ndjson" -d'
{ "update" : {"_id" : "1", "_index" : "test", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"} }
{ "update" : { "_id" : "0", "_index" : "test", "retry_on_conflict" : 3} }
{ "script" : { "source": "ctx._source.counter += params.param1", "lang" : "painless", "params" : {"param1" : 1}}, "upsert" : {"counter" : 1}}
{ "update" : {"_id" : "2", "_index" : "test", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"}, "doc_as_upsert" : true }
{ "update" : {"_id" : "3", "_index" : "test", "_source" : true} }
{ "doc" : {"field" : "value"} }
{ "update" : {"_id" : "4", "_index" : "test"} }
{ "doc" : {"field" : "value"}, "_source": true}
'

返回值为:

{
  "took" : 109,
  "errors" : true,
  "items" : [
    {
      "update" : {
        "_index" : "test",
        "_type" : "_doc",
        "_id" : "1",
        "_version" : 5,
        "result" : "updated",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 19,
        "_primary_term" : 8,
        "status" : 200
      }
    },
    {
      "update" : {
        "_index" : "test",
        "_type" : "_doc",
        "_id" : "0",
        "_version" : 1,
        "result" : "created",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 20,
        "_primary_term" : 8,
        "status" : 201
      }
    },
    {
      "update" : {
        "_index" : "test",
        "_type" : "_doc",
        "_id" : "2",
        "_version" : 1,
        "result" : "created",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 21,
        "_primary_term" : 8,
        "status" : 201
      }
    },
    {
      "update" : {
        "_index" : "test",
        "_type" : "_doc",
        "_id" : "3",
        "_version" : 2,
        "result" : "updated",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 22,
        "_primary_term" : 8,
        "get" : {
          "_seq_no" : 22,
          "_primary_term" : 8,
          "found" : true,
          "_source" : {
            "user" : "kimchy",
            "post_date" : "2009-11-15T14:12:12",
            "message" : "trying out Elasticsearch",
            "field" : "value"
          }
        },
        "status" : 200
      }
    },
    {
      "update" : {
        "_index" : "test",
        "_type" : "_doc",
        "_id" : "4",
        "status" : 404,
        "error" : {
          "type" : "document_missing_exception",
          "reason" : "[_doc][4]: document missing",
          "index_uuid" : "sNvURoEgQX-a2VLHwJt59Q",
          "shard" : "0",
          "index" : "test"
        }
      }
    }
  ]
}

部分响应

为了确保快速响应,如果一个或多个碎片失败,则bulk API将响应部分结果。

本文为博主原创文章,未经博主允许不得转载。
更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: