Skip to content

Commit 0d11236

Browse files
committed
re #BITBUCKET-61 merginng in 0.8.5 release
Branch: master
2 parents 2a36245 + cc1fdee commit 0d11236

4 files changed

Lines changed: 206 additions & 36 deletions

File tree

HISTORY.rst

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,21 @@
44
TaskGraph Release History
55
=========================
66

7+
0.8.5 (2019-09-11)
8+
------------------
9+
* Dropped support for Python 2.7.
10+
* Fixed an issue where paths in ``ignore_paths`` were not getting ignored in
11+
the case of ``copy_duplicate_artifact=True``.
12+
* Fixed an issue where the "percent completed" in the logging monitor would
13+
sometimes exceed 100%. This occurred when a duplicate task was added to
14+
the TaskGraph object.
15+
* Fixed an issue where a relative path set as a target path would always cause
16+
TaskGraph to raise an exception after the task was complete.
17+
* Fixed an issue where kwargs that were unhashable were not considered when
18+
determining if a Task should be re-run.
19+
* Fixed an issue where files with almost identical modified times and sizes
20+
would hash equal in cases even when the filenames were different.
21+
722
0.8.4 (2019-05-23)
823
------------------
924
* Fixed an exception that occurred when two tasks were constructed that

bitbucket-pipelines.yml

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,6 @@
11
pipelines:
22
default:
33
- parallel:
4-
- step:
5-
name: Tests on python2.7
6-
image: python:2.7-stretch
7-
caches:
8-
- pip
9-
script:
10-
- pip install tox
11-
- tox -e py27-base,py27-psutil
124
- step:
135
name: Tests on python3.6
146
image: python:3.6-stretch

taskgraph/Task.py

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@ def __init__(
146146

147147
self._taskgraph_started_event = threading.Event()
148148

149+
# this variable is used to print accurate representation of how many
150+
# tasks have been completed in the logging output.
151+
self._added_task_count = 0
152+
149153
# use this to keep track of all the tasks added to the graph by their
150154
# task hashes. Used to determine if an identical task has been added
151155
# to the taskgraph during `add_task`
@@ -526,6 +530,7 @@ def add_task(
526530
raise ValueError(
527531
"The task graph is closed and cannot accept more "
528532
"tasks.")
533+
self._added_task_count += 1
529534
if args is None:
530535
args = []
531536
if kwargs is None:
@@ -661,20 +666,19 @@ def _execution_monitor(self):
661666
task_name, time.time() - task_time)
662667
for task_name, task_time in self._active_task_list])
663668

664-
total_tasks = len(self._task_hash_map)
665669
completed_tasks = len(self._completed_task_names)
666670
percent_complete = 0.0
667-
if total_tasks > 0:
671+
if self._added_task_count > 0:
668672
percent_complete = 100.0 * (
669-
float(completed_tasks) / total_tasks)
673+
float(completed_tasks) / self._added_task_count)
670674

671675
LOGGER.info(
672676
"\n\ttaskgraph execution status: tasks added: %d \n"
673677
"\ttasks complete: %d (%.1f%%) \n"
674678
"\ttasks waiting for a free worker: %d (qsize: %d)\n"
675-
"\ttasks executing (%d): graph is %s\n%s", total_tasks,
676-
completed_tasks, percent_complete, self._task_waiting_count,
677-
queue_length, active_task_count,
679+
"\ttasks executing (%d): graph is %s\n%s",
680+
self._added_task_count, completed_tasks, percent_complete,
681+
self._task_waiting_count, queue_length, active_task_count,
678682
'closed' if self._closed else 'open',
679683
active_task_message)
680684

@@ -926,13 +930,13 @@ def __init__(
926930
try:
927931
scrubbed_value = _scrub_task_args(arg, self._target_path_list)
928932
_ = pickle.dumps(scrubbed_value)
929-
kwargs_clean[arg] = scrubbed_value
933+
kwargs_clean[key] = scrubbed_value
930934
except TypeError:
931935
LOGGER.warning(
932-
"could not pickle kw argument %s (%s). "
936+
"could not pickle kw argument %s (%s) scrubbed to %s. "
933937
"Skipping argument which means it will not be considered "
934938
"when calculating whether inputs have been changed "
935-
"on a successive run.", key, arg)
939+
"on a successive run.", key, arg, scrubbed_value)
936940

937941
self._reexecution_info = {
938942
'func_name': self._func.__name__,
@@ -1125,7 +1129,8 @@ def is_precalculated(self):
11251129
other_arguments = list(_filter_non_files(
11261130
[self._reexecution_info['args_clean'],
11271131
self._reexecution_info['kwargs_clean']],
1128-
self._target_path_list+self._ignore_path_list,
1132+
self._target_path_list,
1133+
self._ignore_path_list,
11291134
self._ignore_directories))
11301135

11311136
LOGGER.debug("file_stat_list: %s", file_stat_list)
@@ -1174,17 +1179,22 @@ def is_precalculated(self):
11741179
'Path not found: %s' % path)
11751180
continue
11761181
if hash_algorithm == 'sizetimestamp':
1177-
size, modified_time = [
1178-
float(x) for x in hash_string.split(':')]
1182+
size, modified_time, actual_path = [
1183+
x for x in hash_string.split('::')]
1184+
if actual_path != path:
1185+
mismatched_target_file_list.append(
1186+
"Path names don't match\n"
1187+
"cached: (%s)\nactual (%s)" % (path, actual_path))
11791188
target_modified_time = os.path.getmtime(path)
1180-
if not math.isclose(modified_time, target_modified_time):
1189+
if not math.isclose(
1190+
float(modified_time), target_modified_time):
11811191
mismatched_target_file_list.append(
11821192
"Modified times don't match "
11831193
"cached: (%f) actual: (%f)" % (
1184-
modified_time, target_modified_time))
1194+
float(modified_time), target_modified_time))
11851195
continue
11861196
target_size = os.path.getsize(path)
1187-
if size != target_size:
1197+
if float(size) != target_size:
11881198
mismatched_target_file_list.append(
11891199
"File sizes don't match "
11901200
"cached: (%s) actual: (%s)" % (
@@ -1305,7 +1315,7 @@ def _get_file_stats(
13051315

13061316

13071317
def _filter_non_files(
1308-
base_value, keep_list, keep_directories):
1318+
base_value, keep_list, ignore_list, keep_directories):
13091319
"""Remove any values that are files not in ignore list or directories.
13101320
13111321
Parameters:
@@ -1314,6 +1324,7 @@ def _filter_non_files(
13141324
contains filepaths in any nested structure.
13151325
keep_list (list): any paths found in this list are not filtered.
13161326
All paths in this list should be "os.path.norm"ed.
1327+
ignore_list (list): any paths found in this list are filtered.
13171328
keep_directories (boolean): If True directories are not filtered
13181329
out.
13191330
@@ -1325,7 +1336,7 @@ def _filter_non_files(
13251336
if isinstance(base_value, _VALID_PATH_TYPES):
13261337
try:
13271338
norm_path = _normalize_path(base_value)
1328-
if (norm_path in keep_list or (
1339+
if norm_path not in ignore_list and (norm_path in keep_list or (
13291340
os.path.isdir(norm_path) and keep_directories) or
13301341
not os.path.isfile(norm_path)):
13311342
yield norm_path
@@ -1341,12 +1352,12 @@ def _filter_non_files(
13411352
for key in base_value.keys():
13421353
value = base_value[key]
13431354
for filter_value in _filter_non_files(
1344-
value, keep_list, keep_directories):
1355+
value, keep_list, ignore_list, keep_directories):
13451356
yield (value, filter_value)
13461357
elif isinstance(base_value, (list, set, tuple)):
13471358
for value in base_value:
13481359
for filter_value in _filter_non_files(
1349-
value, keep_list, keep_directories):
1360+
value, keep_list, ignore_list, keep_directories):
13501361
yield filter_value
13511362
else:
13521363
yield base_value
@@ -1432,8 +1443,9 @@ def _hash_file(file_path, hash_algorithm, buf_size=2**20):
14321443
"""
14331444
if hash_algorithm == 'sizetimestamp':
14341445
norm_path = _normalize_path(file_path)
1435-
return '%d:%f' % (
1436-
os.path.getsize(norm_path), os.path.getmtime(norm_path))
1446+
return '%d::%f::%s' % (
1447+
os.path.getsize(norm_path), os.path.getmtime(norm_path),
1448+
norm_path)
14371449
hash_func = hashlib.new(hash_algorithm)
14381450
with open(file_path, 'rb') as f:
14391451
binary_data = f.read(buf_size)
@@ -1445,12 +1457,13 @@ def _hash_file(file_path, hash_algorithm, buf_size=2**20):
14451457

14461458
def _normalize_path(path):
14471459
"""Convert `path` into normalized, normcase, absolute filepath."""
1448-
norm_path = os.path.normpath(os.path.normcase(path))
1460+
norm_path = os.path.normpath(path)
14491461
try:
1450-
return os.path.abspath(norm_path)
1462+
abs_path = os.path.abspath(norm_path)
14511463
except TypeError:
14521464
# this occurs when encountering VERY long strings that might be
14531465
# interpreted as paths
14541466
LOGGER.warn(
14551467
"failed to abspath %s so returning normalized path instead")
1456-
return norm_path
1468+
abs_path = norm_path
1469+
return os.path.normcase(abs_path)

tests/test_task.py

Lines changed: 154 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,18 @@
1212
import logging.handlers
1313
import multiprocessing
1414
import mock
15+
import importlib
1516

1617
import taskgraph
1718

1819
LOGGER = logging.getLogger(__name__)
1920

2021
N_TEARDOWN_RETRIES = 5
2122

22-
# Python 3 relocated the reload function to imp.
23-
if 'reload' not in __builtins__:
24-
from imp import reload
23+
24+
def _noop_function(**kwargs):
25+
"""Does nothing except allow kwargs to be passed."""
26+
pass
2527

2628

2729
def _long_running_function(delay):
@@ -167,7 +169,7 @@ def test_version_not_loaded(self):
167169
with self.assertRaises(RuntimeError):
168170
# RuntimeError is a side effect of `import taskgraph`, so we
169171
# reload it to retrigger the metadata load.
170-
taskgraph = reload(taskgraph)
172+
taskgraph = importlib.reload(taskgraph)
171173

172174
def test_single_task(self):
173175
"""TaskGraph: Test a single task."""
@@ -1197,6 +1199,154 @@ def test_duplicate_but_different_target(self):
11971199
contents = target_file.read()
11981200
self.assertEqual(contents, test_string)
11991201

1202+
def test_modifying_functions_with_copy(self):
1203+
"""TaskGraph: test with copy artifacts and ignore inputs."""
1204+
n_runs_a = 0
1205+
n_runs_b = 0
1206+
1207+
a_path = os.path.join(self.workspace_dir, 'a.txt')
1208+
b_path = os.path.join(self.workspace_dir, 'b.txt')
1209+
volatile_path = os.path.join(self.workspace_dir, 'volatile.txt')
1210+
d_path = os.path.join(self.workspace_dir, 'd.txt')
1211+
1212+
b_path_suffix = os.path.join(self.workspace_dir, 'b_suffix.txt')
1213+
volatile_path_suffix = os.path.join(self.workspace_dir, 'volatile_suffix.txt')
1214+
d_path_suffix = os.path.join(self.workspace_dir, 'd_suffix.txt')
1215+
1216+
with open(a_path, 'w') as a_file:
1217+
a_file.write('a file')
1218+
1219+
def run_a_batch(a_path, b_path, volatile_path, d_path):
1220+
def _a(a_path, target_path):
1221+
nonlocal n_runs_a
1222+
n_runs_a += 1
1223+
if not os.path.exists(a_path):
1224+
raise RuntimeError("a_path doesn't exist")
1225+
with open(target_path, 'w') as target_file:
1226+
target_file.write('_a result')
1227+
1228+
def _b(b_path, volatile_path, target_path):
1229+
nonlocal n_runs_b
1230+
n_runs_b += 1
1231+
if not os.path.exists(a_path):
1232+
raise RuntimeError("a_path path doesn't exist")
1233+
with open(volatile_path, 'w') as volitile_file:
1234+
volitile_file.write('_b volatile')
1235+
with open(target_path, 'w') as target_file:
1236+
target_file.write('_b result')
1237+
1238+
task_graph = taskgraph.TaskGraph(self.workspace_dir, -1, 0)
1239+
task_a = task_graph.add_task(
1240+
func=_a,
1241+
args=(a_path, b_path),
1242+
target_path_list=[b_path],
1243+
hash_algorithm='md5',
1244+
copy_duplicate_artifact=True,
1245+
task_name='_a task')
1246+
_ = task_graph.add_task(
1247+
func=_b,
1248+
args=(b_path, volatile_path, d_path),
1249+
target_path_list=[d_path],
1250+
ignore_path_list=[volatile_path],
1251+
hash_algorithm='md5',
1252+
copy_duplicate_artifact=True,
1253+
dependent_task_list=[task_a],
1254+
task_name='_b task')
1255+
task_graph.join()
1256+
task_graph.close()
1257+
del task_graph
1258+
1259+
run_a_batch(a_path, b_path, volatile_path, d_path)
1260+
run_a_batch(a_path, b_path, volatile_path, d_path)
1261+
run_a_batch(a_path, b_path_suffix, volatile_path_suffix, d_path_suffix)
1262+
1263+
self.assertTrue(n_runs_a == 1)
1264+
self.assertTrue(n_runs_b == 1)
1265+
1266+
def test_expected_path_list(self):
1267+
"""TaskGraph: test expected path list matches actual path list."""
1268+
def _create_file(target_path, content):
1269+
with open(target_path, 'w') as target_file:
1270+
target_file.write(content)
1271+
1272+
task_graph = taskgraph.TaskGraph(self.workspace_dir, -1, 0)
1273+
# note it is important this is a relative path that does not
1274+
# contain the drive letter on Windows.
1275+
absolute_target_file_path = os.path.join(
1276+
self.workspace_dir, 'a.txt')
1277+
relative_path = os.path.relpath(absolute_target_file_path)
1278+
1279+
_ = task_graph.add_task(
1280+
func=_create_file,
1281+
args=(relative_path, 'test value'),
1282+
target_path_list=[relative_path],
1283+
task_name='create file')
1284+
1285+
task_graph.close()
1286+
task_graph.join()
1287+
del task_graph
1288+
1289+
self.assertTrue('Ran without crashing!')
1290+
1291+
def test_kwargs_hashed(self):
1292+
"""TaskGraph: ensure kwargs are considered in determining id hash."""
1293+
task_graph = taskgraph.TaskGraph(self.workspace_dir, -1, 0)
1294+
1295+
task_a = task_graph.add_task(
1296+
func=_noop_function,
1297+
kwargs={
1298+
'content': ['this value: a']},
1299+
task_name='noop a')
1300+
1301+
task_b = task_graph.add_task(
1302+
func=_noop_function,
1303+
kwargs={
1304+
'content': ['this value b']},
1305+
task_name='noop b')
1306+
1307+
task_graph.close()
1308+
task_graph.join()
1309+
del task_graph
1310+
1311+
self.assertNotEqual(
1312+
task_a._task_id_hash, task_b._task_id_hash,
1313+
"task ids should be different since the kwargs are different")
1314+
1315+
def test_same_timestamp_and_value(self):
1316+
"""TaskGraph: ensure identical files but filename are noticed."""
1317+
task_graph = taskgraph.TaskGraph(self.workspace_dir, -1, 0)
1318+
1319+
file_a_path = os.path.join(self.workspace_dir, 'file_a.txt')
1320+
file_b_path = os.path.join(self.workspace_dir, 'file_b.txt')
1321+
1322+
with open(file_a_path, 'w') as file_a:
1323+
file_a.write('a')
1324+
with open(file_b_path, 'w') as file_b:
1325+
file_b.write('a')
1326+
1327+
os.utime(file_a_path, (0, 0))
1328+
os.utime(file_b_path, (0, 0))
1329+
1330+
task_a = task_graph.add_task(
1331+
func=_noop_function,
1332+
kwargs={
1333+
'path': file_a_path},
1334+
task_name='noop a')
1335+
1336+
task_b = task_graph.add_task(
1337+
func=_noop_function,
1338+
kwargs={
1339+
'path': file_b_path},
1340+
task_name='noop b')
1341+
1342+
task_graph.close()
1343+
task_graph.join()
1344+
del task_graph
1345+
1346+
self.assertNotEqual(
1347+
task_a._task_id_hash, task_b._task_id_hash,
1348+
"task ids should be different since the filenames are different")
1349+
12001350

12011351
def Fail(n_tries, result_path):
12021352
"""Create a function that fails after `n_tries`."""

0 commit comments

Comments
 (0)