1
0

runtests_engine.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. # -*- coding: utf-8 -*-
  2. """
  3. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  4. :copyright: Copyright 2015 by the SaltStack Team, see AUTHORS for more details.
  5. :license: Apache 2.0, see LICENSE for more details.
  6. pytestsalt.engines.pytest_engine
  7. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  8. Simple salt engine which will setup a socket to accept connections allowing us to know
  9. when a daemon is up and running
  10. """
  11. # Import python libs
  12. from __future__ import absolute_import, print_function, unicode_literals
  13. import errno
  14. import logging
  15. import os
  16. import socket
  17. import sys
  18. import salt.utils.asynchronous
  19. # Import salt libs
  20. import salt.utils.event
  21. # Import 3rd-party libs
  22. from salt.ext.tornado import gen, ioloop, iostream, netutil
  23. log = logging.getLogger(__name__)
  24. __virtualname__ = "salt_runtests"
  25. def __virtual__():
  26. if __opts__["__role"] != "master":
  27. return False
  28. return "runtests_conn_check_port" in __opts__ # pylint: disable=undefined-variable
  29. def start():
  30. pytest_engine = PyTestEngine(__opts__) # pylint: disable=undefined-variable
  31. pytest_engine.start()
  32. class PyTestEngine(object):
  33. def __init__(self, opts):
  34. self.opts = opts
  35. self.sock = None
  36. self.stop_sending_events_file = opts.get("pytest_stop_sending_events_file")
  37. def start(self):
  38. self.io_loop = ioloop.IOLoop()
  39. self.io_loop.make_current()
  40. self.io_loop.add_callback(self._start)
  41. self.io_loop.start()
  42. @gen.coroutine
  43. def _start(self):
  44. port = int(self.opts["runtests_conn_check_port"])
  45. log.info(
  46. "Starting Pytest Engine(role=%s, id=%s) on port %s",
  47. self.opts["__role"],
  48. self.opts["id"],
  49. port,
  50. )
  51. self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  52. self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  53. self.sock.setblocking(0)
  54. # bind the socket to localhost on the config provided port
  55. self.sock.bind(("localhost", port))
  56. # become a server socket
  57. self.sock.listen(5)
  58. with salt.utils.asynchronous.current_ioloop(self.io_loop):
  59. netutil.add_accept_handler(
  60. self.sock, self.handle_connection,
  61. )
  62. if self.opts["__role"] == "master":
  63. yield self.fire_master_started_event()
  64. def handle_connection(self, connection, address):
  65. log.warning(
  66. "Accepted connection from %s. Role: %s", address, self.opts["__role"]
  67. )
  68. # We just need to know that the daemon running the engine is alive...
  69. try:
  70. connection.shutdown(socket.SHUT_RDWR) # pylint: disable=no-member
  71. connection.close()
  72. except socket.error as exc:
  73. if not sys.platform.startswith("darwin"):
  74. raise
  75. try:
  76. if exc.errno != errno.ENOTCONN:
  77. raise
  78. except AttributeError:
  79. # This is not macOS !?
  80. pass
  81. @gen.coroutine
  82. def fire_master_started_event(self):
  83. log.info("Firing salt-%s started event...", self.opts["__role"])
  84. start_event_tag = "salt/{}/{}/start".format(
  85. self.opts["__role"], self.opts["id"]
  86. )
  87. log.info(
  88. "Firing salt-%s started event. Tag: %s",
  89. self.opts["__role"],
  90. start_event_tag,
  91. )
  92. load = {"id": self.opts["id"], "tag": start_event_tag, "data": {}}
  93. # One minute should be more than enough to fire these events every second in order
  94. # for pytest-salt to pickup that the master is running
  95. with salt.utils.event.get_master_event(
  96. self.opts, self.opts["sock_dir"], listen=False
  97. ) as event_bus:
  98. timeout = 30
  99. while True:
  100. if self.stop_sending_events_file and not os.path.exists(
  101. self.stop_sending_events_file
  102. ):
  103. log.info(
  104. 'The stop sending events file "marker" is done. Stop sending events...'
  105. )
  106. break
  107. timeout -= 1
  108. try:
  109. event_bus.fire_event(load, start_event_tag, timeout=500)
  110. if timeout <= 0:
  111. break
  112. yield gen.sleep(1)
  113. except iostream.StreamClosedError:
  114. break