runtests_log_handler.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. # -*- coding: utf-8 -*-
  2. '''
  3. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  4. :copyright: Copyright 2016 by the SaltStack Team, see AUTHORS for more details.
  5. :license: Apache 2.0, see LICENSE for more details.
  6. pytestsalt.salt.log_handlers.pytest_log_handler
  7. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  8. Salt External Logging Handler
  9. '''
  10. # Import python libs
  11. from __future__ import absolute_import
  12. import errno
  13. import socket
  14. import logging
  15. import threading
  16. from multiprocessing import Queue
  17. # Import 3rd-party libs
  18. import msgpack
  19. # Import Salt libs
  20. from salt.ext import six
  21. from salt.utils.platform import is_darwin
  22. import salt.log.setup
  23. log = logging.getLogger(__name__)
  24. __virtualname__ = 'runtests_log_handler'
  25. def __virtual__():
  26. if 'runtests_log_port' not in __opts__:
  27. return False, "'runtests_log_port' not in options"
  28. if six.PY3:
  29. return False, "runtests external logging handler is temporarily disabled for Python 3 tests"
  30. return True
  31. def setup_handlers():
  32. port = __opts__['runtests_log_port']
  33. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  34. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  35. try:
  36. sock.connect(('localhost', port))
  37. except socket.error as exc:
  38. if exc.errno == errno.ECONNREFUSED:
  39. log.warning('Failed to connect to log server')
  40. return
  41. finally:
  42. try:
  43. sock.shutdown(socket.SHUT_RDWR)
  44. except OSError:
  45. pass
  46. sock.close()
  47. # One million log messages is more than enough to queue.
  48. # Above that value, if `process_queue` can't process fast enough,
  49. # start dropping. This will contain a memory leak in case `process_queue`
  50. # can't process fast enough of in case it can't deliver the log records at all.
  51. if is_darwin():
  52. queue_size = 32767
  53. else:
  54. queue_size = 10000000
  55. queue = Queue(queue_size)
  56. handler = salt.log.setup.QueueHandler(queue)
  57. level = salt.log.setup.LOG_LEVELS[(__opts__.get('runtests_log_level') or 'error').lower()]
  58. handler.setLevel(level)
  59. process_queue_thread = threading.Thread(target=process_queue, args=(port, queue))
  60. process_queue_thread.daemon = True
  61. process_queue_thread.start()
  62. return handler
  63. def process_queue(port, queue):
  64. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  65. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  66. try:
  67. sock.connect(('localhost', port))
  68. except socket.error as exc:
  69. if exc.errno == errno.ECONNREFUSED:
  70. sock.shutdown(socket.SHUT_RDWR)
  71. sock.close()
  72. log.warning('Failed to connect to log server')
  73. return
  74. while True:
  75. try:
  76. record = queue.get()
  77. if record is None:
  78. # A sentinel to stop processing the queue
  79. break
  80. # Just log everything, filtering will happen on the main process
  81. # logging handlers
  82. sock.sendall(msgpack.dumps(record.__dict__, encoding='utf-8'))
  83. except (IOError, EOFError, KeyboardInterrupt, SystemExit):
  84. try:
  85. sock.shutdown(socket.SHUT_RDWR)
  86. sock.close()
  87. except (OSError, socket.error):
  88. pass
  89. break
  90. except socket.error as exc:
  91. if exc.errno == errno.EPIPE:
  92. # Broken pipe
  93. try:
  94. sock.shutdown(socket.SHUT_RDWR)
  95. sock.close()
  96. except (OSError, socket.error):
  97. pass
  98. break
  99. log.exception(exc)
  100. except Exception as exc: # pylint: disable=broad-except
  101. log.warning(
  102. 'An exception occurred in the pytest salt logging '
  103. 'queue thread: %s',
  104. exc,
  105. exc_info_on_loglevel=logging.DEBUG
  106. )