123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- """
- :codeauthor: Pedro Algarvio (pedro@algarvio.me)
- :copyright: Copyright 2016 by the SaltStack Team, see AUTHORS for more details.
- :license: Apache 2.0, see LICENSE for more details.
- pytestsalt.salt.log_handlers.pytest_log_handler
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- Salt External Logging Handler
- """
- import errno
- import logging
- import socket
- import threading
- from multiprocessing import Queue
- import salt.log.setup
- import salt.utils.msgpack
- from salt.utils.platform import is_darwin
- log = logging.getLogger(__name__)
- __virtualname__ = "runtests_log_handler"
- def __virtual__():
- if "runtests_log_port" not in __opts__:
- return False, "'runtests_log_port' not in options"
- return (
- False,
- "runtests external logging handler is temporarily disabled for Python 3 tests",
- )
- def setup_handlers():
- port = __opts__["runtests_log_port"]
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- try:
- sock.connect(("localhost", port))
- except OSError as exc:
- if exc.errno == errno.ECONNREFUSED:
- log.warning("Failed to connect to log server")
- return
- finally:
- try:
- sock.shutdown(socket.SHUT_RDWR)
- except OSError:
- pass
- sock.close()
- # One million log messages is more than enough to queue.
- # Above that value, if `process_queue` can't process fast enough,
- # start dropping. This will contain a memory leak in case `process_queue`
- # can't process fast enough of in case it can't deliver the log records at all.
- if is_darwin():
- queue_size = 32767
- else:
- queue_size = 10000000
- queue = Queue(queue_size)
- handler = salt.log.setup.QueueHandler(queue)
- level = salt.log.setup.LOG_LEVELS[
- (__opts__.get("runtests_log_level") or "error").lower()
- ]
- handler.setLevel(level)
- process_queue_thread = threading.Thread(target=process_queue, args=(port, queue))
- process_queue_thread.daemon = True
- process_queue_thread.start()
- return handler
- def process_queue(port, queue):
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- try:
- sock.connect(("localhost", port))
- except OSError as exc:
- if exc.errno == errno.ECONNREFUSED:
- sock.shutdown(socket.SHUT_RDWR)
- sock.close()
- log.warning("Failed to connect to log server")
- return
- while True:
- try:
- record = queue.get()
- if record is None:
- # A sentinel to stop processing the queue
- break
- # Just log everything, filtering will happen on the main process
- # logging handlers
- sock.sendall(salt.utils.msgpack.dumps(record.__dict__, use_bin_type=True))
- except (OSError, EOFError, KeyboardInterrupt, SystemExit):
- if hasattr(exc, "errno") and exc.errno != errno.EPIPE:
- log.exception(exc)
- try:
- sock.shutdown(socket.SHUT_RDWR)
- sock.close()
- except OSError:
- pass
- break
- except Exception as exc: # pylint: disable=broad-except
- log.warning(
- "An exception occurred in the pytest salt logging " "queue thread: %s",
- exc,
- exc_info_on_loglevel=logging.DEBUG,
- )
|