forked from seung-lab/python-task-queue
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueueables.py
More file actions
42 lines (34 loc) · 1.08 KB
/
queueables.py
File metadata and controls
42 lines (34 loc) · 1.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from functools import partial
import orjson
from .queueablefns import totask as qtotask, FunctionTask, FunctionTaskLite, tofunc
from .registered_task import totask as rtotask, RegisteredTask
def totask(task):
if isinstance(task, (FunctionTask, RegisteredTask)):
return task
if type(task) is bytes:
task = task.decode('utf8')
if isinstance(task, str):
task = orjson.loads(task)
ident = -1
if isinstance(task, dict):
ident = task.get('id', -1)
if 'payload' in task:
task = task['payload']
if isinstance(task, FunctionTaskLite):
return FunctionTask(*task)
elif isinstance(task, list):
task[3] = ident
return FunctionTask(*task)
elif isinstance(task, dict):
return rtotask(task, ident)
elif isinstance(task, partial) or callable(task):
return qtotask(task, ident)
raise ValueError("Unable to convert {} into a task object.".format(task))
def totaskid(taskid):
if hasattr(taskid, 'id'):
return taskid.id
elif 'id' in taskid:
return taskid['id']
elif isinstance(taskid, (list, tuple)):
return taskid[3]
return taskid