123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- # -*- coding: utf-8 -*-
- """
- :codeauthor: Pedro Algarvio (pedro@algarvio.me)
- :copyright: Copyright 2015 by the SaltStack Team, see AUTHORS for more details.
- :license: Apache 2.0, see LICENSE for more details.
- pytestsalt.engines.pytest_engine
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- Simple salt engine which will setup a socket to accept connections allowing us to know
- when a daemon is up and running
- """
- # Import python libs
- from __future__ import absolute_import, print_function, unicode_literals
- import errno
- import logging
- import os
- import socket
- import sys
- import salt.utils.asynchronous
- # Import salt libs
- import salt.utils.event
- # Import 3rd-party libs
- from salt.ext.tornado import gen, ioloop, iostream, netutil
- log = logging.getLogger(__name__)
- __virtualname__ = "salt_runtests"
- def __virtual__():
- if __opts__["__role"] != "master":
- return False
- return "runtests_conn_check_port" in __opts__ # pylint: disable=undefined-variable
- def start():
- pytest_engine = PyTestEngine(__opts__) # pylint: disable=undefined-variable
- pytest_engine.start()
- class PyTestEngine(object):
- def __init__(self, opts):
- self.opts = opts
- self.sock = None
- self.stop_sending_events_file = opts.get("pytest_stop_sending_events_file")
- def start(self):
- self.io_loop = ioloop.IOLoop()
- self.io_loop.make_current()
- self.io_loop.add_callback(self._start)
- self.io_loop.start()
- @gen.coroutine
- def _start(self):
- port = int(self.opts["runtests_conn_check_port"])
- log.info(
- "Starting Pytest Engine(role=%s, id=%s) on port %s",
- self.opts["__role"],
- self.opts["id"],
- port,
- )
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.sock.setblocking(0)
- # bind the socket to localhost on the config provided port
- self.sock.bind(("localhost", port))
- # become a server socket
- self.sock.listen(5)
- with salt.utils.asynchronous.current_ioloop(self.io_loop):
- netutil.add_accept_handler(
- self.sock, self.handle_connection,
- )
- if self.opts["__role"] == "master":
- yield self.fire_master_started_event()
- def handle_connection(self, connection, address):
- log.warning(
- "Accepted connection from %s. Role: %s", address, self.opts["__role"]
- )
- # We just need to know that the daemon running the engine is alive...
- try:
- connection.shutdown(socket.SHUT_RDWR) # pylint: disable=no-member
- connection.close()
- except socket.error as exc:
- if not sys.platform.startswith("darwin"):
- raise
- try:
- if exc.errno != errno.ENOTCONN:
- raise
- except AttributeError:
- # This is not macOS !?
- pass
- @gen.coroutine
- def fire_master_started_event(self):
- log.info("Firing salt-%s started event...", self.opts["__role"])
- start_event_tag = "salt/{}/{}/start".format(
- self.opts["__role"], self.opts["id"]
- )
- log.info(
- "Firing salt-%s started event. Tag: %s",
- self.opts["__role"],
- start_event_tag,
- )
- load = {"id": self.opts["id"], "tag": start_event_tag, "data": {}}
- # One minute should be more than enough to fire these events every second in order
- # for pytest-salt to pickup that the master is running
- with salt.utils.event.get_master_event(
- self.opts, self.opts["sock_dir"], listen=False
- ) as event_bus:
- timeout = 30
- while True:
- if self.stop_sending_events_file and not os.path.exists(
- self.stop_sending_events_file
- ):
- log.info(
- 'The stop sending events file "marker" is done. Stop sending events...'
- )
- break
- timeout -= 1
- try:
- event_bus.fire_event(load, start_event_tag, timeout=500)
- if timeout <= 0:
- break
- yield gen.sleep(1)
- except iostream.StreamClosedError:
- break
|