Skip to content

Commit fbbb602

Browse files
committed
MSA-11790 : add 2 fonctions obtain_file_lock_exclusif and release_file_lock_exclusif
1 parent 42766d2 commit fbbb602

2 files changed

Lines changed: 430 additions & 22 deletions

File tree

msa_sdk/util.py

Lines changed: 164 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ def obtain_file_lock(lock_file_name, mode, process_param, sleep_time=60,
155155

156156
lock_obtained = True
157157
except io.BlockingIOError:
158-
tries += 1
158+
tries += sleep_time
159159
time.sleep(sleep_time)
160160

161161
if not lock_obtained:
@@ -214,7 +214,7 @@ def release_file_lock(lock_file_name, process_param, sleep_time=60,
214214
break
215215

216216
except io.BlockingIOError:
217-
tries += 1
217+
tries += sleep_time
218218
time.sleep(sleep_time)
219219

220220
if tries >= timeout:
@@ -231,6 +231,168 @@ def release_file_lock(lock_file_name, process_param, sleep_time=60,
231231
return r_json
232232

233233

234+
def obtain_file_lock_exclusif(lock_file_name, process_param, mode= 'w+', sleep_time=15, max_try_nb=10):
235+
"""
236+
237+
Lock one file exclusivly (only the subtenant and instance_id who make the lock can unlock the file. If the file is locked by one other subtenant or instance_id, it will retry many time (<max_try_nb) to get the lock.
238+
239+
Parameters
240+
----------
241+
lock_file_name: String
242+
File name
243+
mode: String mode like 'w+'
244+
File mode
245+
process_param: options PROCESSINSTANCEID, TASKID, EXECNUMBER
246+
Process parameters
247+
sleep_time: Integer
248+
Time to wait until next try
249+
max_try_nb: Integer
250+
Max number of try, the timeout will be max_try_nb * sleep_time
251+
252+
Returns
253+
------
254+
Result of the lock
255+
256+
"""
257+
dev_var = Variables()
258+
context = Variables.task_call(dev_var)
259+
lock_file_path = '{}/{}'.format(constants.UBI_JENTREPRISE_DIRECTORY, lock_file_name)
260+
261+
lock_obtained = False
262+
263+
r_json = ''
264+
tries = 1
265+
if not process_param.get('UBIQUBEID') or not process_param.get('SERVICEINSTANCEREFERENCE'):
266+
process_param['UBIQUBEID'] = context['UBIQUBEID']
267+
process_param['SERVICEINSTANCEREFERENCE'] = context['SERVICEINSTANCEREFERENCE']
268+
lock_content = 'Locked by '+process_param['UBIQUBEID'] + ' with serviceinstancereference=' + process_param['SERVICEINSTANCEREFERENCE'] +' on '
269+
270+
#lock_content = 'Locked by with serviceinstancereference='
271+
if not process_param.get('SERVICEINSTANCEID'):
272+
process_param['SERVICEINSTANCEID'] = context['SERVICEINSTANCEID']
273+
if not process_param.get('PROCESSINSTANCEID'):
274+
process_param['PROCESSINSTANCEID'] = context['PROCESSINSTANCEID']
275+
276+
lock_content_lower = lock_content.lower()
277+
while not lock_obtained and tries < max_try_nb:
278+
wait_message = False
279+
try:
280+
file_content = ''
281+
if os.path.exists(lock_file_path):
282+
with open(lock_file_path) as f_file:
283+
file_content = f_file.read()
284+
file_content_lower = file_content.lower()
285+
if lock_content_lower in file_content_lower:
286+
#wait_message = 'Lock file already done by this instance : ' + lock_content
287+
lock_obtained = True
288+
continue
289+
elif "locked by " in file_content_lower:
290+
wait_message = 'Wait, lock file already done by : ' + file_content
291+
raise FileNotFoundError
292+
elif "unlocked" in file_content_lower:
293+
os.remove(lock_file_path)
294+
else:
295+
day = time.strftime("%Y/%m/%d %H:%M:%S")
296+
f_lock_file = open(lock_file_path, mode)
297+
fcntl.flock(f_lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
298+
with open(lock_file_path, 'w+') as f_file:
299+
f_file.write(lock_content + day)
300+
lock_obtained = True
301+
fcntl.flock(f_lock_file, fcntl.LOCK_UN)
302+
continue
303+
304+
except FileNotFoundError:
305+
if wait_message:
306+
if context.get('UBIQUBEID'):
307+
update_asynchronous_task_details(wait_message)
308+
time.sleep(sleep_time)
309+
tries = tries + 1
310+
311+
if not lock_obtained:
312+
nb_sec = max_try_nb * sleep_time
313+
if wait_message:
314+
r_json = MSA_API.process_content(constants.FAILED, 'After waiting '+str(nb_sec)+' secondes, lock could not be obtained on the file '+lock_file_name+' (full_path='+lock_file_path+') : '+ wait_message, process_param, True)
315+
else:
316+
r_json = MSA_API.process_content(constants.FAILED, 'After waiting '+str(nb_sec)+' secondes, lock could not be obtained on the file '+lock_file_name+', full_path='+lock_file_path, process_param, True)
317+
else:
318+
r_json = MSA_API.process_content(constants.ENDED, 'Lock obtained on the file '+lock_file_name+', full_path='+lock_file_path, process_param, True)
319+
return r_json
320+
321+
def release_file_lock_exclusif(lock_file_name, process_param, sleep_time=30, max_try_nb = 10):
322+
"""
323+
324+
Release lock file exclusif (only the subtenant and instance_id who make the lock can unlock.
325+
326+
Parameters
327+
----------
328+
lock_file_name: String
329+
File name
330+
process_param: options PROCESSINSTANCEID, TASKID, EXECNUMBER
331+
Process parameters
332+
sleep_time: Integer
333+
Time to wait until next try
334+
max_try_nb: Integer
335+
Max number of try, the timeout will be max_try_nb * sleep_time
336+
337+
338+
Returns
339+
------
340+
Result of the release
341+
342+
"""
343+
dev_var = Variables()
344+
context = Variables.task_call(dev_var)
345+
346+
lock_file_path = '{}/{}'.format(constants.UBI_JENTREPRISE_DIRECTORY,
347+
lock_file_name)
348+
349+
r_json = ''
350+
tries = 1
351+
if not process_param.get('UBIQUBEID') or not process_param.get('SERVICEINSTANCEREFERENCE'):
352+
process_param['UBIQUBEID'] = context['UBIQUBEID']
353+
process_param['SERVICEINSTANCEREFERENCE'] = context['SERVICEINSTANCEREFERENCE']
354+
lock_content = 'Locked by '+process_param['UBIQUBEID'] + ' with serviceinstancereference=' + process_param['SERVICEINSTANCEREFERENCE'] +' on '
355+
if not process_param.get('SERVICEINSTANCEID'):
356+
process_param['SERVICEINSTANCEID'] = context['SERVICEINSTANCEID']
357+
if not process_param.get('PROCESSINSTANCEID'):
358+
process_param['PROCESSINSTANCEID'] = context['PROCESSINSTANCEID']
359+
360+
release_obtained = False
361+
lock_content_lower = lock_content.lower()
362+
wait_message = 'Wait'
363+
message = 'Lock released on the file {}, full_path={}'.format(lock_file_name, lock_file_path)
364+
365+
while not release_obtained and tries < max_try_nb:
366+
file_content_lower = ''
367+
if os.path.exists(lock_file_path):
368+
with open(lock_file_path) as f_file:
369+
file_content = f_file.read().lower()
370+
if lock_content_lower in file_content.lower():
371+
# Lock file already done by this instance, we can unlock it
372+
os.remove(lock_file_path)
373+
release_obtained = True
374+
continue
375+
elif "locked by" in file_content.lower():
376+
wait_message = 'Wait, lock file already done by : ' + file_content
377+
378+
tries = tries + 1
379+
# if process_param['UBIQUBEID']:
380+
# update_asynchronous_task_details(wait_message)
381+
time.sleep(sleep_time)
382+
383+
else:
384+
message = 'Lock file not exist ({}, full_path={}), it was already unlocked '.format(lock_file_name, lock_file_path)
385+
release_obtained = True
386+
continue
387+
if tries >= max_try_nb:
388+
nb_sec = max_try_nb * sleep_time
389+
r_json = MSA_API.process_content(constants.FAILED, 'After waiting '+str(nb_sec)+' secondes, lock could not be released on the file '+lock_file_name+', full_path='+lock_file_path+ ' : '+wait_message, process_param, True)
390+
else:
391+
r_json = MSA_API.process_content(constants.ENDED, message, process_param, True)
392+
393+
return r_json
394+
395+
234396
def is_overlapping_cidr(cidr1, cidr2):
235397
"""
236398

0 commit comments

Comments
 (0)