runtests_log_handler.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. # -*- coding: utf-8 -*-
  2. '''
  3. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  4. :copyright: Copyright 2016 by the SaltStack Team, see AUTHORS for more details.
  5. :license: Apache 2.0, see LICENSE for more details.
  6. pytestsalt.salt.log_handlers.pytest_log_handler
  7. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  8. Salt External Logging Handler
  9. '''
  10. # Import python libs
  11. from __future__ import absolute_import
  12. import errno
  13. import socket
  14. import logging
  15. import threading
  16. from multiprocessing import Queue
  17. # Import 3rd-party libs
  18. import msgpack
  19. # Import Salt libs
  20. from salt.ext import six
  21. from salt.utils.platform import is_darwin
  22. import salt.log.setup
  23. log = logging.getLogger(__name__)
  24. __virtualname__ = 'runtests_log_handler'
  25. def __virtual__():
  26. if 'runtests_log_port' not in __opts__:
  27. return False, "'runtests_log_port' not in options"
  28. if six.PY3:
  29. return False, "runtests external logging handler is temporarily disabled for Python 3 tests"
  30. return True
  31. def setup_handlers():
  32. port = __opts__['runtests_log_port']
  33. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  34. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  35. try:
  36. sock.connect(('localhost', port))
  37. except socket.error as exc:
  38. if exc.errno == errno.ECONNREFUSED:
  39. log.warning('Failed to connect to log server')
  40. return
  41. finally:
  42. try:
  43. sock.shutdown(socket.SHUT_RDWR)
  44. except OSError:
  45. pass
  46. sock.close()
  47. if is_darwin():
  48. queue_size = 32767
  49. else:
  50. queue_size = 10000000
  51. queue = Queue(queue_size)
  52. handler = salt.log.setup.QueueHandler(queue)
  53. level = salt.log.setup.LOG_LEVELS[(__opts__.get('runtests_log_level') or 'error').lower()]
  54. handler.setLevel(level)
  55. process_queue_thread = threading.Thread(target=process_queue, args=(port, queue))
  56. process_queue_thread.daemon = True
  57. process_queue_thread.start()
  58. return handler
  59. def process_queue(port, queue):
  60. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  61. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  62. try:
  63. sock.connect(('localhost', port))
  64. except socket.error as exc:
  65. if exc.errno == errno.ECONNREFUSED:
  66. sock.shutdown(socket.SHUT_RDWR)
  67. sock.close()
  68. log.warning('Failed to connect to log server')
  69. return
  70. while True:
  71. try:
  72. record = queue.get()
  73. if record is None:
  74. # A sentinel to stop processing the queue
  75. break
  76. # Just log everything, filtering will happen on the main process
  77. # logging handlers
  78. sock.sendall(msgpack.dumps(record.__dict__, encoding='utf-8'))
  79. except (IOError, EOFError, KeyboardInterrupt, SystemExit):
  80. sock.shutdown(socket.SHUT_RDWR)
  81. sock.close()
  82. break
  83. except socket.error as exc:
  84. if exc.errno == errno.EPIPE:
  85. # Broken pipe
  86. sock.shutdown(socket.SHUT_RDWR)
  87. sock.close()
  88. break
  89. log.exception(exc)
  90. except Exception as exc: # pylint: disable=broad-except
  91. log.warning(
  92. 'An exception occurred in the pytest salt logging '
  93. 'queue thread: %s',
  94. exc,
  95. exc_info_on_loglevel=logging.DEBUG
  96. )