基本语法
可以在一个请求中同时处理删除、更新操作,可以大大增加索引速度。接口语法是/_bulk,它期望使用以下换行分隔的JSON (NDJSON)结构:
action_and_meta_data\n
optional_source\n
action_and_meta_data\n
optional_source\n
....
action_and_meta_data\n
optional_source\n
注意:最后一行必须以\n结尾,每个新的行必须以\r开头,请求头的Content-Type必须是application/x-ndjson。
支持的操作有:index、create、delete、update。index和create操作后面一行必须跟source属性(create的时候如果文档存在那么会创建失败,index会新增或者替换索引),delete行后面不需要跟source,update后面需要更修改的doc、upsert、script,来看下示例:
curl -XPOST "http://127.0.0.1:9200/_bulk?pretty" -H "Content-Type:application/x-ndjson" -d'
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"}}
'
返回值为:
{
"took" : 234,
"errors" : true,
"items" : [
{
"index" : {
"_index" : "test",
"_type" : "_doc",
"_id" : "1",
"_version" : 3,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 16,
"_primary_term" : 8,
"status" : 200
}
},
{
"delete" : {
"_index" : "test",
"_type" : "_doc",
"_id" : "2",
"_version" : 1,
"result" : "not_found",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 17,
"_primary_term" : 8,
"status" : 404
}
},
{
"create" : {
"_index" : "test",
"_type" : "_doc",
"_id" : "3",
"status" : 409,
"error" : {
"type" : "version_conflict_engine_exception",
"reason" : "[3]: version conflict, document already exists (current version [1])",
"index_uuid" : "sNvURoEgQX-a2VLHwJt59Q",
"shard" : "0",
"index" : "test"
}
}
},
{
"update" : {
"_index" : "test",
"_type" : "_doc",
"_id" : "1",
"_version" : 4,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 18,
"_primary_term" : 8,
"status" : 200
}
}
]
}
也可以直接下url中带上index,这样不需要在请求体中带上index。
为什么使用这个格式?作者的想法是使这个过程尽可能快,由于一些操作将被重定向到其他节点上的其他分片,因此只有action_meta_data在接收节点端被解析。使用此协议的客户端库应该尝试并努力在客户端做类似的事情,并尽可能减少缓冲。
对批量操作的响应是一个大型JSON结构,每个操作的结果都按照与请求中出现的操作相同的顺序执行。单个操作的失败不影响其余操作。
es没有说大概多少执行数据比较合理,这个是取决于你的es服务器还有需要进行不同大小的压测,才能最终确认大概需要多少执行数据,可以达到性能最大化。
如果使用HTTP API,请确保客户端不发送HTTP块,因为这会降低速度。
乐观并发控制
在bluk api中的index和delete操作,都会维护if_seq_no和if_primary_term参数,来进行并发操作的控制,可以确保索引或者删除的都是最新的。
版本控制
每个bulk选项都可以使用version属性,它自动遵循基于_version映射的索引/删除操作的行为,还支持version_type属性。
路由
每个bulk选项都可以使用routing属性,它自动遵循基于_routing映射的索引/删除操作的行为。
等待可用分片
当使用批量调用时,可以使用wait_for_active_shards参数,在开始处理大容量请求之前,要激活的碎片副本的最小数量。
刷新
控制什么时候可以被搜索到,只有接收到批量请求的碎片才会受到refresh的影响,假设接口是_bulk?refresh=wait_for,并且有3个文档需要路由到5个不同的分片中,请求将只等待3个分片的刷新,另外两个分片不参与_bulk请求。
更新
当使用update操作的时候可以设置retry_on_conflict属性,指定版本冲突的时候最多可以执行多少次。
update操作支持以下属性:doc(部分文档)、upsert、doc_as_upsert、script、params(为script提供)、_source、lang(为script提供),示例如下:
curl -XPOST "http://127.0.0.1:9200/_bulk?pretty" -H "Content-Type:application/x-ndjson" -d'
{ "update" : {"_id" : "1", "_index" : "test", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"} }
{ "update" : { "_id" : "0", "_index" : "test", "retry_on_conflict" : 3} }
{ "script" : { "source": "ctx._source.counter += params.param1", "lang" : "painless", "params" : {"param1" : 1}}, "upsert" : {"counter" : 1}}
{ "update" : {"_id" : "2", "_index" : "test", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"}, "doc_as_upsert" : true }
{ "update" : {"_id" : "3", "_index" : "test", "_source" : true} }
{ "doc" : {"field" : "value"} }
{ "update" : {"_id" : "4", "_index" : "test"} }
{ "doc" : {"field" : "value"}, "_source": true}
'
返回值为:
{
"took" : 109,
"errors" : true,
"items" : [
{
"update" : {
"_index" : "test",
"_type" : "_doc",
"_id" : "1",
"_version" : 5,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 19,
"_primary_term" : 8,
"status" : 200
}
},
{
"update" : {
"_index" : "test",
"_type" : "_doc",
"_id" : "0",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 20,
"_primary_term" : 8,
"status" : 201
}
},
{
"update" : {
"_index" : "test",
"_type" : "_doc",
"_id" : "2",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 21,
"_primary_term" : 8,
"status" : 201
}
},
{
"update" : {
"_index" : "test",
"_type" : "_doc",
"_id" : "3",
"_version" : 2,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 22,
"_primary_term" : 8,
"get" : {
"_seq_no" : 22,
"_primary_term" : 8,
"found" : true,
"_source" : {
"user" : "kimchy",
"post_date" : "2009-11-15T14:12:12",
"message" : "trying out Elasticsearch",
"field" : "value"
}
},
"status" : 200
}
},
{
"update" : {
"_index" : "test",
"_type" : "_doc",
"_id" : "4",
"status" : 404,
"error" : {
"type" : "document_missing_exception",
"reason" : "[_doc][4]: document missing",
"index_uuid" : "sNvURoEgQX-a2VLHwJt59Q",
"shard" : "0",
"index" : "test"
}
}
}
]
}
部分响应
为了确保快速响应,如果一个或多个碎片失败,则bulk API将响应部分结果。
注意:本文归作者所有,未经作者允许,不得转载