|
- # -*- coding: utf-8 -*-
- # Import python libs
- from __future__ import absolute_import, unicode_literals, print_function
- import os
- import sys
- import shutil
- import tempfile
- import stat
- # Import Salt Testing libs
- from tests.support.unit import skipIf, TestCase
- from tests.support.paths import TMP
- # Import salt libs
- import salt.utils.files
- import salt.utils.find
- # Import 3rd-party libs
- from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
- from salt.ext import six
- class TestFind(TestCase):
- def test_parse_interval(self):
- self.assertRaises(ValueError, salt.utils.find._parse_interval, 'w')
- self.assertRaises(ValueError, salt.utils.find._parse_interval, '1')
- self.assertRaises(ValueError, salt.utils.find._parse_interval, '1s1w')
- self.assertRaises(ValueError, salt.utils.find._parse_interval, '1s1s')
- result, resolution, modifier = salt.utils.find._parse_interval('')
- self.assertEqual(result, 0)
- self.assertIs(resolution, None)
- self.assertEqual(modifier, '')
- result, resolution, modifier = salt.utils.find._parse_interval('1s')
- self.assertEqual(result, 1.0)
- self.assertEqual(resolution, 1)
- self.assertEqual(modifier, '')
- result, resolution, modifier = salt.utils.find._parse_interval('1m')
- self.assertEqual(result, 60.0)
- self.assertEqual(resolution, 60)
- self.assertEqual(modifier, '')
- result, resolution, modifier = salt.utils.find._parse_interval('1h')
- self.assertEqual(result, 3600.0)
- self.assertEqual(resolution, 3600)
- self.assertEqual(modifier, '')
- result, resolution, modifier = salt.utils.find._parse_interval('1d')
- self.assertEqual(result, 86400.0)
- self.assertEqual(resolution, 86400)
- self.assertEqual(modifier, '')
- result, resolution, modifier = salt.utils.find._parse_interval('1w')
- self.assertEqual(result, 604800.0)
- self.assertEqual(resolution, 604800)
- self.assertEqual(modifier, '')
- result, resolution, modifier = salt.utils.find._parse_interval('1w3d6h')
- self.assertEqual(result, 885600.0)
- self.assertEqual(resolution, 3600)
- self.assertEqual(modifier, '')
- result, resolution, modifier = salt.utils.find._parse_interval('1m1s')
- self.assertEqual(result, 61.0)
- self.assertEqual(resolution, 1)
- self.assertEqual(modifier, '')
- result, resolution, modifier = salt.utils.find._parse_interval('1m2s')
- self.assertEqual(result, 62.0)
- self.assertEqual(resolution, 1)
- self.assertEqual(modifier, '')
- result, resolution, modifier = salt.utils.find._parse_interval('+1d')
- self.assertEqual(result, 86400.0)
- self.assertEqual(resolution, 86400)
- self.assertEqual(modifier, '+')
- result, resolution, modifier = salt.utils.find._parse_interval('-1d')
- self.assertEqual(result, 86400.0)
- self.assertEqual(resolution, 86400)
- self.assertEqual(modifier, '-')
- def test_parse_size(self):
- self.assertRaises(ValueError, salt.utils.find._parse_size, '')
- self.assertRaises(ValueError, salt.utils.find._parse_size, '1s1s')
- min_size, max_size = salt.utils.find._parse_size('1')
- self.assertEqual(min_size, 1)
- self.assertEqual(max_size, 1)
- min_size, max_size = salt.utils.find._parse_size('1b')
- self.assertEqual(min_size, 1)
- self.assertEqual(max_size, 1)
- min_size, max_size = salt.utils.find._parse_size('1k')
- self.assertEqual(min_size, 1024)
- self.assertEqual(max_size, 2047)
- min_size, max_size = salt.utils.find._parse_size('1m')
- self.assertEqual(min_size, 1048576)
- self.assertEqual(max_size, 2097151)
- min_size, max_size = salt.utils.find._parse_size('1g')
- self.assertEqual(min_size, 1073741824)
- self.assertEqual(max_size, 2147483647)
- min_size, max_size = salt.utils.find._parse_size('1t')
- self.assertEqual(min_size, 1099511627776)
- self.assertEqual(max_size, 2199023255551)
- min_size, max_size = salt.utils.find._parse_size('0m')
- self.assertEqual(min_size, 0)
- self.assertEqual(max_size, 1048575)
- min_size, max_size = salt.utils.find._parse_size('-1m')
- self.assertEqual(min_size, 0)
- self.assertEqual(max_size, 1048576)
- min_size, max_size = salt.utils.find._parse_size('+1m')
- self.assertEqual(min_size, 1048576)
- self.assertEqual(max_size, sys.maxsize)
- min_size, max_size = salt.utils.find._parse_size('+1M')
- self.assertEqual(min_size, 1048576)
- self.assertEqual(max_size, sys.maxsize)
- def test_option_requires(self):
- option = salt.utils.find.Option()
- self.assertEqual(option.requires(), salt.utils.find._REQUIRES_PATH)
- def test_name_option_match(self):
- option = salt.utils.find.NameOption('name', '*.txt')
- self.assertIs(option.match('', '', ''), None)
- self.assertIs(option.match('', 'hello.txt', '').group(), 'hello.txt')
- self.assertIs(option.match('', 'HELLO.TXT', ''), None)
- def test_iname_option_match(self):
- option = salt.utils.find.InameOption('name', '*.txt')
- self.assertIs(option.match('', '', ''), None)
- self.assertIs(option.match('', 'hello.txt', '').group(), 'hello.txt')
- self.assertIs(option.match('', 'HELLO.TXT', '').group(), 'HELLO.TXT')
- def test_regex_option_match(self):
- self.assertRaises(
- ValueError, salt.utils.find.RegexOption, 'name', '(.*}'
- )
- option = salt.utils.find.RegexOption('name', r'.*\.txt')
- self.assertIs(option.match('', '', ''), None)
- self.assertIs(option.match('', 'hello.txt', '').group(), 'hello.txt')
- self.assertIs(option.match('', 'HELLO.TXT', ''), None)
- def test_iregex_option_match(self):
- self.assertRaises(
- ValueError, salt.utils.find.IregexOption, 'name', '(.*}'
- )
- option = salt.utils.find.IregexOption('name', r'.*\.txt')
- self.assertIs(option.match('', '', ''), None)
- self.assertIs(option.match('', 'hello.txt', '').group(), 'hello.txt')
- self.assertIs(option.match('', 'HELLO.TXT', '').group(), 'HELLO.TXT')
- def test_type_option_requires(self):
- self.assertRaises(ValueError, salt.utils.find.TypeOption, 'type', 'w')
- option = salt.utils.find.TypeOption('type', 'd')
- self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT)
- def test_type_option_match(self):
- option = salt.utils.find.TypeOption('type', 'b')
- self.assertEqual(option.match('', '', [stat.S_IFREG]), False)
- option = salt.utils.find.TypeOption('type', 'c')
- self.assertEqual(option.match('', '', [stat.S_IFREG]), False)
- option = salt.utils.find.TypeOption('type', 'd')
- self.assertEqual(option.match('', '', [stat.S_IFREG]), False)
- option = salt.utils.find.TypeOption('type', 'f')
- self.assertEqual(option.match('', '', [stat.S_IFREG]), True)
- option = salt.utils.find.TypeOption('type', 'l')
- self.assertEqual(option.match('', '', [stat.S_IFREG]), False)
- option = salt.utils.find.TypeOption('type', 'p')
- self.assertEqual(option.match('', '', [stat.S_IFREG]), False)
- option = salt.utils.find.TypeOption('type', 's')
- self.assertEqual(option.match('', '', [stat.S_IFREG]), False)
- option = salt.utils.find.TypeOption('type', 'b')
- self.assertEqual(option.match('', '', [stat.S_IFBLK]), True)
- option = salt.utils.find.TypeOption('type', 'c')
- self.assertEqual(option.match('', '', [stat.S_IFCHR]), True)
- option = salt.utils.find.TypeOption('type', 'd')
- self.assertEqual(option.match('', '', [stat.S_IFDIR]), True)
- option = salt.utils.find.TypeOption('type', 'l')
- self.assertEqual(option.match('', '', [stat.S_IFLNK]), True)
- option = salt.utils.find.TypeOption('type', 'p')
- self.assertEqual(option.match('', '', [stat.S_IFIFO]), True)
- option = salt.utils.find.TypeOption('type', 's')
- self.assertEqual(option.match('', '', [stat.S_IFSOCK]), True)
- @skipIf(sys.platform.startswith('win'), 'pwd not available on Windows')
- def test_owner_option_requires(self):
- self.assertRaises(
- ValueError, salt.utils.find.OwnerOption, 'owner', 'notexist'
- )
- option = salt.utils.find.OwnerOption('owner', 'root')
- self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT)
- @skipIf(sys.platform.startswith('win'), 'pwd not available on Windows')
- def test_owner_option_match(self):
- option = salt.utils.find.OwnerOption('owner', 'root')
- self.assertEqual(option.match('', '', [0] * 5), True)
- option = salt.utils.find.OwnerOption('owner', '500')
- self.assertEqual(option.match('', '', [500] * 5), True)
- @skipIf(sys.platform.startswith('win'), 'grp not available on Windows')
- def test_group_option_requires(self):
- self.assertRaises(
- ValueError, salt.utils.find.GroupOption, 'group', 'notexist'
- )
- if sys.platform.startswith(('darwin', 'freebsd', 'openbsd')):
- group_name = 'wheel'
- else:
- group_name = 'root'
- option = salt.utils.find.GroupOption('group', group_name)
- self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT)
- @skipIf(sys.platform.startswith('win'), 'grp not available on Windows')
- def test_group_option_match(self):
- if sys.platform.startswith(('darwin', 'freebsd', 'openbsd')):
- group_name = 'wheel'
- else:
- group_name = 'root'
- option = salt.utils.find.GroupOption('group', group_name)
- self.assertEqual(option.match('', '', [0] * 6), True)
- option = salt.utils.find.GroupOption('group', '500')
- self.assertEqual(option.match('', '', [500] * 6), True)
- def test_size_option_requires(self):
- self.assertRaises(
- ValueError, salt.utils.find.SizeOption, 'size', '1s1s'
- )
- option = salt.utils.find.SizeOption('size', '+1G')
- self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT)
- def test_size_option_match(self):
- option = salt.utils.find.SizeOption('size', '+1k')
- self.assertEqual(option.match('', '', [10000] * 7), True)
- option = salt.utils.find.SizeOption('size', '+1G')
- self.assertEqual(option.match('', '', [10000] * 7), False)
- def test_mtime_option_requires(self):
- self.assertRaises(
- ValueError, salt.utils.find.MtimeOption, 'mtime', '4g'
- )
- option = salt.utils.find.MtimeOption('mtime', '1d')
- self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT)
- def test_mtime_option_match(self):
- option = salt.utils.find.MtimeOption('mtime', '-1w')
- self.assertEqual(option.match('', '', [1] * 9), False)
- option = salt.utils.find.MtimeOption('mtime', '-1s')
- self.assertEqual(option.match('', '', [10 ** 10] * 9), True)
- class TestGrepOption(TestCase):
- def setUp(self):
- super(TestGrepOption, self).setUp()
- self.tmpdir = tempfile.mkdtemp(dir=TMP)
- def tearDown(self):
- shutil.rmtree(self.tmpdir)
- super(TestGrepOption, self).tearDown()
- def test_grep_option_requires(self):
- self.assertRaises(
- ValueError, salt.utils.find.GrepOption, 'grep', '(foo)|(bar}'
- )
- option = salt.utils.find.GrepOption('grep', '(foo)|(bar)')
- find = salt.utils.find
- self.assertEqual(
- option.requires(), (find._REQUIRES_CONTENTS | find._REQUIRES_STAT)
- )
- def test_grep_option_match_regular_file(self):
- hello_file = os.path.join(self.tmpdir, 'hello.txt')
- with salt.utils.files.fopen(hello_file, 'w') as fp_:
- fp_.write(salt.utils.stringutils.to_str('foo'))
- option = salt.utils.find.GrepOption('grep', 'foo')
- self.assertEqual(
- option.match(self.tmpdir, 'hello.txt', os.stat(hello_file)),
- hello_file
- )
- option = salt.utils.find.GrepOption('grep', 'bar')
- self.assertEqual(
- option.match(self.tmpdir, 'hello.txt', os.stat(hello_file)),
- None
- )
- @skipIf(sys.platform.startswith('win'), 'No /dev/null on Windows')
- def test_grep_option_match_dev_null(self):
- option = salt.utils.find.GrepOption('grep', 'foo')
- self.assertEqual(
- option.match('dev', 'null', os.stat('/dev/null')), None
- )
- class TestPrintOption(TestCase):
- def setUp(self):
- super(TestPrintOption, self).setUp()
- self.tmpdir = tempfile.mkdtemp(dir=TMP)
- def tearDown(self):
- shutil.rmtree(self.tmpdir)
- super(TestPrintOption, self).tearDown()
- def test_print_option_defaults(self):
- option = salt.utils.find.PrintOption('print', '')
- self.assertEqual(option.need_stat, False)
- self.assertEqual(option.print_title, False)
- self.assertEqual(option.fmt, ['path'])
- def test_print_option_requires(self):
- option = salt.utils.find.PrintOption('print', '')
- self.assertEqual(option.requires(), salt.utils.find._REQUIRES_PATH)
- option = salt.utils.find.PrintOption('print', 'name')
- self.assertEqual(option.requires(), salt.utils.find._REQUIRES_PATH)
- option = salt.utils.find.PrintOption('print', 'path')
- self.assertEqual(option.requires(), salt.utils.find._REQUIRES_PATH)
- option = salt.utils.find.PrintOption('print', 'name,path')
- self.assertEqual(option.requires(), salt.utils.find._REQUIRES_PATH)
- option = salt.utils.find.PrintOption('print', 'user')
- self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT)
- option = salt.utils.find.PrintOption('print', 'path user')
- self.assertEqual(option.requires(), salt.utils.find._REQUIRES_STAT)
- def test_print_option_execute(self):
- hello_file = os.path.join(self.tmpdir, 'hello.txt')
- with salt.utils.files.fopen(hello_file, 'w') as fp_:
- fp_.write(salt.utils.stringutils.to_str('foo'))
- option = salt.utils.find.PrintOption('print', '')
- self.assertEqual(option.execute('', [0] * 9), '')
- option = salt.utils.find.PrintOption('print', 'path')
- self.assertEqual(option.execute('test_name', [0] * 9), 'test_name')
- option = salt.utils.find.PrintOption('print', 'name')
- self.assertEqual(option.execute('test_name', [0] * 9), 'test_name')
- option = salt.utils.find.PrintOption('print', 'size')
- self.assertEqual(option.execute(hello_file, os.stat(hello_file)), 3)
- option = salt.utils.find.PrintOption('print', 'type')
- self.assertEqual(option.execute(hello_file, os.stat(hello_file)), 'f')
- option = salt.utils.find.PrintOption('print', 'mode')
- self.assertEqual(option.execute(hello_file, range(10)), 0)
- option = salt.utils.find.PrintOption('print', 'mtime')
- self.assertEqual(option.execute(hello_file, range(10)), 8)
- option = salt.utils.find.PrintOption('print', 'md5')
- self.assertEqual(
- option.execute(hello_file, os.stat(hello_file)),
- 'acbd18db4cc2f85cedef654fccc4a4d8'
- )
- option = salt.utils.find.PrintOption('print', 'path name')
- self.assertEqual(
- option.execute('test_name', [0] * 9), ['test_name', 'test_name']
- )
- option = salt.utils.find.PrintOption('print', 'size name')
- self.assertEqual(
- option.execute('test_name', [0] * 9), [0, 'test_name']
- )
- @skipIf(sys.platform.startswith('win'), "pwd not available on Windows")
- def test_print_user(self):
- option = salt.utils.find.PrintOption('print', 'user')
- self.assertEqual(option.execute('', [0] * 10), 'root')
- option = salt.utils.find.PrintOption('print', 'user')
- self.assertEqual(option.execute('', [2 ** 31] * 10), 2 ** 31)
- @skipIf(sys.platform.startswith('win'), "grp not available on Windows")
- def test_print_group(self):
- option = salt.utils.find.PrintOption('print', 'group')
- if sys.platform.startswith(('darwin', 'freebsd', 'openbsd')):
- group_name = 'wheel'
- else:
- group_name = 'root'
- self.assertEqual(option.execute('', [0] * 10), group_name)
- # This seems to be not working in Ubuntu 12.04 32 bit
- #option = salt.utils.find.PrintOption('print', 'group')
- #self.assertEqual(option.execute('', [2 ** 31] * 10), 2 ** 31)
- @skipIf(sys.platform.startswith('win'), "no /dev/null on windows")
- def test_print_md5(self):
- option = salt.utils.find.PrintOption('print', 'md5')
- self.assertEqual(option.execute('/dev/null', os.stat('/dev/null')), '')
- class TestFinder(TestCase):
- def setUp(self):
- super(TestFinder, self).setUp()
- self.tmpdir = tempfile.mkdtemp(dir=TMP)
- def tearDown(self):
- shutil.rmtree(self.tmpdir)
- super(TestFinder, self).tearDown()
- @skipIf(sys.platform.startswith('win'), 'No /dev/null on Windows')
- def test_init(self):
- finder = salt.utils.find.Finder({})
- self.assertEqual(
- six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption'
- )
- self.assertEqual(finder.criteria, [])
- finder = salt.utils.find.Finder({'_': None})
- self.assertEqual(
- six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption'
- )
- self.assertEqual(finder.criteria, [])
- self.assertRaises(ValueError, salt.utils.find.Finder, {'': None})
- self.assertRaises(ValueError, salt.utils.find.Finder, {'name': None})
- self.assertRaises(
- ValueError, salt.utils.find.Finder, {'nonexist': 'somevalue'}
- )
- finder = salt.utils.find.Finder({'name': 'test_name'})
- self.assertEqual(
- six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption'
- )
- self.assertEqual(
- six.text_type(finder.criteria[0].__class__)[-12:-2], 'NameOption'
- )
- finder = salt.utils.find.Finder({'iname': 'test_name'})
- self.assertEqual(
- six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption'
- )
- self.assertEqual(
- six.text_type(finder.criteria[0].__class__)[-13:-2], 'InameOption'
- )
- finder = salt.utils.find.Finder({'regex': r'.*\.txt'})
- self.assertEqual(
- six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption'
- )
- self.assertEqual(
- six.text_type(finder.criteria[0].__class__)[-13:-2], 'RegexOption'
- )
- finder = salt.utils.find.Finder({'iregex': r'.*\.txt'})
- self.assertEqual(
- six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption'
- )
- self.assertEqual(
- six.text_type(finder.criteria[0].__class__)[-14:-2], 'IregexOption'
- )
- finder = salt.utils.find.Finder({'type': 'd'})
- self.assertEqual(
- six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption'
- )
- self.assertEqual(
- six.text_type(finder.criteria[0].__class__)[-12:-2], 'TypeOption'
- )
- finder = salt.utils.find.Finder({'owner': 'root'})
- self.assertEqual(
- six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption'
- )
- self.assertEqual(
- six.text_type(finder.criteria[0].__class__)[-13:-2], 'OwnerOption'
- )
- if sys.platform.startswith(('darwin', 'freebsd', 'openbsd')):
- group_name = 'wheel'
- else:
- group_name = 'root'
- finder = salt.utils.find.Finder({'group': group_name})
- self.assertEqual(
- six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption'
- )
- self.assertEqual(
- six.text_type(finder.criteria[0].__class__)[-13:-2], 'GroupOption'
- )
- finder = salt.utils.find.Finder({'size': '+1G'})
- self.assertEqual(
- six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption'
- )
- self.assertEqual(
- six.text_type(finder.criteria[0].__class__)[-12:-2], 'SizeOption'
- )
- finder = salt.utils.find.Finder({'mtime': '1d'})
- self.assertEqual(
- six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption'
- )
- self.assertEqual(
- six.text_type(finder.criteria[0].__class__)[-13:-2], 'MtimeOption'
- )
- finder = salt.utils.find.Finder({'grep': 'foo'})
- self.assertEqual(
- six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption'
- )
- self.assertEqual(
- six.text_type(finder.criteria[0].__class__)[-12:-2], 'GrepOption'
- )
- finder = salt.utils.find.Finder({'print': 'name'})
- self.assertEqual(
- six.text_type(finder.actions[0].__class__)[-13:-2], 'PrintOption'
- )
- self.assertEqual(finder.criteria, [])
- def test_find(self):
- hello_file = os.path.join(self.tmpdir, 'hello.txt')
- with salt.utils.files.fopen(hello_file, 'w') as fp_:
- fp_.write(salt.utils.stringutils.to_str('foo'))
- finder = salt.utils.find.Finder({})
- self.assertEqual(list(finder.find(self.tmpdir)), [self.tmpdir, hello_file])
- finder = salt.utils.find.Finder({'mindepth': 1})
- self.assertEqual(list(finder.find(self.tmpdir)), [hello_file])
- finder = salt.utils.find.Finder({'maxdepth': 0})
- self.assertEqual(list(finder.find(self.tmpdir)), [self.tmpdir])
- finder = salt.utils.find.Finder({'name': 'hello.txt'})
- self.assertEqual(list(finder.find(self.tmpdir)), [hello_file])
- finder = salt.utils.find.Finder({'type': 'f', 'print': 'path'})
- self.assertEqual(list(finder.find(self.tmpdir)), [hello_file])
- finder = salt.utils.find.Finder({'size': '+1G', 'print': 'path'})
- self.assertEqual(list(finder.find(self.tmpdir)), [])
- finder = salt.utils.find.Finder(
- {'name': 'hello.txt', 'print': 'path name'}
- )
- self.assertEqual(
- list(finder.find(self.tmpdir)), [[hello_file, 'hello.txt']]
- )
- finder = salt.utils.find.Finder({'name': 'test_name'})
- self.assertEqual(list(finder.find('')), [])
|