-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcompress.py
More file actions
155 lines (146 loc) · 6.32 KB
/
compress.py
File metadata and controls
155 lines (146 loc) · 6.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import zlib
import random
import time
import gzip
import lzma
import hashlib
import os
import tempfile
import magic
def gztoxztemp(args):
xzsize = 0
(file_, logFile) = args
Log("File " + file_ + " is being processed...\n", logFile)
targetFileName = file_[:-3] + ".xz"
time.sleep(0.2)
return int(random.uniform(1, 3))
def gztoxz(args):
xzsize = 0
(file_, logFile) = args
time.sleep(random.uniform(1, 3))
Log("File " + file_ + " is being processed...\n", logFile)
targetFileName = file_[:-3] + ".xz"
tempFile = tempfile.NamedTemporaryFile()
if os.path.isfile(targetFileName):
Log("File " + targetFileName + " exists.\n", logFile)
if md5sum(file_, logFile) == md5sum(targetFileName, logFile):
Log("File " + file_ + " and " + targetFileName + " match.\n", logFile)
os.remove(file_)
xzsize = os.stat(targetFileName).st_size
else:
if uncompress(file_, tempFile.name, logFile):
if compress('xz', tempFile.name, targetFileName, logFile):
if md5sum(file_, logFile) == md5sum(targetFileName, logFile):
Log("File " + file_ + " and " + targetFileName + " match\n", logFile)
os.remove(file_)
else:
Log("File " + file_ + " and " + targetFileName + " do NOT match\n", logFile)
xzsize = os.stat(targetFileName).st_size
else:
Log("Error during uncompression of file " + file_ + "\n", logFile)
tempFile.close()
return xzsize
def compress(algo, file_, targetFileName, logFile):
fmagic = filemagic(targetFileName)
time.sleep(random.uniform(1, 3))
Log("File " + file_ + " is being processed...\n", logFile)
targetFolder = os.path.dirname(file_)
if not os.path.exists(targetFolder):
Log("Directory " + targetFolder + " is being created...\n", logFile)
os.makedirs(targetFolder)
try:
if os.path.isfile(targetFileName) and targetFileName.endswith(".xz"):
Log("Target file " + targetFileName + " exists.\n", logFile)
if md5sum(targetFileName, logFile) == md5sum(file_, logFile):
if 'xz' in fmagic and targetFileName.endswith(".gz"):
Log("File " + file_ + " is actually a gzip compressed file.\n", loFile)
elif 'gzip' in fmagic and targetFileName.endswith(".xz"):
Log("File " + file_ + " is actually an XZ compressed file.\n", logFile)
Log("File " + file_ + " and " + targetFileName + " are the same.\n", logFile)
else:
Log("Target file " + targetFileName + " and source file " + file_ + " are NOT the same.\n", logFile)
else:
Log("File " + targetFileName + " is being written...\n", logFile)
with open(file_, "rb") as sf:
data = sf.read()
if algo == 'xz':
Log("Compressing using XZ and writing to " + targetFileName + "\n", logFile)
try:
with lzma.open(targetFileName,'wb') as cf:
cf.write(data)
except IOError as err:
Log("IO Error in file " + targetFileName + ", Error - " + str(err) + "\n", logFile)
return False
else:
with gzip.open(targetFileName,'wb') as cf:
cf.write(data)
except IOError as err:
Log("Compress - IO Error in file " + file_ + ", Error - " + str(err) + "\n", logFile)
return False
return True
def uncompress(file_, targetFileName, logFile):
fmagic = filemagic(file_)
time.sleep(random.uniform(1, 3))
try:
if os.path.isfile(file_):
Log("File " + targetFileName + " is being written...\n", logFile)
with open(targetFileName, 'wb') as tf:
if 'plain' in fmagic:
Log("File " + file_ + " is actually an ASCII Text file.\n", logFile)
with open(file_, "rb") as f:
data = f.read()
elif 'gzip' in fmagic or file_.endswith(".gz"):
Log("Uncompressing " + file_ + " using gzip and writing to " + targetFileName + " has magic - " + fmagic + " \n", logFile)
with gzip.open(file_, "rb") as f:
data = f.read()
elif 'xz' in fmagic or file_.endswith(".xz"):
if not file_.endswith(".xz"):
Log("File " + file_ + " is actually a XZ compressed file.\n", logFile)
with lzma.open(file_, "rb") as f:
data = f.read()
tf.write(data)
return True
except (zlib.error, EOFError, OSError) as err:
print(err)
Log("uncompress - IO Error in file " + file_ + ", Error - " + str(err) + "\n", logFile)
return False
def md5sum(file_, logFile):
m = hashlib.md5()
fmagic = filemagic(file_)
try:
if 'plain' in fmagic:
with open(file_, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
m.update(chunk)
elif 'gzip' in fmagic or file_.endswith(".gz"):
with gzip.open(file_, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
m.update(chunk)
elif 'xz' in fmagic or file_.endswith(".xz"):
with lzma.open(file_, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
m.update(chunk)
else:
with open(file_, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
m.update(chunk)
except lzma.LZMAError as err:
Log("LZMA Error in " + file_ + ", Error - " + str(err) + "\n", logFile)
md5 = ""
except IOError as err:
Log("md5sum - IO Error in " + file_ + ", Error - " + str(err) + "\n", logFile)
md5 = ""
except EOFError as err:
Log("md5sum - EOF Error in " + file_ + ", Error - " + str(err) + "\n", logFile)
md5 = ""
finally:
md5 = m.hexdigest()
return md5
def filemagic(file_):
ms=magic.open(magic.MAGIC_MIME)
ms.load()
return ms.file(file_)
def Log(message, logFile):
# print(message)
with open(logFile, "a") as f:
f.write(message)