forked from vastsa/FileCodeBox
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstorage.py
More file actions
61 lines (48 loc) · 1.73 KB
/
storage.py
File metadata and controls
61 lines (48 loc) · 1.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import os
import asyncio
from datetime import datetime
from pathlib import Path
from typing import BinaryIO
from fastapi import UploadFile
import settings
class FileSystemStorage:
DATA_ROOT = Path(settings.DATA_ROOT)
STATIC_URL = settings.STATIC_URL
NAME = "filesystem"
async def get_filepath(self, text: str):
return self.DATA_ROOT / text.lstrip(self.STATIC_URL + '/')
async def get_text(self, file: UploadFile, key: str):
ext = file.filename.split('.')[-1]
now = datetime.now()
path = self.DATA_ROOT / f"upload/{now.year}/{now.month}/{now.day}/"
if not path.exists():
path.mkdir(parents=True)
text = f"{self.STATIC_URL}/{(path / f'{key}.{ext}').relative_to(self.DATA_ROOT)}"
return text
@staticmethod
async def get_size(file: UploadFile):
f = file.file
f.seek(0, os.SEEK_END)
size = f.tell()
f.seek(0, os.SEEK_SET)
return size
@staticmethod
def _save(filepath, file: BinaryIO):
with open(filepath, 'wb') as f:
chunk_size = 256 * 1024
chunk = file.read(chunk_size)
while chunk:
f.write(chunk)
chunk = file.read(chunk_size)
async def save_file(self, file: UploadFile, text: str):
filepath = await self.get_filepath(text)
await asyncio.to_thread(self._save, filepath, file.file)
async def delete_file(self, text: str):
filepath = await self.get_filepath(text)
await asyncio.to_thread(os.remove, filepath)
async def delete_files(self, texts):
tasks = [self.delete_file(text) for text in texts]
await asyncio.gather(*tasks)
STORAGE_ENGINE = {
"filesystem": FileSystemStorage
}