123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424 |
- # -*- coding: utf-8 -*-
- # Import Python libs
- from __future__ import absolute_import
- import os
- import sys
- import shutil
- import tempfile
- import textwrap
- import copy
- # Import Salt Testing libs
- from tests.support.helpers import with_tempdir
- from tests.support.unit import TestCase
- from tests.support.paths import TMP
- # Import Salt libs
- import salt.loader
- import salt.config
- import salt.utils.files
- import salt.utils.versions
- from salt.state import HighState
- from salt.utils.pydsl import PyDslError
- # Import 3rd-party libs
- from salt.ext import six
- from salt.ext.six.moves import StringIO
- REQUISITES = ['require', 'require_in', 'use', 'use_in', 'watch', 'watch_in']
- class CommonTestCaseBoilerplate(TestCase):
- def setUp(self):
- self.root_dir = tempfile.mkdtemp(dir=TMP)
- self.addCleanup(shutil.rmtree, self.root_dir, ignore_errors=True)
- self.state_tree_dir = os.path.join(self.root_dir, 'state_tree')
- self.cache_dir = os.path.join(self.root_dir, 'cachedir')
- if not os.path.isdir(self.root_dir):
- os.makedirs(self.root_dir)
- if not os.path.isdir(self.state_tree_dir):
- os.makedirs(self.state_tree_dir)
- if not os.path.isdir(self.cache_dir):
- os.makedirs(self.cache_dir)
- self.config = salt.config.minion_config(None)
- self.config['root_dir'] = self.root_dir
- self.config['state_events'] = False
- self.config['id'] = 'match'
- self.config['file_client'] = 'local'
- self.config['file_roots'] = dict(base=[self.state_tree_dir])
- self.config['cachedir'] = self.cache_dir
- self.config['test'] = False
- self.config['grains'] = salt.loader.grains(self.config)
- self.HIGHSTATE = HighState(self.config)
- self.HIGHSTATE.push_active()
- def tearDown(self):
- try:
- self.HIGHSTATE.pop_active()
- except IndexError:
- pass
- del self.config
- del self.HIGHSTATE
- def state_highstate(self, state, dirpath):
- opts = copy.copy(self.config)
- opts['file_roots'] = dict(base=[dirpath])
- HIGHSTATE = HighState(opts)
- HIGHSTATE.push_active()
- try:
- high, errors = HIGHSTATE.render_highstate(state)
- if errors:
- import pprint
- pprint.pprint('\n'.join(errors))
- pprint.pprint(high)
- out = HIGHSTATE.state.call_high(high)
- # pprint.pprint(out)
- finally:
- HIGHSTATE.pop_active()
- class PyDSLRendererTestCase(CommonTestCaseBoilerplate):
- '''
- WARNING: If tests in here are flaky, they may need
- to be moved to their own class. Sharing HighState, especially
- through setUp/tearDown can create dangerous race conditions!
- '''
- def render_sls(self, content, sls='', saltenv='base', **kws):
- if 'env' in kws:
- # "env" is not supported; Use "saltenv".
- kws.pop('env')
- return self.HIGHSTATE.state.rend['pydsl'](
- StringIO(content), saltenv=saltenv, sls=sls, **kws
- )
- def test_state_declarations(self):
- result = self.render_sls(textwrap.dedent('''
- state('A').cmd.run('ls -la', cwd='/var/tmp')
- state().file.managed('myfile.txt', source='salt://path/to/file')
- state('X').cmd('run', 'echo hello world', cwd='/')
- a_cmd = state('A').cmd
- a_cmd.run(shell='/bin/bash')
- state('A').service.running(name='apache')
- '''))
- self.assertTrue('A' in result and 'X' in result)
- A_cmd = result['A']['cmd']
- self.assertEqual(A_cmd[0], 'run')
- self.assertEqual(A_cmd[1]['name'], 'ls -la')
- self.assertEqual(A_cmd[2]['cwd'], '/var/tmp')
- self.assertEqual(A_cmd[3]['shell'], '/bin/bash')
- A_service = result['A']['service']
- self.assertEqual(A_service[0], 'running')
- self.assertEqual(A_service[1]['name'], 'apache')
- X_cmd = result['X']['cmd']
- self.assertEqual(X_cmd[0], 'run')
- self.assertEqual(X_cmd[1]['name'], 'echo hello world')
- self.assertEqual(X_cmd[2]['cwd'], '/')
- del result['A']
- del result['X']
- self.assertEqual(len(result), 2)
- # 2 rather than 1 because pydsl adds an extra no-op state
- # declaration.
- s_iter = six.itervalues(result)
- try:
- s = next(s_iter)['file']
- except KeyError:
- s = next(s_iter)['file']
- self.assertEqual(s[0], 'managed')
- self.assertEqual(s[1]['name'], 'myfile.txt')
- self.assertEqual(s[2]['source'], 'salt://path/to/file')
- def test_requisite_declarations(self):
- result = self.render_sls(textwrap.dedent('''
- state('X').cmd.run('echo hello')
- state('A').cmd.run('mkdir tmp', cwd='/var')
- state('B').cmd.run('ls -la', cwd='/var/tmp') \
- .require(state('X').cmd) \
- .require(cmd='A') \
- .watch(service='G')
- state('G').service.running(name='collectd')
- state('G').service.watch_in(state('A').cmd)
- state('H').cmd.require_in(cmd='echo hello')
- state('H').cmd.run('echo world')
- '''))
- self.assertTrue(len(result), 6)
- self.assertTrue(set("X A B G H".split()).issubset(set(result.keys())))
- b = result['B']['cmd']
- self.assertEqual(b[0], 'run')
- self.assertEqual(b[1]['name'], 'ls -la')
- self.assertEqual(b[2]['cwd'], '/var/tmp')
- self.assertEqual(b[3]['require'][0]['cmd'], 'X')
- self.assertEqual(b[4]['require'][0]['cmd'], 'A')
- self.assertEqual(b[5]['watch'][0]['service'], 'G')
- self.assertEqual(result['G']['service'][2]['watch_in'][0]['cmd'], 'A')
- self.assertEqual(
- result['H']['cmd'][1]['require_in'][0]['cmd'], 'echo hello'
- )
- def test_include_extend(self):
- result = self.render_sls(textwrap.dedent('''
- include(
- 'some.sls.file',
- 'another.sls.file',
- 'more.sls.file',
- delayed=True
- )
- A = state('A').cmd.run('echo hoho', cwd='/')
- state('B').cmd.run('echo hehe', cwd='/')
- extend(
- A,
- state('X').cmd.run(cwd='/a/b/c'),
- state('Y').file('managed', name='a_file.txt'),
- state('Z').service.watch(file='A')
- )
- '''))
- self.assertEqual(len(result), 4)
- self.assertEqual(
- result['include'],
- [{'base': sls} for sls in
- ('some.sls.file', 'another.sls.file', 'more.sls.file')]
- )
- extend = result['extend']
- self.assertEqual(extend['X']['cmd'][0], 'run')
- self.assertEqual(extend['X']['cmd'][1]['cwd'], '/a/b/c')
- self.assertEqual(extend['Y']['file'][0], 'managed')
- self.assertEqual(extend['Y']['file'][1]['name'], 'a_file.txt')
- self.assertEqual(len(extend['Z']['service']), 1)
- self.assertEqual(extend['Z']['service'][0]['watch'][0]['file'], 'A')
- self.assertEqual(result['B']['cmd'][0], 'run')
- self.assertTrue('A' not in result)
- self.assertEqual(extend['A']['cmd'][0], 'run')
- def test_cmd_call(self):
- result = self.HIGHSTATE.state.call_template_str(textwrap.dedent('''\
- #!pydsl
- state('A').cmd.run('echo this is state A', cwd='/')
- some_var = 12345
- def do_something(a, b, *args, **kws):
- return dict(result=True, changes={'a': a, 'b': b, 'args': args, 'kws': kws, 'some_var': some_var})
- state('C').cmd.call(do_something, 1, 2, 3, x=1, y=2) \
- .require(state('A').cmd)
- state('G').cmd.wait('echo this is state G', cwd='/') \
- .watch(state('C').cmd)
- '''))
- ret = next(result[k] for k in six.iterkeys(result) if 'do_something' in k)
- changes = ret['changes']
- self.assertEqual(
- changes,
- dict(a=1, b=2, args=(3,), kws=dict(x=1, y=2), some_var=12345)
- )
- ret = next(result[k] for k in six.iterkeys(result) if '-G_' in k)
- self.assertEqual(ret['changes']['stdout'], 'this is state G')
- def test_multiple_state_func_in_state_mod(self):
- with self.assertRaisesRegex(PyDslError, 'Multiple state functions'):
- self.render_sls(textwrap.dedent('''
- state('A').cmd.run('echo hoho')
- state('A').cmd.wait('echo hehe')
- '''))
- def test_no_state_func_in_state_mod(self):
- with self.assertRaisesRegex(PyDslError, 'No state function specified'):
- self.render_sls(textwrap.dedent('''
- state('B').cmd.require(cmd='hoho')
- '''))
- def test_load_highstate(self):
- result = self.render_sls(textwrap.dedent('''
- import salt.utils.yaml
- __pydsl__.load_highstate(salt.utils.yaml.safe_load("""
- A:
- cmd.run:
- - name: echo hello
- - cwd: /
- B:
- pkg:
- - installed
- service:
- - running
- - require:
- - pkg: B
- - watch:
- - cmd: A
- """))
- state('A').cmd.run(name='echo hello world')
- '''))
- self.assertEqual(len(result), 3)
- self.assertEqual(result['A']['cmd'][0], 'run')
- self.assertIn({'name': 'echo hello'}, result['A']['cmd'])
- self.assertIn({'cwd': '/'}, result['A']['cmd'])
- self.assertIn({'name': 'echo hello world'}, result['A']['cmd'])
- self.assertEqual(len(result['A']['cmd']), 4)
- self.assertEqual(len(result['B']['pkg']), 1)
- self.assertEqual(result['B']['pkg'][0], 'installed')
- self.assertEqual(result['B']['service'][0], 'running')
- self.assertIn({'require': [{'pkg': 'B'}]}, result['B']['service'])
- self.assertIn({'watch': [{'cmd': 'A'}]}, result['B']['service'])
- self.assertEqual(len(result['B']['service']), 3)
- def test_ordered_states(self):
- result = self.render_sls(textwrap.dedent('''
- __pydsl__.set(ordered=True)
- A = state('A')
- state('B').cmd.run('echo bbbb')
- A.cmd.run('echo aaa')
- state('B').cmd.run(cwd='/')
- state('C').cmd.run('echo ccc')
- state('B').file.managed(source='/a/b/c')
- '''))
- self.assertEqual(len(result['B']['cmd']), 3)
- self.assertEqual(result['A']['cmd'][1]['require'][0]['cmd'], 'B')
- self.assertEqual(result['C']['cmd'][1]['require'][0]['cmd'], 'A')
- self.assertEqual(result['B']['file'][1]['require'][0]['cmd'], 'C')
- @with_tempdir()
- def test_pipe_through_stateconf(self, dirpath):
- output = os.path.join(dirpath, 'output')
- write_to(os.path.join(dirpath, 'xxx.sls'), textwrap.dedent(
- '''#!stateconf -os yaml . jinja
- .X:
- cmd.run:
- - name: echo X >> {0}
- - cwd: /
- .Y:
- cmd.run:
- - name: echo Y >> {0}
- - cwd: /
- .Z:
- cmd.run:
- - name: echo Z >> {0}
- - cwd: /
- '''.format(output.replace('\\', '/'))))
- write_to(os.path.join(dirpath, 'yyy.sls'), textwrap.dedent('''\
- #!pydsl|stateconf -ps
- __pydsl__.set(ordered=True)
- state('.D').cmd.run('echo D >> {0}', cwd='/')
- state('.E').cmd.run('echo E >> {0}', cwd='/')
- state('.F').cmd.run('echo F >> {0}', cwd='/')
- '''.format(output.replace('\\', '/'))))
- write_to(os.path.join(dirpath, 'aaa.sls'), textwrap.dedent('''\
- #!pydsl|stateconf -ps
- include('xxx', 'yyy')
- # make all states in xxx run BEFORE states in this sls.
- extend(state('.start').stateconf.require(stateconf='xxx::goal'))
- # make all states in yyy run AFTER this sls.
- extend(state('.goal').stateconf.require_in(stateconf='yyy::start'))
- __pydsl__.set(ordered=True)
- state('.A').cmd.run('echo A >> {0}', cwd='/')
- state('.B').cmd.run('echo B >> {0}', cwd='/')
- state('.C').cmd.run('echo C >> {0}', cwd='/')
- '''.format(output.replace('\\', '/'))))
- self.state_highstate({'base': ['aaa']}, dirpath)
- with salt.utils.files.fopen(output, 'r') as f:
- self.assertEqual(''.join(f.read().split()), "XYZABCDEF")
- @with_tempdir()
- def test_compile_time_state_execution(self, dirpath):
- if not sys.stdin.isatty():
- self.skipTest('Not attached to a TTY')
- # The Windows shell will include any spaces before the redirect
- # in the text that is redirected.
- # For example: echo hello > test.txt will contain "hello "
- write_to(os.path.join(dirpath, 'aaa.sls'), textwrap.dedent('''\
- #!pydsl
- __pydsl__.set(ordered=True)
- A = state('A')
- A.cmd.run('echo hehe>{0}/zzz.txt', cwd='/')
- A.file.managed('{0}/yyy.txt', source='salt://zzz.txt')
- A()
- A()
- state().cmd.run('echo hoho>>{0}/yyy.txt', cwd='/')
- A.file.managed('{0}/xxx.txt', source='salt://zzz.txt')
- A()
- '''.format(dirpath.replace('\\', '/'))))
- self.state_highstate({'base': ['aaa']}, dirpath)
- with salt.utils.files.fopen(os.path.join(dirpath, 'yyy.txt'), 'rt') as f:
- self.assertEqual(f.read(), 'hehe' + os.linesep + 'hoho' + os.linesep)
- with salt.utils.files.fopen(os.path.join(dirpath, 'xxx.txt'), 'rt') as f:
- self.assertEqual(f.read(), 'hehe' + os.linesep)
- @with_tempdir()
- def test_nested_high_state_execution(self, dirpath):
- output = os.path.join(dirpath, 'output')
- write_to(os.path.join(dirpath, 'aaa.sls'), textwrap.dedent('''\
- #!pydsl
- __salt__['state.sls']('bbb')
- state().cmd.run('echo bbbbbb', cwd='/')
- '''))
- write_to(os.path.join(dirpath, 'bbb.sls'), textwrap.dedent(
- '''
- # {{ salt['state.sls']('ccc') }}
- test:
- cmd.run:
- - name: echo bbbbbbb
- - cwd: /
- '''))
- write_to(os.path.join(dirpath, 'ccc.sls'), textwrap.dedent(
- '''
- #!pydsl
- state().cmd.run('echo ccccc', cwd='/')
- '''))
- self.state_highstate({'base': ['aaa']}, dirpath)
- @with_tempdir()
- def test_repeat_includes(self, dirpath):
- output = os.path.join(dirpath, 'output')
- write_to(os.path.join(dirpath, 'b.sls'), textwrap.dedent('''\
- #!pydsl
- include('c')
- include('d')
- '''))
- write_to(os.path.join(dirpath, 'c.sls'), textwrap.dedent('''\
- #!pydsl
- modtest = include('e')
- modtest.success
- '''))
- write_to(os.path.join(dirpath, 'd.sls'), textwrap.dedent('''\
- #!pydsl
- modtest = include('e')
- modtest.success
- '''))
- write_to(os.path.join(dirpath, 'e.sls'), textwrap.dedent('''\
- #!pydsl
- success = True
- '''))
- self.state_highstate({'base': ['b']}, dirpath)
- self.state_highstate({'base': ['c', 'd']}, dirpath)
- def write_to(fpath, content):
- with salt.utils.files.fopen(fpath, 'w') as f:
- f.write(content)
|