1
0

test_inotify.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. # coding: utf-8
  2. # Python libs
  3. from __future__ import absolute_import
  4. import os
  5. import shutil
  6. import tempfile
  7. # Salt libs
  8. import salt.utils.files
  9. from salt.beacons import inotify
  10. # Salt testing libs
  11. from tests.support.unit import skipIf, TestCase
  12. from tests.support.mixins import LoaderModuleMockMixin
  13. # Third-party libs
  14. try:
  15. import pyinotify # pylint: disable=unused-import
  16. HAS_PYINOTIFY = True
  17. except ImportError:
  18. HAS_PYINOTIFY = False
  19. import logging
  20. log = logging.getLogger(__name__)
  21. @skipIf(not HAS_PYINOTIFY, 'pyinotify is not available')
  22. class INotifyBeaconTestCase(TestCase, LoaderModuleMockMixin):
  23. '''
  24. Test case for salt.beacons.inotify
  25. '''
  26. def setup_loader_modules(self):
  27. return {inotify: {}}
  28. def setUp(self):
  29. self.tmpdir = tempfile.mkdtemp()
  30. def tearDown(self):
  31. shutil.rmtree(self.tmpdir, ignore_errors=True)
  32. def test_empty_config(self):
  33. config = [{}]
  34. ret = inotify.beacon(config)
  35. self.assertEqual(ret, [])
  36. def test_file_open(self):
  37. path = os.path.realpath(__file__)
  38. config = [{'files': {path: {'mask': ['open']}}}]
  39. ret = inotify.validate(config)
  40. self.assertEqual(ret, (True, 'Valid beacon configuration'))
  41. ret = inotify.beacon(config)
  42. self.assertEqual(ret, [])
  43. with salt.utils.files.fopen(path, 'r') as f:
  44. pass
  45. ret = inotify.beacon(config)
  46. self.assertEqual(len(ret), 1)
  47. self.assertEqual(ret[0]['path'], path)
  48. self.assertEqual(ret[0]['change'], 'IN_OPEN')
  49. def test_dir_no_auto_add(self):
  50. config = [{'files': {self.tmpdir: {'mask': ['create']}}}]
  51. ret = inotify.validate(config)
  52. self.assertEqual(ret, (True, 'Valid beacon configuration'))
  53. ret = inotify.beacon(config)
  54. self.assertEqual(ret, [])
  55. fp = os.path.join(self.tmpdir, 'tmpfile')
  56. with salt.utils.files.fopen(fp, 'w') as f:
  57. pass
  58. ret = inotify.beacon(config)
  59. self.assertEqual(len(ret), 1)
  60. self.assertEqual(ret[0]['path'], fp)
  61. self.assertEqual(ret[0]['change'], 'IN_CREATE')
  62. with salt.utils.files.fopen(fp, 'r') as f:
  63. pass
  64. ret = inotify.beacon(config)
  65. self.assertEqual(ret, [])
  66. def test_dir_auto_add(self):
  67. config = [{'files': {self.tmpdir: {'mask': ['create', 'open'], 'auto_add': True}}}]
  68. ret = inotify.validate(config)
  69. self.assertEqual(ret, (True, 'Valid beacon configuration'))
  70. ret = inotify.beacon(config)
  71. self.assertEqual(ret, [])
  72. fp = os.path.join(self.tmpdir, 'tmpfile')
  73. with salt.utils.files.fopen(fp, 'w') as f:
  74. pass
  75. ret = inotify.beacon(config)
  76. self.assertEqual(len(ret), 2)
  77. self.assertEqual(ret[0]['path'], fp)
  78. self.assertEqual(ret[0]['change'], 'IN_CREATE')
  79. self.assertEqual(ret[1]['path'], fp)
  80. self.assertEqual(ret[1]['change'], 'IN_OPEN')
  81. with salt.utils.files.fopen(fp, 'r') as f:
  82. pass
  83. ret = inotify.beacon(config)
  84. self.assertEqual(len(ret), 1)
  85. self.assertEqual(ret[0]['path'], fp)
  86. self.assertEqual(ret[0]['change'], 'IN_OPEN')
  87. def test_dir_recurse(self):
  88. dp1 = os.path.join(self.tmpdir, 'subdir1')
  89. os.mkdir(dp1)
  90. dp2 = os.path.join(dp1, 'subdir2')
  91. os.mkdir(dp2)
  92. fp = os.path.join(dp2, 'tmpfile')
  93. with salt.utils.files.fopen(fp, 'w') as f:
  94. pass
  95. config = [{'files': {self.tmpdir: {'mask': ['open'], 'recurse': True}}}]
  96. ret = inotify.validate(config)
  97. self.assertEqual(ret, (True, 'Valid beacon configuration'))
  98. ret = inotify.beacon(config)
  99. self.assertEqual(ret, [])
  100. with salt.utils.files.fopen(fp) as f:
  101. pass
  102. ret = inotify.beacon(config)
  103. self.assertEqual(len(ret), 3)
  104. self.assertEqual(ret[0]['path'], dp1)
  105. self.assertEqual(ret[0]['change'], 'IN_OPEN|IN_ISDIR')
  106. self.assertEqual(ret[1]['path'], dp2)
  107. self.assertEqual(ret[1]['change'], 'IN_OPEN|IN_ISDIR')
  108. self.assertEqual(ret[2]['path'], fp)
  109. self.assertEqual(ret[2]['change'], 'IN_OPEN')
  110. def test_dir_recurse_auto_add(self):
  111. dp1 = os.path.join(self.tmpdir, 'subdir1')
  112. os.mkdir(dp1)
  113. config = [{'files': {self.tmpdir: {'mask': ['create', 'delete'],
  114. 'recurse': True,
  115. 'auto_add': True}}}]
  116. ret = inotify.validate(config)
  117. self.assertEqual(ret, (True, 'Valid beacon configuration'))
  118. ret = inotify.beacon(config)
  119. self.assertEqual(ret, [])
  120. dp2 = os.path.join(dp1, 'subdir2')
  121. os.mkdir(dp2)
  122. ret = inotify.beacon(config)
  123. self.assertEqual(len(ret), 1)
  124. self.assertEqual(ret[0]['path'], dp2)
  125. self.assertEqual(ret[0]['change'], 'IN_CREATE|IN_ISDIR')
  126. fp = os.path.join(dp2, 'tmpfile')
  127. with salt.utils.files.fopen(fp, 'w') as f:
  128. pass
  129. ret = inotify.beacon(config)
  130. self.assertEqual(len(ret), 1)
  131. self.assertEqual(ret[0]['path'], fp)
  132. self.assertEqual(ret[0]['change'], 'IN_CREATE')
  133. os.remove(fp)
  134. ret = inotify.beacon(config)
  135. self.assertEqual(len(ret), 1)
  136. self.assertEqual(ret[0]['path'], fp)
  137. self.assertEqual(ret[0]['change'], 'IN_DELETE')
  138. def test_multi_files_exclude(self):
  139. dp1 = os.path.join(self.tmpdir, 'subdir1')
  140. dp2 = os.path.join(self.tmpdir, 'subdir2')
  141. os.mkdir(dp1)
  142. os.mkdir(dp2)
  143. _exclude1 = '{0}/subdir1/*tmpfile*$'.format(self.tmpdir)
  144. _exclude2 = '{0}/subdir2/*filetmp*$'.format(self.tmpdir)
  145. config = [{'files': {dp1: {'mask': ['create', 'delete'],
  146. 'recurse': True,
  147. 'exclude': [{_exclude1: {'regex': True}}],
  148. 'auto_add': True}}},
  149. {'files': {dp2: {'mask': ['create', 'delete'],
  150. 'recurse': True,
  151. 'exclude': [{_exclude2: {'regex': True}}],
  152. 'auto_add': True}}}]
  153. ret = inotify.validate(config)
  154. self.assertEqual(ret, (True, 'Valid beacon configuration'))
  155. fp = os.path.join(dp1, 'tmpfile')
  156. with salt.utils.files.fopen(fp, 'w') as f:
  157. pass
  158. ret = inotify.beacon(config)
  159. self.assertEqual(len(ret), 0)
  160. os.remove(fp)
  161. ret = inotify.beacon(config)
  162. self.assertEqual(len(ret), 0)
  163. fp = os.path.join(dp2, 'tmpfile')
  164. with salt.utils.files.fopen(fp, 'w') as f:
  165. pass
  166. ret = inotify.beacon(config)
  167. self.assertEqual(len(ret), 1)
  168. self.assertEqual(ret[0]['path'], fp)
  169. self.assertEqual(ret[0]['change'], 'IN_CREATE')
  170. os.remove(fp)
  171. ret = inotify.beacon(config)
  172. self.assertEqual(len(ret), 1)
  173. self.assertEqual(ret[0]['path'], fp)
  174. self.assertEqual(ret[0]['change'], 'IN_DELETE')