forked from theskumar/python-dotenv
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
285 lines (226 loc) · 9.06 KB
/
main.py
File metadata and controls
285 lines (226 loc) · 9.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import inspect
import io
import os
import re
import shutil
import sys
import tempfile
from typing import (Dict, Iterator, List, Match, Optional, # noqa
Pattern, Union, TYPE_CHECKING, Text, IO, Tuple)
import warnings
from collections import OrderedDict
from contextlib import contextmanager
from .compat import StringIO, PY2, to_env
from .parser import parse_stream
if TYPE_CHECKING: # pragma: no cover
if sys.version_info >= (3, 6):
_PathLike = os.PathLike
else:
_PathLike = Text
if sys.version_info >= (3, 0):
_StringIO = StringIO
else:
_StringIO = StringIO[Text]
__posix_variable = re.compile(r'\$\{[^\}]*\}') # type: Pattern[Text]
class DotEnv():
def __init__(self, dotenv_path, verbose=False, encoding=None):
# type: (Union[Text, _PathLike, _StringIO], bool, Union[None, Text]) -> None
self.dotenv_path = dotenv_path # type: Union[Text,_PathLike, _StringIO]
self._dict = None # type: Optional[Dict[Text, Text]]
self.verbose = verbose # type: bool
self.encoding = encoding # type: Union[None, Text]
@contextmanager
def _get_stream(self):
# type: () -> Iterator[IO[Text]]
if isinstance(self.dotenv_path, StringIO):
yield self.dotenv_path
elif os.path.isfile(self.dotenv_path):
with io.open(self.dotenv_path, encoding=self.encoding) as stream:
yield stream
else:
if self.verbose:
warnings.warn("File doesn't exist {}".format(self.dotenv_path)) # type: ignore
yield StringIO('')
def dict(self):
# type: () -> Dict[Text, Text]
"""Return dotenv as dict"""
if self._dict:
return self._dict
values = OrderedDict(self.parse())
self._dict = resolve_nested_variables(values)
return self._dict
def parse(self):
# type: () -> Iterator[Tuple[Text, Text]]
with self._get_stream() as stream:
for mapping in parse_stream(stream):
if mapping.key is not None and mapping.value is not None:
yield mapping.key, mapping.value
def set_as_environment_variables(self, override=False):
# type: (bool) -> bool
"""
Load the current dotenv as system environemt variable.
"""
for k, v in self.dict().items():
if k in os.environ and not override:
continue
os.environ[to_env(k)] = to_env(v)
return True
def get(self, key):
# type: (Text) -> Optional[Text]
"""
"""
data = self.dict()
if key in data:
return data[key]
if self.verbose:
warnings.warn("key %s not found in %s." % (key, self.dotenv_path)) # type: ignore
return None
def get_key(dotenv_path, key_to_get):
# type: (Union[Text, _PathLike], Text) -> Optional[Text]
"""
Gets the value of a given key from the given .env
If the .env path given doesn't exist, fails
"""
return DotEnv(dotenv_path, verbose=True).get(key_to_get)
@contextmanager
def rewrite(path):
# type: (_PathLike) -> Iterator[Tuple[IO[Text], IO[Text]]]
try:
with tempfile.NamedTemporaryFile(mode="w+", delete=False) as dest:
with io.open(path) as source:
yield (source, dest) # type: ignore
except BaseException:
if os.path.isfile(dest.name):
os.unlink(dest.name)
raise
else:
shutil.move(dest.name, path)
def set_key(dotenv_path, key_to_set, value_to_set, quote_mode="always"):
# type: (_PathLike, Text, Text, Text) -> Tuple[Optional[bool], Text, Text]
"""
Adds or Updates a key/value to the given .env
If the .env path given doesn't exist, fails instead of risking creating
an orphan .env somewhere in the filesystem
"""
value_to_set = value_to_set.strip("'").strip('"')
if not os.path.exists(dotenv_path):
warnings.warn("can't write to %s - it doesn't exist." % dotenv_path) # type: ignore
return None, key_to_set, value_to_set
if " " in value_to_set:
quote_mode = "always"
line_template = '{}="{}"\n' if quote_mode == "always" else '{}={}\n'
line_out = line_template.format(key_to_set, value_to_set)
with rewrite(dotenv_path) as (source, dest):
replaced = False
for mapping in parse_stream(source):
if mapping.key == key_to_set:
dest.write(line_out)
replaced = True
else:
dest.write(mapping.original)
if not replaced:
dest.write(line_out)
return True, key_to_set, value_to_set
def unset_key(dotenv_path, key_to_unset, quote_mode="always"):
# type: (_PathLike, Text, Text) -> Tuple[Optional[bool], Text]
"""
Removes a given key from the given .env
If the .env path given doesn't exist, fails
If the given key doesn't exist in the .env, fails
"""
if not os.path.exists(dotenv_path):
warnings.warn("can't delete from %s - it doesn't exist." % dotenv_path) # type: ignore
return None, key_to_unset
removed = False
with rewrite(dotenv_path) as (source, dest):
for mapping in parse_stream(source):
if mapping.key == key_to_unset:
removed = True
else:
dest.write(mapping.original)
if not removed:
warnings.warn("key %s not removed from %s - key doesn't exist." % (key_to_unset, dotenv_path)) # type: ignore
return None, key_to_unset
return removed, key_to_unset
def resolve_nested_variables(values):
# type: (Dict[Text, Text]) -> Dict[Text, Text]
def _replacement(name):
# type: (Text) -> Text
"""
get appropriate value for a variable name.
first search in environ, if not found,
then look into the dotenv variables
"""
ret = os.getenv(name, new_values.get(name, ""))
return ret
def _re_sub_callback(match_object):
# type: (Match[Text]) -> Text
"""
From a match object gets the variable name and returns
the correct replacement
"""
return _replacement(match_object.group()[2:-1])
new_values = {}
for k, v in values.items():
new_values[k] = __posix_variable.sub(_re_sub_callback, v)
return new_values
def _walk_to_root(path):
# type: (Text) -> Iterator[Text]
"""
Yield directories starting from the given directory up to the root
"""
if not os.path.exists(path):
raise IOError('Starting path not found')
if os.path.isfile(path):
path = os.path.dirname(path)
last_dir = None
current_dir = os.path.abspath(path)
while last_dir != current_dir:
yield current_dir
parent_dir = os.path.abspath(os.path.join(current_dir, os.path.pardir))
last_dir, current_dir = current_dir, parent_dir
def find_dotenv(filename='.env', raise_error_if_not_found=False, usecwd=False):
# type: (Text, bool, bool) -> Text
"""
Search in increasingly higher folders for the given file
Returns path to the file if found, or an empty string otherwise
"""
def _is_interactive():
"""Decide whether this is running in a REPL or IPython notebook
"""
main = __import__('__main__', None, None, fromlist=['__file__'])
return not hasattr(main, '__file__')
if usecwd or _is_interactive():
# should work without __file__, e.g. in REPL or IPython notebook
path = os.getcwd()
else:
# will work for .py files
frames = inspect.getouterframes(inspect.currentframe())
# find first frame that is outside of this file
if PY2 and not __file__.endswith('.py'):
# in Python2 __file__ extension could be .pyc or .pyo (this doesn't account
# for edge case of Python compiled for non-standard extension)
current_file = __file__.rsplit('.', 1)[0] + '.py'
else:
current_file = __file__
for frame in frames:
if frame.filename != current_file:
break
path = os.path.dirname(os.path.abspath(frame.filename))
for dirname in _walk_to_root(path):
check_path = os.path.join(dirname, filename)
if os.path.isfile(check_path):
return check_path
if raise_error_if_not_found:
raise IOError('File not found')
return ''
def load_dotenv(dotenv_path=None, stream=None, verbose=False, override=False, **kwargs):
# type: (Union[Text, _PathLike, None], Optional[_StringIO], bool, bool, Union[None, Text]) -> bool
f = dotenv_path or stream or find_dotenv()
return DotEnv(f, verbose=verbose, **kwargs).set_as_environment_variables(override=override)
def dotenv_values(dotenv_path=None, stream=None, verbose=False, **kwargs):
# type: (Union[Text, _PathLike, None], Optional[_StringIO], bool, Union[None, Text]) -> Dict[Text, Text]
f = dotenv_path or stream or find_dotenv()
return DotEnv(f, verbose=verbose, **kwargs).dict()