# -*- coding: utf-8 -*- # Import Python Libs from __future__ import absolute_import, print_function, unicode_literals import time import threading # Import Salt Libs import salt.utils.json import salt.utils.stringutils from salt.netapi.rest_tornado import saltnado from salt.utils.versions import StrictVersion # Import Salt Testing Libs from tests.unit.netapi.test_rest_tornado import SaltnadoTestCase from tests.support.helpers import flaky from tests.support.unit import skipIf # Import 3rd-party libs from salt.ext import six from salt.utils.zeromq import zmq, ZMQDefaultLoop as ZMQIOLoop HAS_ZMQ_IOLOOP = bool(zmq) class _SaltnadoIntegrationTestCase(SaltnadoTestCase): # pylint: disable=abstract-method @property def opts(self): return self.get_config('client_config', from_scratch=True) @property def mod_opts(self): return self.get_config('minion', from_scratch=True) @skipIf(HAS_ZMQ_IOLOOP is False, 'PyZMQ version must be >= 14.0.1 to run these tests.') @skipIf(StrictVersion(zmq.__version__) < StrictVersion('14.0.1'), 'PyZMQ must be >= 14.0.1 to run these tests.') class TestSaltAPIHandler(_SaltnadoIntegrationTestCase): def get_app(self): urls = [('/', saltnado.SaltAPIHandler)] application = self.build_tornado_app(urls) application.event_listener = saltnado.EventListener({}, self.opts) self.application = application return application def test_root(self): ''' Test the root path which returns the list of clients we support ''' response = self.fetch('/', connect_timeout=30, request_timeout=30, ) self.assertEqual(response.code, 200) response_obj = salt.utils.json.loads(response.body) self.assertEqual(sorted(response_obj['clients']), ['local', 'local_async', 'runner', 'runner_async']) self.assertEqual(response_obj['return'], 'Welcome') def test_post_no_auth(self): ''' Test post with no auth token, should 401 ''' # get a token for this test low = [{'client': 'local', 'tgt': '*', 'fun': 'test.ping', }] response = self.fetch('/', method='POST', body=salt.utils.json.dumps(low), headers={'Content-Type': self.content_type_map['json']}, follow_redirects=False, connect_timeout=30, request_timeout=30, ) self.assertEqual(response.code, 302) self.assertEqual(response.headers['Location'], '/login') # Local client tests @skipIf(True, 'to be re-enabled when #23623 is merged') def test_simple_local_post(self): ''' Test a basic API of / ''' low = [{'client': 'local', 'tgt': '*', 'fun': 'test.ping', }] response = self.fetch('/', method='POST', body=salt.utils.json.dumps(low), headers={'Content-Type': self.content_type_map['json'], saltnado.AUTH_TOKEN_HEADER: self.token['token']}, connect_timeout=30, request_timeout=30, ) response_obj = salt.utils.json.loads(response.body) self.assertEqual(len(response_obj['return']), 1) # If --proxy is set, it will cause an extra minion_id to be in the # response. Since there's not a great way to know if the test # runner's proxy minion is running, and we're not testing proxy # minions here anyway, just remove it from the response. response_obj['return'][0].pop('proxytest', None) self.assertEqual(response_obj['return'][0], {'minion': True, 'sub_minion': True}) def test_simple_local_post_no_tgt(self): ''' POST job with invalid tgt ''' low = [{'client': 'local', 'tgt': 'minion_we_dont_have', 'fun': 'test.ping', }] response = self.fetch('/', method='POST', body=salt.utils.json.dumps(low), headers={'Content-Type': self.content_type_map['json'], saltnado.AUTH_TOKEN_HEADER: self.token['token']}, connect_timeout=30, request_timeout=30, ) response_obj = salt.utils.json.loads(response.body) self.assertEqual(response_obj['return'], ["No minions matched the target. No command was sent, no jid was assigned."]) # local client request body test @skipIf(True, 'Undetermined race condition in test. Temporarily disabled.') def test_simple_local_post_only_dictionary_request(self): ''' Test a basic API of / ''' low = {'client': 'local', 'tgt': '*', 'fun': 'test.ping', } response = self.fetch('/', method='POST', body=salt.utils.json.dumps(low), headers={'Content-Type': self.content_type_map['json'], saltnado.AUTH_TOKEN_HEADER: self.token['token']}, connect_timeout=30, request_timeout=30, ) response_obj = salt.utils.json.loads(response.body) self.assertEqual(len(response_obj['return']), 1) # If --proxy is set, it will cause an extra minion_id to be in the # response. Since there's not a great way to know if the test # runner's proxy minion is running, and we're not testing proxy # minions here anyway, just remove it from the response. response_obj['return'][0].pop('proxytest', None) self.assertEqual(response_obj['return'][0], {'minion': True, 'sub_minion': True}) def test_simple_local_post_invalid_request(self): ''' Test a basic API of / ''' low = ["invalid request"] response = self.fetch('/', method='POST', body=salt.utils.json.dumps(low), headers={'Content-Type': self.content_type_map['json'], saltnado.AUTH_TOKEN_HEADER: self.token['token']}, connect_timeout=30, request_timeout=30, ) self.assertEqual(response.code, 400) # local_async tests def test_simple_local_async_post(self): low = [{'client': 'local_async', 'tgt': '*', 'fun': 'test.ping', }] response = self.fetch('/', method='POST', body=salt.utils.json.dumps(low), headers={'Content-Type': self.content_type_map['json'], saltnado.AUTH_TOKEN_HEADER: self.token['token']}, ) response_obj = salt.utils.json.loads(response.body) ret = response_obj['return'] ret[0]['minions'] = sorted(ret[0]['minions']) try: # If --proxy is set, it will cause an extra minion_id to be in the # response. Since there's not a great way to know if the test # runner's proxy minion is running, and we're not testing proxy # minions here anyway, just remove it from the response. ret[0]['minions'].remove('proxytest') except ValueError: pass # TODO: verify pub function? Maybe look at how we test the publisher self.assertEqual(len(ret), 1) self.assertIn('jid', ret[0]) self.assertEqual(ret[0]['minions'], sorted(['minion', 'sub_minion'])) def test_multi_local_async_post(self): low = [{'client': 'local_async', 'tgt': '*', 'fun': 'test.ping', }, {'client': 'local_async', 'tgt': '*', 'fun': 'test.ping', }] response = self.fetch('/', method='POST', body=salt.utils.json.dumps(low), headers={'Content-Type': self.content_type_map['json'], saltnado.AUTH_TOKEN_HEADER: self.token['token']}, ) response_obj = salt.utils.json.loads(response.body) ret = response_obj['return'] ret[0]['minions'] = sorted(ret[0]['minions']) ret[1]['minions'] = sorted(ret[1]['minions']) try: # If --proxy is set, it will cause an extra minion_id to be in the # response. Since there's not a great way to know if the test # runner's proxy minion is running, and we're not testing proxy # minions here anyway, just remove it from the response. ret[0]['minions'].remove('proxytest') ret[1]['minions'].remove('proxytest') except ValueError: pass self.assertEqual(len(ret), 2) self.assertIn('jid', ret[0]) self.assertIn('jid', ret[1]) self.assertEqual(ret[0]['minions'], sorted(['minion', 'sub_minion'])) self.assertEqual(ret[1]['minions'], sorted(['minion', 'sub_minion'])) def test_multi_local_async_post_multitoken(self): low = [{'client': 'local_async', 'tgt': '*', 'fun': 'test.ping', }, {'client': 'local_async', 'tgt': '*', 'fun': 'test.ping', 'token': self.token['token'], # send a different (but still valid token) }, {'client': 'local_async', 'tgt': '*', 'fun': 'test.ping', 'token': 'BAD_TOKEN', # send a bad token }, ] response = self.fetch('/', method='POST', body=salt.utils.json.dumps(low), headers={'Content-Type': self.content_type_map['json'], saltnado.AUTH_TOKEN_HEADER: self.token['token']}, ) response_obj = salt.utils.json.loads(response.body) ret = response_obj['return'] ret[0]['minions'] = sorted(ret[0]['minions']) ret[1]['minions'] = sorted(ret[1]['minions']) try: # If --proxy is set, it will cause an extra minion_id to be in the # response. Since there's not a great way to know if the test # runner's proxy minion is running, and we're not testing proxy # minions here anyway, just remove it from the response. ret[0]['minions'].remove('proxytest') ret[1]['minions'].remove('proxytest') except ValueError: pass self.assertEqual(len(ret), 3) # make sure we got 3 responses self.assertIn('jid', ret[0]) # the first 2 are regular returns self.assertIn('jid', ret[1]) self.assertIn('Failed to authenticate', ret[2]) # bad auth self.assertEqual(ret[0]['minions'], sorted(['minion', 'sub_minion'])) self.assertEqual(ret[1]['minions'], sorted(['minion', 'sub_minion'])) def test_simple_local_async_post_no_tgt(self): low = [{'client': 'local_async', 'tgt': 'minion_we_dont_have', 'fun': 'test.ping', }] response = self.fetch('/', method='POST', body=salt.utils.json.dumps(low), headers={'Content-Type': self.content_type_map['json'], saltnado.AUTH_TOKEN_HEADER: self.token['token']}, ) response_obj = salt.utils.json.loads(response.body) self.assertEqual(response_obj['return'], [{}]) @skipIf(True, 'Undetermined race condition in test. Temporarily disabled.') def test_simple_local_post_only_dictionary_request_with_order_masters(self): ''' Test a basic API of / ''' low = {'client': 'local', 'tgt': '*', 'fun': 'test.ping', } self.application.opts['order_masters'] = True self.application.opts['syndic_wait'] = 5 response = self.fetch('/', method='POST', body=salt.utils.json.dumps(low), headers={'Content-Type': self.content_type_map['json'], saltnado.AUTH_TOKEN_HEADER: self.token['token']}, connect_timeout=30, request_timeout=30, ) response_obj = salt.utils.json.loads(response.body) self.application.opts['order_masters'] = [] self.application.opts['syndic_wait'] = 5 # If --proxy is set, it will cause an extra minion_id to be in the # response. Since there's not a great way to know if the test runner's # proxy minion is running, and we're not testing proxy minions here # anyway, just remove it from the response. response_obj[0]['return'].pop('proxytest', None) self.assertEqual(response_obj['return'], [{'minion': True, 'sub_minion': True}]) # runner tests def test_simple_local_runner_post(self): low = [{'client': 'runner', 'fun': 'manage.up', }] response = self.fetch('/', method='POST', body=salt.utils.json.dumps(low), headers={'Content-Type': self.content_type_map['json'], saltnado.AUTH_TOKEN_HEADER: self.token['token']}, connect_timeout=30, request_timeout=30, ) response_obj = salt.utils.json.loads(response.body) self.assertEqual(len(response_obj['return']), 1) try: # If --proxy is set, it will cause an extra minion_id to be in the # response. Since there's not a great way to know if the test # runner's proxy minion is running, and we're not testing proxy # minions here anyway, just remove it from the response. response_obj['return'][0].remove('proxytest') except ValueError: pass self.assertEqual(sorted(response_obj['return'][0]), sorted(['minion', 'sub_minion'])) # runner_async tests def test_simple_local_runner_async_post(self): low = [{'client': 'runner_async', 'fun': 'manage.up', }] response = self.fetch('/', method='POST', body=salt.utils.json.dumps(low), headers={'Content-Type': self.content_type_map['json'], saltnado.AUTH_TOKEN_HEADER: self.token['token']}, connect_timeout=10, request_timeout=10, ) response_obj = salt.utils.json.loads(response.body) self.assertIn('return', response_obj) self.assertEqual(1, len(response_obj['return'])) self.assertIn('jid', response_obj['return'][0]) self.assertIn('tag', response_obj['return'][0]) @flaky @skipIf(HAS_ZMQ_IOLOOP is False, 'PyZMQ version must be >= 14.0.1 to run these tests.') class TestMinionSaltAPIHandler(_SaltnadoIntegrationTestCase): def get_app(self): urls = [(r"/minions/(.*)", saltnado.MinionSaltAPIHandler), (r"/minions", saltnado.MinionSaltAPIHandler), ] application = self.build_tornado_app(urls) application.event_listener = saltnado.EventListener({}, self.opts) return application @skipIf(True, 'issue #34753') def test_get_no_mid(self): response = self.fetch('/minions', method='GET', headers={saltnado.AUTH_TOKEN_HEADER: self.token['token']}, follow_redirects=False, ) response_obj = salt.utils.json.loads(response.body) self.assertEqual(len(response_obj['return']), 1) # one per minion self.assertEqual(len(response_obj['return'][0]), 2) # check a single grain for minion_id, grains in six.iteritems(response_obj['return'][0]): self.assertEqual(minion_id, grains['id']) @skipIf(True, 'to be re-enabled when #23623 is merged') def test_get(self): response = self.fetch('/minions/minion', method='GET', headers={saltnado.AUTH_TOKEN_HEADER: self.token['token']}, follow_redirects=False, ) response_obj = salt.utils.json.loads(response.body) self.assertEqual(len(response_obj['return']), 1) self.assertEqual(len(response_obj['return'][0]), 1) # check a single grain self.assertEqual(response_obj['return'][0]['minion']['id'], 'minion') def test_post(self): low = [{'tgt': '*minion', 'fun': 'test.ping', }] response = self.fetch('/minions', method='POST', body=salt.utils.json.dumps(low), headers={'Content-Type': self.content_type_map['json'], saltnado.AUTH_TOKEN_HEADER: self.token['token']}, ) response_obj = salt.utils.json.loads(response.body) ret = response_obj['return'] ret[0]['minions'] = sorted(ret[0]['minions']) # TODO: verify pub function? Maybe look at how we test the publisher self.assertEqual(len(ret), 1) self.assertIn('jid', ret[0]) self.assertEqual(ret[0]['minions'], sorted(['minion', 'sub_minion'])) def test_post_with_client(self): # get a token for this test low = [{'client': 'local_async', 'tgt': '*minion', 'fun': 'test.ping', }] response = self.fetch('/minions', method='POST', body=salt.utils.json.dumps(low), headers={'Content-Type': self.content_type_map['json'], saltnado.AUTH_TOKEN_HEADER: self.token['token']}, ) response_obj = salt.utils.json.loads(response.body) ret = response_obj['return'] ret[0]['minions'] = sorted(ret[0]['minions']) # TODO: verify pub function? Maybe look at how we test the publisher self.assertEqual(len(ret), 1) self.assertIn('jid', ret[0]) self.assertEqual(ret[0]['minions'], sorted(['minion', 'sub_minion'])) def test_post_with_incorrect_client(self): ''' The /minions endpoint is asynchronous only, so if you try something else make sure you get an error ''' # get a token for this test low = [{'client': 'local', 'tgt': '*', 'fun': 'test.ping', }] response = self.fetch('/minions', method='POST', body=salt.utils.json.dumps(low), headers={'Content-Type': self.content_type_map['json'], saltnado.AUTH_TOKEN_HEADER: self.token['token']}, ) self.assertEqual(response.code, 400) @skipIf(HAS_ZMQ_IOLOOP is False, 'PyZMQ version must be >= 14.0.1 to run these tests.') class TestJobsSaltAPIHandler(_SaltnadoIntegrationTestCase): def get_app(self): urls = [(r"/jobs/(.*)", saltnado.JobsSaltAPIHandler), (r"/jobs", saltnado.JobsSaltAPIHandler), ] application = self.build_tornado_app(urls) application.event_listener = saltnado.EventListener({}, self.opts) return application @skipIf(True, 'to be re-enabled when #23623 is merged') def test_get(self): # test with no JID self.http_client.fetch(self.get_url('/jobs'), self.stop, method='GET', headers={saltnado.AUTH_TOKEN_HEADER: self.token['token']}, follow_redirects=False, ) response = self.wait(timeout=30) response_obj = salt.utils.json.loads(response.body)['return'][0] try: for jid, ret in six.iteritems(response_obj): self.assertIn('Function', ret) self.assertIn('Target', ret) self.assertIn('Target-type', ret) self.assertIn('User', ret) self.assertIn('StartTime', ret) self.assertIn('Arguments', ret) except AttributeError as attribute_error: print(salt.utils.json.loads(response.body)) raise # test with a specific JID passed in jid = next(six.iterkeys(response_obj)) self.http_client.fetch(self.get_url('/jobs/{0}'.format(jid)), self.stop, method='GET', headers={saltnado.AUTH_TOKEN_HEADER: self.token['token']}, follow_redirects=False, ) response = self.wait(timeout=30) response_obj = salt.utils.json.loads(response.body)['return'][0] self.assertIn('Function', response_obj) self.assertIn('Target', response_obj) self.assertIn('Target-type', response_obj) self.assertIn('User', response_obj) self.assertIn('StartTime', response_obj) self.assertIn('Arguments', response_obj) self.assertIn('Result', response_obj) # TODO: run all the same tests from the root handler, but for now since they are # the same code, we'll just sanity check @skipIf(HAS_ZMQ_IOLOOP is False, 'PyZMQ version must be >= 14.0.1 to run these tests.') class TestRunSaltAPIHandler(_SaltnadoIntegrationTestCase): def get_app(self): urls = [("/run", saltnado.RunSaltAPIHandler), ] application = self.build_tornado_app(urls) application.event_listener = saltnado.EventListener({}, self.opts) return application @skipIf(True, 'to be re-enabled when #23623 is merged') def test_get(self): low = [{'client': 'local', 'tgt': '*', 'fun': 'test.ping', }] response = self.fetch('/run', method='POST', body=salt.utils.json.dumps(low), headers={'Content-Type': self.content_type_map['json'], saltnado.AUTH_TOKEN_HEADER: self.token['token']}, ) response_obj = salt.utils.json.loads(response.body) self.assertEqual(response_obj['return'], [{'minion': True, 'sub_minion': True}]) @skipIf(HAS_ZMQ_IOLOOP is False, 'PyZMQ version must be >= 14.0.1 to run these tests.') class TestEventsSaltAPIHandler(_SaltnadoIntegrationTestCase): def get_app(self): urls = [(r"/events", saltnado.EventsSaltAPIHandler), ] application = self.build_tornado_app(urls) application.event_listener = saltnado.EventListener({}, self.opts) # store a reference, for magic later! self.application = application self.events_to_fire = 0 return application def test_get(self): self.events_to_fire = 5 response = self.fetch('/events', headers={saltnado.AUTH_TOKEN_HEADER: self.token['token']}, streaming_callback=self.on_event, ) def _stop(self): self.stop() def on_event(self, event): if six.PY3: event = event.decode('utf-8') if self.events_to_fire > 0: self.application.event_listener.event.fire_event({ 'foo': 'bar', 'baz': 'qux', }, 'salt/netapi/test') self.events_to_fire -= 1 # once we've fired all the events, lets call it a day else: # wait so that we can ensure that the next future is ready to go # to make sure we don't explode if the next one is ready ZMQIOLoop.current().add_timeout(time.time() + 0.5, self._stop) event = event.strip() # if we got a retry, just continue if event != 'retry: 400': tag, data = event.splitlines() self.assertTrue(tag.startswith('tag: ')) self.assertTrue(data.startswith('data: ')) @skipIf(HAS_ZMQ_IOLOOP is False, 'PyZMQ version must be >= 14.0.1 to run these tests.') class TestWebhookSaltAPIHandler(_SaltnadoIntegrationTestCase): def get_app(self): urls = [(r"/hook(/.*)?", saltnado.WebhookSaltAPIHandler), ] application = self.build_tornado_app(urls) self.application = application application.event_listener = saltnado.EventListener({}, self.opts) return application @skipIf(True, 'Skipping until we can devote more resources to debugging this test.') def test_post(self): self._future_resolved = threading.Event() try: def verify_event(future): ''' Notify the threading event that the future is resolved ''' self._future_resolved.set() self._finished = False # TODO: remove after some cleanup of the event listener # get an event future future = self.application.event_listener.get_event(self, tag='salt/netapi/hook', callback=verify_event) # fire the event response = self.fetch('/hook', method='POST', body='foo=bar', headers={saltnado.AUTH_TOKEN_HEADER: self.token['token']}, ) response_obj = salt.utils.json.loads(response.body) self.assertTrue(response_obj['success']) resolve_future_timeout = 60 self._future_resolved.wait(resolve_future_timeout) try: event = future.result() except Exception as exc: self.fail('Failed to resolve future under {} secs: {}'.format(resolve_future_timeout, exc)) self.assertEqual(event['tag'], 'salt/netapi/hook') self.assertIn('headers', event['data']) self.assertEqual( event['data']['post'], {'foo': salt.utils.stringutils.to_bytes('bar')} ) finally: self._future_resolved.clear() del self._future_resolved