diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index f439f8d6..8a6aa99e 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -42,53 +42,73 @@ def __heartbeat(): + wait = base = 30 + while not __finished.is_set(): try: __protocol.heartbeat() + wait = base # reset to base wait time on success except Exception as exc: logger.error(str(exc)) + wait = min(60, wait * 2 or 1) # double wait time with each consecutive error up to a maximum - __finished.wait(30) + __finished.wait(wait) def __report(): + wait = base = 0 + while not __finished.is_set(): try: __protocol.report(__queue) # is blocking actually, blocks for max config.QUEUE_TIMEOUT seconds + wait = base except Exception as exc: logger.error(str(exc)) + wait = min(60, wait * 2 or 1) - __finished.wait(0) + __finished.wait(wait) def __report_log(): + wait = base = 0 + while not __finished.is_set(): try: __protocol.report_log(__log_queue) + wait = base except Exception as exc: logger.error(str(exc)) + wait = min(60, wait * 2 or 1) - __finished.wait(0) + __finished.wait(wait) def __send_profile_snapshot(): + wait = base = 0.5 + while not __finished.is_set(): try: __protocol.send_snapshot(__snapshot_queue) + wait = base except Exception as exc: logger.error(str(exc)) + wait = min(60, wait * 2 or 1) - __finished.wait(0.5) + __finished.wait(wait) def __query_profile_command(): + wait = base = config.get_profile_task_interval + while not __finished.is_set(): try: __protocol.query_profile_commands() + wait = base except Exception as exc: logger.error(str(exc)) + wait = min(60, wait * 2 or 1) - __finished.wait(config.get_profile_task_interval) + __finished.wait(wait) def __command_dispatch(): diff --git a/skywalking/agent/protocol/grpc.py b/skywalking/agent/protocol/grpc.py index 7ad0a8e3..69c0ebb7 100644 --- a/skywalking/agent/protocol/grpc.py +++ b/skywalking/agent/protocol/grpc.py @@ -79,6 +79,7 @@ def heartbeat(self): except grpc.RpcError: self.on_error() + raise def on_error(self): traceback.print_exc() if logger.isEnabledFor(logging.DEBUG) else None @@ -86,14 +87,20 @@ def on_error(self): self.channel.subscribe(self._cb, try_to_connect=True) def report(self, queue: Queue, block: bool = True): - start = time() + start = None def generator(): + nonlocal start + while True: try: - timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int - if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously - return + timeout = config.QUEUE_TIMEOUT # type: int + if not start: # make sure first time through queue is always checked + start = time() + else: + timeout -= int(time() - start) + if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously + return segment = queue.get(block=block, timeout=timeout) # type: Segment except Empty: return @@ -145,16 +152,23 @@ def generator(): self.traces_reporter.report(generator()) except grpc.RpcError: self.on_error() + raise # reraise so that incremental reconnect wait can process def report_log(self, queue: Queue, block: bool = True): - start = time() + start = None def generator(): + nonlocal start + while True: try: - timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int - if timeout <= 0: - return + timeout = config.QUEUE_TIMEOUT # type: int + if not start: # make sure first time through queue is always checked + start = time() + else: + timeout -= int(time() - start) + if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously + return log_data = queue.get(block=block, timeout=timeout) # type: LogData except Empty: return @@ -169,16 +183,23 @@ def generator(): self.log_reporter.report(generator()) except grpc.RpcError: self.on_error() + raise def send_snapshot(self, queue: Queue, block: bool = True): - start = time() + start = None def generator(): + nonlocal start + while True: try: - timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int - if timeout <= 0: - return + timeout = config.QUEUE_TIMEOUT # type: int + if not start: # make sure first time through queue is always checked + start = time() + else: + timeout -= int(time() - start) + if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously + return snapshot = queue.get(block=block, timeout=timeout) # type: TracingThreadSnapshot except Empty: return @@ -199,3 +220,4 @@ def generator(): self.profile_channel.send(generator()) except grpc.RpcError: self.on_error() + raise diff --git a/skywalking/agent/protocol/http.py b/skywalking/agent/protocol/http.py index 2ca8bac4..943c6f7c 100644 --- a/skywalking/agent/protocol/http.py +++ b/skywalking/agent/protocol/http.py @@ -44,14 +44,20 @@ def heartbeat(self): self.service_management.send_heart_beat() def report(self, queue: Queue, block: bool = True): - start = time() + start = None def generator(): + nonlocal start + while True: try: - timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int - if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously - return + timeout = config.QUEUE_TIMEOUT # type: int + if not start: # make sure first time through queue is always checked + start = time() + else: + timeout -= int(time() - start) + if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously + return segment = queue.get(block=block, timeout=timeout) # type: Segment except Empty: return @@ -68,14 +74,20 @@ def generator(): pass def report_log(self, queue: Queue, block: bool = True): - start = time() + start = None def generator(): + nonlocal start + while True: try: - timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int - if timeout <= 0: - return + timeout = config.QUEUE_TIMEOUT # type: int + if not start: # make sure first time through queue is always checked + start = time() + else: + timeout -= int(time() - start) + if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously + return log_data = queue.get(block=block, timeout=timeout) # type: LogData except Empty: return diff --git a/skywalking/agent/protocol/kafka.py b/skywalking/agent/protocol/kafka.py index a0b1a4c4..d7ebca13 100644 --- a/skywalking/agent/protocol/kafka.py +++ b/skywalking/agent/protocol/kafka.py @@ -45,14 +45,20 @@ def heartbeat(self): self.service_management.send_heart_beat() def report(self, queue: Queue, block: bool = True): - start = time() + start = None def generator(): + nonlocal start + while True: try: - timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int - if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously - return + timeout = config.QUEUE_TIMEOUT # type: int + if not start: # make sure first time through queue is always checked + start = time() + else: + timeout -= int(time() - start) + if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously + return segment = queue.get(block=block, timeout=timeout) # type: Segment except Empty: return @@ -103,14 +109,20 @@ def generator(): self.traces_reporter.report(generator()) def report_log(self, queue: Queue, block: bool = True): - start = time() + start = None def generator(): + nonlocal start + while True: try: - timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int - if timeout <= 0: - return + timeout = config.QUEUE_TIMEOUT # type: int + if not start: # make sure first time through queue is always checked + start = time() + else: + timeout -= int(time() - start) + if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously + return log_data = queue.get(block=block, timeout=timeout) # type: LogData except Empty: return diff --git a/skywalking/config.py b/skywalking/config.py index 041a6a83..ae3491bf 100644 --- a/skywalking/config.py +++ b/skywalking/config.py @@ -74,7 +74,6 @@ profile_dump_max_stack_depth = int(os.getenv('SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH') or '500') # type: int profile_snapshot_transport_buffer_size = int(os.getenv('SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE') or '50') -# NOTE - Log reporting requires a separate channel, will merge in the future. log_reporter_active = True if os.getenv('SW_AGENT_LOG_REPORTER_ACTIVE') and \ os.getenv('SW_AGENT_LOG_REPORTER_ACTIVE') == 'True' else False # type: bool log_reporter_max_buffer_size = int(os.getenv('SW_AGENT_LOG_REPORTER_BUFFER_SIZE') or '10000') # type: int