Source code for pyrcareworld.utils.locker
import os
import time
if os.name == "nt":
import msvcrt
def lock(fp):
fp.seek(0)
msvcrt.locking(fp.fileno(), msvcrt.LK_LOCK, 1)
def unlock(fp):
fp.seek(0)
msvcrt.locking(fp.fileno(), msvcrt.LK_UNLCK, 1)
else:
import fcntl
[docs]
def lock(fp):
fcntl.flock(fp.fileno(), fcntl.LOCK_EX)
[docs]
def unlock(fp):
fcntl.flock(fp.fileno(), fcntl.LOCK_UN)
[docs]
class Locker:
def __init__(self, lck_name: str):
self.lck_path = os.path.join(
os.path.expanduser("~"), ".rfuniverse", lck_name + ".lck"
)
# check if the lock file exists
if not os.path.exists(self.lck_path):
os.makedirs(os.path.dirname(self.lck_path), exist_ok=True)
open(self.lck_path, "w+").close()
self.fp = None
def __enter__(self):
self.fp = open(self.lck_path, 'r+') # Open the file in read-write mode
lock(self.fp)
return self.fp
def __exit__(self, _type, value, tb):
try:
unlock(self.fp)
finally:
self.fp.close()
time.sleep(0.1)