-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjuntapdf.py
More file actions
4185 lines (3397 loc) · 159 KB
/
juntapdf.py
File metadata and controls
4185 lines (3397 loc) · 159 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import atexit
import concurrent.futures
import glob
import logging
import os
import re
import subprocess
import shutil
import sys
import json
import tempfile
import threading
import time
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
def aplicar_tema_moderno(root):
"""
Tema personalizado com cores institucionais - profissional e único
"""
try:
# Cores da identidade visual
COR_PRIMARIA = "#004A80" # Azul governo
COR_SECUNDARIA = "#0078D4" # Azul Microsoft
COR_FUNDO = "#F5F5F5" # Cinza muito claro
COR_TEXTO = "#333333" # Cinza escuro
COR_BORDA = "#CCCCCC" # Cinza médio
# Configura estilo manualmente
style = ttk.Style()
# Tema geral
style.configure(".",
background=COR_FUNDO,
foreground=COR_TEXTO,
font=("Segoe UI", 9))
# Frame principal
style.configure("TFrame", background=COR_FUNDO)
# LabelFrame (caixas de grupo)
style.configure("TLabelframe",
background=COR_FUNDO,
borderwidth=1,
relief="solid")
style.configure("TLabelframe.Label",
background=COR_FUNDO,
foreground=COR_PRIMARIA,
font=("Segoe UI", 9, "bold"))
# Botões - DESTAQUE INSTITUCIONAL
style.configure("TButton",
background="#E1E1E1",
foreground=COR_TEXTO,
borderwidth=1,
relief="raised",
padding=(12, 4),
font=("Segoe UI", 9))
style.map("TButton",
background=[('active', '#D0D0D0'),
('pressed', COR_PRIMARIA)],
foreground=[('pressed', 'white')])
# Botão de ação principal
style.configure("Primary.TButton",
background=COR_PRIMARIA,
foreground="white",
font=("Segoe UI", 9, "bold"))
style.map("Primary.TButton",
background=[('active', COR_SECUNDARIA),
('pressed', '#003366')])
# Progressbar
style.configure("Horizontal.TProgressbar",
background=COR_PRIMARIA,
troughcolor=COR_FUNDO,
borderwidth=0)
# Labels
style.configure("TLabel",
background=COR_FUNDO,
foreground=COR_TEXTO,
font=("Segoe UI", 9))
style.configure("Title.TLabel",
background=COR_FUNDO,
foreground=COR_PRIMARIA,
font=("Segoe UI", 11, "bold"))
# Entry/Combobox
style.configure("TEntry",
fieldbackground="white",
borderwidth=1,
relief="solid")
style.configure("TCombobox",
fieldbackground="white")
logging.info("🎨 Tema institucional personalizado aplicado")
except Exception as e:
logging.warning(f"Erro ao aplicar tema personalizado: {e}")
# Não retorna nada - função void
try:
import psutil
PSUtil_AVAILABLE = True
except ImportError:
PSUtil_AVAILABLE = False
logging.warning("psutil não disponível - algumas métricas estarão limitadas")
class SecureLogger:
def __init__(self):
self.sensitive_patterns = [
r'password[=:]\s*\S+',
r'user[=:]\s*\S+',
r'[\w\.-]+@[\w\.-]+\.\w+',
r'senha[=:]\s*\S+',
r'pwd[=:]\s*\S+'
]
def sanitize_log(self, message):
if not isinstance(message, str):
message = str(message)
for pattern in self.sensitive_patterns:
message = re.sub(pattern, '[REDACTED]', message, flags=re.IGNORECASE)
return message
secure_logger = SecureLogger()
def limpar_logs_antigos(dias=30):
try:
log_dir = os.path.join(tempfile.gettempdir(), "JuntaPDF_Logs")
if not os.path.exists(log_dir):
return
agora = time.time()
limite_tempo = agora - (dias * 24 * 60 * 60)
for arquivo in os.listdir(log_dir):
if arquivo.startswith("juntapdf_") and arquivo.endswith(".log"):
caminho_completo = os.path.join(log_dir, arquivo)
if os.path.getmtime(caminho_completo) < limite_tempo:
os.remove(caminho_completo)
logging.debug(f"Log expirado removido: {arquivo}")
except Exception as e:
logging.warning(f"Erro ao limpar logs antigos: {e}")
def setup_log_rotation():
try:
log_dir = os.path.join(tempfile.gettempdir(), "JuntaPDF_Logs")
if not os.path.exists(log_dir):
return
for log_file in glob.glob(os.path.join(log_dir, "juntapdf_*.log")):
try:
if os.path.getsize(log_file) > 10 * 1024 * 1024:
base_name = os.path.basename(log_file)
name_without_ext = os.path.splitext(base_name)[0]
timestamp = time.strftime("%Y%m%d_%H%M%S")
new_name = f"{name_without_ext}_rotated_{timestamp}.log"
new_path = os.path.join(log_dir, new_name)
os.rename(log_file, new_path)
logging.info(f"Log rotacionado: {base_name} -> {new_name}")
except Exception as e:
logging.warning(f"Erro ao rotacionar log {log_file}: {e}")
except Exception as e:
logging.warning(f"Erro no sistema de rotação de logs: {e}")
def setup_logging():
setup_log_rotation()
log_dir = os.path.join(tempfile.gettempdir(), "JuntaPDF_Logs")
os.makedirs(log_dir, exist_ok=True)
limpar_logs_antigos(30)
log_file = os.path.join(log_dir, f"juntapdf_{time.strftime('%Y%m%d')}.log")
class SanitizedFileHandler(logging.FileHandler):
def emit(self, record):
record.msg = secure_logger.sanitize_log(record.msg)
if record.args:
record.args = tuple(secure_logger.sanitize_log(str(arg)) if isinstance(arg, str) else arg
for arg in record.args)
super().emit(record)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
SanitizedFileHandler(log_file, encoding='utf-8'),
logging.StreamHandler(sys.stdout)
]
)
logging.info("=" * 60)
logging.info("JuntaPDF Iniciado")
logging.info(f"Versão Python: {sys.version}")
logging.info(f"Diretório de Log: {log_dir}")
setup_logging()
PDF_LIBS_AVAILABLE = False
PIKEPDF_AVAILABLE = False
DND_AVAILABLE = False
PDFA_AVAILABLE = False
GHOSTSCRIPT_PATH = None
ICC_PROFILE_PATH = None
temp_files_global = []
temp_files_lock = threading.Lock()
MAX_FILE_SIZE = 500 * 1024 * 1024
MAX_TOTAL_PAGES = 10000
MAX_FILES_PER_OPERATION = 100
MAX_FILES_FOR_SPLIT = 10
MAX_PAGES_PER_FILE_FOR_SPLIT = 1000
MAX_TOTAL_OUTPUT_FILES = 500
MAX_PAGES_FOR_SINGLE_FILE_SPLIT = 200
class SecurityError(Exception):
pass
class PDFCorruptionError(Exception):
pass
class SystemOverloadError(Exception):
pass
class PDFProcessingError(Exception):
pass
class PerformanceMonitor:
def __init__(self):
self.operation_times = []
self.memory_usage = []
def check_system_health(self):
try:
import psutil
memory = psutil.virtual_memory()
if memory.percent > 90:
raise SystemOverloadError("Sistema com memória insuficiente")
cpu = psutil.cpu_percent(interval=1)
if cpu > 80:
raise SystemOverloadError("CPU sobrecarregada")
except ImportError:
pass
performance_monitor = PerformanceMonitor()
def create_operation_checkpoint(operation_type, files_processed, current_step, temp_files):
checkpoint = {
'operation_type': operation_type,
'files_processed': list(files_processed),
'current_step': current_step,
'temp_files': list(temp_files),
'timestamp': time.time()
}
checkpoint_file = os.path.join(tempfile.gettempdir(), f"juntapdf_checkpoint_{os.getpid()}.json")
try:
with open(checkpoint_file, 'w', encoding='utf-8') as f:
json.dump(checkpoint, f)
if hasattr(os, 'chmod'):
os.chmod(checkpoint_file, 0o600)
logging.info(f"Checkpoint criado: {checkpoint_file}")
except Exception as e:
logging.warning(f"Erro ao criar checkpoint: {e}")
def cleanup_checkpoint():
checkpoint_file = os.path.join(tempfile.gettempdir(), f"juntapdf_checkpoint_{os.getpid()}.json")
try:
if os.path.exists(checkpoint_file):
os.remove(checkpoint_file)
logging.info("Checkpoint removido")
except Exception as e:
logging.warning(f"Erro ao remover checkpoint: {e}")
class UIThreadDispatcher:
"""Gerenciador seguro de chamadas de interface a partir de threads."""
def __init__(self, root_element):
self.root = root_element
def dispatch(self, func, *args, **kwargs):
"""Envia a função para rodar na thread principal de forma segura."""
def wrapped():
try:
func(*args, **kwargs)
except Exception as e:
print(f"Erro na UI (Dispatch): {e}")
if self.root:
self.root.after(0, wrapped)
# Variável global para o dispatcher (será iniciada na criação da interface)
ui_dispatch = None
def exec_segura(cmd, timeout=60, descricao="Processo", progress_widget=None, cwd=None):
"""
Versão DEFINITIVA: Substitui todas as outras chamadas de subprocesso.
Blindada contra deadlocks, erros de unicode e falhas silenciosas.
"""
import shlex
# Se o comando vier como string única, divide corretamente
if isinstance(cmd, str):
cmd = shlex.split(cmd)
logging.info(f"Executando {descricao}: {cmd}")
try:
# Popen com PIPES e modo binário para evitar travamento de buffer e encoding
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=False, # Segurança
cwd=cwd
)
try:
# Comunicação com timeout real
stdout_bytes, stderr_bytes = process.communicate(timeout=timeout)
# Decodificação resiliente (ignora caracteres estranhos do Ghostscript)
stdout = stdout_bytes.decode('utf-8', errors='replace') if stdout_bytes else ""
stderr = stderr_bytes.decode('utf-8', errors='replace') if stderr_bytes else ""
# Retorna objeto compatível com subprocess.CompletedProcess
return subprocess.CompletedProcess(cmd, process.returncode, stdout, stderr)
except subprocess.TimeoutExpired:
process.kill()
stdout_bytes, stderr_bytes = process.communicate() # Limpa buffers
logging.error(f"Timeout em {descricao}")
raise subprocess.TimeoutExpired(cmd, timeout)
except Exception as e:
logging.error(f"Erro crítico em exec_segura ({descricao}): {str(e)}")
# Retorna um objeto de erro falso para não quebrar quem chama
return subprocess.CompletedProcess(cmd, 1, "", f"Erro Python: {str(e)}")
except subprocess.TimeoutExpired:
# Já tratado acima; propaga para camadas superiores
raise
except Exception as e:
logging.error(f"Erro em {descricao}: {e}")
raise
finally:
try:
if progress_widget and hasattr(progress_widget, 'stop'):
progress_widget.stop()
if hasattr(progress_widget, 'config'):
progress_widget.config(mode="determinate")
except Exception:
logging.debug("Progress widget stop falhou (ignorando).")
def validar_pdfa_com_ghostscript_rigoroso(file_path):
"""
Versão unificada, blindada e integrada com exec_segura (FASE 0).
Retorna: (bool_ok, motivo)
"""
if not os.path.exists(file_path):
return False, "Arquivo não encontrado."
# 1. Resolver Ghostscript de forma robusta
gs_cmd = (
GHOSTSCRIPT_PATH
if "GHOSTSCRIPT_PATH" in globals() and GHOSTSCRIPT_PATH
else shutil.which("gswin64c")
or shutil.which("gswin32c")
or "gswin64c"
)
# 2. Comando de validação: PDF/A estrito (policy=1)
cmd = [
gs_cmd,
"-dPDFA",
"-dBATCH",
"-dNOPAUSE",
"-dNOOUTERSAVE",
"-dPDFACompatibilityPolicy=1",
"-sDEVICE=pdfwrite",
"-sOutputFile=" + os.devnull,
file_path
]
try:
# 3. Execução blindada (sem timeout_dinamico!)
result = exec_segura(
cmd,
timeout=60,
descricao="Validação PDF/A Ghostscript",
progress_widget=None
)
stderr = result.stderr or ""
if result.returncode == 0:
return True, "Válido (Ghostscript aceitou)"
# 4. Diagnóstico
motivo = "Falha na validação PDF/A (policy=1)."
if "Error:" in stderr:
for line in stderr.splitlines():
if "Error:" in line:
motivo = line.strip()
break
elif stderr.strip():
motivo = f"Erro GS: {stderr.strip()[:100]}"
return False, motivo
except subprocess.TimeoutExpired:
return False, "Timeout: PDF muito complexo para validação."
except Exception as e:
return False, f"Erro técnico: {str(e)}"
def calcular_tamanho_lote(num_arquivos, tamanho_total_estimate):
"""
Calcula o tamanho ideal do lote baseado no número de arquivos e tamanho total
para evitar sobrecarga de memória
"""
# Limites conservadores para evitar memory overflow
if tamanho_total_estimate > 100 * 1024 * 1024: # > 100MB
return max(1, num_arquivos // 10) # Lotes muito pequenos para grandes arquivos
elif tamanho_total_estimate > 50 * 1024 * 1024: # > 50MB
return max(2, num_arquivos // 5)
elif num_arquivos > 20:
return 5 # Lotes de 5 arquivos para muitas operações
elif num_arquivos > 10:
return 3 # Lotes de 3 arquivos
else:
return num_arquivos # Processa tudo de uma vez se for pouco
def validate_file_security(file_path):
if not os.path.exists(file_path):
raise SecurityError("Arquivo não existe")
file_size = os.path.getsize(file_path)
if file_size > MAX_FILE_SIZE:
raise SecurityError(f"Arquivo muito grande ({file_size/1024/1024:.1f}MB > {MAX_FILE_SIZE/1024/1024}MB)")
filename = os.path.basename(file_path)
dangerous_patterns = [
'..',
'|',
';',
'`',
'\0',
'\r',
'\n'
]
if any(pattern in file_path for pattern in dangerous_patterns):
raise SecurityError("Nome de arquivo contém caracteres perigosos")
try:
absolute_path = os.path.abspath(file_path)
normalized_path = os.path.normpath(file_path)
if '..' in normalized_path or normalized_path != os.path.normpath(absolute_path):
raise SecurityError("Tentativa de path traversal detectada")
except Exception as e:
raise SecurityError(f"Erro ao validar caminho: {e}")
try:
with open(file_path, 'rb') as f:
header = f.read(4)
if header != b'%PDF':
raise SecurityError("Arquivo não é um PDF válido")
f.seek(0)
first_chunk = f.read(4096)
if b'/JavaScript' in first_chunk:
raise SecurityError("PDF contém JavaScript - risco de segurança")
except Exception as e:
raise SecurityError(f"Erro ao verificar arquivo: {e}")
def validate_split_limits(files, split_mode, options=None):
options = options or {}
if len(files) > MAX_FILES_FOR_SPLIT:
raise SystemOverloadError(
f"Máximo de {MAX_FILES_FOR_SPLIT} arquivos para divisão. "
f"Selecionados: {len(files)}"
)
total_input_pages = 0
total_output_files_estimate = 0
for f in files:
try:
reader = safe_pdf_reader(f)
pages = len(reader.pages)
total_input_pages += pages
if pages > MAX_PAGES_PER_FILE_FOR_SPLIT:
raise SystemOverloadError(
f"Arquivo '{os.path.basename(f)}' tem {pages} páginas. "
f"Máximo permitido para divisão: {MAX_PAGES_PER_FILE_FOR_SPLIT}"
)
if split_mode == "all":
total_output_files_estimate += pages
elif split_mode == "extract":
page_ranges = options.get('page_ranges', [])
total_output_files_estimate += 1
elif split_mode == "interval":
interval = options.get('interval', 5)
total_output_files_estimate += (pages + interval - 1) // interval
elif split_mode == "parts":
parts = options.get('parts', 3)
total_output_files_estimate += min(parts, pages)
except Exception as e:
logging.warning(f"Erro ao validar arquivo {f}: {e}")
continue
if total_input_pages > MAX_TOTAL_PAGES:
raise SystemOverloadError(
f"Total de {total_input_pages} páginas excede o limite de {MAX_TOTAL_PAGES}"
)
if total_output_files_estimate > MAX_TOTAL_OUTPUT_FILES:
raise SystemOverloadError(
f"Operação geraria aproximadamente {total_output_files_estimate} arquivos. "
f"Máximo permitido: {MAX_TOTAL_OUTPUT_FILES}"
)
if split_mode == "all" and total_input_pages > MAX_PAGES_FOR_SINGLE_FILE_SPLIT:
raise SystemOverloadError(
f"Divisão página-a-página limitada a {MAX_PAGES_FOR_SINGLE_FILE_SPLIT} páginas. "
f"Total: {total_input_pages}"
)
return total_input_pages, total_output_files_estimate
DND_AVAILABLE = False
try:
from tkinterdnd2 import DND_FILES, TkinterDnD
DND_AVAILABLE = True
logging.info("tkinterdnd2 disponível")
except ImportError:
DND_AVAILABLE = False
logging.warning("tkinterdnd2 não disponível")
try:
from PyPDF2 import PdfMerger, PdfReader, PdfWriter
PDF_LIBS_AVAILABLE = True
logging.info("PyPDF2 disponível")
except ImportError as e:
PDF_LIBS_AVAILABLE = False
logging.error(f"PyPDF2 não disponível: {e}")
try:
import pikepdf
PIKEPDF_AVAILABLE = True
logging.info(f"pikepdf {pikepdf.__version__} disponível")
except ImportError as e:
PIKEPDF_AVAILABLE = False
logging.warning(f"pikepdf não disponível: {e}")
except Exception as e:
PIKEPDF_AVAILABLE = False
logging.warning(f"Erro ao importar pikepdf: {e}")
cancel_operation = False
total_files_merge_var = None
total_pages_merge_var = None
total_size_merge_var = None
total_files_split_var = None
total_pages_split_var = None
total_size_split_var = None
protect_var = None
pdfa_var = None
pdfa_var_split = None
compress_var = None
meta_var = None
split_mode_var = None
split_interval_var = None
split_parts_var = None
status_var = None
compress_level = None
pdf_metadata_cache = {}
def safe_temp_file(prefix="temp", suffix=".pdf"):
import tempfile
temp_file = tempfile.NamedTemporaryFile(
prefix=prefix,
suffix=suffix,
delete=False
)
temp_path = temp_file.name
temp_file.close()
add_temp_file(temp_path)
return temp_path
def add_temp_file(file_path):
with temp_files_lock:
if file_path not in temp_files_global:
temp_files_global.append(file_path)
logging.debug(f"Arquivo temporário registrado: {file_path}")
def remove_temp_file(file_path):
with temp_files_lock:
if file_path in temp_files_global:
temp_files_global.remove(file_path)
logging.debug(f"Arquivo temporário removido: {file_path}")
def cleanup_temp_files():
logging.info("Iniciando limpeza de arquivos temporários")
with temp_files_lock:
files_to_clean = list(temp_files_global)
for temp_file in files_to_clean:
try:
if os.path.exists(temp_file):
os.remove(temp_file)
logging.debug(f"Arquivo temporário removido: {temp_file}")
except Exception as e:
logging.warning(f"Erro ao limpar {temp_file}: {e}")
try:
cleanup_checkpoint()
except Exception as e:
logging.warning(f"Erro ao limpar checkpoint: {e}")
try:
if 'thread_executor' in globals():
thread_executor.shutdown(wait=True, timeout=5)
except TypeError:
try:
if 'thread_executor' in globals():
thread_executor.shutdown(wait=True)
except Exception as e:
logging.warning(f"Falha ao encerrar thread_executor: {e}")
except Exception as e:
logging.warning(f"Erro ao encerrar thread_executor: {e}")
atexit.register(cleanup_temp_files)
logging.info("Cleanup registrado no atexit")
def widget_exists(widget):
try:
return widget is not None and hasattr(widget, "winfo_exists") and widget.winfo_exists()
except Exception:
return False
def safe_widget_config(widget, **kwargs):
try:
if widget is None:
return False
if hasattr(widget, "winfo_exists") and widget.winfo_exists():
try:
widget.config(**kwargs)
return True
except tk.TclError:
return False
except Exception:
return False
return False
def encontrar_ghostscript():
logging.info("Procurando Ghostscript...")
for cmd in ("gswin64c", "gswin32c", "gs"):
caminho = shutil.which(cmd)
if caminho:
logging.info(f"Ghostscript encontrado: {caminho}")
return caminho
possiveis_pastas = [
r"C:\Program Files\gs",
r"C:\Program Files (x86)\gs",
r"C:\Ghostscript",
]
for base in possiveis_pastas:
if not os.path.exists(base):
continue
versoes = glob.glob(os.path.join(base, "gs*", "bin", "gswin64c.exe"))
if not versoes:
versoes = glob.glob(os.path.join(base, "gs*", "bin", "gswin32c.exe"))
if versoes:
versoes.sort(reverse=True)
logging.info(f"Ghostscript encontrado: {versoes[0]}")
return versoes[0]
logging.warning("Ghostscript não encontrado")
return None
def get_app_path():
"""Retorna o diretório base corretamente, seja .py ou .exe"""
if getattr(sys, 'frozen', False):
return os.path.dirname(sys.executable)
else:
return os.path.dirname(os.path.abspath(__file__))
def encontrar_ghostscript_portavel():
"""Busca Ghostscript na pasta portátil e CONFIGURA AS BIBLIOTECAS"""
exe_dir = get_app_path()
# Caminho esperado: pasta_do_script/Ghostscript/bin/gswin64c.exe
caminho_bin = os.path.join(exe_dir, "Ghostscript", "bin", "gswin64c.exe")
if os.path.exists(caminho_bin):
# --- PROTEÇÃO CRÍTICA (GS_LIB) ---
caminho_lib = os.path.join(exe_dir, "Ghostscript", "lib")
if os.path.exists(caminho_lib):
os.environ['GS_LIB'] = caminho_lib
logging.info(f"✅ GS_LIB configurado: {caminho_lib}")
else:
logging.warning("⚠️ Pasta 'lib' não encontrada - GS portátil pode falhar")
# ---------------------------------
logging.info(f"✅ Ghostscript PORTÁTIL encontrado: {caminho_bin}")
return caminho_bin
# Se não achar portátil, usa o do sistema (sua função original)
logging.info("🔍 Ghostscript portátil não encontrado, buscando no sistema...")
return encontrar_ghostscript()
def encontrar_perfil_icc_portavel(gs_exec=None):
"""BUSCA ABSOLUTA BLINDADA - encontra sRGB mesmo sem extensão"""
# 1. Diretório base CORRETO (script ou executável)
if getattr(sys, 'frozen', False):
base_dir = os.path.dirname(sys.executable) # Pasta do .exe
else:
base_dir = os.path.dirname(os.path.abspath(__file__)) # Pasta do .py
logging.info(f"📁 Base directory: {base_dir}")
# 2. Lista de possíveis nomes/locais - AGORA INCLUI SEM EXTENSÃO
candidatos = []
# Na pasta do script - COM E SEM EXTENSÃO
candidatos.extend([
os.path.join(base_dir, "srgb.icc"),
os.path.join(base_dir, "sRGB.icc"),
os.path.join(base_dir, "srgb"), # ← SEM EXTENSÃO
os.path.join(base_dir, "sRGB"), # ← SEM EXTENSÃO
os.path.join(base_dir, "Ghostscript", "Resource", "ColorSpace", "srgb.icc"),
os.path.join(base_dir, "Ghostscript", "Resource", "ColorSpace", "sRGB.icc"),
os.path.join(base_dir, "Ghostscript", "Resource", "ColorSpace", "srgb"), # ← SEM EXTENSÃO
os.path.join(base_dir, "Ghostscript", "Resource", "ColorSpace", "sRGB"), # ← SEM EXTENSÃO
])
# 3. Busca ativa
for caminho in candidatos:
if os.path.exists(caminho):
logging.info(f"✅ PERFIL ICC ENCONTRADO: {caminho}")
return caminho
# 4. Busca inteligente por arquivos que contenham "srgb" no nome (case insensitive)
logging.info("🔍 Buscando arquivos que contenham 'srgb' no nome...")
# Procura em toda a estrutura de pastas
for root_dir, dirs, files in os.walk(base_dir):
for file in files:
if 'srgb' in file.lower(): # Encontra qualquer arquivo com "srgb" no nome
caminho_completo = os.path.join(root_dir, file)
logging.info(f"📍 Arquivo potencial encontrado: {caminho_completo}")
# Verifica se é um arquivo ICC válido (pelo menos 1KB)
try:
if os.path.getsize(caminho_completo) > 1024: # Mínimo 1KB
logging.info(f"✅ PERFIL ICC ENCONTRADO (busca inteligente): {caminho_completo}")
return caminho_completo
except:
continue
# 5. Fallback: busca no sistema (função original)
logging.warning("🔍 ICC não encontrado localmente, buscando no sistema...")
fallback = encontrar_perfil_icc(gs_exec) # Sua função original do sistema
if fallback:
logging.info(f"✅ ICC encontrado no sistema: {fallback}")
return fallback
# 6. Diagnóstico detalhado do fracasso
logging.error("❌ NENHUM perfil ICC encontrado em nenhum local!")
# Lista todos os arquivos na pasta base para debug
try:
logging.info("📋 Arquivos na pasta base:")
for item in os.listdir(base_dir):
item_path = os.path.join(base_dir, item)
if os.path.isfile(item_path):
logging.info(f" 📄 {item} ({os.path.getsize(item_path)} bytes)")
except Exception as e:
logging.warning(f"Erro ao listar arquivos: {e}")
return None
def encontrar_perfil_icc(gs_exec):
if not gs_exec:
return None
gs_dir = os.path.dirname(os.path.dirname(gs_exec))
possible_paths = [
os.path.join(gs_dir, "iccprofiles", "srgb.icc"),
os.path.join(gs_dir, "iccprofiles", "default_rgb.icc"),
os.path.join(gs_dir, "lib", "srgb.icc"),
os.path.join(gs_dir, "Resource", "ColorSpace", "sRGB.icc"),
]
for icc_path in possible_paths:
if os.path.exists(icc_path):
logging.info(f"Perfil ICC encontrado: {icc_path}")
return icc_path
try:
for root_dir, dirs, files in os.walk(gs_dir):
for file in files:
if file.lower() in ("srgb.icc", "default_rgb.icc"):
found_path = os.path.join(root_dir, file)
logging.info(f"Perfil ICC encontrado: {found_path}")
return found_path
except Exception as e:
logging.warning(f"Erro na busca recursiva: {e}")
logging.warning("Perfil ICC não encontrado")
return None
def diagnosticar_compressao(input_path):
"""Diagnóstico para entender por que a compressão não está funcionando"""
try:
import pikepdf
with pikepdf.open(input_path) as pdf:
info = {
'paginas': len(pdf.pages),
'versao_pdf': pdf.pdf_version,
'criptografado': pdf.is_encrypted,
'streams_total': 0,
'imagens': 0
}
# Analisa objetos do PDF de forma segura
for obj in pdf.objects:
try:
if hasattr(obj, 'get'):
# Verifica se é imagem
if '/Subtype' in obj and obj['/Subtype'] == pikepdf.Name('/Image'):
info['imagens'] += 1
# Verifica se tem stream (pode ser comprimido)
if '/Length' in obj:
info['streams_total'] += 1
except Exception:
continue # Ignora objetos problemáticos
return info
except Exception as e:
return {'erro': str(e)}
# Agora chama as funções portáteis (que fazem o fallback se precisar)
GHOSTSCRIPT_PATH = encontrar_ghostscript_portavel()
ICC_PROFILE_PATH = encontrar_perfil_icc_portavel(GHOSTSCRIPT_PATH)
PDFA_AVAILABLE = bool(GHOSTSCRIPT_PATH and ICC_PROFILE_PATH)
def comprimir_pdf(input_path, output_path, nivel="Qualidade Equilibrada", forcar=False):
"""
Sistema de compressão APENAS para PDFs NORMAS (NÃO PDF/A)
"""
# 🎯 REDIRECIONAMENTO CRÍTICO: Se for PDF/A, use a função robusta
if PDFA_AVAILABLE and pdfa_var.get():
logging.info("🔀 Redirecionando compressão para motor PDF/A robusto...")
return converter_para_pdfa(input_path, output_path, nivel)
# MAPEAMENTO para compressão NORMAL (não PDF/A)
config_map = {
"Tamanho Mínimo": "/ebook",
"Qualidade Equilibrada": "/printer",
"Qualidade Máxima": "/prepress"
}
if nivel not in config_map:
nivel = "Qualidade Equilibrada"
gs_setting = config_map[nivel]
if not GHOSTSCRIPT_PATH:
raise PDFProcessingError("Ghostscript não disponível")
# ⚠️ COMANDO SEM PDF/A - apenas compressão normal
gs_args = [
GHOSTSCRIPT_PATH,
"-sDEVICE=pdfwrite",
"-dNOPAUSE",
"-dBATCH",
"-dQUIET",
f"-dPDFSETTINGS={gs_setting}",
f"-sOutputFile={output_path}",
input_path
]
resultado = exec_segura(gs_args, timeout=120, descricao=f"Compressão {nivel}")
if resultado.returncode != 0:
raise PDFProcessingError(f"Ghostscript falhou: {resultado.stderr}")
# Cálculo de estatísticas
tamanho_original = os.path.getsize(input_path)
tamanho_comprimido = os.path.getsize(output_path)
if not os.path.exists(output_path) or os.path.getsize(output_path) == 0:
raise PDFProcessingError("Arquivo comprimido não gerado")
reducao_percentual = ((tamanho_original - tamanho_comprimido) / tamanho_original) * 100
logging.info(f"📊 Compressão Normal ({nivel}): {tamanho_original/1024/1024:.2f}MB → {tamanho_comprimido/1024/1024:.2f}MB ({reducao_percentual:+.1f}%)")
return reducao_percentual, f"Compressão {nivel} aplicada"
def deve_comprimir(diagnostico, tamanho_original, nivel):
"""Decide se vale a pena comprimir baseado em múltiplos fatores"""
# Fator 1: Tamanho muito pequeno
if tamanho_original < 50 * 1024: # < 50KB
logging.info("📏 PDF muito pequeno - pulando compressão")
return False
# Fator 2: Já altamente comprimido
if ('taxa_compressao_existente' in diagnostico and
diagnostico['taxa_compressao_existente'] > 0.8):
logging.info("🎯 PDF já bem comprimido - pulando compressão")
return False
# Fator 3: Muitas imagens JPEG (já comprimidas)
if (diagnostico.get('imagens', 0) > 5 and
diagnostico.get('streams_comprimidos', 0) / max(1, diagnostico.get('streams_total', 1)) > 0.7):
logging.info("🖼️ Muitas imagens já comprimidas - risco de perda de qualidade")
return nivel != "Tamanho Mínimo" # Só comprime se for modo agressivo
# Fator 4: PDF criptografado
if diagnostico.get('criptografado', False):
logging.info("🔒 PDF criptografado - pulando compressão")
return False
return True
def comprimir_com_pikepdf(input_path, output_path, nivel="Qualidade Máxima", diagnostico=None):
"""Versão melhorada com decisões inteligentes"""
if not PIKEPDF_AVAILABLE:
raise PDFProcessingError("pikepdf não disponível")
import pikepdf
logging.info(f"Comprimindo com pikepdf inteligente ({nivel}): {os.path.basename(input_path)}")
tamanho_original = os.path.getsize(input_path)
# Configurações adaptativas baseadas no diagnóstico
configuracoes = {
"Qualidade Máxima": {
"compress_streams": True,
"stream_decode_level": pikepdf.StreamDecodeLevel.none,
"object_stream_mode": pikepdf.ObjectStreamMode.generate,
},
"Qualidade Equilibrada": {
"compress_streams": True,
"stream_decode_level": pikepdf.StreamDecodeLevel.generalized,
"object_stream_mode": pikepdf.ObjectStreamMode.generate,
},
"Tamanho Mínimo": {
"compress_streams": True,
"stream_decode_level": pikepdf.StreamDecodeLevel.all,
"object_stream_mode": pikepdf.ObjectStreamMode.generate,
}
}
config = configuracoes.get(nivel, configuracoes["Qualidade Máxima"])
# Otimização: não normaliza conteúdo se já for bem estruturado
if diagnostico and diagnostico.get('versao_pdf', '') >= '1.5':
config['normalize_content'] = False
else:
config['normalize_content'] = True
with pikepdf.open(input_path) as pdf:
pdf.save(output_path, **config)
if not os.path.exists(output_path) or os.path.getsize(output_path) == 0: