-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdecorators.py
More file actions
236 lines (204 loc) · 9.82 KB
/
decorators.py
File metadata and controls
236 lines (204 loc) · 9.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
import functools
from datetime import datetime, timedelta
from http import HTTPStatus
import time
from dateutil import tz
from codegreen.config import get_configuration, write_config_file
from codegreen.queries import submit_cc_resource_usage, get_prediction, submit_nf_resource_usage, get_data, get_location_prediction
from codegreen.expections import UnauthorizedException, InternalServerErrorException
from codegreen.config import get_api_endpoint, get_api_key
def time_shift(experiment_name: str):
"""Time shift a computation using the experiment configuration defined in the configuration file with the experiment name.
Requires to have run @init_experiment at some point in prior to the computation. The function will automatically pause the computation
until the best time within the specified experiment parameters is reached.
:param experiment_name: _description_
:type experiment_name: str
"""
def actual_decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
config = get_configuration(experiment_name)
hard_finish_time =(datetime.now() + timedelta(hours=config['allowed_delay_hours'])).timestamp()
print(config)
try:
sleep_until(
estimated_runtime_hours=config["estimated_runtime_hours"],
estimated_run_time_in_minutes=config["estimated_runtime_minutes"],
percent_renewable=config["percent_renewable"],
hard_finish_time=hard_finish_time,
area_code=config["area_code"],
log_request=config["log_request"],
process_id=config["experiment_hash"],
experiment_name=experiment_name
)
except KeyError as e:
print(
f"Error: Missing key in config dictionary: {str(e)}, Please initialize your configuration with @init_experiment"
)
return func(*args, **kwargs)
return wrapper
return actual_decorator
def sleep_until(
estimated_runtime_hours: int,
estimated_run_time_in_minutes: int,
percent_renewable: int,
hard_finish_time,
area_code:list[str],
log_request:bool,
process_id:str,
experiment_name:str
):
"""_summary_
:param estimated_runtime_hours: Estimated runtime in hours
:type estimated_runtime_hours: int
:param estimated_run_time_in_minutes: Estimated runtime minutes
:type estimated_run_time_in_minutes: int
:param percent_renewable: User required percentage of renewable energy available in the grid at time of computation
:type percent_renewable: int
:param hard_finish_time: Final deadline for the computation/
:type hard_finish_time: _type_
:param area_code: Area code passed as a list of strings.
:type area_code: list[str]
:param log_request: Allow logging the request at the server. This will allow us to compute how much carbon you saved.
:type log_request: bool
:param process_id: A hash identifying the experiment. Usually autogenerated by the init method.
:type process_id: str
:param experiment_name: A name of the experiment used to identify the correct configuration file.
:type experiment_name: str
:raises UnauthorizedException: thrown if the API key is invalid.
:raises InternalServerErrorException: thrown if there is an error at the server.
"""
try:
print(type(hard_finish_time))
r = get_prediction(
estimated_runtime_hours,
estimated_run_time_in_minutes,
percent_renewable,
hard_finish_time,
area_code,
log_request,
process_id,
experiment_name
)
if r.status_code == HTTPStatus.UNAUTHORIZED:
raise UnauthorizedException
if r.status_code == HTTPStatus.INTERNAL_SERVER_ERROR:
raise InternalServerErrorException
print(r.request)
js = r.json()
print(js)
sleep_until = js["suggested_start"]
sleep_until = datetime.utcfromtimestamp(sleep_until)
sleep_until = sleep_until.replace(tzinfo=tz.tzutc())
sleep_until = sleep_until.astimezone(tz.tzlocal())
print(sleep_until)
print(f"{js['message']}: {sleep_until}")
print(sleep_until.timestamp())
sleep_time = sleep_until.timestamp() - datetime.now().timestamp()
if sleep_time > 0:
time.sleep(sleep_time)
except ConnectionError:
print("Error contacting API")
except UnauthorizedException:
print("Unauthorized -- Did you forget to submit the correct API key?")
except InternalServerErrorException:
print(
"Internal server error -- It is most likely not your fault. If this problem persisits please file an issue on Github"
)
def upload_nf_report(experiment_name: str):
"""Allow uploading a nextflow report generated when running nextflow
with the -with-trace option.
:param experiment_name: Name of the experiment to identify the config file used for this experiment.
:type experiment_name: str
"""
def actual_decorator(func):
@functools.wraps(func)
def wrapper(*args):
config = get_configuration(experiment_name)
try:
r = submit_nf_resource_usage(config["nexflow_logfile"], process_id=config["experiment_hash"], experiment_name=experiment_name)
except KeyError as e:
print(
f"Error: Missing key in config dictionary: {str(e)}, Please initialize your configuration with @init_experiment"
)
print(r.status_code)
return func(*args)
return wrapper
return actual_decorator
def upload_cc_report(experiment_name: str):
"""Automatically upload the codecarbon report generated by the codecarbon tracker tool.
:param experiment_name: Name of the experiment used to identify the correct configuration file for the experiment.
:type experiment_name: str
"""
def actual_decorator(func):
@functools.wraps(func)
def wrapper(*args):
func(*args)
print("Uploading")
config = get_configuration(experiment_name)
try:
r = submit_cc_resource_usage(
config["codecarbon_logfile"], process_id=config["experiment_hash"], task_name=config['experiment_name'], postal_code=''.join(config["area_code"]), experiment_name=experiment_name
)
except KeyError as e:
print(
f"Error: Missing key in config dictionary: {str(e)}, Please initialize your configuration with @init_experiment"
)
print(r.status_code)
return wrapper
return actual_decorator
def init_experiment(
experiment_name:str,
nextflow_logfile:str=None,
area_code: list[str] = ['DE-9'],
estimated_runtime_hours:int=2,
estimated_runtime_minutes:int=40,
codecarbon_logfile:str='emissions.csv',
percent_renewable:int=30,
allowed_delay_hours:int=24,
log_request:bool=True,
overwrite:bool=False,
):
"""Initialize an experiment with the given parameters. This function will write a configuration file
which can be used to report using the same configuration later on. The experiment_name paramter
identifies the correct file. By default, the codecarbon report is called emissions.csv.
:param experiment_name: Name of the experiment used to identify the correct configuration file.
:type experiment_name: str
:param nextflow_logfile: Name of the nextflow trace file generated when running nextflow with the '-with-trace' option/
:type nextflow_logfile: str
:param area_code: list of area codes with a two letter country code and an optional postal code separated by a dash. Postal codes can be given as 1-5 digit areas. ['CC-PPPPP', 'CC', 'CC-P']
:type area_code: list[str]
:param estimated_runtime_hours: Estimated number of hours of runtime
:type estimated_runtime_hours: int
:param estimated_runtime_minutes: Estimated additional minutes of runtime
:type estimated_runtime_minutes: int
:param codecarbon_logfile: Name of the emission file generated by codecarbon, defaults to 'emissions.csv'
:type codecarbon_logfile: str, optional
:param percent_renewable: User required percentage of renewable energy present at the time of computation in the grid, defaults to 30
:type percent_renewable: int, optional
:param allowed_delay_hours: Allowed delay of the computation in hours. It defaults to one day.
The computation starts at the latest at [current_time + allowed_delay_hours - estimated_runtime_hours-estimated_runtime_minutes]. defaults to 24
:type allowed_delay_hours: int, optional
:param log_request: Allow loggin the request server side. This allows us to calculate the carbon that has been saved by time shifting, defaults to True
:type log_request: bool, optional
:param overwrite: Overwrite the configuration file at the second initialization (when rerunning the code), defaults to False
:type overwrite: bool, optional
"""
def actual_decorator(func):
@functools.wraps(func)
def wrapper(*args):
write_config_file(
experiment_name=experiment_name,
codecarbon_logfile=codecarbon_logfile,
nextflow_logfile=nextflow_logfile,
area_code=area_code,
estimated_runtime_hours=estimated_runtime_hours,
estimated_runtime_minutes=estimated_runtime_minutes,
percent_renewable=percent_renewable,
allowed_delay_hours=allowed_delay_hours,
log_request=log_request,
overwrite=overwrite,
)
return func(*args)
return wrapper
return actual_decorator