forked from DNS-Africa/EPP-Client-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathepp.py
More file actions
executable file
·284 lines (247 loc) · 11.1 KB
/
epp.py
File metadata and controls
executable file
·284 lines (247 loc) · 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Sample Python EPP client
"""
© Domain Name Services (Pty) Ltd. 2010. All rights reserved.
© DNS Africa Ltd. 2022. All rights reserved.
$Id$
"""
__version__ = "$Id$"
import gettext
import logging
import optparse
import os
from os import isatty
import os.path
import random
import re
import socket
import ssl
import struct
import sys
import time
from lib import colorlogging
# Enable translation
t = gettext.translation('rye', os.path.join(os.path.dirname(__file__), 'locale'), fallback=True)
_ = t.gettext
log = logging.getLogger()
packfmt = "!I"
class EPPTCPTransport:
"""An epp client transport class. This is just the raw TCP IP protocol. The XML data needs to be handled separatly.
The EPP transport protocol is definied at http://tools.ietf.org/html/rfc5734 it looks complicated but is
actually very simple and elegant.
the actual data should be XML data formated according to RFC5730-RFC5733
No validation of any data takes place in this Class
"""
sock = None
_greeting = ""
_isssl = None
def __init__(self, host="127.0.0.1", port=3121, usessl=True, cert=None, nogreeting=False):
"""Connect to the EPP server. Server header in self.header"""
if usessl:
self._isssl = True
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
if cert:
context.load_cert_chain(cert) #'/u/elp/test/epp/dnservices.pem') #certfile="cert.crt", keyfile="cert.key")
self.sock = context.wrap_socket(socket.socket(socket.AF_INET))
# s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
## do not require a certificate from the server
# self.sock = ssl.wrap_socket(s,cert_reqs = ssl.CERT_NONE,certfile = cert, ssl_version=ssl_version_dict[sslversion])
# print("Port %s" % (port))
# conn.connect(('127.0.0.1',8443))
self.sock.connect((host, port))
if not nogreeting:
self._greeting = self.get()
def get(self):
"""Get an EPP response """
header = bytearray()
rest = bytearray() # If we get more bytes than we expected.
while len(header) < 4:
if self._isssl:
data = self.sock.recv(4)
#log.debug("Initial header via TLS. (%s), %s len %s", type(data), data, len(data))
else:
data = self.sock.recv(4, 0x40) # 0x40 is the MSG_WAITALL flag which can only be used on plain sockets
#log.debug("Initial header no encryption. (%s), %s len %s", type(data), data, len(data))
if len(data) == 0:
print("<!-- " + _(
"Did not receive anything from the server or socket timeout. Was the initial login invalid?") + " -->")
sys.exit(1)
header = header + data
if len(header) > 4:
rest = header[4:]
header = header[:4]
bytes1 = struct.unpack(packfmt, header)[0]
log.debug("Initial header no encryption. (%s), %s Body should be %s bytes", type(header), header, bytes1)
data = bytearray() # the buffer
total = rest # Initialize with anything extra read while we were getting the header.
while len(total) < (bytes1 - 4):
bytesleft = (bytes1 - 4) - len(total)
data = self.sock.recv(bytesleft)
length = len(data)
#length = self.sock.recv_into(data, 16384)
#log.debug("Data %s %s", len(data), data)
if length == 0:
print("<!-- " + _(
"Could not receive the rest of the expected message header. Only have {0} bytes out of expected {1}.").format(
len(total), (bytes1 - 4)) + " -->")
sys.exit(1)
total = total + data
return str(total,'utf-8')
def send(self, data: bytes):
"""Send an EPP command """
# Write the header
self.sock.write(struct.pack('=L', socket.htonl(len(data) + 4)))
log.debug('Sending size %s bytes %s' % ((len(data) + 4),struct.pack('=L', socket.htonl(len(data) + 4))))
# Send the data
self.sock.write(data)
log.debug("Sent: %s", data)
def request(self, data):
"""Convenience function. Does a send and then returns the result of a get. Also converts the string __CLTRID__ to a suitable unique clTRID"""
cltrid = "EPPTEST-%s.%s" % (time.time(), random.randint(0, 1000000))
self._lastcltrid = cltrid
data = data.replace('__CLTRID__', cltrid)
self.send(bytes(data, encoding='utf8'))
return self.get()
def getGreeting(self):
"""Returns the EPP servers greeting"""
return str(self._greeting)
def close(self):
"""Close the socket"""
self.sock.close()
def templatefill(template, defines):
"""Fill out the given template with values as requested"""
data = {}
for d in defines:
p = d.find("=")
if p == -1:
print("Unable to interpret definition %s. Aborting!" % (d))
sys.exit(1)
k = d[:p]
v = d[p + 1:]
data[k] = v
return template % data
def send_epp(data):
if options.defs is not None and len(options.defs) > 0:
data = templatefill(data, options.defs)
if options.testing:
print(data)
else:
print(epp.request(data))
print("\n<!-- ================ -->\n")
def eppLogin(username, password, services=['urn:ietf:params:xml:ns:domain-1.0', 'urn:ietf:params:xml:ns:contact-1.0']):
"""Performs an epp login command. Ignore the services parameter for the co.za namespace."""
template = """<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<epp xmlns="urn:ietf:params:xml:ns:epp-1.0">
<command>
<login>
<clID>%(username)s</clID>
<pw>%(password)s</pw>
<options>
<version>1.0</version>
<lang>en</lang>
</options>
<svcs>"""
for svc in services:
template = template + " <objURI>%s</objURI>\n" % (svc)
template = template + """
</svcs>
</login>
<clTRID>__CLTRID__</clTRID>
</command>
</epp>"""
data = {'username': username, 'password': password}
result = str(epp.request(template % data))
if re.search('result code.*1000', result): return True # Good login.
if re.search('result code.*2002', result):
return True # Already logged in.
else:
print(result)
sys.exit(1)
def fileRead(fname):
"""Tries to locate fname and read it in."""
if not os.path.exists(fname):
newfname = os.path.join(os.path.dirname(__file__), fname)
if not os.path.exists(newfname):
print("Unable to locate file %s. Aborting!" % (fname))
sys.exit(1)
else:
fname = newfname
return open(fname).read()
if __name__ == "__main__":
usage = (_("Usage:") + " %prog [<options>] <files...>\n" +
_("Example EPP client. The individual EPP commands should be in files specified on the command line.\n")
+ _("Eg: ./epp.py --host=reg-test.dnservices.co.za login.xml create_host.xml create_domain.xml\n")
+ _("Will replace all occurances of __CLTRID__ with a suitable clTRID value\n"))
if sys.version_info[0] >= 2 and sys.version_info[1] >= 6:
usage = usage + __doc__
# print usage
# sys.exit(1)
parser = optparse.OptionParser(usage)
parser.add_option("--host", "--ip", dest="host", default="127.0.0.1", help=_("Host to connect to [%default] "))
parser.add_option("--port", "-p", dest="port", default="8443", help=_("Port to connect to") + " [%default]")
parser.add_option("--cert", "-c", dest="cert", help=_("SSL certificate to use for authenticated connections"))
parser.add_option("--sslversion", "-s", dest="sslversion", default='TLSv1',
help=_("The ssl version identifier {SSLv2, SSLv3, SSLv23, TLSv1}"))
parser.add_option("--nossl", dest="nossl", action="store_true", default=False, help=_("Do not use SSL"))
parser.add_option("--verbose", "-v", dest="verbose", action="store_true", default=False,
help=_("Show the EPP server greeting and other debug info"))
parser.add_option("--username", "-u", dest="username", help=_(
"Username to login with. If not specified will assume one of the provided files will do the login."))
parser.add_option("--password", dest="password", help=_("Password to login with"))
parser.add_option("--ng", dest="nogreeting", action="store_true", default=False,
help=_("Do not wait for an EPP server greeting"))
parser.add_option("--testing", "-t", dest="testing", action="store_true", default=False,
help=_("Do not connect to the server just output the completed templates"))
parser.add_option("-d", "--define", dest="defs", action="append",
help=_("For scripting, any fields to be replaced (python dictionary subsitution). Eg: -d "
"DOMAIN=test.co.za will replace %(DOMAIN)s with test.co.za"))
(options, args) = parser.parse_args()
if options.verbose:
logging.basicConfig(level=logging.DEBUG)
colorlogging.enableLogging(debug=True, color=True, console=True)
else:
logging.basicConfig(level=logging.INFO)
colorlogging.enableLogging(debug=False, color=True, console=True)
# Args not necessary
# if not args:
# parser.print_help()
# sys.exit(2)
if not options.testing:
try:
if options.cert:
try:
open(options.cert).close()
except IOError as e:
print("Could not read the SSL certificate %s\n%s" % (options.cert, e))
sys.exit(1)
epp = EPPTCPTransport(options.host, int(options.port), usessl=not options.nossl, cert=options.cert,
nogreeting=options.nogreeting)
else:
epp = EPPTCPTransport(options.host, int(options.port), not options.nossl, nogreeting=options.nogreeting)
except ssl.SSLError as e:
print("Could not connect due to an SSL error")
print(e)
sys.exit(1)
if options.verbose:
print("<!-- Greeting:-->")
print(str(epp.getGreeting()))
if options.username is not None:
eppLogin(options.username, options.password)
# Permit the first xml command file from an input pipe - that will permit flexibility of xml command file pre-processing
# Eg. remove comments:
# cat create_domain.xml | sed -e 's/<!--.*-->//g' -e '/<!--/,/-->/d' | epp.py ...
if not isatty(sys.stdin.isatty()):
send_epp(sys.stdin.read())
for fname in args:
try:
send_epp(fileRead(fname))
except IOError:
if not os.path.exists(fname):
print(_("The file %s does not exist.") % fname)
else:
print(_("Unable to read %s.") % fname)
sys.exit(1)
if not options.testing:
epp.close()