-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclasses_wiper.py
More file actions
253 lines (229 loc) · 7.24 KB
/
classes_wiper.py
File metadata and controls
253 lines (229 loc) · 7.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from json import load, dump
from subprocess import Popen, PIPE, STDOUT, STARTUPINFO, STARTF_USESHOWWINDOW
from wmi import WMI
class Config:
'''Handle configuration file in JSON format'''
def __init__(self, path):
'''Read config file'''
self.path = path
self._keys = list()
with self.path.open(encoding='utf-8') as fp:
for key, value in load(fp).items():
self.__dict__[key] = value
self._keys.append(key)
def exists(self, key):
'''Check if key exists'''
return key in self._keys
def save(self, path=None):
'''Save config file'''
if path:
self.path = path
with self.path.open('w', encoding='utf-8') as fp:
dump({key: self.__dict__[key] for key in self._keys}, fp)
class Size(int):
'''Human readable size'''
def __repr__(self):
'''Genereate readable size'''
def _round(*base): # intern function to calculate human readable
for prefix, b in base:
rnd = round(self/b, 2)
if rnd >= 1:
break
if rnd >= 10:
rnd = round(rnd, 1)
if rnd >= 100:
rnd = round(rnd)
return f'{rnd} {prefix}', rnd
if self < 0:
raise ValueError('Size must be positive')
iec, rnd_iec = _round(('PiB', 2**50), ('TiB', 2**40), ('GiB', 2**30), ('MiB', 2**20), ('kiB', 2**10))
si, rnd_si = _round(('PB', 10**15), ('TB', 10**12), ('GB', 10**9), ('MB', 10**6), ('kB', 10**3))
return (f'{iec} / {si}') if rnd_iec or rnd_si else f'{int(self)} B'
def __add__(self, other):
'''Plus'''
return Size(int.__add__(self, other))
class WinPopen(Popen):
'''Popen with startupinfo'''
def __init__(self, cmd):
'''Create process'''
self._cmd = cmd
startupinfo = STARTUPINFO()
startupinfo.dwFlags |= STARTF_USESHOWWINDOW
super().__init__(cmd,
stdout = PIPE,
#stderr = STDOUT,
stderr = PIPE,
encoding = 'utf-8',
errors = 'ignore',
universal_newlines = True,
startupinfo = startupinfo
)
class Drives:
'''Use WMI to get infos about drives'''
DELAY = 1
RETRIES = 10
NTFS_LABEL_CHARS = r'abcdefghijklmnopqrstuvwxyz0123456789!§$%&()@-_#=[]{}€'
FAT_LABEL_CHARS = r'abcdefghijklmnopqrstuvwxyz0123456789!§$%&()@-_#'
def __init__(self):
'''Connect to API'''
self._conn = WMI()
def get_occupied_volumes(self):
'''Get IDs of volumes that are in use'''
return {volume.Name.rstrip('\\') for volume in self._conn.Win32_Volume() if volume.Name != volume.DeviceID}
def get_logical(self):
'''Get logical disks'''
log_disks = dict()
for log_disk in self._conn.Win32_LogicalDisk():
log_disks[log_disk.DeviceID] = dict()
try:
log_disks[log_disk.DeviceID]['VolumeName'] = log_disk.VolumeName
except AttributeError:
log_disks[log_disk.DeviceID]['VolumeName'] = ''
try:
log_disks[log_disk.DeviceID]['FileSystem'] = log_disk.FileSystem
except AttributeError:
log_disks[log_disk.DeviceID]['FileSystem'] = ''
try:
log_disks[log_disk.DeviceID]['Size'] = Size(log_disk.Size)
except TypeError:
log_disks[log_disk.DeviceID]['Size'] = None
return log_disks
def get_physical(self):
'''Get physical disks'''
drives = dict()
for drive in self._conn.Win32_DiskDrive():
drives[drive.DeviceID] = dict()
for attr in (
'Caption',
'MediaType',
'InterfaceType',
'Manufacturer',
'Model',
'Name',
'Size',
):
try:
value = getattr(drive, attr)
if attr == 'Size' and value is not None:
drives[drive.DeviceID][attr] = Size(value)
else:
drives[drive.DeviceID][attr] = value
except (AttributeError, TypeError):
drives[drive.DeviceID][attr] = None
return drives
def get_drive_info(self, device_id):
'''Get info about given drive'''
for drive in self._conn.Win32_DiskDrive():
if drive.DeviceID == device_id:
info = dict()
for attr in (
'Caption',
'Description',
'Size',
'InterfaceType',
'MediaType',
'Manufacturer',
'Model',
'SerialNumber',
'FirmwareRevision'
):
try:
value = getattr(drive, attr)
if attr == 'Size' and value is not None:
info[attr] = Size(value)
else:
info[attr] = value
except (AttributeError, TypeError):
info[attr] = None
return info
def get_parents(self):
'''Return dict LOGICALDRIVE: PHYSICALDRIVE'''
disk2part = {(rel.Antecedent.DeviceID, rel.Dependent.DeviceID)
for rel in self._conn.Win32_DiskDriveToDiskPartition()
}
part2logical = {(rel.Antecedent.DeviceID, rel.Dependent.DeviceID)
for rel in self._conn.Win32_LogicalDiskToPartition()
}
return {logical: disk
for disk, part_disk in disk2part
for part_log, logical in part2logical
if part_disk == part_log
}
def get_parent_of(self, device_id):
'''Get parent of given device'''
if device_id.startswith('\\\\.\\PHYSICALDRIVE'):
return device_id
try:
return self.get_parents()[device_id]
except KeyError:
return
def get_children_of(self, device_id):
'''Return logical drives / partitions of given physical drive'''
part2logical = {rel.Antecedent.DeviceID: rel.Dependent.DeviceID
for rel in self._conn.Win32_LogicalDiskToPartition()
}
return {part2logical[rel.Dependent.DeviceID] for rel in self._conn.Win32_DiskDriveToDiskPartition()
if rel.Antecedent.DeviceID == device_id and rel.Dependent.DeviceID in part2logical
}
def dump(self):
'''Return list of all drives'''
parents = self.get_parents()
log_disks = self.get_logical()
drives = dict()
for device_id, drive_dict in self.get_physical().items():
drive = {'DeviceID': device_id} | drive_dict
drive['Partitions'] = list()
for log_id, disk_id in parents.items():
if disk_id == device_id:
drive['Partitions'].append({'DeviceID': log_id} | log_disks[log_id])
try:
drives[int(device_id.lstrip('\\\\.\\PHYSICALDRIVE'))] = drive
except ValueError:
pass
return [drives[i] for i in sorted(drives.keys())]
def get_system_ids(self):
'''Get ids if system drives'''
ids = set()
for os_drive in self._conn.Win32_OperatingSystem():
if os_drive.SystemDrive:
ids.add(os_drive.SystemDrive)
ids.add(self.get_parent_of(os_drive.SystemDrive))
return ids
def check_fs_label(self, label, fs):
'''Check if string would be a valid file system label'''
if fs == 'ntfs':
max_len = 32
valid_chars = self.NTFS_LABEL_CHARS
elif fs == 'exfat':
max_len = 15
valid_chars = self.FAT_LABEL_CHARS
elif fs == 'fat32':
max_len = 11
valid_chars = self.FAT_LABEL_CHARS
else:
raise ValueError(f'invalid fs: {fs}')
if len(label) > max_len:
raise ValueError(f'label too long ({max_len} max. for {fs}): "{label}", {len(label)} chars')
for char in label:
if char.lower() not in valid_chars:
raise ValueError(f'invalid chaaracter in "{label}": {char}')
return label
def diskpart(self, drive_id, script_path, pt='gpt', fs='ntfs', label='Volume', letter=None, echo=None):
'''Create partition table and partition using diskpart'''
script = f'select disk {drive_id.lstrip("\\\\.\\PHYSICALDRIVE")}\nclean\nconvert {pt}\n'
if fs:
script += f'create partition primary\nformat quick fs={fs} label={label}\n'
if letter:
script += f'assign letter={letter}\n'
script_path.write_text(script)
proc = WinPopen([f'diskpart', '/s', f'{script_path}'])
for line in proc.stdout:
if echo:
echo(line.strip())
try:
script_path.unlink()
except:
pass
return proc