Skip to content

Commit 48fea4d

Browse files
committed
Remove threading, rely on async
1 parent 44183ac commit 48fea4d

File tree

1 file changed

+30
-79
lines changed

1 file changed

+30
-79
lines changed

logdna/logdna.py

Lines changed: 30 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
import threading
77
import time
88

9-
from concurrent.futures import ThreadPoolExecutor
10-
119
from .configs import defaults
1210
from .utils import sanitize_meta, get_ip, normalize_list_option
1311

@@ -77,18 +75,11 @@ def __init__(self, key, options={}):
7775
"buf_retention_limit", defaults["BUF_RETENTION_LIMIT"]
7876
)
7977

80-
# Set up the Thread Pools
81-
self.worker_thread_pool = ThreadPoolExecutor()
82-
self.request_thread_pool = ThreadPoolExecutor(
83-
max_workers=self.max_concurrent_requests
84-
)
85-
8678
# Set up async client and event loop
8779
self.async_client = None
8880
self.loop = None
8981

9082
self.setLevel(logging.DEBUG)
91-
self._lock = threading.RLock()
9283

9384
self.flusher = None
9485

@@ -123,77 +114,53 @@ def _get_event_loop(self):
123114
return loop
124115

125116
def buffer_log(self, message):
126-
if self.worker_thread_pool:
127-
try:
128-
self.worker_thread_pool.submit(self.buffer_log_sync, message)
129-
except RuntimeError:
130-
self.buffer_log_sync(message)
131-
except Exception as e:
132-
self.internalLogger.debug("Error in calling buffer_log: %s", e)
117+
self.buffer_log_sync(message)
133118

134119
def buffer_log_sync(self, message):
135-
# Attempt to acquire lock to write to buffer
136-
if self._lock.acquire(blocking=True):
137-
try:
138-
msglen = len(message["line"])
139-
if self.buf_size + msglen < self.buf_retention_limit:
140-
self.buf.append(message)
141-
self.buf_size += msglen
142-
else:
143-
self.internalLogger.debug(
144-
"The buffer size exceeded the limit: %s",
145-
self.buf_retention_limit,
146-
)
147-
148-
if self.buf_size >= self.flush_limit:
149-
self.close_flusher()
150-
self.flush()
151-
else:
152-
self.start_flusher()
153-
except Exception as e:
154-
self.internalLogger.exception(f"Error in buffer_log_sync: {e}")
155-
finally:
156-
self._lock.release()
120+
try:
121+
msglen = len(message["line"])
122+
if self.buf_size + msglen < self.buf_retention_limit:
123+
self.buf.append(message)
124+
self.buf_size += msglen
125+
else:
126+
self.internalLogger.debug(
127+
"The buffer size exceeded the limit: %s",
128+
self.buf_retention_limit,
129+
)
130+
131+
if self.buf_size >= self.flush_limit:
132+
self.close_flusher()
133+
self.flush()
134+
else:
135+
self.start_flusher()
136+
except Exception as e:
137+
self.internalLogger.exception(f"Error in buffer_log_sync: {e}")
157138

158139
def flush(self):
159140
self.schedule_flush_sync()
160141

161142
def schedule_flush_sync(self, should_block=False):
162-
if self.request_thread_pool:
163-
try:
164-
self.request_thread_pool.submit(
165-
self.try_lock_and_do_flush_request, should_block
166-
)
167-
except RuntimeError:
168-
self.try_lock_and_do_flush_request(should_block)
169-
except Exception as e:
170-
self.internalLogger.debug(
171-
"Error in calling try_lock_and_do_flush_request: %s", e
172-
)
143+
self.try_lock_and_do_flush_request(should_block)
173144

174145
def try_lock_and_do_flush_request(self, should_block=False):
175146
local_buf = []
176-
if self._lock.acquire(blocking=should_block):
177-
if not self.buf:
178-
self.close_flusher()
179-
self._lock.release()
180-
return
147+
# Simple buffer extraction - no locks needed
148+
if not self.buf:
149+
self.close_flusher()
150+
return
181151

182-
local_buf = self.buf.copy()
183-
self.buf.clear()
184-
self.buf_size = 0
185-
if local_buf:
186-
self.close_flusher()
187-
self._lock.release()
152+
local_buf = self.buf.copy()
153+
self.buf.clear()
154+
self.buf_size = 0
155+
if local_buf:
156+
self.close_flusher()
188157

189158
if local_buf:
190159
# Run the async request in the event loop
191160
loop = self._get_event_loop()
192161
if loop.is_running():
193-
# If we're already in an event loop, create a task
194162
asyncio.create_task(self.try_request(local_buf))
195163
else:
196-
# If no event loop is running, run the coroutine
197164
loop.run_until_complete(self.try_request(local_buf))
198165

199166
async def try_request(self, buf):
@@ -356,28 +323,12 @@ def close(self):
356323
# Close the flusher
357324
self.close_flusher()
358325

359-
# First gracefully shut down any threads that are still attempting
360-
# to add log messages to the buffer. This ensures that we don't lose
361-
# any log messages that are in the process of being added to the
362-
# buffer.
363-
if self.worker_thread_pool:
364-
self.worker_thread_pool.shutdown(wait=True)
365-
self.worker_thread_pool = None
366-
367326
# Manually force a flush of any remaining log messages in the buffer.
368327
# We block here to ensure that the flush completes prior to the
369328
# application exiting and because the probability of this
370329
# introducing a noticeable delay is very low because close() is only
371330
# called when the logger and application are shutting down.
372-
self.schedule_flush_sync(should_block=True)
373-
374-
# Finally, shut down the thread pool that was used to send the log
375-
# messages to the server. We can assume at this point that all log
376-
# messages that were in the buffer prior to the worker threads
377-
# shutting down have been sent to the server.
378-
if self.request_thread_pool:
379-
self.request_thread_pool.shutdown(wait=True)
380-
self.request_thread_pool = None
331+
self.flush()
381332

382333
# Close the async httpx client
383334
if self.async_client:

0 commit comments

Comments
 (0)