forked from theskumar/python-dotenv
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
350 lines (272 loc) · 9.81 KB
/
main.py
File metadata and controls
350 lines (272 loc) · 9.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import codecs
import io
import os
import re
import shutil
import sys
from subprocess import Popen
import tempfile
import warnings
from collections import OrderedDict, namedtuple
from contextlib import contextmanager
from .compat import StringIO, PY2, WIN, text_type
from .environ import env
__posix_variable = re.compile(r'\$\{[^\}]*\}')
_binding = re.compile(
r"""
(
\s* # leading whitespace
(?:export{0}+)? # export
( '[^']+' # single-quoted key
| [^=\#\s]+ # or unquoted key
)?
(?:
(?:{0}*={0}*) # equal sign
( '(?:\\'|[^'])*' # single-quoted value
| "(?:\\"|[^"])*" # or double-quoted value
| [^\#\r\n]* # or unquoted value
)
)?
\s* # trailing whitespace
(?:\#[^\r\n]*)? # comment
(?:\r|\n|\r\n)? # newline
)
""".format(r'[^\S\r\n]'),
re.MULTILINE | re.VERBOSE,
)
_escape_sequence = re.compile(r"\\[\\'\"abfnrtv]")
Binding = namedtuple('Binding', 'key value original')
def decode_escapes(string):
def decode_match(match):
return codecs.decode(match.group(0), 'unicode-escape')
return _escape_sequence.sub(decode_match, string)
def is_surrounded_by(string, char):
return (
len(string) > 1
and string[0] == string[-1] == char
)
def parse_binding(string, position):
match = _binding.match(string, position)
(matched, key, value) = match.groups()
if key is None or value is None:
key = None
value = None
else:
value_quoted = is_surrounded_by(value, "'") or is_surrounded_by(value, '"')
if value_quoted:
value = decode_escapes(value[1:-1])
else:
value = value.strip()
return (Binding(key=key, value=value, original=matched), match.end())
def parse_stream(stream):
string = stream.read()
position = 0
length = len(string)
while position < length:
(binding, position) = parse_binding(string, position)
yield binding
class DotEnv():
def __init__(self, dotenv_path, verbose=False):
self.dotenv_path = dotenv_path
self._dict = None
self.verbose = verbose
@contextmanager
def _get_stream(self):
if isinstance(self.dotenv_path, StringIO):
yield self.dotenv_path
elif os.path.isfile(self.dotenv_path):
with io.open(self.dotenv_path) as stream:
yield stream
else:
if self.verbose:
warnings.warn("File doesn't exist {}".format(self.dotenv_path))
yield StringIO('')
def dict(self):
"""Return dotenv as dict"""
if self._dict:
return self._dict
values = OrderedDict(self.parse())
self._dict = resolve_nested_variables(values)
return self._dict
def parse(self):
with self._get_stream() as stream:
for mapping in parse_stream(stream):
if mapping.key is not None and mapping.value is not None:
yield mapping.key, mapping.value
def set_as_environment_variables(self, override=False):
"""
Load the current dotenv as system environemt variable.
"""
for k, v in self.dict().items():
if k in os.environ and not override:
continue
# With Python2 on Windows, force environment variables to str to avoid
# "TypeError: environment can only contain strings" in Python's subprocess.py.
if PY2 and WIN:
if isinstance(k, text_type) or isinstance(v, text_type):
k = k.encode('ascii')
v = v.encode('ascii')
os.environ[k] = v
return True
def get(self, key):
"""
"""
data = self.dict()
if key in data:
return data[key]
if self.verbose:
warnings.warn("key %s not found in %s." % (key, self.dotenv_path))
def get_key(dotenv_path, key_to_get):
"""
Gets the value of a given key from the given .env
If the .env path given doesn't exist, fails
"""
return DotEnv(dotenv_path, verbose=True).get(key_to_get)
@contextmanager
def rewrite(path):
try:
with tempfile.NamedTemporaryFile(mode="w+", delete=False) as dest:
with io.open(path) as source:
yield (source, dest)
except BaseException:
if os.path.isfile(dest.name):
os.unlink(dest.name)
raise
else:
shutil.move(dest.name, path)
def set_key(dotenv_path, key_to_set, value_to_set, quote_mode="always"):
"""
Adds or Updates a key/value to the given .env
If the .env path given doesn't exist, fails instead of risking creating
an orphan .env somewhere in the filesystem
"""
value_to_set = value_to_set.strip("'").strip('"')
if not os.path.exists(dotenv_path):
warnings.warn("can't write to %s - it doesn't exist." % dotenv_path)
return None, key_to_set, value_to_set
if " " in value_to_set:
quote_mode = "always"
line_template = '{}="{}"\n' if quote_mode == "always" else '{}={}\n'
line_out = line_template.format(key_to_set, value_to_set)
with rewrite(dotenv_path) as (source, dest):
replaced = False
for mapping in parse_stream(source):
if mapping.key == key_to_set:
dest.write(line_out)
replaced = True
else:
dest.write(mapping.original)
if not replaced:
dest.write(line_out)
return True, key_to_set, value_to_set
def unset_key(dotenv_path, key_to_unset, quote_mode="always"):
"""
Removes a given key from the given .env
If the .env path given doesn't exist, fails
If the given key doesn't exist in the .env, fails
"""
if not os.path.exists(dotenv_path):
warnings.warn("can't delete from %s - it doesn't exist." % dotenv_path)
return None, key_to_unset
removed = False
with rewrite(dotenv_path) as (source, dest):
for mapping in parse_stream(source):
if mapping.key == key_to_unset:
removed = True
else:
dest.write(mapping.original)
if not removed:
warnings.warn("key %s not removed from %s - key doesn't exist." % (key_to_unset, dotenv_path))
return None, key_to_unset
return removed, key_to_unset
def resolve_nested_variables(values):
def _replacement(name):
"""
get appropriate value for a variable name.
first search in environ, if not found,
then look into the dotenv variables
"""
ret = os.getenv(name, new_values.get(name, ""))
return ret
def _re_sub_callback(match_object):
"""
From a match object gets the variable name and returns
the correct replacement
"""
return _replacement(match_object.group()[2:-1])
new_values = {}
for k, v in values.items():
new_values[k] = __posix_variable.sub(_re_sub_callback, v)
return new_values
def _walk_to_root(path):
"""
Yield directories starting from the given directory up to the root
"""
if not os.path.exists(path):
raise IOError('Starting path not found')
if os.path.isfile(path):
path = os.path.dirname(path)
last_dir = None
current_dir = os.path.abspath(path)
while last_dir != current_dir:
yield current_dir
parent_dir = os.path.abspath(os.path.join(current_dir, os.path.pardir))
last_dir, current_dir = current_dir, parent_dir
def find_dotenv(filename='.env', raise_error_if_not_found=False, usecwd=False):
"""
Search in increasingly higher folders for the given file
Returns path to the file if found, or an empty string otherwise
"""
if usecwd or '__file__' not in globals():
# should work without __file__, e.g. in REPL or IPython notebook
path = os.getcwd()
else:
# will work for .py files
frame = sys._getframe()
# find first frame that is outside of this file
while frame.f_code.co_filename == __file__:
frame = frame.f_back
frame_filename = frame.f_code.co_filename
path = os.path.dirname(os.path.abspath(frame_filename))
for dirname in _walk_to_root(path):
check_path = os.path.join(dirname, filename)
if os.path.isfile(check_path):
return check_path
if raise_error_if_not_found:
raise IOError('File not found')
return ''
def load_dotenv(dotenv_path=None, stream=None, verbose=False, override=False):
f = dotenv_path or stream or find_dotenv()
DotEnv(f, verbose=verbose).set_as_environment_variables(override=override)
return env
def dotenv_values(dotenv_path=None, stream=None, verbose=False):
f = dotenv_path or stream or find_dotenv()
return DotEnv(f, verbose=verbose).dict()
def run_command(command, env):
"""Run command in sub process.
Runs the command in a sub process with the variables from `env`
added in the current environment variables.
Parameters
----------
command: List[str]
The command and it's parameters
env: Dict
The additional environment variables
Returns
-------
int
The return code of the command
"""
# copy the current environment variables and add the vales from
# `env`
cmd_env = os.environ.copy()
cmd_env.update(env)
p = Popen(command,
universal_newlines=True,
bufsize=0,
shell=False,
env=cmd_env)
_, _ = p.communicate()
return p.returncode