123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519 |
- # -*- coding: utf-8 -*-
- from __future__ import absolute_import
- import copy
- import os
- import shutil
- import sys
- import tempfile
- import textwrap
- import salt.config
- import salt.loader
- import salt.utils.files
- import salt.utils.versions
- from salt.ext import six
- from salt.ext.six.moves import StringIO
- from salt.state import HighState
- from salt.utils.pydsl import PyDslError
- from tests.support.helpers import slowTest, with_tempdir
- from tests.support.runtests import RUNTIME_VARS
- from tests.support.unit import TestCase
- REQUISITES = ["require", "require_in", "use", "use_in", "watch", "watch_in"]
- class CommonTestCaseBoilerplate(TestCase):
- def setUp(self):
- self.root_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- self.addCleanup(shutil.rmtree, self.root_dir, ignore_errors=True)
- self.state_tree_dir = os.path.join(self.root_dir, "state_tree")
- self.cache_dir = os.path.join(self.root_dir, "cachedir")
- if not os.path.isdir(self.root_dir):
- os.makedirs(self.root_dir)
- if not os.path.isdir(self.state_tree_dir):
- os.makedirs(self.state_tree_dir)
- if not os.path.isdir(self.cache_dir):
- os.makedirs(self.cache_dir)
- self.config = salt.config.minion_config(None)
- self.config["root_dir"] = self.root_dir
- self.config["state_events"] = False
- self.config["id"] = "match"
- self.config["file_client"] = "local"
- self.config["file_roots"] = dict(base=[self.state_tree_dir])
- self.config["cachedir"] = self.cache_dir
- self.config["test"] = False
- self.config["grains"] = salt.loader.grains(self.config)
- self.HIGHSTATE = HighState(self.config)
- self.HIGHSTATE.push_active()
- def tearDown(self):
- try:
- self.HIGHSTATE.pop_active()
- except IndexError:
- pass
- del self.config
- del self.HIGHSTATE
- def state_highstate(self, state, dirpath):
- opts = copy.copy(self.config)
- opts["file_roots"] = dict(base=[dirpath])
- HIGHSTATE = HighState(opts)
- HIGHSTATE.push_active()
- try:
- high, errors = HIGHSTATE.render_highstate(state)
- if errors:
- import pprint
- pprint.pprint("\n".join(errors))
- pprint.pprint(high)
- out = HIGHSTATE.state.call_high(high)
- # pprint.pprint(out)
- finally:
- HIGHSTATE.pop_active()
- class PyDSLRendererTestCase(CommonTestCaseBoilerplate):
- """
- WARNING: If tests in here are flaky, they may need
- to be moved to their own class. Sharing HighState, especially
- through setUp/tearDown can create dangerous race conditions!
- """
- def render_sls(self, content, sls="", saltenv="base", **kws):
- if "env" in kws:
- # "env" is not supported; Use "saltenv".
- kws.pop("env")
- return self.HIGHSTATE.state.rend["pydsl"](
- StringIO(content), saltenv=saltenv, sls=sls, **kws
- )
- @slowTest
- def test_state_declarations(self):
- result = self.render_sls(
- textwrap.dedent(
- """
- state('A').cmd.run('ls -la', cwd='/var/tmp')
- state().file.managed('myfile.txt', source='salt://path/to/file')
- state('X').cmd('run', 'echo hello world', cwd='/')
- a_cmd = state('A').cmd
- a_cmd.run(shell='/bin/bash')
- state('A').service.running(name='apache')
- """
- )
- )
- self.assertTrue("A" in result and "X" in result)
- A_cmd = result["A"]["cmd"]
- self.assertEqual(A_cmd[0], "run")
- self.assertEqual(A_cmd[1]["name"], "ls -la")
- self.assertEqual(A_cmd[2]["cwd"], "/var/tmp")
- self.assertEqual(A_cmd[3]["shell"], "/bin/bash")
- A_service = result["A"]["service"]
- self.assertEqual(A_service[0], "running")
- self.assertEqual(A_service[1]["name"], "apache")
- X_cmd = result["X"]["cmd"]
- self.assertEqual(X_cmd[0], "run")
- self.assertEqual(X_cmd[1]["name"], "echo hello world")
- self.assertEqual(X_cmd[2]["cwd"], "/")
- del result["A"]
- del result["X"]
- self.assertEqual(len(result), 2)
- # 2 rather than 1 because pydsl adds an extra no-op state
- # declaration.
- s_iter = six.itervalues(result)
- try:
- s = next(s_iter)["file"]
- except KeyError:
- s = next(s_iter)["file"]
- self.assertEqual(s[0], "managed")
- self.assertEqual(s[1]["name"], "myfile.txt")
- self.assertEqual(s[2]["source"], "salt://path/to/file")
- @slowTest
- def test_requisite_declarations(self):
- result = self.render_sls(
- textwrap.dedent(
- """
- state('X').cmd.run('echo hello')
- state('A').cmd.run('mkdir tmp', cwd='/var')
- state('B').cmd.run('ls -la', cwd='/var/tmp') \
- .require(state('X').cmd) \
- .require(cmd='A') \
- .watch(service='G')
- state('G').service.running(name='collectd')
- state('G').service.watch_in(state('A').cmd)
- state('H').cmd.require_in(cmd='echo hello')
- state('H').cmd.run('echo world')
- """
- )
- )
- self.assertTrue(len(result), 6)
- self.assertTrue(set("X A B G H".split()).issubset(set(result.keys())))
- b = result["B"]["cmd"]
- self.assertEqual(b[0], "run")
- self.assertEqual(b[1]["name"], "ls -la")
- self.assertEqual(b[2]["cwd"], "/var/tmp")
- self.assertEqual(b[3]["require"][0]["cmd"], "X")
- self.assertEqual(b[4]["require"][0]["cmd"], "A")
- self.assertEqual(b[5]["watch"][0]["service"], "G")
- self.assertEqual(result["G"]["service"][2]["watch_in"][0]["cmd"], "A")
- self.assertEqual(result["H"]["cmd"][1]["require_in"][0]["cmd"], "echo hello")
- @slowTest
- def test_include_extend(self):
- result = self.render_sls(
- textwrap.dedent(
- """
- include(
- 'some.sls.file',
- 'another.sls.file',
- 'more.sls.file',
- delayed=True
- )
- A = state('A').cmd.run('echo hoho', cwd='/')
- state('B').cmd.run('echo hehe', cwd='/')
- extend(
- A,
- state('X').cmd.run(cwd='/a/b/c'),
- state('Y').file('managed', name='a_file.txt'),
- state('Z').service.watch(file='A')
- )
- """
- )
- )
- self.assertEqual(len(result), 4)
- self.assertEqual(
- result["include"],
- [
- {"base": sls}
- for sls in ("some.sls.file", "another.sls.file", "more.sls.file")
- ],
- )
- extend = result["extend"]
- self.assertEqual(extend["X"]["cmd"][0], "run")
- self.assertEqual(extend["X"]["cmd"][1]["cwd"], "/a/b/c")
- self.assertEqual(extend["Y"]["file"][0], "managed")
- self.assertEqual(extend["Y"]["file"][1]["name"], "a_file.txt")
- self.assertEqual(len(extend["Z"]["service"]), 1)
- self.assertEqual(extend["Z"]["service"][0]["watch"][0]["file"], "A")
- self.assertEqual(result["B"]["cmd"][0], "run")
- self.assertTrue("A" not in result)
- self.assertEqual(extend["A"]["cmd"][0], "run")
- @slowTest
- def test_cmd_call(self):
- result = self.HIGHSTATE.state.call_template_str(
- textwrap.dedent(
- """\
- #!pydsl
- state('A').cmd.run('echo this is state A', cwd='/')
- some_var = 12345
- def do_something(a, b, *args, **kws):
- return dict(result=True, changes={'a': a, 'b': b, 'args': args, 'kws': kws, 'some_var': some_var})
- state('C').cmd.call(do_something, 1, 2, 3, x=1, y=2) \
- .require(state('A').cmd)
- state('G').cmd.wait('echo this is state G', cwd='/') \
- .watch(state('C').cmd)
- """
- )
- )
- ret = next(result[k] for k in six.iterkeys(result) if "do_something" in k)
- changes = ret["changes"]
- self.assertEqual(
- changes, dict(a=1, b=2, args=(3,), kws=dict(x=1, y=2), some_var=12345)
- )
- ret = next(result[k] for k in six.iterkeys(result) if "-G_" in k)
- self.assertEqual(ret["changes"]["stdout"], "this is state G")
- @slowTest
- def test_multiple_state_func_in_state_mod(self):
- with self.assertRaisesRegex(PyDslError, "Multiple state functions"):
- self.render_sls(
- textwrap.dedent(
- """
- state('A').cmd.run('echo hoho')
- state('A').cmd.wait('echo hehe')
- """
- )
- )
- @slowTest
- def test_no_state_func_in_state_mod(self):
- with self.assertRaisesRegex(PyDslError, "No state function specified"):
- self.render_sls(
- textwrap.dedent(
- """
- state('B').cmd.require(cmd='hoho')
- """
- )
- )
- @slowTest
- def test_load_highstate(self):
- result = self.render_sls(
- textwrap.dedent(
- '''
- import salt.utils.yaml
- __pydsl__.load_highstate(salt.utils.yaml.safe_load("""
- A:
- cmd.run:
- - name: echo hello
- - cwd: /
- B:
- pkg:
- - installed
- service:
- - running
- - require:
- - pkg: B
- - watch:
- - cmd: A
- """))
- state('A').cmd.run(name='echo hello world')
- '''
- )
- )
- self.assertEqual(len(result), 3)
- self.assertEqual(result["A"]["cmd"][0], "run")
- self.assertIn({"name": "echo hello"}, result["A"]["cmd"])
- self.assertIn({"cwd": "/"}, result["A"]["cmd"])
- self.assertIn({"name": "echo hello world"}, result["A"]["cmd"])
- self.assertEqual(len(result["A"]["cmd"]), 4)
- self.assertEqual(len(result["B"]["pkg"]), 1)
- self.assertEqual(result["B"]["pkg"][0], "installed")
- self.assertEqual(result["B"]["service"][0], "running")
- self.assertIn({"require": [{"pkg": "B"}]}, result["B"]["service"])
- self.assertIn({"watch": [{"cmd": "A"}]}, result["B"]["service"])
- self.assertEqual(len(result["B"]["service"]), 3)
- @slowTest
- def test_ordered_states(self):
- result = self.render_sls(
- textwrap.dedent(
- """
- __pydsl__.set(ordered=True)
- A = state('A')
- state('B').cmd.run('echo bbbb')
- A.cmd.run('echo aaa')
- state('B').cmd.run(cwd='/')
- state('C').cmd.run('echo ccc')
- state('B').file.managed(source='/a/b/c')
- """
- )
- )
- self.assertEqual(len(result["B"]["cmd"]), 3)
- self.assertEqual(result["A"]["cmd"][1]["require"][0]["cmd"], "B")
- self.assertEqual(result["C"]["cmd"][1]["require"][0]["cmd"], "A")
- self.assertEqual(result["B"]["file"][1]["require"][0]["cmd"], "C")
- @with_tempdir()
- @slowTest
- def test_pipe_through_stateconf(self, dirpath):
- output = os.path.join(dirpath, "output")
- write_to(
- os.path.join(dirpath, "xxx.sls"),
- textwrap.dedent(
- """#!stateconf -os yaml . jinja
- .X:
- cmd.run:
- - name: echo X >> {0}
- - cwd: /
- .Y:
- cmd.run:
- - name: echo Y >> {0}
- - cwd: /
- .Z:
- cmd.run:
- - name: echo Z >> {0}
- - cwd: /
- """.format(
- output.replace("\\", "/")
- )
- ),
- )
- write_to(
- os.path.join(dirpath, "yyy.sls"),
- textwrap.dedent(
- """\
- #!pydsl|stateconf -ps
- __pydsl__.set(ordered=True)
- state('.D').cmd.run('echo D >> {0}', cwd='/')
- state('.E').cmd.run('echo E >> {0}', cwd='/')
- state('.F').cmd.run('echo F >> {0}', cwd='/')
- """.format(
- output.replace("\\", "/")
- )
- ),
- )
- write_to(
- os.path.join(dirpath, "aaa.sls"),
- textwrap.dedent(
- """\
- #!pydsl|stateconf -ps
- include('xxx', 'yyy')
- # make all states in xxx run BEFORE states in this sls.
- extend(state('.start').stateconf.require(stateconf='xxx::goal'))
- # make all states in yyy run AFTER this sls.
- extend(state('.goal').stateconf.require_in(stateconf='yyy::start'))
- __pydsl__.set(ordered=True)
- state('.A').cmd.run('echo A >> {0}', cwd='/')
- state('.B').cmd.run('echo B >> {0}', cwd='/')
- state('.C').cmd.run('echo C >> {0}', cwd='/')
- """.format(
- output.replace("\\", "/")
- )
- ),
- )
- self.state_highstate({"base": ["aaa"]}, dirpath)
- with salt.utils.files.fopen(output, "r") as f:
- self.assertEqual("".join(f.read().split()), "XYZABCDEF")
- @with_tempdir()
- @slowTest
- def test_compile_time_state_execution(self, dirpath):
- if not sys.stdin.isatty():
- self.skipTest("Not attached to a TTY")
- # The Windows shell will include any spaces before the redirect
- # in the text that is redirected.
- # For example: echo hello > test.txt will contain "hello "
- write_to(
- os.path.join(dirpath, "aaa.sls"),
- textwrap.dedent(
- """\
- #!pydsl
- __pydsl__.set(ordered=True)
- A = state('A')
- A.cmd.run('echo hehe>{0}/zzz.txt', cwd='/')
- A.file.managed('{0}/yyy.txt', source='salt://zzz.txt')
- A()
- A()
- state().cmd.run('echo hoho>>{0}/yyy.txt', cwd='/')
- A.file.managed('{0}/xxx.txt', source='salt://zzz.txt')
- A()
- """.format(
- dirpath.replace("\\", "/")
- )
- ),
- )
- self.state_highstate({"base": ["aaa"]}, dirpath)
- with salt.utils.files.fopen(os.path.join(dirpath, "yyy.txt"), "rt") as f:
- self.assertEqual(f.read(), "hehe" + os.linesep + "hoho" + os.linesep)
- with salt.utils.files.fopen(os.path.join(dirpath, "xxx.txt"), "rt") as f:
- self.assertEqual(f.read(), "hehe" + os.linesep)
- @with_tempdir()
- @slowTest
- def test_nested_high_state_execution(self, dirpath):
- output = os.path.join(dirpath, "output")
- write_to(
- os.path.join(dirpath, "aaa.sls"),
- textwrap.dedent(
- """\
- #!pydsl
- __salt__['state.sls']('bbb')
- state().cmd.run('echo bbbbbb', cwd='/')
- """
- ),
- )
- write_to(
- os.path.join(dirpath, "bbb.sls"),
- textwrap.dedent(
- """
- # {{ salt['state.sls']('ccc') }}
- test:
- cmd.run:
- - name: echo bbbbbbb
- - cwd: /
- """
- ),
- )
- write_to(
- os.path.join(dirpath, "ccc.sls"),
- textwrap.dedent(
- """
- #!pydsl
- state().cmd.run('echo ccccc', cwd='/')
- """
- ),
- )
- self.state_highstate({"base": ["aaa"]}, dirpath)
- @with_tempdir()
- @slowTest
- def test_repeat_includes(self, dirpath):
- output = os.path.join(dirpath, "output")
- write_to(
- os.path.join(dirpath, "b.sls"),
- textwrap.dedent(
- """\
- #!pydsl
- include('c')
- include('d')
- """
- ),
- )
- write_to(
- os.path.join(dirpath, "c.sls"),
- textwrap.dedent(
- """\
- #!pydsl
- modtest = include('e')
- modtest.success
- """
- ),
- )
- write_to(
- os.path.join(dirpath, "d.sls"),
- textwrap.dedent(
- """\
- #!pydsl
- modtest = include('e')
- modtest.success
- """
- ),
- )
- write_to(
- os.path.join(dirpath, "e.sls"),
- textwrap.dedent(
- """\
- #!pydsl
- success = True
- """
- ),
- )
- self.state_highstate({"base": ["b"]}, dirpath)
- self.state_highstate({"base": ["c", "d"]}, dirpath)
- def write_to(fpath, content):
- with salt.utils.files.fopen(fpath, "w") as f:
- f.write(content)
|