# -*- coding: utf-8 -*- # Import python libs from __future__ import absolute_import, unicode_literals, print_function import os import sys import shutil import tempfile import stat # Import Salt Testing libs from tests.support.runtests import RUNTIME_VARS from tests.support.unit import skipIf, TestCase # Import salt libs import salt.utils.files import salt.utils.find # Import 3rd-party libs from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin from salt.ext import six class TestFind(TestCase): def test_parse_interval(self): self.assertRaises(ValueError, salt.utils.find._parse_interval, 'w') self.assertRaises(ValueError, salt.utils.find._parse_interval, '1') self.assertRaises(ValueError, salt.utils.find._parse_interval, '1s1w') self.assertRaises(ValueError, salt.utils.find._parse_interval, '1s1s') result, resolution, modifier = salt.utils.find._parse_interval('') self.assertEqual(result, 0) self.assertIs(resolution, None) self.assertEqual(modifier, '') result, resolution, modifier = salt.utils.find._parse_interval('1s') self.assertEqual(result, 1.0) self.assertEqual(resolution, 1) self.assertEqual(modifier, '') result, resolution, modifier = salt.utils.find._parse_interval('1m') self.assertEqual(result, 60.0) self.assertEqual(resolution, 60) self.assertEqual(modifier, '') result, resolution, modifier = salt.utils.find._parse_interval('1h') self.assertEqual(result, 3600.0) self.assertEqual(resolution, 3600) self.assertEqual(modifier, '') result, resolution, modifier = salt.utils.find._parse_interval('1d') self.assertEqual(result, 86400.0) self.assertEqual(resolution, 86400) self.assertEqual(modifier, '') result, resolution, modifier = salt.utils.find._parse_interval('1w') self.assertEqual(result, 604800.0) self.assertEqual(resolution, 604800) self.assertEqual(modifier, '') result, resolution, modifier = salt.utils.find._parse_interval('1w3d6h') self.assertEqual(result, 885600.0) self.assertEqual(resolution, 3600) self.assertEqual(modifier, '') result, resolution, modifier = salt.utils.find._parse_interval('1m1s') self.assertEqual(result, 61.0) self.assertEqual(resolution, 1) self.assertEqual(modifier, '') result, resolution, modifier = salt.utils.find._parse_interval('1m2s') self.assertEqual(result, 62.0) self.assertEqual(resolution, 1) self.assertEqual(modifier, '') result, resolution, modifier = salt.utils.find._parse_interval('+1d') self.assertEqual(result, 86400.0) self.assertEqual(resolution, 86400) self.assertEqual(modifier, '+') result, resolution, modifier = salt.utils.find._parse_interval('-1d') self.assertEqual(result, 86400.0) self.assertEqual(resolution, 86400) self.assertEqual(modifier, '-') def test_parse_size(self): self.assertRaises(ValueError, salt.utils.find._parse_size, '') self.assertRaises(ValueError, salt.utils.find._parse_size, '1s1s') min_size, max_size = salt.utils.find._parse_size('1') self.assertEqual(min_size, 1) self.assertEqual(max_size, 1) min_size, max_size = salt.utils.find._parse_size('1b') self.assertEqual(min_size, 1) self.assertEqual(max_size, 1) min_size, max_size = salt.utils.find._parse_size('1k') self.assertEqual(min_size, 1024) self.assertEqual(max_size, 2047) min_size, max_size = salt.utils.find._parse_size('1m') self.assertEqual(min_size, 1048576) self.assertEqual(max_size, 2097151) min_size, max_size = salt.utils.find._parse_size('1g') self.assertEqual(min_size, 1073741824) self.assertEqual(max_size, 2147483647) min_size, max_size = salt.utils.find._parse_size('1t') self.assertEqual(min_size, 1099511627776) self.assertEqual(max_size, 2199023255551) min_size, max_size = salt.utils.find._parse_size('0m') self.assertEqual(min_size, 0) self.assertEqual(max_size, 1048575) min_size, max_size = salt.utils.find._parse_size('-1m') self.assertEqual(min_size, 0) self.assertEqual(max_size, 1048576) min_size, max_size = salt.utils.find._parse_size('+1m') self.assertEqual(min_size, 1048576) self.assertEqual(max_size, sys.maxsize) min_size, max_size = salt.utils.find._parse_size('+1M') self.assertEqual(min_size, 1048576) self.assertEqual(max_size, sys.maxsize) def test_option_requires(self): option = salt.utils.find.Option() self.assertEqual(option.requires(), salt.utils.find._REQUIRES_PATH) def test_name_option_match(self): option = salt.utils.find.NameOption('name', '*.txt') self.assertIs(option.match('', '', ''), None) self.assertIs(option.match('', 'hello.txt', '').group(), 'hello.txt') self.assertIs(option.match('', 'HELLO.TXT', ''), None) def test_iname_option_match(self): option = salt.utils.find.InameOption('name', '*.txt') self.assertIs(option.match('', '', ''), None) self.assertIs(option.match('', 'hello.txt', '').group(), 'hello.txt') self.assertIs(option.match('', 'HELLO.TXT', '').group(), 'HELLO.TXT') def test_regex_option_match(self): self.assertRaises( ValueError, salt.utils.find.RegexOption, 'name', '(.*}' ) option = salt.utils.find.RegexOption('name', r'.*\.txt') self.assertIs(option.match('', '', ''), None) self.assertIs(option.match('', 'hello.txt', '').group(), 'hello.txt') self.assertIs(option.match('', 'HELLO.TXT', ''), None) def test_iregex_option_match(self): self.assertRaises( ValueError, salt.utils.find.IregexOption, 'name', '(.*}' ) option = salt.utils.find.IregexOption('name', r'.*\.txt') self.assertIs(option.match('', '', ''), None) self.assertIs(option.match('', 'hello.txt', '').group(), 'hello.txt') self.assertIs(option.match('', 'HELLO.TXT', '').group(), 'HELLO.TXT') def test_type_option_requires(self): self.assertRaises(ValueError, salt.utils.find.TypeOption, 'type', 'w') option = salt.utils.find.TypeOption('type', 'd') self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT) def test_type_option_match(self): option = salt.utils.find.TypeOption('type', 'b') self.assertEqual(option.match('', '', [stat.S_IFREG]), False) option = salt.utils.find.TypeOption('type', 'c') self.assertEqual(option.match('', '', [stat.S_IFREG]), False) option = salt.utils.find.TypeOption('type', 'd') self.assertEqual(option.match('', '', [stat.S_IFREG]), False) option = salt.utils.find.TypeOption('type', 'f') self.assertEqual(option.match('', '', [stat.S_IFREG]), True) option = salt.utils.find.TypeOption('type', 'l') self.assertEqual(option.match('', '', [stat.S_IFREG]), False) option = salt.utils.find.TypeOption('type', 'p') self.assertEqual(option.match('', '', [stat.S_IFREG]), False) option = salt.utils.find.TypeOption('type', 's') self.assertEqual(option.match('', '', [stat.S_IFREG]), False) option = salt.utils.find.TypeOption('type', 'b') self.assertEqual(option.match('', '', [stat.S_IFBLK]), True) option = salt.utils.find.TypeOption('type', 'c') self.assertEqual(option.match('', '', [stat.S_IFCHR]), True) option = salt.utils.find.TypeOption('type', 'd') self.assertEqual(option.match('', '', [stat.S_IFDIR]), True) option = salt.utils.find.TypeOption('type', 'l') self.assertEqual(option.match('', '', [stat.S_IFLNK]), True) option = salt.utils.find.TypeOption('type', 'p') self.assertEqual(option.match('', '', [stat.S_IFIFO]), True) option = salt.utils.find.TypeOption('type', 's') self.assertEqual(option.match('', '', [stat.S_IFSOCK]), True) @skipIf(sys.platform.startswith('win'), 'pwd not available on Windows') def test_owner_option_requires(self): self.assertRaises( ValueError, salt.utils.find.OwnerOption, 'owner', 'notexist' ) option = salt.utils.find.OwnerOption('owner', 'root') self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT) @skipIf(sys.platform.startswith('win'), 'pwd not available on Windows') def test_owner_option_match(self): option = salt.utils.find.OwnerOption('owner', 'root') self.assertEqual(option.match('', '', [0] * 5), True) option = salt.utils.find.OwnerOption('owner', '500') self.assertEqual(option.match('', '', [500] * 5), True) @skipIf(sys.platform.startswith('win'), 'grp not available on Windows') def test_group_option_requires(self): self.assertRaises( ValueError, salt.utils.find.GroupOption, 'group', 'notexist' ) if sys.platform.startswith(('darwin', 'freebsd', 'openbsd')): group_name = 'wheel' else: group_name = 'root' option = salt.utils.find.GroupOption('group', group_name) self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT) @skipIf(sys.platform.startswith('win'), 'grp not available on Windows') def test_group_option_match(self): if sys.platform.startswith(('darwin', 'freebsd', 'openbsd')): group_name = 'wheel' else: group_name = 'root' option = salt.utils.find.GroupOption('group', group_name) self.assertEqual(option.match('', '', [0] * 6), True) option = salt.utils.find.GroupOption('group', '500') self.assertEqual(option.match('', '', [500] * 6), True) def test_size_option_requires(self): self.assertRaises( ValueError, salt.utils.find.SizeOption, 'size', '1s1s' ) option = salt.utils.find.SizeOption('size', '+1G') self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT) def test_size_option_match(self): option = salt.utils.find.SizeOption('size', '+1k') self.assertEqual(option.match('', '', [10000] * 7), True) option = salt.utils.find.SizeOption('size', '+1G') self.assertEqual(option.match('', '', [10000] * 7), False) def test_mtime_option_requires(self): self.assertRaises( ValueError, salt.utils.find.MtimeOption, 'mtime', '4g' ) option = salt.utils.find.MtimeOption('mtime', '1d') self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT) def test_mtime_option_match(self): option = salt.utils.find.MtimeOption('mtime', '-1w') self.assertEqual(option.match('', '', [1] * 9), False) option = salt.utils.find.MtimeOption('mtime', '-1s') self.assertEqual(option.match('', '', [10 ** 10] * 9), True) class TestGrepOption(TestCase): def setUp(self): super(TestGrepOption, self).setUp() self.tmpdir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP) def tearDown(self): shutil.rmtree(self.tmpdir) super(TestGrepOption, self).tearDown() def test_grep_option_requires(self): self.assertRaises( ValueError, salt.utils.find.GrepOption, 'grep', '(foo)|(bar}' ) option = salt.utils.find.GrepOption('grep', '(foo)|(bar)') find = salt.utils.find self.assertEqual( option.requires(), (find._REQUIRES_CONTENTS | find._REQUIRES_STAT) ) def test_grep_option_match_regular_file(self): hello_file = os.path.join(self.tmpdir, 'hello.txt') with salt.utils.files.fopen(hello_file, 'w') as fp_: fp_.write(salt.utils.stringutils.to_str('foo')) option = salt.utils.find.GrepOption('grep', 'foo') self.assertEqual( option.match(self.tmpdir, 'hello.txt', os.stat(hello_file)), hello_file ) option = salt.utils.find.GrepOption('grep', 'bar') self.assertEqual( option.match(self.tmpdir, 'hello.txt', os.stat(hello_file)), None ) @skipIf(sys.platform.startswith('win'), 'No /dev/null on Windows') def test_grep_option_match_dev_null(self): option = salt.utils.find.GrepOption('grep', 'foo') self.assertEqual( option.match('dev', 'null', os.stat('/dev/null')), None ) class TestPrintOption(TestCase): def setUp(self): super(TestPrintOption, self).setUp() self.tmpdir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP) def tearDown(self): shutil.rmtree(self.tmpdir) super(TestPrintOption, self).tearDown() def test_print_option_defaults(self): option = salt.utils.find.PrintOption('print', '') self.assertEqual(option.need_stat, False) self.assertEqual(option.print_title, False) self.assertEqual(option.fmt, ['path']) def test_print_option_requires(self): option = salt.utils.find.PrintOption('print', '') self.assertEqual(option.requires(), salt.utils.find._REQUIRES_PATH) option = salt.utils.find.PrintOption('print', 'name') self.assertEqual(option.requires(), salt.utils.find._REQUIRES_PATH) option = salt.utils.find.PrintOption('print', 'path') self.assertEqual(option.requires(), salt.utils.find._REQUIRES_PATH) option = salt.utils.find.PrintOption('print', 'name,path') self.assertEqual(option.requires(), salt.utils.find._REQUIRES_PATH) option = salt.utils.find.PrintOption('print', 'user') self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT) option = salt.utils.find.PrintOption('print', 'path user') self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT) def test_print_option_execute(self): hello_file = os.path.join(self.tmpdir, 'hello.txt') with salt.utils.files.fopen(hello_file, 'w') as fp_: fp_.write(salt.utils.stringutils.to_str('foo')) option = salt.utils.find.PrintOption('print', '') self.assertEqual(option.execute('', [0] * 9), '') option = salt.utils.find.PrintOption('print', 'path') self.assertEqual(option.execute('test_name', [0] * 9), 'test_name') option = salt.utils.find.PrintOption('print', 'name') self.assertEqual(option.execute('test_name', [0] * 9), 'test_name') option = salt.utils.find.PrintOption('print', 'size') self.assertEqual(option.execute(hello_file, os.stat(hello_file)), 3) option = salt.utils.find.PrintOption('print', 'type') self.assertEqual(option.execute(hello_file, os.stat(hello_file)), 'f') option = salt.utils.find.PrintOption('print', 'mode') self.assertEqual(option.execute(hello_file, range(10)), 0) option = salt.utils.find.PrintOption('print', 'mtime') self.assertEqual(option.execute(hello_file, range(10)), 8) option = salt.utils.find.PrintOption('print', 'md5') self.assertEqual( option.execute(hello_file, os.stat(hello_file)), 'acbd18db4cc2f85cedef654fccc4a4d8' ) option = salt.utils.find.PrintOption('print', 'path name') self.assertEqual( option.execute('test_name', [0] * 9), ['test_name', 'test_name'] ) option = salt.utils.find.PrintOption('print', 'size name') self.assertEqual( option.execute('test_name', [0] * 9), [0, 'test_name'] ) @skipIf(sys.platform.startswith('win'), "pwd not available on Windows") def test_print_user(self): option = salt.utils.find.PrintOption('print', 'user') self.assertEqual(option.execute('', [0] * 10), 'root') option = salt.utils.find.PrintOption('print', 'user') self.assertEqual(option.execute('', [2 ** 31] * 10), 2 ** 31) @skipIf(sys.platform.startswith('win'), "grp not available on Windows") def test_print_group(self): option = salt.utils.find.PrintOption('print', 'group') if sys.platform.startswith(('darwin', 'freebsd', 'openbsd')): group_name = 'wheel' else: group_name = 'root' self.assertEqual(option.execute('', [0] * 10), group_name) # This seems to be not working in Ubuntu 12.04 32 bit #option = salt.utils.find.PrintOption('print', 'group') #self.assertEqual(option.execute('', [2 ** 31] * 10), 2 ** 31) @skipIf(sys.platform.startswith('win'), "no /dev/null on windows") def test_print_md5(self): option = salt.utils.find.PrintOption('print', 'md5') self.assertEqual(option.execute('/dev/null', os.stat('/dev/null')), '') class TestFinder(TestCase): def setUp(self): super(TestFinder, self).setUp() self.tmpdir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP) def tearDown(self): shutil.rmtree(self.tmpdir) super(TestFinder, self).tearDown() @skipIf(sys.platform.startswith('win'), 'No /dev/null on Windows') def test_init(self): finder = salt.utils.find.Finder({}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption' ) self.assertEqual(finder.criteria, []) finder = salt.utils.find.Finder({'_': None}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption' ) self.assertEqual(finder.criteria, []) self.assertRaises(ValueError, salt.utils.find.Finder, {'': None}) self.assertRaises(ValueError, salt.utils.find.Finder, {'name': None}) self.assertRaises( ValueError, salt.utils.find.Finder, {'nonexist': 'somevalue'} ) finder = salt.utils.find.Finder({'name': 'test_name'}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption' ) self.assertEqual( six.text_type(finder.criteria[0].__class__)[-12:-2], 'NameOption' ) finder = salt.utils.find.Finder({'iname': 'test_name'}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption' ) self.assertEqual( six.text_type(finder.criteria[0].__class__)[-13:-2], 'InameOption' ) finder = salt.utils.find.Finder({'regex': r'.*\.txt'}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption' ) self.assertEqual( six.text_type(finder.criteria[0].__class__)[-13:-2], 'RegexOption' ) finder = salt.utils.find.Finder({'iregex': r'.*\.txt'}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption' ) self.assertEqual( six.text_type(finder.criteria[0].__class__)[-14:-2], 'IregexOption' ) finder = salt.utils.find.Finder({'type': 'd'}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption' ) self.assertEqual( six.text_type(finder.criteria[0].__class__)[-12:-2], 'TypeOption' ) finder = salt.utils.find.Finder({'owner': 'root'}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption' ) self.assertEqual( six.text_type(finder.criteria[0].__class__)[-13:-2], 'OwnerOption' ) if sys.platform.startswith(('darwin', 'freebsd', 'openbsd')): group_name = 'wheel' else: group_name = 'root' finder = salt.utils.find.Finder({'group': group_name}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption' ) self.assertEqual( six.text_type(finder.criteria[0].__class__)[-13:-2], 'GroupOption' ) finder = salt.utils.find.Finder({'size': '+1G'}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption' ) self.assertEqual( six.text_type(finder.criteria[0].__class__)[-12:-2], 'SizeOption' ) finder = salt.utils.find.Finder({'mtime': '1d'}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption' ) self.assertEqual( six.text_type(finder.criteria[0].__class__)[-13:-2], 'MtimeOption' ) finder = salt.utils.find.Finder({'grep': 'foo'}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption' ) self.assertEqual( six.text_type(finder.criteria[0].__class__)[-12:-2], 'GrepOption' ) finder = salt.utils.find.Finder({'print': 'name'}) self.assertEqual( six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption' ) self.assertEqual(finder.criteria, []) def test_find(self): hello_file = os.path.join(self.tmpdir, 'hello.txt') with salt.utils.files.fopen(hello_file, 'w') as fp_: fp_.write(salt.utils.stringutils.to_str('foo')) finder = salt.utils.find.Finder({}) self.assertEqual(list(finder.find(self.tmpdir)), [self.tmpdir, hello_file]) finder = salt.utils.find.Finder({'mindepth': 1}) self.assertEqual(list(finder.find(self.tmpdir)), [hello_file]) finder = salt.utils.find.Finder({'maxdepth': 0}) self.assertEqual(list(finder.find(self.tmpdir)), [self.tmpdir]) finder = salt.utils.find.Finder({'name': 'hello.txt'}) self.assertEqual(list(finder.find(self.tmpdir)), [hello_file]) finder = salt.utils.find.Finder({'type': 'f', 'print': 'path'}) self.assertEqual(list(finder.find(self.tmpdir)), [hello_file]) finder = salt.utils.find.Finder({'size': '+1G', 'print': 'path'}) self.assertEqual(list(finder.find(self.tmpdir)), []) finder = salt.utils.find.Finder( {'name': 'hello.txt', 'print': 'path name'} ) self.assertEqual( list(finder.find(self.tmpdir)), [[hello_file, 'hello.txt']] ) finder = salt.utils.find.Finder({'name': 'test_name'}) self.assertEqual(list(finder.find('')), [])