forked from castle/castle-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient.py
More file actions
132 lines (114 loc) · 4.91 KB
/
client.py
File metadata and controls
132 lines (114 loc) · 4.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from castle.api_request import APIRequest
from castle.commands.authenticate import CommandsAuthenticate
from castle.commands.filter import CommandsFilter
from castle.commands.log import CommandsLog
from castle.commands.risk import CommandsRisk
from castle.commands.start_impersonation import CommandsStartImpersonation
from castle.commands.end_impersonation import CommandsEndImpersonation
from castle.commands.track import CommandsTrack
from castle.configuration import configuration
from castle.context.prepare import ContextPrepare
from castle.errors import InternalServerError, RequestError, ImpersonationFailed
from castle.failover.prepare_response import FailoverPrepareResponse
from castle.failover.strategy import FailoverStrategy
class Client(object):
@classmethod
def from_request(cls, request, options=None):
if options is None:
options = {}
options = options.copy()
options['context'] = ContextPrepare.call(request, options)
return cls(options)
@staticmethod
def failover_response_or_raise(user_id, exception):
if configuration.failover_strategy == FailoverStrategy.THROW.value:
raise exception
return FailoverPrepareResponse(
user_id, None, exception.__class__.__name__
).call()
def __init__(self, options=None):
if options is None:
options = {}
self.do_not_track = options.get('do_not_track', False)
self.timestamp = options.get('timestamp')
self.context = options.get('context')
self.api = APIRequest()
def _add_timestamp_if_necessary(self, options):
if self.timestamp:
options.setdefault('timestamp', self.timestamp)
def authenticate(self, options):
if self.tracked():
self._add_timestamp_if_necessary(options)
command = CommandsAuthenticate(self.context).call(options)
try:
response = self.api.call(command)
response.update(failover=False, failover_reason=None)
return response
except (RequestError, InternalServerError) as exception:
return Client.failover_response_or_raise(options.get('user_id'), exception)
else:
return FailoverPrepareResponse(
options.get('user_id'),
'allow',
'Castle set to do not track.'
).call()
def filter(self, options):
if self.tracked():
self._add_timestamp_if_necessary(options)
command = CommandsFilter(self.context).call(options)
try:
response = self.api.call(command)
response.update(failover=False, failover_reason=None)
return response
except (RequestError, InternalServerError) as exception:
return Client.failover_response_or_raise(options.get('user').get('id'), exception)
else:
return FailoverPrepareResponse(
options.get('user').get('id'),
'allow',
'Castle set to do not track.'
).call()
def log(self, options):
if not self.tracked():
return None
self._add_timestamp_if_necessary(options)
return self.api.call(CommandsLog(self.context).call(options))
def risk(self, options):
if self.tracked():
self._add_timestamp_if_necessary(options)
command = CommandsRisk(self.context).call(options)
try:
response = self.api.call(command)
response.update(failover=False, failover_reason=None)
return response
except (RequestError, InternalServerError) as exception:
return Client.failover_response_or_raise(options.get('user').get('id'), exception)
else:
return FailoverPrepareResponse(
options.get('user').get('id'),
'allow',
'Castle set to do not track.'
).call()
def start_impersonation(self, options):
self._add_timestamp_if_necessary(options)
response = self.api.call(CommandsStartImpersonation(self.context).call(options))
if not response.get('success'):
raise ImpersonationFailed
return response
def end_impersonation(self, options):
self._add_timestamp_if_necessary(options)
response = self.api.call(CommandsEndImpersonation(self.context).call(options))
if not response.get('success'):
raise ImpersonationFailed
return response
def track(self, options):
if not self.tracked():
return None
self._add_timestamp_if_necessary(options)
return self.api.call(CommandsTrack(self.context).call(options))
def disable_tracking(self):
self.do_not_track = True
def enable_tracking(self):
self.do_not_track = False
def tracked(self):
return not self.do_not_track