diff --git a/smpplib/client.py b/smpplib/client.py index bcd3b91..fe82ad9 100644 --- a/smpplib/client.py +++ b/smpplib/client.py @@ -28,6 +28,7 @@ import socket import struct +from time import sleep from smpplib import consts, exceptions, smpp @@ -61,12 +62,14 @@ class Client(object): vendor = None _socket = None sequence_generator = None - - def __init__(self, host, port, timeout=5, sequence_generator=None, logger_name=None): + + def __init__(self, host, port, timeout=5, sequence_generator=None, max_outstanding_operations=None, logger_name=None): self.host = host self.port = int(port) self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.timeout = timeout + self.outstanding_operations = set() + self.max_outstanding_operations = max_outstanding_operations self.logger = logging.getLogger(logger_name or 'smpp.Client.{}'.format(id(self))) if sequence_generator is None: sequence_generator = SimpleSequenceGenerator() @@ -167,18 +170,41 @@ def unbind(self): except socket.timeout: raise exceptions.ConnectionError() - def send_pdu(self, p): + def manage_outstanding_operations(self, sending, pdu): + if not self.max_outstanding_operations: + return + + if pdu.is_response(): + operation = '{}{}'.format('S' if sending else 'C', pdu.sequence) + try: + self.outstanding_operations.remove(operation) + self.logger.debug('Unregistering outstanding operation {}'.format(operation)) + except KeyError: + self.logger.warning('Failed to unregistered outstanding operation {}'.format(operation)) + else: + operation = '{}{}'.format('C' if sending else 'S', pdu.sequence) + if pdu.sequence in self.outstanding_operations: + self.logger.warning('Outstanding operation {} already registered'.format(operation)) + else: + self.outstanding_operations.add(operation) + self.logger.debug('Registering outstanding operation {}'.format(operation)) + + def send_pdu(self, pdu): """Send PDU to the SMSC""" - if self.state not in consts.COMMAND_STATES[p.command]: + if self.state not in consts.COMMAND_STATES[pdu.command]: raise exceptions.PDUError("Command %s failed: %s" % ( - p.command, + pdu.command, consts.DESCRIPTIONS[consts.SMPP_ESME_RINVBNDSTS], )) - self.logger.debug('Sending %s PDU', p.command) + if self.max_outstanding_operations and pdu.is_request(): + while len(self.outstanding_operations) >= self.max_outstanding_operations: + sleep(.1) + + self.logger.debug('Sending %s PDU', pdu.command) - generated = p.generate() + generated = pdu.generate() self.logger.debug('>>%s (%d bytes)', binascii.b2a_hex(generated), len(generated)) @@ -194,6 +220,8 @@ def send_pdu(self, p): raise exceptions.ConnectionError() sent += sent_last + self.manage_outstanding_operations(sending=True, pdu=pdu) + return True def read_pdu(self): @@ -227,6 +255,8 @@ def read_pdu(self): self.logger.debug('Read %s PDU', pdu.command) + self.manage_outstanding_operations(sending=False, pdu=pdu) + if pdu.is_error(): return pdu