forked from seung-lab/python-task-queue
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathaws_queue_api.py
More file actions
130 lines (101 loc) · 3.22 KB
/
aws_queue_api.py
File metadata and controls
130 lines (101 loc) · 3.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import json
import re
import boto3
import botocore
from .secrets import aws_credentials
class AWSTaskQueueAPI(object):
def __init__(self, qurl, region_name=None):
"""
qurl: either a queue name (e.g. 'pull_queue') or a url
like https://sqs.us-east-1.amazonaws.com/DIGITS/wms-pull-queue
"""
matches = re.search(r'sqs.([\w\d-]+).amazonaws', qurl)
if matches is not None:
region_name, = matches.groups()
self._qurl = qurl
else:
self._qurl = None
credentials = aws_credentials()
self._sqs = boto3.client('sqs',
region_name=region_name,
aws_secret_access_key=credentials['AWS_SECRET_ACCESS_KEY'],
aws_access_key_id=credentials['AWS_ACCESS_KEY_ID'],
)
if self._qurl is None:
self._qurl = self._sqs.get_queue_url(QueueName=qurl)["QueueUrl"]
@property
def enqueued(self):
status = self.status()
return int(status['ApproximateNumberOfMessages']) + int(status['ApproximateNumberOfMessagesNotVisible'])
def status(self):
resp = self._sqs.get_queue_attributes(
QueueUrl=self._qurl,
AttributeNames=['ApproximateNumberOfMessages', 'ApproximateNumberOfMessagesNotVisible']
)
return resp['Attributes']
def insert(self, task):
resp = self._sqs.send_message(
QueueUrl=self._qurl,
DelaySeconds=0,
MessageBody=json.dumps(task),
)
return resp['MessageId']
def renew_lease(self, seconds):
raise NotImplementedError()
def cancel_lease(self, rhandle):
raise NotImplementedError()
def _request(self, num_tasks, visibility_timeout):
resp = self._sqs.receive_message(
QueueUrl=self._qurl,
AttributeNames=[
'SentTimestamp'
],
MaxNumberOfMessages=num_tasks,
MessageAttributeNames=[
'All'
],
VisibilityTimeout=visibility_timeout,
WaitTimeSeconds=0,
)
if 'Messages' not in resp:
return []
tasks = []
for msg in resp['Messages']:
task = json.loads(msg['Body'])
task['id'] = msg['ReceiptHandle']
tasks.append(task)
return tasks
def lease(self, seconds, numTasks=1, groupByTag=False, tag=''):
if numTasks > 1:
raise ValueError("This library (not boto/SQS) only supports fetching one task at a time. Requested: {}.".format(numTasks))
return self._request(numTasks, seconds)
def acknowledge(self, task):
return self.delete(task)
def delete(self, task):
if type(task) == str:
rhandle = task
else:
try:
rhandle = task._id
except AttributeError:
rhandle = task['id']
try:
self._sqs.delete_message(
QueueUrl=self._qurl,
ReceiptHandle=rhandle,
)
except botocore.exceptions.ClientError as err:
pass
def purge(self):
# This is more efficient, but it kept freezing
# try:
# self._sqs.purge_queue(QueueUrl=self._qurl)
# except botocore.errorfactory.PurgeQueueInProgress:
while self.enqueued:
# visibility_timeout must be > 0 for delete to work
tasks = self._request(num_tasks=10, visibility_timeout=10)
for task in tasks:
self.delete(task)
return self
def list(self):
return self._request(num_tasks=10, visibility_timeout=0)