test_ipset.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. # -*- coding: utf-8 -*-
  2. '''
  3. :codeauthor: Jayesh Kariya <jayeshk@saltstack.com>
  4. '''
  5. # Import Python libs
  6. from __future__ import absolute_import, print_function, unicode_literals
  7. # Import Salt Testing Libs
  8. from tests.support.mixins import LoaderModuleMockMixin
  9. from tests.support.unit import TestCase
  10. from tests.support.mock import (
  11. MagicMock,
  12. call,
  13. patch)
  14. # Import Salt Libs
  15. import salt.states.ipset as ipset
  16. class IpsetSetPresentTestCase(TestCase, LoaderModuleMockMixin):
  17. '''
  18. Test cases for salt.states.ipset.present
  19. '''
  20. fake_name = 'fake_ipset'
  21. fake_set_type = {'bitmap': '192.168.0.3'}
  22. def setup_loader_modules(self):
  23. return {ipset: {}}
  24. def _runner(self, expected_ret, test=False, check_set=False, new_set=None,
  25. new_set_assertion=True):
  26. mock_check_set = MagicMock(return_value=check_set)
  27. mock_new_set = MagicMock() if new_set is None else MagicMock(return_value=new_set)
  28. with patch.dict(ipset.__salt__, {'ipset.check_set': mock_check_set,
  29. 'ipset.new_set': mock_new_set}):
  30. with patch.dict(ipset.__opts__, {'test': test}):
  31. actual_ret = ipset.set_present(self.fake_name, self.fake_set_type)
  32. mock_check_set.assert_called_once_with(self.fake_name)
  33. if new_set_assertion:
  34. mock_new_set.assert_called_once_with(self.fake_name, self.fake_set_type, 'ipv4')
  35. else:
  36. self.assertTrue(mock_new_set.call_count == 0)
  37. self.assertDictEqual(actual_ret, expected_ret)
  38. def test_already_exists(self):
  39. '''
  40. Test to verify the chain exists when it already exists.
  41. '''
  42. ret = {'name': self.fake_name,
  43. 'result': True,
  44. 'comment': 'ipset set {0} already exists for ipv4'.format(self.fake_name),
  45. 'changes': {}}
  46. self._runner(ret, check_set=True, new_set_assertion=False)
  47. def test_needs_update_test_mode(self):
  48. '''
  49. Test to verify that detects need for update but doesn't apply when in test mode.
  50. '''
  51. ret = {'name': self.fake_name,
  52. 'result': None,
  53. 'comment': 'ipset set {0} would be added for ipv4'.format(self.fake_name),
  54. 'changes': {}}
  55. self._runner(ret, test=True, new_set_assertion=False)
  56. def test_creates_set(self):
  57. ret = {'name': self.fake_name,
  58. 'result': True,
  59. 'comment': 'ipset set {0} created successfully for ipv4'.format(self.fake_name),
  60. 'changes': {'locale': self.fake_name}}
  61. self._runner(ret, new_set=True)
  62. def test_create_fails(self):
  63. ret = {'name': self.fake_name,
  64. 'result': False,
  65. 'comment': 'Failed to create set {0} for ipv4: '.format(self.fake_name),
  66. 'changes': {}}
  67. self._runner(ret, new_set='')
  68. class IpsetSetAbsentTestCase(TestCase, LoaderModuleMockMixin):
  69. '''
  70. Test cases for salt.states.ipset.present
  71. '''
  72. fake_name = 'fake_ipset'
  73. fake_set_type = {'bitmap': '192.168.0.3'}
  74. def setup_loader_modules(self):
  75. return {ipset: {}}
  76. def _runner(self, expected_ret, test=False, check_set=True, delete_set='',
  77. flush_assertion=False, delete_set_assertion=False):
  78. mock_check_set = MagicMock(return_value=check_set)
  79. mock_flush = MagicMock()
  80. mock_delete_set = MagicMock() if delete_set is None else MagicMock(return_value=delete_set)
  81. with patch.dict(ipset.__opts__, {'test': test}):
  82. with patch.dict(ipset.__salt__, {'ipset.check_set': mock_check_set,
  83. 'ipset.flush': mock_flush,
  84. 'ipset.delete_set': mock_delete_set}):
  85. actual_ret = ipset.set_absent(self.fake_name)
  86. mock_check_set.assert_called_once_with(self.fake_name, 'ipv4')
  87. if flush_assertion:
  88. mock_flush.assert_called_once_with(self.fake_name, 'ipv4')
  89. else:
  90. self.assertTrue(mock_flush.call_count == 0)
  91. if delete_set_assertion:
  92. mock_delete_set.assert_called_once_with(self.fake_name, 'ipv4')
  93. else:
  94. self.assertTrue(mock_delete_set.call_count == 0)
  95. self.assertDictEqual(actual_ret, expected_ret)
  96. def test_already_absent(self):
  97. ret = {'name': self.fake_name,
  98. 'result': True,
  99. 'comment': 'ipset set {0} for ipv4 is already absent'.format(self.fake_name),
  100. 'changes': {}}
  101. self._runner(ret, check_set=False, delete_set=None)
  102. def test_remove_test_mode(self):
  103. ret = {'name': self.fake_name,
  104. 'result': None,
  105. 'comment': 'ipset set {0} for ipv4 would be removed'.format(self.fake_name),
  106. 'changes': {}}
  107. self._runner(ret, test=True, delete_set=None)
  108. def test_remove_fails(self):
  109. ret = {'name': self.fake_name,
  110. 'result': False,
  111. 'comment': 'Failed to delete set {0} for ipv4: '.format(self.fake_name),
  112. 'changes': {}}
  113. self._runner(ret, flush_assertion=True, delete_set_assertion=True)
  114. def test_remove_success(self):
  115. ret = {'name': self.fake_name,
  116. 'result': True,
  117. 'comment': 'ipset set {0} deleted successfully for family ipv4'.format(self.fake_name),
  118. 'changes': {'locale': 'fake_ipset'}}
  119. self._runner(ret, delete_set=True, flush_assertion=True, delete_set_assertion=True)
  120. class IpsetPresentTestCase(TestCase, LoaderModuleMockMixin):
  121. '''
  122. Test cases for salt.states.ipset.present
  123. '''
  124. fake_name = 'fake_ipset'
  125. fake_entries = ['192.168.0.3', '192.168.1.3']
  126. def setup_loader_modules(self):
  127. return {ipset: {}}
  128. def _runner(self, expected_ret, test=False, check=False, add=False,
  129. add_assertion=False):
  130. mock_check = MagicMock(return_value=check)
  131. mock_add = MagicMock(return_value=add)
  132. with patch.dict(ipset.__opts__, {'test': test}):
  133. with patch.dict(ipset.__salt__, {'ipset.check': mock_check,
  134. 'ipset.add': mock_add}):
  135. actual_ret = ipset.present(self.fake_name, self.fake_entries, set_name=self.fake_name)
  136. mock_check.assert_has_calls([call(self.fake_name, e, 'ipv4') for e in self.fake_entries], any_order=True)
  137. if add_assertion:
  138. expected_calls = [call(self.fake_name, e, 'ipv4', set_name=self.fake_name) for e in self.fake_entries]
  139. if add is not True:
  140. # if the add fails, then it will only get called once.
  141. expected_calls = expected_calls[:1]
  142. mock_add.assert_has_calls(expected_calls, any_order=True)
  143. else:
  144. self.assertTrue(mock_add.call_count == 0)
  145. self.assertDictEqual(actual_ret, expected_ret)
  146. def test_entries_already_present(self):
  147. ret = {'name': self.fake_name,
  148. 'result': True,
  149. 'comment': 'entry for 192.168.0.3 already in set {0} for ipv4\n'
  150. 'entry for 192.168.1.3 already in set {0} for ipv4\n'
  151. ''.format(self.fake_name),
  152. 'changes': {}}
  153. self._runner(ret, check=True)
  154. def test_in_test_mode(self):
  155. ret = {'name': self.fake_name,
  156. 'result': None,
  157. 'comment': 'entry 192.168.0.3 would be added to set {0} for family ipv4\n'
  158. 'entry 192.168.1.3 would be added to set {0} for family ipv4\n'
  159. ''.format(self.fake_name),
  160. 'changes': {}}
  161. self._runner(ret, test=True)
  162. def test_add_fails(self):
  163. ret = {'name': self.fake_name,
  164. 'result': False,
  165. 'comment': 'Failed to add to entry 192.168.1.3 to set {0} for family ipv4.\n'
  166. 'Error'.format(self.fake_name),
  167. 'changes': {}}
  168. self._runner(ret, add='Error', add_assertion=True)
  169. def test_success(self):
  170. ret = {'name': self.fake_name,
  171. 'result': True,
  172. 'comment': 'entry 192.168.0.3 added to set {0} for family ipv4\n'
  173. 'entry 192.168.1.3 added to set {0} for family ipv4\n'
  174. ''.format(self.fake_name),
  175. 'changes': {'locale': 'fake_ipset'}}
  176. self._runner(ret, add='worked', add_assertion=True)
  177. def test_missing_entry(self):
  178. ret = {'name': self.fake_name,
  179. 'result': False,
  180. 'comment': 'ipset entry must be specified',
  181. 'changes': {}}
  182. mock = MagicMock(return_value=True)
  183. with patch.dict(ipset.__salt__, {'ipset.check': mock}):
  184. self.assertDictEqual(ipset.present(self.fake_name), ret)
  185. class IpsetAbsentTestCase(TestCase, LoaderModuleMockMixin):
  186. '''
  187. Test cases for salt.states.ipset.present
  188. '''
  189. fake_name = 'fake_ipset'
  190. fake_entries = ['192.168.0.3', '192.168.1.3']
  191. def setup_loader_modules(self):
  192. return {ipset: {}}
  193. def _runner(self, expected_ret, test=False, check=False, delete=False,
  194. delete_assertion=False):
  195. mock_check = MagicMock(return_value=check)
  196. mock_delete = MagicMock(return_value=delete)
  197. with patch.dict(ipset.__opts__, {'test': test}):
  198. with patch.dict(ipset.__salt__, {'ipset.check': mock_check,
  199. 'ipset.delete': mock_delete}):
  200. actual_ret = ipset.absent(self.fake_name, self.fake_entries, set_name=self.fake_name)
  201. mock_check.assert_has_calls([call(self.fake_name, e, 'ipv4') for e in self.fake_entries], any_order=True)
  202. if delete_assertion:
  203. expected_calls = [call(self.fake_name, e, 'ipv4', set_name=self.fake_name) for e in self.fake_entries]
  204. if delete is not True:
  205. expected_calls = expected_calls[:1]
  206. mock_delete.assert_has_calls(expected_calls, any_order=True)
  207. else:
  208. self.assertTrue(mock_delete.call_count == 0)
  209. self.assertDictEqual(actual_ret, expected_ret)
  210. def test_already_absent(self):
  211. ret = {'name': self.fake_name,
  212. 'result': True,
  213. 'comment': 'ipset entry for 192.168.0.3 not present in set {0} for ipv4\n'
  214. 'ipset entry for 192.168.1.3 not present in set {0} for ipv4\n'
  215. ''.format(self.fake_name),
  216. 'changes': {}}
  217. self._runner(ret)
  218. def test_in_test_mode(self):
  219. ret = {'name': self.fake_name,
  220. 'result': None,
  221. 'comment': 'ipset entry 192.168.0.3 would be removed from set {0} for ipv4\n'
  222. 'ipset entry 192.168.1.3 would be removed from set {0} for ipv4\n'
  223. ''.format(self.fake_name),
  224. 'changes': {}}
  225. self._runner(ret, test=True, check=True)
  226. def test_del_fails(self):
  227. ret = {'name': self.fake_name,
  228. 'result': False,
  229. 'comment': 'Failed to delete ipset entry from set {0} for ipv4. Attempted entry was 192.168.1.3.\n'
  230. 'Error\n'.format(self.fake_name),
  231. 'changes': {}}
  232. self._runner(ret, check=True, delete='Error', delete_assertion=True)
  233. def test_success(self):
  234. ret = {'name': self.fake_name,
  235. 'result': True,
  236. 'comment': 'ipset entry 192.168.0.3 removed from set {0} for ipv4\n'
  237. 'ipset entry 192.168.1.3 removed from set {0} for ipv4\n'
  238. ''.format(self.fake_name),
  239. 'changes': {'locale': 'fake_ipset'}}
  240. self._runner(ret, check=True, delete='worked', delete_assertion=True)
  241. def test_absent(self):
  242. ret = {'name': self.fake_name,
  243. 'result': False,
  244. 'comment': 'ipset entry must be specified',
  245. 'changes': {}}
  246. mock = MagicMock(return_value=True)
  247. with patch.dict(ipset.__salt__, {'ipset.check': mock}):
  248. self.assertDictEqual(ipset.absent(self.fake_name), ret)
  249. class IpsetFlushTestCase(TestCase, LoaderModuleMockMixin):
  250. '''
  251. Test cases for salt.states.ipset.present
  252. '''
  253. fake_name = 'fake_ipset'
  254. def setup_loader_modules(self):
  255. return {ipset: {}}
  256. def _runner(self, expected_ret, test=False, check_set=True, flush=True,
  257. flush_assertion=True):
  258. mock_check_set = MagicMock(return_value=check_set)
  259. mock_flush = MagicMock(return_value=flush)
  260. with patch.dict(ipset.__opts__, {'test': test}):
  261. with patch.dict(ipset.__salt__, {'ipset.check_set': mock_check_set,
  262. 'ipset.flush': mock_flush}):
  263. actual_ret = ipset.flush(self.fake_name)
  264. mock_check_set.assert_called_once_with(self.fake_name)
  265. if flush_assertion:
  266. mock_flush.assert_called_once_with(self.fake_name, 'ipv4')
  267. else:
  268. self.assertTrue(mock_flush.call_count == 0)
  269. self.assertDictEqual(actual_ret, expected_ret)
  270. def test_no_set(self):
  271. ret = {'name': self.fake_name,
  272. 'result': False,
  273. 'comment': 'ipset set {0} does not exist for ipv4'.format(self.fake_name),
  274. 'changes': {}}
  275. self._runner(ret, check_set=False, flush_assertion=False)
  276. def test_in_test_mode(self):
  277. ret = {'name': self.fake_name,
  278. 'result': None,
  279. 'comment': 'ipset entries in set {0} for ipv4 would be flushed'.format(self.fake_name),
  280. 'changes': {}}
  281. self._runner(ret, test=True, flush_assertion=False)
  282. def test_flush_fails(self):
  283. ret = {'name': self.fake_name,
  284. 'result': False,
  285. 'comment': 'Failed to flush ipset entries from set {0} for ipv4'.format(self.fake_name),
  286. 'changes': {}}
  287. self._runner(ret, flush=False)
  288. def test_success(self):
  289. ret = {'name': self.fake_name,
  290. 'result': True,
  291. 'comment': 'Flushed ipset entries from set {0} for ipv4'.format(self.fake_name),
  292. 'changes': {'locale': 'fake_ipset'}}
  293. self._runner(ret)