Skip to content

Commit 74a9b76

Browse files
authored
Improvements related to deployment (#69)
* Add exponential backoff and retries to query execution * Fix handling of nested OTel spans * Bump up the max runtime of activities * Improve documentation
1 parent f3c67d3 commit 74a9b76

7 files changed

Lines changed: 155 additions & 162 deletions

File tree

oonipipeline/Readme.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ In order to run the pipeline you should setup the following dependencies:
2121

2222
### Quick start
2323

24+
```
25+
git clone https://github.com/ooni/data
26+
```
27+
2428
Start temporal dev server:
2529

2630
```
@@ -38,7 +42,7 @@ clickhouse server
3842
You can then start the desired workflow, for example to create signal observations for the US:
3943

4044
```
41-
hatch run oonipipeline mkobs --probe-cc US --test-name signal --start-day 2024-01-01 --end-day 2024-01-02
45+
hatch run oonipipeline mkobs --probe-cc US --test-name signal --start-day 2024-01-01 --end-day 2024-01-02 --create-tables
4246
```
4347

4448
Monitor the workflow executing by accessing: http://localhost:8233/
@@ -92,7 +96,7 @@ hatch run oonipipeline startworkers
9296
Then you can trigger the workflow by passing the `--no-start-workers` flag:
9397

9498
```
95-
hatch run oonipipeline mkobs --probe-cc US --start-day 2024-01-01 --end-day 2024-01-20 --no-start-workers
99+
hatch run oonipipeline mkobs --probe-cc US --start-day 2024-01-01 --end-day 2024-01-20 --no-start-workers --create-tables
96100
```
97101

98102
#### Superset
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
VERSION = "5.0.0a0"
1+
VERSION = "5.0.0a1"

oonipipeline/src/oonipipeline/db/connections.py

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import csv
22
import pickle
3+
import random
34
import time
45

56
from collections import defaultdict, namedtuple
@@ -32,7 +33,9 @@ def __init__(
3233
conn_url,
3334
row_buffer_size=0,
3435
max_block_size=1_000_000,
35-
dump_failing_rows: Optional[str] = None,
36+
max_retries=3,
37+
backoff_factor=1,
38+
max_backoff=32,
3639
):
3740
from clickhouse_driver import Client
3841

@@ -44,7 +47,10 @@ def __init__(
4447

4548
self._column_names = {}
4649
self._row_buffer = defaultdict(list)
47-
self.dump_failing_rows = dump_failing_rows
50+
51+
self._max_retries = max_retries
52+
self._max_backoff = max_backoff
53+
self._backoff_factor = backoff_factor
4854

4955
def __enter__(self):
5056
return self
@@ -57,9 +63,30 @@ def delete_sync(self, table_name: str, where: str):
5763
self.execute("SET mutations_sync = 1;")
5864
return self.execute(f"DELETE FROM {table_name} WHERE {where};")
5965

60-
def execute(self, *args, **kwargs):
66+
def _execute(self, *args, **kwargs):
6167
return self.client.execute(*args, **kwargs)
6268

69+
def execute(self, query_str, *args, **kwargs):
70+
exception_list = []
71+
# Exponentially backoff the retries
72+
for attempt in range(self._max_retries):
73+
try:
74+
return self._execute(query_str, *args, **kwargs)
75+
except Exception as e:
76+
exception_list.append(e)
77+
sleep_time = min(self._max_backoff, self._backoff_factor * (2**attempt))
78+
log.error(
79+
f"failed to execute {query_str} args[{len(args)}] kwargs[{len(kwargs)}] (attempt {attempt})"
80+
)
81+
log.error(e)
82+
log.error("exception history")
83+
for exc in exception_list[:-1]:
84+
log.error(exc)
85+
sleep_time += random.uniform(0, sleep_time * 0.1)
86+
time.sleep(sleep_time)
87+
# Raise the last exception
88+
raise exception_list[-1]
89+
6390
def execute_iter(self, *args, **kwargs):
6491
return self.client.execute_iter(
6592
*args, **kwargs, settings={"max_block_size": self.max_block_size}
@@ -82,27 +109,7 @@ def write_rows(self, table_name, rows, column_names):
82109
def flush_rows(self, table_name, rows):
83110
fields_str = ", ".join(self._column_names[table_name])
84111
query_str = f"INSERT INTO {table_name} ({fields_str}) VALUES"
85-
try:
86-
self.execute(query_str, rows)
87-
except Exception as exc:
88-
log.error(
89-
f"Failed to write {len(rows)} rows. Trying to savage what is savageable. ({exc})"
90-
)
91-
for idx, row in enumerate(rows):
92-
try:
93-
self.execute(
94-
query_str,
95-
[row],
96-
types_check=True,
97-
query_id=f"oonidata-savage-{idx}-{time.time()}",
98-
)
99-
time.sleep(0.1)
100-
except Exception as exc:
101-
log.error(f"Failed to write {row} ({exc}) {query_str}")
102-
103-
if self.dump_failing_rows:
104-
with open(self.dump_failing_rows, "ab") as out_file:
105-
pickle.dump({"query_str": query_str, "row": row}, out_file)
112+
self.execute(query_str, rows)
106113

107114
def flush_all_rows(self):
108115
for table_name, rows in self._row_buffer.items():

oonipipeline/src/oonipipeline/temporal/activities/analysis.py

Lines changed: 109 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -107,132 +107,119 @@ def make_analysis_in_a_day(params: MakeAnalysisParams) -> dict:
107107

108108
tracer = opentelemetry.trace.get_tracer(__name__)
109109

110-
with opentelemetry.trace.get_current_span():
111-
fingerprintdb = FingerprintDB(datadir=data_dir, download=False)
112-
body_db = BodyDB(db=ClickhouseConnection(clickhouse))
113-
db_writer = ClickhouseConnection(clickhouse, row_buffer_size=10_000)
114-
db_lookup = ClickhouseConnection(clickhouse)
115-
116-
column_names_wa = [f.name for f in dataclasses.fields(WebAnalysis)]
117-
column_names_er = [
118-
f.name for f in dataclasses.fields(MeasurementExperimentResult)
119-
]
120-
121-
# TODO(art): this previous range search and deletion makes the idempotence
122-
# of the activity not 100% accurate.
123-
# We should look into fixing it.
124-
prev_range_list = [
125-
get_prev_range(
126-
db=db_lookup,
127-
table_name=WebAnalysis.__table_name__,
128-
timestamp=datetime.combine(day, datetime.min.time()),
129-
test_name=[],
130-
probe_cc=probe_cc,
131-
timestamp_column="measurement_start_time",
132-
),
133-
get_prev_range(
134-
db=db_lookup,
135-
table_name=MeasurementExperimentResult.__table_name__,
136-
timestamp=datetime.combine(day, datetime.min.time()),
137-
test_name=[],
138-
probe_cc=probe_cc,
139-
timestamp_column="timeofday",
140-
probe_cc_column="location_network_cc",
141-
),
142-
]
143-
144-
log.info(f"loading ground truth DB for {day}")
145-
with tracer.start_as_current_span(
146-
"MakeObservations:load_ground_truths"
147-
) as span:
148-
ground_truth_db_path = (
149-
data_dir / "ground_truths" / f"web-{day.strftime('%Y-%m-%d')}.sqlite3"
150-
)
151-
web_ground_truth_db = WebGroundTruthDB()
152-
web_ground_truth_db.build_from_existing(
153-
str(ground_truth_db_path.absolute())
154-
)
155-
log.info(f"loaded ground truth DB for {day}")
156-
span.add_event(f"loaded ground truth DB for {day}")
157-
span.set_attribute("day", day.strftime("%Y-%m-%d"))
158-
span.set_attribute(
159-
"ground_truth_row_count", web_ground_truth_db.count_rows()
160-
)
110+
fingerprintdb = FingerprintDB(datadir=data_dir, download=False)
111+
body_db = BodyDB(db=ClickhouseConnection(clickhouse))
112+
db_writer = ClickhouseConnection(clickhouse, row_buffer_size=10_000)
113+
db_lookup = ClickhouseConnection(clickhouse)
114+
115+
column_names_wa = [f.name for f in dataclasses.fields(WebAnalysis)]
116+
column_names_er = [f.name for f in dataclasses.fields(MeasurementExperimentResult)]
117+
118+
# TODO(art): this previous range search and deletion makes the idempotence
119+
# of the activity not 100% accurate.
120+
# We should look into fixing it.
121+
prev_range_list = [
122+
get_prev_range(
123+
db=db_lookup,
124+
table_name=WebAnalysis.__table_name__,
125+
timestamp=datetime.combine(day, datetime.min.time()),
126+
test_name=[],
127+
probe_cc=probe_cc,
128+
timestamp_column="measurement_start_time",
129+
),
130+
get_prev_range(
131+
db=db_lookup,
132+
table_name=MeasurementExperimentResult.__table_name__,
133+
timestamp=datetime.combine(day, datetime.min.time()),
134+
test_name=[],
135+
probe_cc=probe_cc,
136+
timestamp_column="timeofday",
137+
probe_cc_column="location_network_cc",
138+
),
139+
]
140+
141+
log.info(f"loading ground truth DB for {day}")
142+
with tracer.start_span("MakeObservations:load_ground_truths") as span:
143+
ground_truth_db_path = (
144+
data_dir / "ground_truths" / f"web-{day.strftime('%Y-%m-%d')}.sqlite3"
145+
)
146+
web_ground_truth_db = WebGroundTruthDB()
147+
web_ground_truth_db.build_from_existing(str(ground_truth_db_path.absolute()))
148+
log.info(f"loaded ground truth DB for {day}")
149+
span.add_event(f"loaded ground truth DB for {day}")
150+
span.set_attribute("day", day.strftime("%Y-%m-%d"))
151+
span.set_attribute("ground_truth_row_count", web_ground_truth_db.count_rows())
152+
153+
failures = 0
154+
no_exp_results = 0
155+
observation_count = 0
156+
with tracer.start_span("MakeObservations:iter_web_observations") as span:
157+
for web_obs in iter_web_observations(
158+
db_lookup,
159+
measurement_day=day,
160+
probe_cc=probe_cc,
161+
test_name="web_connectivity",
162+
):
163+
try:
164+
relevant_gts = web_ground_truth_db.lookup_by_web_obs(web_obs=web_obs)
165+
except:
166+
log.error(
167+
f"failed to lookup relevant_gts for {web_obs[0].measurement_uid}",
168+
exc_info=True,
169+
)
170+
failures += 1
171+
continue
161172

162-
failures = 0
163-
no_exp_results = 0
164-
observation_count = 0
165-
with tracer.start_as_current_span(
166-
"MakeObservations:iter_web_observations"
167-
) as span:
168-
for web_obs in iter_web_observations(
169-
db_lookup,
170-
measurement_day=day,
171-
probe_cc=probe_cc,
172-
test_name="web_connectivity",
173-
):
174-
try:
175-
relevant_gts = web_ground_truth_db.lookup_by_web_obs(
176-
web_obs=web_obs
177-
)
178-
except:
179-
log.error(
180-
f"failed to lookup relevant_gts for {web_obs[0].measurement_uid}",
181-
exc_info=True,
173+
try:
174+
website_analysis = list(
175+
make_web_analysis(
176+
web_observations=web_obs,
177+
body_db=body_db,
178+
web_ground_truths=relevant_gts,
179+
fingerprintdb=fingerprintdb,
182180
)
183-
failures += 1
181+
)
182+
if len(website_analysis) == 0:
183+
log.info(f"no website analysis for {probe_cc}, {test_name}")
184+
no_exp_results += 1
184185
continue
185186

186-
try:
187-
website_analysis = list(
188-
make_web_analysis(
189-
web_observations=web_obs,
190-
body_db=body_db,
191-
web_ground_truths=relevant_gts,
192-
fingerprintdb=fingerprintdb,
193-
)
194-
)
195-
if len(website_analysis) == 0:
196-
log.info(f"no website analysis for {probe_cc}, {test_name}")
197-
no_exp_results += 1
198-
continue
199-
200-
observation_count += 1
201-
table_name, rows = make_db_rows(
202-
dc_list=website_analysis, column_names=column_names_wa
203-
)
204-
205-
db_writer.write_rows(
206-
table_name=table_name,
207-
rows=rows,
208-
column_names=column_names_wa,
209-
)
210-
211-
website_er = list(make_website_experiment_results(website_analysis))
212-
table_name, rows = make_db_rows(
213-
dc_list=website_er,
214-
column_names=column_names_er,
215-
custom_remap={"loni_list": orjson.dumps},
216-
)
217-
218-
db_writer.write_rows(
219-
table_name=table_name,
220-
rows=rows,
221-
column_names=column_names_er,
222-
)
223-
224-
except:
225-
web_obs_ids = ",".join(map(lambda wo: wo.observation_id, web_obs))
226-
log.error(
227-
f"failed to generate analysis for {web_obs_ids}", exc_info=True
228-
)
229-
failures += 1
230-
231-
span.set_attribute("total_failure_count", failures)
232-
span.set_attribute("total_observation_count", observation_count)
233-
span.set_attribute("no_experiment_results_count", no_exp_results)
234-
span.set_attribute("day", day.strftime("%Y-%m-%d"))
235-
span.set_attribute("probe_cc", probe_cc)
187+
observation_count += 1
188+
table_name, rows = make_db_rows(
189+
dc_list=website_analysis, column_names=column_names_wa
190+
)
191+
192+
db_writer.write_rows(
193+
table_name=table_name,
194+
rows=rows,
195+
column_names=column_names_wa,
196+
)
197+
198+
website_er = list(make_website_experiment_results(website_analysis))
199+
table_name, rows = make_db_rows(
200+
dc_list=website_er,
201+
column_names=column_names_er,
202+
custom_remap={"loni_list": orjson.dumps},
203+
)
204+
205+
db_writer.write_rows(
206+
table_name=table_name,
207+
rows=rows,
208+
column_names=column_names_er,
209+
)
210+
211+
except:
212+
web_obs_ids = ",".join(map(lambda wo: wo.observation_id, web_obs))
213+
log.error(
214+
f"failed to generate analysis for {web_obs_ids}", exc_info=True
215+
)
216+
failures += 1
217+
218+
span.set_attribute("total_failure_count", failures)
219+
span.set_attribute("total_observation_count", observation_count)
220+
span.set_attribute("no_experiment_results_count", no_exp_results)
221+
span.set_attribute("day", day.strftime("%Y-%m-%d"))
222+
span.set_attribute("probe_cc", probe_cc)
236223

237224
for prev_range in prev_range_list:
238225
maybe_delete_prev_range(db=db_lookup, prev_range=prev_range)

oonipipeline/src/oonipipeline/temporal/activities/observations.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,17 +75,13 @@ def make_observations_for_file_entry_batch(
7575

7676
total_failure_count = 0
7777
current_span = trace.get_current_span()
78-
with current_span, ClickhouseConnection(
79-
clickhouse, row_buffer_size=row_buffer_size
80-
) as db:
78+
with ClickhouseConnection(clickhouse, row_buffer_size=row_buffer_size) as db:
8179
ccs = ccs_set(probe_cc)
8280
idx = 0
8381
for bucket_name, s3path, ext, fe_size in file_entry_batch:
8482
failure_count = 0
8583
# Nest the traced span within the current span
86-
with tracer.start_as_current_span(
87-
"MakeObservations:stream_file_entry"
88-
) as span:
84+
with tracer.start_span("MakeObservations:stream_file_entry") as span:
8985
log.debug(f"processing file s3://{bucket_name}/{s3path}")
9086
t = PerfTimer()
9187
try:

oonipipeline/src/oonipipeline/temporal/workflows.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@
5959
TASK_QUEUE_NAME = "oonipipeline-task-queue"
6060
OBSERVATION_WORKFLOW_ID = "oonipipeline-observations"
6161

62-
MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT = timedelta(hours=24)
62+
# TODO(art): come up with a nicer way to nest workflows so we don't need such a high global timeout
63+
MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT = timedelta(hours=48)
6364
MAKE_GROUND_TRUTHS_START_TO_CLOSE_TIMEOUT = timedelta(hours=1)
6465
MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT = timedelta(hours=10)
6566

0 commit comments

Comments
 (0)