runtests_log_handler.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. """
  2. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  3. :copyright: Copyright 2016 by the SaltStack Team, see AUTHORS for more details.
  4. :license: Apache 2.0, see LICENSE for more details.
  5. pytestsalt.salt.log_handlers.pytest_log_handler
  6. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  7. Salt External Logging Handler
  8. """
  9. import errno
  10. import logging
  11. import socket
  12. import threading
  13. from multiprocessing import Queue
  14. import salt.log.setup
  15. import salt.utils.msgpack
  16. from salt.utils.platform import is_darwin
  17. log = logging.getLogger(__name__)
  18. __virtualname__ = "runtests_log_handler"
  19. def __virtual__():
  20. if "runtests_log_port" not in __opts__:
  21. return False, "'runtests_log_port' not in options"
  22. return (
  23. False,
  24. "runtests external logging handler is temporarily disabled for Python 3 tests",
  25. )
  26. def setup_handlers():
  27. port = __opts__["runtests_log_port"]
  28. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  29. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  30. try:
  31. sock.connect(("localhost", port))
  32. except OSError as exc:
  33. if exc.errno == errno.ECONNREFUSED:
  34. log.warning("Failed to connect to log server")
  35. return
  36. finally:
  37. try:
  38. sock.shutdown(socket.SHUT_RDWR)
  39. except OSError:
  40. pass
  41. sock.close()
  42. # One million log messages is more than enough to queue.
  43. # Above that value, if `process_queue` can't process fast enough,
  44. # start dropping. This will contain a memory leak in case `process_queue`
  45. # can't process fast enough of in case it can't deliver the log records at all.
  46. if is_darwin():
  47. queue_size = 32767
  48. else:
  49. queue_size = 10000000
  50. queue = Queue(queue_size)
  51. handler = salt.log.setup.QueueHandler(queue)
  52. level = salt.log.setup.LOG_LEVELS[
  53. (__opts__.get("runtests_log_level") or "error").lower()
  54. ]
  55. handler.setLevel(level)
  56. process_queue_thread = threading.Thread(target=process_queue, args=(port, queue))
  57. process_queue_thread.daemon = True
  58. process_queue_thread.start()
  59. return handler
  60. def process_queue(port, queue):
  61. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  62. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  63. try:
  64. sock.connect(("localhost", port))
  65. except OSError as exc:
  66. if exc.errno == errno.ECONNREFUSED:
  67. sock.shutdown(socket.SHUT_RDWR)
  68. sock.close()
  69. log.warning("Failed to connect to log server")
  70. return
  71. while True:
  72. try:
  73. record = queue.get()
  74. if record is None:
  75. # A sentinel to stop processing the queue
  76. break
  77. # Just log everything, filtering will happen on the main process
  78. # logging handlers
  79. sock.sendall(salt.utils.msgpack.dumps(record.__dict__, use_bin_type=True))
  80. except (OSError, EOFError, KeyboardInterrupt, SystemExit):
  81. if hasattr(exc, "errno") and exc.errno != errno.EPIPE:
  82. log.exception(exc)
  83. try:
  84. sock.shutdown(socket.SHUT_RDWR)
  85. sock.close()
  86. except OSError:
  87. pass
  88. break
  89. except Exception as exc: # pylint: disable=broad-except
  90. log.warning(
  91. "An exception occurred in the pytest salt logging " "queue thread: %s",
  92. exc,
  93. exc_info_on_loglevel=logging.DEBUG,
  94. )