|
6 | 6 | import threading |
7 | 7 | import time |
8 | 8 |
|
9 | | -from concurrent.futures import ThreadPoolExecutor |
10 | | - |
11 | 9 | from .configs import defaults |
12 | 10 | from .utils import sanitize_meta, get_ip, normalize_list_option |
13 | 11 |
|
@@ -77,18 +75,11 @@ def __init__(self, key, options={}): |
77 | 75 | "buf_retention_limit", defaults["BUF_RETENTION_LIMIT"] |
78 | 76 | ) |
79 | 77 |
|
80 | | - # Set up the Thread Pools |
81 | | - self.worker_thread_pool = ThreadPoolExecutor() |
82 | | - self.request_thread_pool = ThreadPoolExecutor( |
83 | | - max_workers=self.max_concurrent_requests |
84 | | - ) |
85 | | - |
86 | 78 | # Set up async client and event loop |
87 | 79 | self.async_client = None |
88 | 80 | self.loop = None |
89 | 81 |
|
90 | 82 | self.setLevel(logging.DEBUG) |
91 | | - self._lock = threading.RLock() |
92 | 83 |
|
93 | 84 | self.flusher = None |
94 | 85 |
|
@@ -123,77 +114,53 @@ def _get_event_loop(self): |
123 | 114 | return loop |
124 | 115 |
|
125 | 116 | def buffer_log(self, message): |
126 | | - if self.worker_thread_pool: |
127 | | - try: |
128 | | - self.worker_thread_pool.submit(self.buffer_log_sync, message) |
129 | | - except RuntimeError: |
130 | | - self.buffer_log_sync(message) |
131 | | - except Exception as e: |
132 | | - self.internalLogger.debug("Error in calling buffer_log: %s", e) |
| 117 | + self.buffer_log_sync(message) |
133 | 118 |
|
134 | 119 | def buffer_log_sync(self, message): |
135 | | - # Attempt to acquire lock to write to buffer |
136 | | - if self._lock.acquire(blocking=True): |
137 | | - try: |
138 | | - msglen = len(message["line"]) |
139 | | - if self.buf_size + msglen < self.buf_retention_limit: |
140 | | - self.buf.append(message) |
141 | | - self.buf_size += msglen |
142 | | - else: |
143 | | - self.internalLogger.debug( |
144 | | - "The buffer size exceeded the limit: %s", |
145 | | - self.buf_retention_limit, |
146 | | - ) |
147 | | - |
148 | | - if self.buf_size >= self.flush_limit: |
149 | | - self.close_flusher() |
150 | | - self.flush() |
151 | | - else: |
152 | | - self.start_flusher() |
153 | | - except Exception as e: |
154 | | - self.internalLogger.exception(f"Error in buffer_log_sync: {e}") |
155 | | - finally: |
156 | | - self._lock.release() |
| 120 | + try: |
| 121 | + msglen = len(message["line"]) |
| 122 | + if self.buf_size + msglen < self.buf_retention_limit: |
| 123 | + self.buf.append(message) |
| 124 | + self.buf_size += msglen |
| 125 | + else: |
| 126 | + self.internalLogger.debug( |
| 127 | + "The buffer size exceeded the limit: %s", |
| 128 | + self.buf_retention_limit, |
| 129 | + ) |
| 130 | + |
| 131 | + if self.buf_size >= self.flush_limit: |
| 132 | + self.close_flusher() |
| 133 | + self.flush() |
| 134 | + else: |
| 135 | + self.start_flusher() |
| 136 | + except Exception as e: |
| 137 | + self.internalLogger.exception(f"Error in buffer_log_sync: {e}") |
157 | 138 |
|
158 | 139 | def flush(self): |
159 | 140 | self.schedule_flush_sync() |
160 | 141 |
|
161 | 142 | def schedule_flush_sync(self, should_block=False): |
162 | | - if self.request_thread_pool: |
163 | | - try: |
164 | | - self.request_thread_pool.submit( |
165 | | - self.try_lock_and_do_flush_request, should_block |
166 | | - ) |
167 | | - except RuntimeError: |
168 | | - self.try_lock_and_do_flush_request(should_block) |
169 | | - except Exception as e: |
170 | | - self.internalLogger.debug( |
171 | | - "Error in calling try_lock_and_do_flush_request: %s", e |
172 | | - ) |
| 143 | + self.try_lock_and_do_flush_request(should_block) |
173 | 144 |
|
174 | 145 | def try_lock_and_do_flush_request(self, should_block=False): |
175 | 146 | local_buf = [] |
176 | | - if self._lock.acquire(blocking=should_block): |
177 | | - if not self.buf: |
178 | | - self.close_flusher() |
179 | | - self._lock.release() |
180 | | - return |
| 147 | + # Simple buffer extraction - no locks needed |
| 148 | + if not self.buf: |
| 149 | + self.close_flusher() |
| 150 | + return |
181 | 151 |
|
182 | | - local_buf = self.buf.copy() |
183 | | - self.buf.clear() |
184 | | - self.buf_size = 0 |
185 | | - if local_buf: |
186 | | - self.close_flusher() |
187 | | - self._lock.release() |
| 152 | + local_buf = self.buf.copy() |
| 153 | + self.buf.clear() |
| 154 | + self.buf_size = 0 |
| 155 | + if local_buf: |
| 156 | + self.close_flusher() |
188 | 157 |
|
189 | 158 | if local_buf: |
190 | 159 | # Run the async request in the event loop |
191 | 160 | loop = self._get_event_loop() |
192 | 161 | if loop.is_running(): |
193 | | - # If we're already in an event loop, create a task |
194 | 162 | asyncio.create_task(self.try_request(local_buf)) |
195 | 163 | else: |
196 | | - # If no event loop is running, run the coroutine |
197 | 164 | loop.run_until_complete(self.try_request(local_buf)) |
198 | 165 |
|
199 | 166 | async def try_request(self, buf): |
@@ -356,28 +323,12 @@ def close(self): |
356 | 323 | # Close the flusher |
357 | 324 | self.close_flusher() |
358 | 325 |
|
359 | | - # First gracefully shut down any threads that are still attempting |
360 | | - # to add log messages to the buffer. This ensures that we don't lose |
361 | | - # any log messages that are in the process of being added to the |
362 | | - # buffer. |
363 | | - if self.worker_thread_pool: |
364 | | - self.worker_thread_pool.shutdown(wait=True) |
365 | | - self.worker_thread_pool = None |
366 | | - |
367 | 326 | # Manually force a flush of any remaining log messages in the buffer. |
368 | 327 | # We block here to ensure that the flush completes prior to the |
369 | 328 | # application exiting and because the probability of this |
370 | 329 | # introducing a noticeable delay is very low because close() is only |
371 | 330 | # called when the logger and application are shutting down. |
372 | | - self.schedule_flush_sync(should_block=True) |
373 | | - |
374 | | - # Finally, shut down the thread pool that was used to send the log |
375 | | - # messages to the server. We can assume at this point that all log |
376 | | - # messages that were in the buffer prior to the worker threads |
377 | | - # shutting down have been sent to the server. |
378 | | - if self.request_thread_pool: |
379 | | - self.request_thread_pool.shutdown(wait=True) |
380 | | - self.request_thread_pool = None |
| 331 | + self.flush() |
381 | 332 |
|
382 | 333 | # Close the async httpx client |
383 | 334 | if self.async_client: |
|
0 commit comments