Skip to content

Commit 95bad36

Browse files
authored
Merge pull request #40 from AlpineNow/tjb2
Update wait_until_finished to properly detect final workflow status.
2 parents 91d4019 + f2774e4 commit 95bad36

File tree

3 files changed

+39
-32
lines changed

3 files changed

+39
-32
lines changed

README.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,6 @@ Running a workflow and downloading the results::
3434
>>> import alpine as AlpineAPI
3535
>>> session = AlpineAPI.APIClient(host, port, username, password)
3636
>>> process_id = session.workfile.process.run(workfile_id)
37-
>>> session.workfile.process.wait_until_finished(process_id)
37+
>>> session.workfile.process.wait_until_finished(workfile_id, process_id)
3838
>>> results = session.workfile.process.download(workfile_id, process_id)
3939

alpine/workfile.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -433,8 +433,8 @@ def download_results(self, workflow_id, process_id):
433433
"""
434434
Downloads a workflow run result.
435435
436-
:param str workflow_id: ID of the workflow.
437-
:param ste process_id: ID of a particular workflow run.
436+
:param int workflow_id: ID of the workflow.
437+
:param str process_id: ID of a particular workflow run.
438438
:return: JSON object of workflow results.
439439
:rtype: dict
440440
:exception WorkspaceNotFoundException: The workspace does not exist.
@@ -488,13 +488,14 @@ def stop(self, process_id):
488488
raise StopFlowFailureException("Stopping the workflow failed with status {0}: {1}"
489489
.format(response.status_code, response.reason))
490490

491-
def wait_until_finished(self, process_id, verbose=False, query_time=10, timeout=3600):
491+
def wait_until_finished(self, workflow_id, process_id, verbose=False, query_time=10, timeout=3600):
492492
"""
493493
Waits for a running workflow to finish.
494494
495-
:param str process_id: Process ID of the workflow.
495+
:param int workflow_id: ID of a particular workflow run.
496+
:param str process_id: ID of a particular workflow run.
496497
:param bool verbose: Optionally print approximate run time.
497-
:param float query_time: Number of seconds between status queries.
498+
:param float query_time: Number of seconds between status queries. Minimum of 1 second.
498499
:param float timeout: Amount of time in seconds to wait for workflow to finish. Will stop if exceeded.
499500
:return: Workflow run status.
500501
:rtype: str
@@ -503,17 +504,20 @@ def wait_until_finished(self, process_id, verbose=False, query_time=10, timeout=
503504
504505
Example::
505506
506-
>>> session.workfile.process.wait_until_finished(process_id = process_id)
507+
>>> session.workfile.process.wait_until_finished(workflow_id = workflow_id, process_id = process_id)
507508
508509
"""
510+
query_time = max(1, query_time)
509511

510512
start = time.time()
513+
511514
self.logger.debug("Waiting for process ID: <{0}> to complete...".format(process_id))
512-
wait_count = 0
513515

516+
# workflow_status = self.query_status2(workflow_id, process_id)
514517
workflow_status = self.query_status(process_id)
518+
515519
while workflow_status == "WORKING": # loop while waiting for workflow to complete
516-
wait_count += 1
520+
517521
wait_total = time.time() - start
518522

519523
if wait_total >= timeout:
@@ -523,11 +527,18 @@ def wait_until_finished(self, process_id, verbose=False, query_time=10, timeout=
523527
" It now has status '{2}'.".format(process_id, timeout, stop_status))
524528

525529
if verbose:
526-
print("\rWorkflow in progress for ~{0:.2f} seconds.".format(wait_total)),
530+
print("\rWorkflow in progress for ~{0:.1f} seconds.".format(wait_total)),
527531

528532
time.sleep(query_time)
529533

530-
if workflow_status == "FAILED":
531-
raise RunFlowFailureException("The workflow with process ID: <{0}> failed.".format(process_id))
532534
workflow_status = self.query_status(process_id)
533-
return workflow_status
535+
536+
if verbose:
537+
print("")
538+
539+
# If workflow is finished - find the final status, otherwise return status in progress.
540+
time.sleep(1)
541+
final_results = self.download_results(workflow_id, process_id)
542+
status = self.get_metadata(final_results)['status']
543+
544+
return status

doc/JupyterNotebookExamples/Introduction.ipynb

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
"# Introduction\n",
88
"\n",
99
"Let's start with a example of an Alpine API session.\n",
10+
"\n",
1011
"1. Initialize a session.\n",
1112
"1. Take a tour some commands.\n",
1213
"1. Run a workflow and download the results."
@@ -55,13 +56,14 @@
5556
"You'll also need your Alpine username and password.\n",
5657
"\n",
5758
"I've stored my connection information in a configuration file named `alpine_login.conf` that looks something like this:\n",
58-
"\n",
59+
"```JSON\n",
5960
" {\n",
6061
" \"host\": \"AlpineHost\",\n",
6162
" \"port\": \"PortNum\",\n",
6263
" \"username\": \"fakename\",\n",
6364
" \"password\": \"12345\"\n",
64-
" }"
65+
" }\n",
66+
"```"
6567
]
6668
},
6769
{
@@ -180,7 +182,7 @@
180182
" u'organization_uuid': None,\n",
181183
" u'users_license_limit_reached': False,\n",
182184
" u'vendor': u'alpine',\n",
183-
" u'version': u'6.3.0.0.5302-3f9996dea\\n',\n",
185+
" u'version': u'6.3.0.0.5313-80ba3a0fc\\n',\n",
184186
" u'workflow_enabled': True}\n"
185187
]
186188
}
@@ -200,7 +202,7 @@
200202
"name": "stdout",
201203
"output_type": "stream",
202204
"text": [
203-
"u'6.3.0.0.5302-3f9996dea'\n"
205+
"u'6.3.0.0.5313-80ba3a0fc'\n"
204206
]
205207
}
206208
],
@@ -337,15 +339,15 @@
337339
" u'roles': [u'admin'],\n",
338340
" u'subscribed_to_emails': False,\n",
339341
" u'tags': [],\n",
340-
" u'title': u'Assistant Regional Manager',\n",
342+
" u'title': u'Assistant to the Regional Manager',\n",
341343
" u'user_type': u'analytics_developer',\n",
342344
" u'username': u'tjbay',\n",
343345
" u'using_default_image': True}\n"
344346
]
345347
}
346348
],
347349
"source": [
348-
"pprint(session.user.update(user_id, title = \"Assistant Regional Manager\"))"
350+
"pprint(session.user.update(user_id, title = \"Assistant to the Regional Manager\"))"
349351
]
350352
},
351353
{
@@ -389,25 +391,18 @@
389391
"name": "stdout",
390392
"output_type": "stream",
391393
"text": [
392-
"Workflow in progress for ~45.72 seconds. "
394+
"Workflow in progress for ~40.3 seconds. \n"
393395
]
394396
},
395397
{
396398
"data": {
397399
"text/plain": [
398-
"'FINISHED'"
400+
"u'SUCCESS'"
399401
]
400402
},
401403
"execution_count": 13,
402404
"metadata": {},
403405
"output_type": "execute_result"
404-
},
405-
{
406-
"name": "stdout",
407-
"output_type": "stream",
408-
"text": [
409-
"\n"
410-
]
411406
}
412407
],
413408
"source": [
@@ -416,7 +411,8 @@
416411
"\n",
417412
"process_id = session.workfile.process.run(workflow_id)\n",
418413
"\n",
419-
"session.workfile.process.wait_until_finished(process_id = process_id,\n",
414+
"session.workfile.process.wait_until_finished(workflow_id = workflow_id,\n",
415+
" process_id = process_id,\n",
420416
" verbose = True,\n",
421417
" query_time = 5)"
422418
]
@@ -439,12 +435,12 @@
439435
"name": "stdout",
440436
"output_type": "stream",
441437
"text": [
442-
"{u'flowMetaInfo': {u'endTime': u'2017-03-07T15:27:27.740-0800',\n",
438+
"{u'flowMetaInfo': {u'endTime': u'2017-03-14T16:57:24.306-0700',\n",
443439
" u'executeUser': u'7',\n",
444440
" u'noOfError': 0,\n",
445441
" u'noOfNodesProcessed': 3,\n",
446-
" u'processId': u'3f5e7d3c-462e-462b-81e1-1eef81c63b2e',\n",
447-
" u'startTime': u'2017-03-07T15:26:40.830-0800',\n",
442+
" u'processId': u'dad3bfc0-43b1-439f-ab16-f9f761621fbd',\n",
443+
" u'startTime': u'2017-03-14T16:56:37.641-0700',\n",
448444
" u'status': u'SUCCESS',\n",
449445
" u'workflowId': u'1269',\n",
450446
" u'workflowName': u'Data ETL'},\n",

0 commit comments

Comments
 (0)