@@ -146,6 +146,10 @@ def __init__(
146146
147147 self ._taskgraph_started_event = threading .Event ()
148148
149+ # this variable is used to print accurate representation of how many
150+ # tasks have been completed in the logging output.
151+ self ._added_task_count = 0
152+
149153 # use this to keep track of all the tasks added to the graph by their
150154 # task hashes. Used to determine if an identical task has been added
151155 # to the taskgraph during `add_task`
@@ -526,6 +530,7 @@ def add_task(
526530 raise ValueError (
527531 "The task graph is closed and cannot accept more "
528532 "tasks." )
533+ self ._added_task_count += 1
529534 if args is None :
530535 args = []
531536 if kwargs is None :
@@ -661,20 +666,19 @@ def _execution_monitor(self):
661666 task_name , time .time () - task_time )
662667 for task_name , task_time in self ._active_task_list ])
663668
664- total_tasks = len (self ._task_hash_map )
665669 completed_tasks = len (self ._completed_task_names )
666670 percent_complete = 0.0
667- if total_tasks > 0 :
671+ if self . _added_task_count > 0 :
668672 percent_complete = 100.0 * (
669- float (completed_tasks ) / total_tasks )
673+ float (completed_tasks ) / self . _added_task_count )
670674
671675 LOGGER .info (
672676 "\n \t taskgraph execution status: tasks added: %d \n "
673677 "\t tasks complete: %d (%.1f%%) \n "
674678 "\t tasks waiting for a free worker: %d (qsize: %d)\n "
675- "\t tasks executing (%d): graph is %s\n %s" , total_tasks ,
676- completed_tasks , percent_complete , self . _task_waiting_count ,
677- queue_length , active_task_count ,
679+ "\t tasks executing (%d): graph is %s\n %s" ,
680+ self . _added_task_count , completed_tasks , percent_complete ,
681+ self . _task_waiting_count , queue_length , active_task_count ,
678682 'closed' if self ._closed else 'open' ,
679683 active_task_message )
680684
@@ -926,13 +930,13 @@ def __init__(
926930 try :
927931 scrubbed_value = _scrub_task_args (arg , self ._target_path_list )
928932 _ = pickle .dumps (scrubbed_value )
929- kwargs_clean [arg ] = scrubbed_value
933+ kwargs_clean [key ] = scrubbed_value
930934 except TypeError :
931935 LOGGER .warning (
932- "could not pickle kw argument %s (%s). "
936+ "could not pickle kw argument %s (%s) scrubbed to %s . "
933937 "Skipping argument which means it will not be considered "
934938 "when calculating whether inputs have been changed "
935- "on a successive run." , key , arg )
939+ "on a successive run." , key , arg , scrubbed_value )
936940
937941 self ._reexecution_info = {
938942 'func_name' : self ._func .__name__ ,
@@ -1125,7 +1129,8 @@ def is_precalculated(self):
11251129 other_arguments = list (_filter_non_files (
11261130 [self ._reexecution_info ['args_clean' ],
11271131 self ._reexecution_info ['kwargs_clean' ]],
1128- self ._target_path_list + self ._ignore_path_list ,
1132+ self ._target_path_list ,
1133+ self ._ignore_path_list ,
11291134 self ._ignore_directories ))
11301135
11311136 LOGGER .debug ("file_stat_list: %s" , file_stat_list )
@@ -1174,17 +1179,22 @@ def is_precalculated(self):
11741179 'Path not found: %s' % path )
11751180 continue
11761181 if hash_algorithm == 'sizetimestamp' :
1177- size , modified_time = [
1178- float (x ) for x in hash_string .split (':' )]
1182+ size , modified_time , actual_path = [
1183+ x for x in hash_string .split ('::' )]
1184+ if actual_path != path :
1185+ mismatched_target_file_list .append (
1186+ "Path names don't match\n "
1187+ "cached: (%s)\n actual (%s)" % (path , actual_path ))
11791188 target_modified_time = os .path .getmtime (path )
1180- if not math .isclose (modified_time , target_modified_time ):
1189+ if not math .isclose (
1190+ float (modified_time ), target_modified_time ):
11811191 mismatched_target_file_list .append (
11821192 "Modified times don't match "
11831193 "cached: (%f) actual: (%f)" % (
1184- modified_time , target_modified_time ))
1194+ float ( modified_time ) , target_modified_time ))
11851195 continue
11861196 target_size = os .path .getsize (path )
1187- if size != target_size :
1197+ if float ( size ) != target_size :
11881198 mismatched_target_file_list .append (
11891199 "File sizes don't match "
11901200 "cached: (%s) actual: (%s)" % (
@@ -1305,7 +1315,7 @@ def _get_file_stats(
13051315
13061316
13071317def _filter_non_files (
1308- base_value , keep_list , keep_directories ):
1318+ base_value , keep_list , ignore_list , keep_directories ):
13091319 """Remove any values that are files not in ignore list or directories.
13101320
13111321 Parameters:
@@ -1314,6 +1324,7 @@ def _filter_non_files(
13141324 contains filepaths in any nested structure.
13151325 keep_list (list): any paths found in this list are not filtered.
13161326 All paths in this list should be "os.path.norm"ed.
1327+ ignore_list (list): any paths found in this list are filtered.
13171328 keep_directories (boolean): If True directories are not filtered
13181329 out.
13191330
@@ -1325,7 +1336,7 @@ def _filter_non_files(
13251336 if isinstance (base_value , _VALID_PATH_TYPES ):
13261337 try :
13271338 norm_path = _normalize_path (base_value )
1328- if (norm_path in keep_list or (
1339+ if norm_path not in ignore_list and (norm_path in keep_list or (
13291340 os .path .isdir (norm_path ) and keep_directories ) or
13301341 not os .path .isfile (norm_path )):
13311342 yield norm_path
@@ -1341,12 +1352,12 @@ def _filter_non_files(
13411352 for key in base_value .keys ():
13421353 value = base_value [key ]
13431354 for filter_value in _filter_non_files (
1344- value , keep_list , keep_directories ):
1355+ value , keep_list , ignore_list , keep_directories ):
13451356 yield (value , filter_value )
13461357 elif isinstance (base_value , (list , set , tuple )):
13471358 for value in base_value :
13481359 for filter_value in _filter_non_files (
1349- value , keep_list , keep_directories ):
1360+ value , keep_list , ignore_list , keep_directories ):
13501361 yield filter_value
13511362 else :
13521363 yield base_value
@@ -1432,8 +1443,9 @@ def _hash_file(file_path, hash_algorithm, buf_size=2**20):
14321443 """
14331444 if hash_algorithm == 'sizetimestamp' :
14341445 norm_path = _normalize_path (file_path )
1435- return '%d:%f' % (
1436- os .path .getsize (norm_path ), os .path .getmtime (norm_path ))
1446+ return '%d::%f::%s' % (
1447+ os .path .getsize (norm_path ), os .path .getmtime (norm_path ),
1448+ norm_path )
14371449 hash_func = hashlib .new (hash_algorithm )
14381450 with open (file_path , 'rb' ) as f :
14391451 binary_data = f .read (buf_size )
@@ -1445,12 +1457,13 @@ def _hash_file(file_path, hash_algorithm, buf_size=2**20):
14451457
14461458def _normalize_path (path ):
14471459 """Convert `path` into normalized, normcase, absolute filepath."""
1448- norm_path = os .path .normpath (os . path . normcase ( path ) )
1460+ norm_path = os .path .normpath (path )
14491461 try :
1450- return os .path .abspath (norm_path )
1462+ abs_path = os .path .abspath (norm_path )
14511463 except TypeError :
14521464 # this occurs when encountering VERY long strings that might be
14531465 # interpreted as paths
14541466 LOGGER .warn (
14551467 "failed to abspath %s so returning normalized path instead" )
1456- return norm_path
1468+ abs_path = norm_path
1469+ return os .path .normcase (abs_path )
0 commit comments