forked from aws/aws-cli
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathassumerole.py
More file actions
88 lines (71 loc) · 3.1 KB
/
assumerole.py
File metadata and controls
88 lines (71 loc) · 3.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import os
import json
import logging
from botocore.exceptions import ProfileNotFound
LOG = logging.getLogger(__name__)
def register_assume_role_provider(event_handlers):
event_handlers.register('session-initialized',
inject_assume_role_provider_cache,
unique_id='inject_assume_role_cred_provider_cache')
def inject_assume_role_provider_cache(session, **kwargs):
try:
cred_chain = session.get_component('credential_provider')
except ProfileNotFound:
# If a user has provided a profile that does not exist,
# trying to retrieve components/config on the session
# will raise ProfileNotFound. Sometimes this is invalid:
#
# "ec2 describe-instances --profile unknown"
#
# and sometimes this is perfectly valid:
#
# "configure set region us-west-2 --profile brand-new-profile"
#
# Because we can't know (and don't want to know) whether
# the customer is trying to do something valid, we just
# immediately return. If it's invalid something else
# up the stack will raise ProfileNotFound, otherwise
# the configure (and other) commands will work as expected.
LOG.debug("ProfileNotFound caught when trying to inject "
"assume-role cred provider cache. Not configuring "
"JSONFileCache for assume-role.")
return
provider = cred_chain.get_provider('assume-role')
provider.cache = JSONFileCache()
class JSONFileCache(object):
"""JSON file cache.
This provides a dict like interface that stores JSON serializable
objects.
The objects are serialized to JSON and stored in a file. These
values can be retrieved at a later time.
"""
CACHE_DIR = os.path.expanduser(os.path.join('~', '.aws', 'cli', 'cache'))
def __init__(self, working_dir=CACHE_DIR):
self._working_dir = working_dir
def __contains__(self, cache_key):
actual_key = self._convert_cache_key(cache_key)
return os.path.isfile(actual_key)
def __getitem__(self, cache_key):
"""Retrieve value from a cache key."""
actual_key = self._convert_cache_key(cache_key)
try:
with open(actual_key) as f:
return json.load(f)
except (OSError, ValueError, IOError):
raise KeyError(cache_key)
def __setitem__(self, cache_key, value):
full_key = self._convert_cache_key(cache_key)
try:
file_content = json.dumps(value)
except (TypeError, ValueError):
raise ValueError("Value cannot be cached, must be "
"JSON serializable: %s" % value)
if not os.path.isdir(self._working_dir):
os.makedirs(self._working_dir)
with os.fdopen(os.open(full_key,
os.O_WRONLY | os.O_CREAT, 0o600), 'w') as f:
f.truncate()
f.write(file_content)
def _convert_cache_key(self, cache_key):
full_path = os.path.join(self._working_dir, cache_key + '.json')
return full_path