使用scrolling来查询/导出超过10000行纪录

一直以来,想从 elasticsearch 中导出超过 10000 行纪录都是一个问题,因为深分页容易触发 OOM,所以 es 默认不允许导出 10000 条以上的纪录,如果你查询的时候指定的 from+size 超过 10000,会得到一条这样的报错:

1
Result window is too large, from + size must be less than or equal to:[10000] but was

这个可以通过设置索引 的 max_result_window 来修改,但是不推荐,且这个方法只对新的索引生效。
另一个办法是使用 scrolling:在查询的时候传入一个 scroll={time} ,time 参数 的格式像是 30m 这样,
在结果返回中会带有一个_scroll_id 字段,记录下这个值,然后请求 /_search/scroll 即可。

如下 ruby 代码演示了连接 elasticsearch 并按 from_id 查询导出的操作,新建对象并调用 start 方法即可。(示例中用到了 faraday 和 elasticsearch-dsl,需要 gem install elasticsearch-dsl faraday 一下)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86

# frozen_string_literal: true

class EsData
include Elasticsearch::DSL
def initialize(name="some-user",password='some-password')
@name = name
@password = password
end
def connect
@client = Faraday.new(url: 'http://some-elasticsearch:9200') do |conn|
conn.request :authorization, :basic, @name,@password
end
self
end
def json_header
{ 'Accept' => 'application/json', 'Content-Type' => 'application/json' }
end


def start(from_id,index_name="some-index-name")
t1 = Time.new.to_f
es_query_body =JSON.dump( query_body_of_from_id from_id, 100)

puts "query_body:#{es_query_body}"
first = query_es((es_query_body), "15m", index_name)
es_res = JSON.parse(first)
scroll_id = es_res["_scroll_id"]
(puts "error:#{first} " and return ) if es_res.nil?

es_res["hits"]["hits"].each do |item|
puts "new item:"
puts item
end unless es_res["hits"].nil?

0.upto 1000 do |i|
es_res_body = scroll("15m",scroll_id)
es_res_obj = JSON.parse(es_res_body)
unless es_res_obj.has_key? "hits"
puts "index_name #{index_name} has no hits"
puts es_res_body
return
end
if es_res_obj["hits"]["hits"].size==0
puts "index #{index_name} cost #{Time.new.to_f - t1} seconds"
return
end
es_res_obj["hits"]["hits"].each do |item|
save item
end
end
end

# @author renlu
# @param query_body 提交给es的查询体
# @return [String,nil]
def query_es(query_body,scroll_time,index_name)
resp = @client.post(
"/#{index_name}/_search?scroll=#{scroll_time}",
query_body,
json_header
).body
end
#@return [String,nil]
def scroll(scroll_time,scroll_id)
body = JSON.dump({ "scroll":scroll_time, "scroll_id":scroll_id })
@client.post("/_search/scroll", body,json_header).body
end

def query_body_of_from_id(from_id,query_size=100)
definition = search do
query do
bool do
must do
match from_id:from_id
end
end
size query_size
end
end
definition.to_hash
end


end


欢迎前往原文讨论:https://github.com/xurenlu/404ms/issues/5


使用scrolling来查询/导出超过10000行纪录
https://404.ms/2023/03/02/post-5/
作者
rocky.xander
发布于
2023年3月3日
许可协议