runtests_log_handler.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. # -*- coding: utf-8 -*-
  2. """
  3. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  4. :copyright: Copyright 2016 by the SaltStack Team, see AUTHORS for more details.
  5. :license: Apache 2.0, see LICENSE for more details.
  6. pytestsalt.salt.log_handlers.pytest_log_handler
  7. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  8. Salt External Logging Handler
  9. """
  10. # Import python libs
  11. from __future__ import absolute_import
  12. import errno
  13. import logging
  14. import socket
  15. import threading
  16. from multiprocessing import Queue
  17. import salt.log.setup
  18. # Import Salt libs
  19. import salt.utils.msgpack
  20. from salt.ext import six
  21. from salt.utils.platform import is_darwin
  22. log = logging.getLogger(__name__)
  23. __virtualname__ = "runtests_log_handler"
  24. def __virtual__():
  25. if "runtests_log_port" not in __opts__:
  26. return False, "'runtests_log_port' not in options"
  27. if six.PY3:
  28. return (
  29. False,
  30. "runtests external logging handler is temporarily disabled for Python 3 tests",
  31. )
  32. return True
  33. def setup_handlers():
  34. port = __opts__["runtests_log_port"]
  35. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  36. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  37. try:
  38. sock.connect(("localhost", port))
  39. except socket.error as exc:
  40. if exc.errno == errno.ECONNREFUSED:
  41. log.warning("Failed to connect to log server")
  42. return
  43. finally:
  44. try:
  45. sock.shutdown(socket.SHUT_RDWR)
  46. except OSError:
  47. pass
  48. sock.close()
  49. # One million log messages is more than enough to queue.
  50. # Above that value, if `process_queue` can't process fast enough,
  51. # start dropping. This will contain a memory leak in case `process_queue`
  52. # can't process fast enough of in case it can't deliver the log records at all.
  53. if is_darwin():
  54. queue_size = 32767
  55. else:
  56. queue_size = 10000000
  57. queue = Queue(queue_size)
  58. handler = salt.log.setup.QueueHandler(queue)
  59. level = salt.log.setup.LOG_LEVELS[
  60. (__opts__.get("runtests_log_level") or "error").lower()
  61. ]
  62. handler.setLevel(level)
  63. process_queue_thread = threading.Thread(target=process_queue, args=(port, queue))
  64. process_queue_thread.daemon = True
  65. process_queue_thread.start()
  66. return handler
  67. def process_queue(port, queue):
  68. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  69. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  70. try:
  71. sock.connect(("localhost", port))
  72. except socket.error as exc:
  73. if exc.errno == errno.ECONNREFUSED:
  74. sock.shutdown(socket.SHUT_RDWR)
  75. sock.close()
  76. log.warning("Failed to connect to log server")
  77. return
  78. while True:
  79. try:
  80. record = queue.get()
  81. if record is None:
  82. # A sentinel to stop processing the queue
  83. break
  84. # Just log everything, filtering will happen on the main process
  85. # logging handlers
  86. sock.sendall(salt.utils.msgpack.dumps(record.__dict__, encoding="utf-8"))
  87. except (IOError, EOFError, KeyboardInterrupt, SystemExit):
  88. if hasattr(exc, "errno") and exc.errno != errno.EPIPE:
  89. log.exception(exc)
  90. try:
  91. sock.shutdown(socket.SHUT_RDWR)
  92. sock.close()
  93. except (OSError, socket.error):
  94. pass
  95. break
  96. except Exception as exc: # pylint: disable=broad-except
  97. log.warning(
  98. "An exception occurred in the pytest salt logging " "queue thread: %s",
  99. exc,
  100. exc_info_on_loglevel=logging.DEBUG,
  101. )