Skip to content

Commit 5a82745

Browse files
committed
Extend aggregation pushdown tests
1 parent fd388e5 commit 5a82745

3 files changed

Lines changed: 218 additions & 11 deletions

File tree

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,24 @@
11
#!/bin/bash
22

3-
curl -XPUT --header 'Content-Type: application/json' http://localhost:9200/account/_bulk \
3+
curl -XPUT localhost:9200/account?pretty -H 'Content-Type: application/json' -d'
4+
{
5+
"mappings": {
6+
"properties": {
7+
"account_number": {"type": "integer"},
8+
"balance": {"type": "integer"},
9+
"firstname": {"type": "keyword"},
10+
"lastname": {"type": "keyword"},
11+
"age": {"type": "integer"},
12+
"gender": {"type": "keyword"},
13+
"address": {"type": "keyword"},
14+
"employer": {"type": "keyword"},
15+
"email": {"type": "keyword"},
16+
"city": {"type": "keyword"},
17+
"state": {"type": "keyword"}
18+
}
19+
}
20+
}
21+
'
22+
23+
curl -XPUT http://localhost:9200/account/_bulk -H 'Content-Type: application/json' \
424
--data-binary @/accounts.json

test/splitgraph/commands/test_aggregation_pushdown.py

Lines changed: 194 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,20 @@
33
from decimal import Decimal
44

55
from test.splitgraph.conftest import _mount_elasticsearch
6-
from splitgraph.core.repository import Repository
7-
from splitgraph.engine import get_engine
6+
from splitgraph.engine import get_engine, ResultShape
87

98

109
def _extract_query_from_explain(result):
11-
query_str = "{"
10+
query_str = ""
1211

13-
for o in result[3:]:
14-
query_str += o[0] + "\n"
12+
for o in result:
13+
if query_str != "":
14+
query_str += o[0] + "\n"
15+
elif "Multicorn: Query:" in o[0]:
16+
query_str = "{"
17+
18+
if o == ("}",):
19+
break
1520

1621
return json.loads(query_str)
1722

@@ -45,3 +50,187 @@ def test_elasticsearch_aggregation_functions_only(local_engine_empty):
4550

4651
# Assert aggregation result
4752
assert result[0] == (999, 25714.837, 49989, 25714837, 20, Decimal("30.171"))
53+
54+
55+
@pytest.mark.mounting
56+
def test_elasticsearch_gropuing_clauses_only(local_engine_empty):
57+
_mount_elasticsearch()
58+
59+
# Single column grouping
60+
query = "SELECT state FROM es.account GROUP BY state"
61+
62+
# Ensure grouping is going to be pushed down
63+
result = get_engine().run_sql("EXPLAIN " + query)
64+
assert len(result) > 2
65+
assert _extract_query_from_explain(result) == {
66+
"aggs": {
67+
"group_buckets": {
68+
"composite": {"sources": [{"state": {"terms": {"field": "state"}}}], "size": 5}
69+
}
70+
}
71+
}
72+
73+
# Ensure results are correct
74+
result = get_engine().run_sql(query, return_shape=ResultShape.MANY_ONE)
75+
assert len(result) == 51
76+
77+
# Assert aggregation result
78+
assert result == [
79+
"AK",
80+
"AL",
81+
"AR",
82+
"AZ",
83+
"CA",
84+
"CO",
85+
"CT",
86+
"DC",
87+
"DE",
88+
"FL",
89+
"GA",
90+
"HI",
91+
"IA",
92+
"ID",
93+
"IL",
94+
"IN",
95+
"KS",
96+
"KY",
97+
"LA",
98+
"MA",
99+
"MD",
100+
"ME",
101+
"MI",
102+
"MN",
103+
"MO",
104+
"MS",
105+
"MT",
106+
"NC",
107+
"ND",
108+
"NE",
109+
"NH",
110+
"NJ",
111+
"NM",
112+
"NV",
113+
"NY",
114+
"OH",
115+
"OK",
116+
"OR",
117+
"PA",
118+
"RI",
119+
"SC",
120+
"SD",
121+
"TN",
122+
"TX",
123+
"UT",
124+
"VA",
125+
"VT",
126+
"WA",
127+
"WI",
128+
"WV",
129+
"WY",
130+
]
131+
132+
# Multi-column grouping
133+
query = "SELECT gender, age FROM es.account GROUP BY age, gender"
134+
135+
# Ensure grouping is going to be pushed down
136+
result = get_engine().run_sql("EXPLAIN " + query)
137+
assert len(result) > 2
138+
assert _extract_query_from_explain(result) == {
139+
"aggs": {
140+
"group_buckets": {
141+
"composite": {
142+
"sources": [
143+
{"gender": {"terms": {"field": "gender"}}},
144+
{"age": {"terms": {"field": "age"}}},
145+
],
146+
"size": 5,
147+
}
148+
}
149+
}
150+
}
151+
152+
# Ensure results are correct
153+
result = get_engine().run_sql(query)
154+
assert len(result) == 42
155+
156+
# Assert aggregation result
157+
gender = "F"
158+
age = 20
159+
for row in result:
160+
assert row == (gender, age)
161+
162+
age += 1
163+
if age == 41:
164+
age = 20
165+
gender = "M"
166+
167+
168+
@pytest.mark.mounting
169+
def test_elasticsearch_gropuing_and_aggregations_bare(local_engine_empty):
170+
_mount_elasticsearch()
171+
172+
# Single column grouping
173+
query = "SELECT gender, avg(balance), avg(age) FROM es.account GROUP BY gender"
174+
175+
# Ensure query is going to be pushed down
176+
result = get_engine().run_sql("EXPLAIN " + query)
177+
assert len(result) > 2
178+
assert _extract_query_from_explain(result) == {
179+
"aggs": {
180+
"group_buckets": {
181+
"composite": {"sources": [{"gender": {"terms": {"field": "gender"}}}], "size": 5},
182+
"aggregations": {
183+
"avg.balance": {"avg": {"field": "balance"}},
184+
"avg.age": {"avg": {"field": "age"}},
185+
},
186+
}
187+
}
188+
}
189+
190+
# Ensure results are correct
191+
result = get_engine().run_sql(query)
192+
assert len(result) == 2
193+
194+
# Assert aggregation result
195+
assert result == [
196+
("F", 25623.34685598377, Decimal("30.3184584178499")),
197+
("M", 25803.800788954635, Decimal("30.027613412228796")),
198+
]
199+
200+
201+
@pytest.mark.mounting
202+
def test_elasticsearch_not_pushed_down(local_engine_empty):
203+
_mount_elasticsearch()
204+
205+
_bare_filtering_query = {"query": {"bool": {"must": [{"range": {"age": {"gt": 30}}}]}}}
206+
207+
# Aggregation only filtering is not going to be pushed down
208+
result = get_engine().run_sql(
209+
"EXPLAIN SELECT max(balance), avg(age) FROM es.account WHERE age > 30"
210+
)
211+
assert len(result) > 2
212+
assert _extract_query_from_explain(result) == _bare_filtering_query
213+
214+
# Grouping only filtering is not going to be pushed down
215+
result = get_engine().run_sql("EXPLAIN SELECT age FROM es.account WHERE age > 30 GROUP BY age")
216+
assert len(result) > 2
217+
assert _extract_query_from_explain(result) == _bare_filtering_query
218+
219+
# Aggregation and grouping filtering is not going to be pushed down
220+
result = get_engine().run_sql(
221+
"EXPLAIN SELECT age, min(balance) FROM es.account WHERE age > 30 GROUP BY age"
222+
)
223+
assert len(result) > 2
224+
assert _extract_query_from_explain(result) == _bare_filtering_query
225+
226+
_bare_sequential_scan = {"query": {"bool": {"must": []}}}
227+
228+
# SELECT STAR is not going to be pushed down
229+
result = get_engine().run_sql("EXPLAIN SELECT * FROM es.account")
230+
assert len(result) > 2
231+
assert _extract_query_from_explain(result) == _bare_sequential_scan
232+
233+
# DISTINCT queries are not going to be pushed down
234+
result = get_engine().run_sql("EXPLAIN SELECT COUNT(DISTINCT city) FROM es.account")
235+
assert len(result) > 2
236+
assert _extract_query_from_explain(result) == _bare_sequential_scan

test/splitgraph/conftest.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
PG_MNT_PULL = R("test_pg_mount_pull")
3333
MG_MNT = R("test_mg_mount")
3434
MYSQL_MNT = R("test/mysql_mount")
35-
ES_MNT = R("test/es_mount")
3635
OUTPUT = R("output")
3736

3837
# On the host, mapped into localhost; on the local engine works as intended.
@@ -179,7 +178,6 @@ def _mount_elasticsearch():
179178
R("output_stage_2"),
180179
Repository(READONLY_NAMESPACE, "pg_mount"),
181180
MYSQL_MNT,
182-
ES_MNT,
183181
]
184182

185183

@@ -199,7 +197,7 @@ def healthcheck_mounting():
199197
_mount_postgres(PG_MNT)
200198
_mount_mongo(MG_MNT)
201199
_mount_mysql(MYSQL_MNT)
202-
_mount_elasticsearch(ES_MNT)
200+
_mount_elasticsearch()
203201
try:
204202
assert (
205203
get_engine().run_sql(
@@ -224,13 +222,13 @@ def healthcheck_mounting():
224222
)
225223
assert (
226224
get_engine().run_sql(
227-
'SELECT COUNT(*) FROM "test/es_mount".account',
225+
"SELECT COUNT(*) FROM es.account",
228226
return_shape=ResultShape.ONE_ONE,
229227
)
230228
is not None
231229
)
232230
finally:
233-
for mountpoint in [PG_MNT, MG_MNT, MYSQL_MNT, ES_MNT]:
231+
for mountpoint in [PG_MNT, MG_MNT, MYSQL_MNT]:
234232
mountpoint.delete()
235233

236234

0 commit comments

Comments
 (0)