# -*- coding: utf-8 -*- # Import python libs from __future__ import absolute_import, print_function, unicode_literals import os import shutil import stat import sys import tempfile # Import salt libs import salt.utils.files import salt.utils.find from salt.ext import six # Import 3rd-party libs from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin # Import Salt Testing libs from tests.support.runtests import RUNTIME_VARS from tests.support.unit import TestCase, skipIf class TestFind(TestCase): def test_parse_interval(self): self.assertRaises(ValueError, salt.utils.find._parse_interval, "w") self.assertRaises(ValueError, salt.utils.find._parse_interval, "1") self.assertRaises(ValueError, salt.utils.find._parse_interval, "1s1w") self.assertRaises(ValueError, salt.utils.find._parse_interval, "1s1s") result, resolution, modifier = salt.utils.find._parse_interval("") self.assertEqual(result, 0) self.assertIs(resolution, None) self.assertEqual(modifier, "") result, resolution, modifier = salt.utils.find._parse_interval("1s") self.assertEqual(result, 1.0) self.assertEqual(resolution, 1) self.assertEqual(modifier, "") result, resolution, modifier = salt.utils.find._parse_interval("1m") self.assertEqual(result, 60.0) self.assertEqual(resolution, 60) self.assertEqual(modifier, "") result, resolution, modifier = salt.utils.find._parse_interval("1h") self.assertEqual(result, 3600.0) self.assertEqual(resolution, 3600) self.assertEqual(modifier, "") result, resolution, modifier = salt.utils.find._parse_interval("1d") self.assertEqual(result, 86400.0) self.assertEqual(resolution, 86400) self.assertEqual(modifier, "") result, resolution, modifier = salt.utils.find._parse_interval("1w") self.assertEqual(result, 604800.0) self.assertEqual(resolution, 604800) self.assertEqual(modifier, "") result, resolution, modifier = salt.utils.find._parse_interval("1w3d6h") self.assertEqual(result, 885600.0) self.assertEqual(resolution, 3600) self.assertEqual(modifier, "") result, resolution, modifier = salt.utils.find._parse_interval("1m1s") self.assertEqual(result, 61.0) self.assertEqual(resolution, 1) self.assertEqual(modifier, "") result, resolution, modifier = salt.utils.find._parse_interval("1m2s") self.assertEqual(result, 62.0) self.assertEqual(resolution, 1) self.assertEqual(modifier, "") result, resolution, modifier = salt.utils.find._parse_interval("+1d") self.assertEqual(result, 86400.0) self.assertEqual(resolution, 86400) self.assertEqual(modifier, "+") result, resolution, modifier = salt.utils.find._parse_interval("-1d") self.assertEqual(result, 86400.0) self.assertEqual(resolution, 86400) self.assertEqual(modifier, "-") def test_parse_size(self): self.assertRaises(ValueError, salt.utils.find._parse_size, "") self.assertRaises(ValueError, salt.utils.find._parse_size, "1s1s") min_size, max_size = salt.utils.find._parse_size("1") self.assertEqual(min_size, 1) self.assertEqual(max_size, 1) min_size, max_size = salt.utils.find._parse_size("1b") self.assertEqual(min_size, 1) self.assertEqual(max_size, 1) min_size, max_size = salt.utils.find._parse_size("1k") self.assertEqual(min_size, 1024) self.assertEqual(max_size, 2047) min_size, max_size = salt.utils.find._parse_size("1m") self.assertEqual(min_size, 1048576) self.assertEqual(max_size, 2097151) min_size, max_size = salt.utils.find._parse_size("1g") self.assertEqual(min_size, 1073741824) self.assertEqual(max_size, 2147483647) min_size, max_size = salt.utils.find._parse_size("1t") self.assertEqual(min_size, 1099511627776) self.assertEqual(max_size, 2199023255551) min_size, max_size = salt.utils.find._parse_size("0m") self.assertEqual(min_size, 0) self.assertEqual(max_size, 1048575) min_size, max_size = salt.utils.find._parse_size("-1m") self.assertEqual(min_size, 0) self.assertEqual(max_size, 1048576) min_size, max_size = salt.utils.find._parse_size("+1m") self.assertEqual(min_size, 1048576) self.assertEqual(max_size, sys.maxsize) min_size, max_size = salt.utils.find._parse_size("+1M") self.assertEqual(min_size, 1048576) self.assertEqual(max_size, sys.maxsize) def test_option_requires(self): option = salt.utils.find.Option() self.assertEqual(option.requires(), salt.utils.find._REQUIRES_PATH) def test_name_option_match(self): option = salt.utils.find.NameOption("name", "*.txt") self.assertIs(option.match("", "", ""), None) self.assertIs(option.match("", "hello.txt", "").group(), "hello.txt") self.assertIs(option.match("", "HELLO.TXT", ""), None) def test_iname_option_match(self): option = salt.utils.find.InameOption("name", "*.txt") self.assertIs(option.match("", "", ""), None) self.assertIs(option.match("", "hello.txt", "").group(), "hello.txt") self.assertIs(option.match("", "HELLO.TXT", "").group(), "HELLO.TXT") def test_regex_option_match(self): self.assertRaises(ValueError, salt.utils.find.RegexOption, "name", "(.*}") option = salt.utils.find.RegexOption("name", r".*\.txt") self.assertIs(option.match("", "", ""), None) self.assertIs(option.match("", "hello.txt", "").group(), "hello.txt") self.assertIs(option.match("", "HELLO.TXT", ""), None) def test_iregex_option_match(self): self.assertRaises(ValueError, salt.utils.find.IregexOption, "name", "(.*}") option = salt.utils.find.IregexOption("name", r".*\.txt") self.assertIs(option.match("", "", ""), None) self.assertIs(option.match("", "hello.txt", "").group(), "hello.txt") self.assertIs(option.match("", "HELLO.TXT", "").group(), "HELLO.TXT") def test_type_option_requires(self): self.assertRaises(ValueError, salt.utils.find.TypeOption, "type", "w") option = salt.utils.find.TypeOption("type", "d") self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT) def test_type_option_match(self): option = salt.utils.find.TypeOption("type", "b") self.assertEqual(option.match("", "", [stat.S_IFREG]), False) option = salt.utils.find.TypeOption("type", "c") self.assertEqual(option.match("", "", [stat.S_IFREG]), False) option = salt.utils.find.TypeOption("type", "d") self.assertEqual(option.match("", "", [stat.S_IFREG]), False) option = salt.utils.find.TypeOption("type", "f") self.assertEqual(option.match("", "", [stat.S_IFREG]), True) option = salt.utils.find.TypeOption("type", "l") self.assertEqual(option.match("", "", [stat.S_IFREG]), False) option = salt.utils.find.TypeOption("type", "p") self.assertEqual(option.match("", "", [stat.S_IFREG]), False) option = salt.utils.find.TypeOption("type", "s") self.assertEqual(option.match("", "", [stat.S_IFREG]), False) option = salt.utils.find.TypeOption("type", "b") self.assertEqual(option.match("", "", [stat.S_IFBLK]), True) option = salt.utils.find.TypeOption("type", "c") self.assertEqual(option.match("", "", [stat.S_IFCHR]), True) option = salt.utils.find.TypeOption("type", "d") self.assertEqual(option.match("", "", [stat.S_IFDIR]), True) option = salt.utils.find.TypeOption("type", "l") self.assertEqual(option.match("", "", [stat.S_IFLNK]), True) option = salt.utils.find.TypeOption("type", "p") self.assertEqual(option.match("", "", [stat.S_IFIFO]), True) option = salt.utils.find.TypeOption("type", "s") self.assertEqual(option.match("", "", [stat.S_IFSOCK]), True) @skipIf(sys.platform.startswith("win"), "pwd not available on Windows") def test_owner_option_requires(self): self.assertRaises(ValueError, salt.utils.find.OwnerOption, "owner", "notexist") option = salt.utils.find.OwnerOption("owner", "root") self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT) @skipIf(sys.platform.startswith("win"), "pwd not available on Windows") def test_owner_option_match(self): option = salt.utils.find.OwnerOption("owner", "root") self.assertEqual(option.match("", "", [0] * 5), True) option = salt.utils.find.OwnerOption("owner", "500") self.assertEqual(option.match("", "", [500] * 5), True) @skipIf(sys.platform.startswith("win"), "grp not available on Windows") def test_group_option_requires(self): self.assertRaises(ValueError, salt.utils.find.GroupOption, "group", "notexist") if sys.platform.startswith(("darwin", "freebsd", "openbsd")): group_name = "wheel" else: group_name = "root" option = salt.utils.find.GroupOption("group", group_name) self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT) @skipIf(sys.platform.startswith("win"), "grp not available on Windows") def test_group_option_match(self): if sys.platform.startswith(("darwin", "freebsd", "openbsd")): group_name = "wheel" else: group_name = "root" option = salt.utils.find.GroupOption("group", group_name) self.assertEqual(option.match("", "", [0] * 6), True) option = salt.utils.find.GroupOption("group", "500") self.assertEqual(option.match("", "", [500] * 6), True) def test_size_option_requires(self): self.assertRaises(ValueError, salt.utils.find.SizeOption, "size", "1s1s") option = salt.utils.find.SizeOption("size", "+1G") self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT) def test_size_option_match(self): option = salt.utils.find.SizeOption("size", "+1k") self.assertEqual(option.match("", "", [10000] * 7), True) option = salt.utils.find.SizeOption("size", "+1G") self.assertEqual(option.match("", "", [10000] * 7), False) def test_mtime_option_requires(self): self.assertRaises(ValueError, salt.utils.find.MtimeOption, "mtime", "4g") option = salt.utils.find.MtimeOption("mtime", "1d") self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT) def test_mtime_option_match(self): option = salt.utils.find.MtimeOption("mtime", "-1w") self.assertEqual(option.match("", "", [1] * 9), False) option = salt.utils.find.MtimeOption("mtime", "-1s") self.assertEqual(option.match("", "", [10 ** 10] * 9), True) class TestGrepOption(TestCase): def setUp(self): super(TestGrepOption, self).setUp() self.tmpdir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP) def tearDown(self): shutil.rmtree(self.tmpdir) super(TestGrepOption, self).tearDown() def test_grep_option_requires(self): self.assertRaises(ValueError, salt.utils.find.GrepOption, "grep", "(foo)|(bar}") option = salt.utils.find.GrepOption("grep", "(foo)|(bar)") find = salt.utils.find self.assertEqual( option.requires(), (find._REQUIRES_CONTENTS | find._REQUIRES_STAT) ) def test_grep_option_match_regular_file(self): hello_file = os.path.join(self.tmpdir, "hello.txt") with salt.utils.files.fopen(hello_file, "w") as fp_: fp_.write(salt.utils.stringutils.to_str("foo")) option = salt.utils.find.GrepOption("grep", "foo") self.assertEqual( option.match(self.tmpdir, "hello.txt", os.stat(hello_file)), hello_file ) option = salt.utils.find.GrepOption("grep", "bar") self.assertEqual( option.match(self.tmpdir, "hello.txt", os.stat(hello_file)), None ) @skipIf(sys.platform.startswith("win"), "No /dev/null on Windows") def test_grep_option_match_dev_null(self): option = salt.utils.find.GrepOption("grep", "foo") self.assertEqual(option.match("dev", "null", os.stat("/dev/null")), None) class TestPrintOption(TestCase): def setUp(self): super(TestPrintOption, self).setUp() self.tmpdir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP) def tearDown(self): shutil.rmtree(self.tmpdir) super(TestPrintOption, self).tearDown() def test_print_option_defaults(self): option = salt.utils.find.PrintOption("print", "") self.assertEqual(option.need_stat, False) self.assertEqual(option.print_title, False) self.assertEqual(option.fmt, ["path"]) def test_print_option_requires(self): option = salt.utils.find.PrintOption("print", "") self.assertEqual(option.requires(), salt.utils.find._REQUIRES_PATH) option = salt.utils.find.PrintOption("print", "name") self.assertEqual(option.requires(), salt.utils.find._REQUIRES_PATH) option = salt.utils.find.PrintOption("print", "path") self.assertEqual(option.requires(), salt.utils.find._REQUIRES_PATH) option = salt.utils.find.PrintOption("print", "name,path") self.assertEqual(option.requires(), salt.utils.find._REQUIRES_PATH) option = salt.utils.find.PrintOption("print", "user") self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT) option = salt.utils.find.PrintOption("print", "path user") self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT) def test_print_option_execute(self): hello_file = os.path.join(self.tmpdir, "hello.txt") with salt.utils.files.fopen(hello_file, "w") as fp_: fp_.write(salt.utils.stringutils.to_str("foo")) option = salt.utils.find.PrintOption("print", "") self.assertEqual(option.execute("", [0] * 9), "") option = salt.utils.find.PrintOption("print", "path") self.assertEqual(option.execute("test_name", [0] * 9), "test_name") option = salt.utils.find.PrintOption("print", "name") self.assertEqual(option.execute("test_name", [0] * 9), "test_name") option = salt.utils.find.PrintOption("print", "size") self.assertEqual(option.execute(hello_file, os.stat(hello_file)), 3) option = salt.utils.find.PrintOption("print", "type") self.assertEqual(option.execute(hello_file, os.stat(hello_file)), "f") option = salt.utils.find.PrintOption("print", "mode") self.assertEqual(option.execute(hello_file, range(10)), 0) option = salt.utils.find.PrintOption("print", "mtime") self.assertEqual(option.execute(hello_file, range(10)), 8) option = salt.utils.find.PrintOption("print", "md5") self.assertEqual( option.execute(hello_file, os.stat(hello_file)), "acbd18db4cc2f85cedef654fccc4a4d8", ) option = salt.utils.find.PrintOption("print", "path name") self.assertEqual( option.execute("test_name", [0] * 9), ["test_name", "test_name"] ) option = salt.utils.find.PrintOption("print", "size name") self.assertEqual(option.execute("test_name", [0] * 9), [0, "test_name"]) @skipIf(sys.platform.startswith("win"), "pwd not available on Windows") def test_print_user(self): option = salt.utils.find.PrintOption("print", "user") self.assertEqual(option.execute("", [0] * 10), "root") option = salt.utils.find.PrintOption("print", "user") self.assertEqual(option.execute("", [2 ** 31] * 10), 2 ** 31) @skipIf(sys.platform.startswith("win"), "grp not available on Windows") def test_print_group(self): option = salt.utils.find.PrintOption("print", "group") if sys.platform.startswith(("darwin", "freebsd", "openbsd")): group_name = "wheel" else: group_name = "root" self.assertEqual(option.execute("", [0] * 10), group_name) # This seems to be not working in Ubuntu 12.04 32 bit # option = salt.utils.find.PrintOption('print', 'group') # self.assertEqual(option.execute('', [2 ** 31] * 10), 2 ** 31) @skipIf(sys.platform.startswith("win"), "no /dev/null on windows") def test_print_md5(self): option = salt.utils.find.PrintOption("print", "md5") self.assertEqual(option.execute("/dev/null", os.stat("/dev/null")), "") class TestFinder(TestCase): def setUp(self): super(TestFinder, self).setUp() self.tmpdir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP) def tearDown(self): shutil.rmtree(self.tmpdir) super(TestFinder, self).tearDown() @skipIf(sys.platform.startswith("win"), "No /dev/null on Windows") def test_init(self): finder = salt.utils.find.Finder({}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], "PrintOption" ) self.assertEqual(finder.criteria, []) finder = salt.utils.find.Finder({"_": None}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], "PrintOption" ) self.assertEqual(finder.criteria, []) self.assertRaises(ValueError, salt.utils.find.Finder, {"": None}) self.assertRaises(ValueError, salt.utils.find.Finder, {"name": None}) self.assertRaises(ValueError, salt.utils.find.Finder, {"nonexist": "somevalue"}) finder = salt.utils.find.Finder({"name": "test_name"}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], "PrintOption" ) self.assertEqual( six.text_type(finder.criteria[0].__class__)[-12:-2], "NameOption" ) finder = salt.utils.find.Finder({"iname": "test_name"}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], "PrintOption" ) self.assertEqual( six.text_type(finder.criteria[0].__class__)[-13:-2], "InameOption" ) finder = salt.utils.find.Finder({"regex": r".*\.txt"}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], "PrintOption" ) self.assertEqual( six.text_type(finder.criteria[0].__class__)[-13:-2], "RegexOption" ) finder = salt.utils.find.Finder({"iregex": r".*\.txt"}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], "PrintOption" ) self.assertEqual( six.text_type(finder.criteria[0].__class__)[-14:-2], "IregexOption" ) finder = salt.utils.find.Finder({"type": "d"}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], "PrintOption" ) self.assertEqual( six.text_type(finder.criteria[0].__class__)[-12:-2], "TypeOption" ) finder = salt.utils.find.Finder({"owner": "root"}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], "PrintOption" ) self.assertEqual( six.text_type(finder.criteria[0].__class__)[-13:-2], "OwnerOption" ) if sys.platform.startswith(("darwin", "freebsd", "openbsd")): group_name = "wheel" else: group_name = "root" finder = salt.utils.find.Finder({"group": group_name}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], "PrintOption" ) self.assertEqual( six.text_type(finder.criteria[0].__class__)[-13:-2], "GroupOption" ) finder = salt.utils.find.Finder({"size": "+1G"}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], "PrintOption" ) self.assertEqual( six.text_type(finder.criteria[0].__class__)[-12:-2], "SizeOption" ) finder = salt.utils.find.Finder({"mtime": "1d"}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], "PrintOption" ) self.assertEqual( six.text_type(finder.criteria[0].__class__)[-13:-2], "MtimeOption" ) finder = salt.utils.find.Finder({"grep": "foo"}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], "PrintOption" ) self.assertEqual( six.text_type(finder.criteria[0].__class__)[-12:-2], "GrepOption" ) finder = salt.utils.find.Finder({"print": "name"}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], "PrintOption" ) self.assertEqual(finder.criteria, []) def test_find(self): hello_file = os.path.join(self.tmpdir, "hello.txt") with salt.utils.files.fopen(hello_file, "w") as fp_: fp_.write(salt.utils.stringutils.to_str("foo")) finder = salt.utils.find.Finder({}) self.assertEqual(list(finder.find(self.tmpdir)), [self.tmpdir, hello_file]) finder = salt.utils.find.Finder({"mindepth": 1}) self.assertEqual(list(finder.find(self.tmpdir)), [hello_file]) finder = salt.utils.find.Finder({"maxdepth": 0}) self.assertEqual(list(finder.find(self.tmpdir)), [self.tmpdir]) finder = salt.utils.find.Finder({"name": "hello.txt"}) self.assertEqual(list(finder.find(self.tmpdir)), [hello_file]) finder = salt.utils.find.Finder({"type": "f", "print": "path"}) self.assertEqual(list(finder.find(self.tmpdir)), [hello_file]) finder = salt.utils.find.Finder({"size": "+1G", "print": "path"}) self.assertEqual(list(finder.find(self.tmpdir)), []) finder = salt.utils.find.Finder({"name": "hello.txt", "print": "path name"}) self.assertEqual(list(finder.find(self.tmpdir)), [[hello_file, "hello.txt"]]) finder = salt.utils.find.Finder({"name": "test_name"}) self.assertEqual(list(finder.find("")), [])