-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSSHSession.py
More file actions
107 lines (89 loc) · 3.89 KB
/
SSHSession.py
File metadata and controls
107 lines (89 loc) · 3.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import paramiko
import os
import posixpath
from stat import S_ISDIR
class SSHSession:
def __init__(self, host, port, username, password, key_path=None, passphrase=None):
self.username = username
self.password = password
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
pkey = None
if key_path:
try:
pkey = paramiko.RSAKey.from_private_key_file(key_path, password=passphrase)
except paramiko.ssh_exception.PasswordRequiredException:
raise ValueError("Private key requires a passphrase")
except paramiko.ssh_exception.SSHException:
raise ValueError("Unsupported key type or invalid key file")
self.client.connect(hostname=host, port=port, username=username, password=password, pkey=pkey)
self.sftp = self.client.open_sftp()
def sftp_walk(self, remotepath):
"""Generator to walk through remote directories recursively"""
files = []
folders = []
for f in self.sftp.listdir_attr(remotepath):
if S_ISDIR(f.st_mode):
folders.append(f.filename)
else:
files.append(f.filename)
if files:
yield remotepath, files
for folder in folders:
new_path = posixpath.join(remotepath, folder)
for x in self.sftp_walk(new_path):
yield x
def put(self, localfile, remotefile):
"""Upload a single file"""
self.sftp.put(localfile, remotefile)
def put_all(self, localpath, remotepath):
"""Upload a directory recursively"""
for root, dirs, files in os.walk(localpath):
rel_root = os.path.relpath(root, localpath)
remote_root = posixpath.join(remotepath, rel_root) if rel_root != "." else remotepath
try:
self.sftp.mkdir(remote_root)
except IOError:
pass # directory may already exist
for file in files:
local_file = os.path.join(root, file)
remote_file = posixpath.join(remote_root, file)
self.put(local_file, remote_file)
def get(self, remotefile, localfile):
"""Download a single file"""
self.sftp.get(remotefile, localfile)
def get_all(self, remotepath, localpath):
"""Download a remote directory recursively"""
os.makedirs(localpath, exist_ok=True)
for remote_dir, files in self.sftp_walk(remotepath):
rel_dir = posixpath.relpath(remote_dir, remotepath)
local_dir = os.path.join(localpath, rel_dir) if rel_dir != "." else localpath
try:
os.makedirs(local_dir, exist_ok=True)
except Exception as e:
print("Error:", e)
continue
for file in files:
remote_file = posixpath.join(remote_dir, file)
local_file = os.path.join(local_dir, file)
try:
self.sftp.get(remote_file, local_file)
except FileNotFoundError:
print(f"Remote file not found: {remote_file}")
except Exception as e:
print(f"Failed to download {remote_file}: {e}")
def executeCommand(self, command, sudo=False):
"""Execute a command on the remote server"""
feed_password = False
if sudo and self.username != "root":
command = f"sudo -S -p '' {command}"
feed_password = self.password is not None and len(self.password) > 0
stdin, stdout, stderr = self.client.exec_command(command)
if feed_password:
stdin.write(self.password + "\n")
stdin.flush()
return stdout.readlines()
def close(self):
if self.client is not None:
self.client.close()
self.client = None