test_nftables.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. # -*- coding: utf-8 -*-
  2. """
  3. :codeauthor: Rahul Handay <rahulha@saltstack.com>
  4. """
  5. # Import Python Libs
  6. from __future__ import absolute_import, print_function, unicode_literals
  7. # Import Salt Libs
  8. import salt.states.nftables as nftables
  9. # Import Salt Testing Libs
  10. from tests.support.mixins import LoaderModuleMockMixin
  11. from tests.support.mock import MagicMock, patch
  12. from tests.support.unit import TestCase
  13. class NftablesTestCase(TestCase, LoaderModuleMockMixin):
  14. """
  15. Validate the nftables state
  16. """
  17. def setup_loader_modules(self):
  18. return {nftables: {}}
  19. def test_chain_present(self):
  20. """
  21. Test to verify the chain is exist.
  22. """
  23. ret = {"name": "salt", "changes": {}, "result": True, "comment": ""}
  24. mock = MagicMock(
  25. side_effect=[
  26. {"result": True, "comment": ""},
  27. {"result": False, "comment": ""},
  28. {"result": False, "comment": ""},
  29. ]
  30. )
  31. with patch.dict(nftables.__salt__, {"nftables.check_chain": mock}):
  32. ret.update(
  33. {
  34. "comment": "nftables salt chain is already"
  35. " exist in filter table for ipv4"
  36. }
  37. )
  38. self.assertDictEqual(nftables.chain_present("salt"), ret)
  39. mock = MagicMock(
  40. side_effect=[
  41. {"result": True, "comment": ""},
  42. {"result": False, "comment": ""},
  43. ]
  44. )
  45. with patch.dict(nftables.__salt__, {"nftables.new_chain": mock}):
  46. ret.update(
  47. {
  48. "changes": {"locale": "salt"},
  49. "comment": "nftables salt chain in filter"
  50. " table create success for ipv4",
  51. }
  52. )
  53. self.assertDictEqual(nftables.chain_present("salt"), ret)
  54. ret.update(
  55. {
  56. "changes": {},
  57. "comment": "Failed to create salt chain"
  58. " in filter table: for ipv4",
  59. "result": False,
  60. }
  61. )
  62. self.assertDictEqual(nftables.chain_present("salt"), ret)
  63. def test_chain_absent(self):
  64. """
  65. Test to verify the chain is absent.
  66. """
  67. ret = {"name": "salt", "changes": {}, "result": True, "comment": ""}
  68. mock = MagicMock(side_effect=[False, True])
  69. with patch.dict(nftables.__salt__, {"nftables.check_chain": mock}):
  70. ret.update(
  71. {
  72. "comment": "nftables salt chain is already absent"
  73. " in filter table for ipv4"
  74. }
  75. )
  76. self.assertDictEqual(nftables.chain_absent("salt"), ret)
  77. mock = MagicMock(return_value="")
  78. with patch.dict(nftables.__salt__, {"nftables.flush": mock}):
  79. ret.update(
  80. {
  81. "result": False,
  82. "comment": "Failed to flush salt chain"
  83. " in filter table: for ipv4",
  84. }
  85. )
  86. self.assertDictEqual(nftables.chain_absent("salt"), ret)
  87. def test_append(self):
  88. """
  89. Test to append a rule to a chain
  90. """
  91. ret = {"name": "salt", "changes": {}, "result": True, "comment": ""}
  92. mock = MagicMock(return_value=[])
  93. with patch.object(nftables, "_STATE_INTERNAL_KEYWORDS", mock):
  94. mock = MagicMock(return_value={"result": True, "comment": "", "rule": "a"})
  95. with patch.dict(nftables.__salt__, {"nftables.build_rule": mock}):
  96. mock = MagicMock(
  97. side_effect=[
  98. {"result": True, "comment": ""},
  99. {"result": False, "comment": ""},
  100. {"result": False, "comment": ""},
  101. {"result": False, "comment": ""},
  102. ]
  103. )
  104. with patch.dict(nftables.__salt__, {"nftables.check": mock}):
  105. ret.update(
  106. {
  107. "comment": "nftables rule for salt"
  108. " already set (a) for ipv4"
  109. }
  110. )
  111. self.assertDictEqual(
  112. nftables.append("salt", table="", chain=""), ret
  113. )
  114. with patch.dict(nftables.__opts__, {"test": True}):
  115. ret.update(
  116. {
  117. "result": None,
  118. "comment": "nftables rule for salt needs"
  119. " to be set (a) for ipv4",
  120. }
  121. )
  122. self.assertDictEqual(
  123. nftables.append("salt", table="", chain=""), ret
  124. )
  125. with patch.dict(nftables.__opts__, {"test": False}):
  126. mock = MagicMock(
  127. side_effect=[
  128. {"result": True, "comment": ""},
  129. {"result": False, "comment": ""},
  130. ]
  131. )
  132. with patch.dict(nftables.__salt__, {"nftables.append": mock}):
  133. ret.update(
  134. {
  135. "changes": {"locale": "salt"},
  136. "comment": "Set nftables rule for salt"
  137. " to: a for ipv4",
  138. "result": True,
  139. }
  140. )
  141. self.assertDictEqual(
  142. nftables.append("salt", table="", chain=""), ret
  143. )
  144. ret.update(
  145. {
  146. "changes": {},
  147. "comment": "Failed to set nftables"
  148. " rule for salt.\nAttempted rule was"
  149. " a for ipv4.\n",
  150. "result": False,
  151. }
  152. )
  153. self.assertDictEqual(
  154. nftables.append("salt", table="", chain=""), ret
  155. )
  156. def test_insert(self):
  157. """
  158. Test to insert a rule into a chain
  159. """
  160. ret = {"name": "salt", "changes": {}, "result": True, "comment": ""}
  161. mock = MagicMock(return_value=[])
  162. with patch.object(nftables, "_STATE_INTERNAL_KEYWORDS", mock):
  163. mock = MagicMock(return_value={"result": True, "comment": "", "rule": "a"})
  164. with patch.dict(nftables.__salt__, {"nftables.build_rule": mock}):
  165. mock = MagicMock(
  166. side_effect=[
  167. {"result": True, "comment": ""},
  168. {"result": False, "comment": ""},
  169. {"result": False, "comment": ""},
  170. {"result": False, "comment": ""},
  171. ]
  172. )
  173. with patch.dict(nftables.__salt__, {"nftables.check": mock}):
  174. ret.update(
  175. {
  176. "comment": "nftables rule for salt already"
  177. " set for ipv4 (a)"
  178. }
  179. )
  180. self.assertDictEqual(
  181. nftables.insert("salt", table="", chain=""), ret
  182. )
  183. with patch.dict(nftables.__opts__, {"test": True}):
  184. ret.update(
  185. {
  186. "result": None,
  187. "comment": "nftables rule for salt"
  188. " needs to be set for ipv4 (a)",
  189. }
  190. )
  191. self.assertDictEqual(
  192. nftables.insert("salt", table="", chain=""), ret
  193. )
  194. with patch.dict(nftables.__opts__, {"test": False}):
  195. mock = MagicMock(
  196. side_effect=[
  197. {"result": True, "comment": ""},
  198. {"result": False, "comment": ""},
  199. ]
  200. )
  201. with patch.dict(nftables.__salt__, {"nftables.insert": mock}):
  202. ret.update(
  203. {
  204. "changes": {"locale": "salt"},
  205. "comment": "Set nftables rule for"
  206. " salt to: a for ipv4",
  207. "result": True,
  208. }
  209. )
  210. self.assertDictEqual(
  211. nftables.insert(
  212. "salt", table="", chain="", position=""
  213. ),
  214. ret,
  215. )
  216. ret.update(
  217. {
  218. "changes": {},
  219. "comment": "Failed to set nftables"
  220. " rule for salt.\nAttempted rule was"
  221. " a",
  222. "result": False,
  223. }
  224. )
  225. self.assertDictEqual(
  226. nftables.insert(
  227. "salt", table="", chain="", position=""
  228. ),
  229. ret,
  230. )
  231. def test_delete(self):
  232. """
  233. Test to delete a rule to a chain
  234. """
  235. ret = {"name": "salt", "changes": {}, "result": None, "comment": ""}
  236. mock = MagicMock(return_value=[])
  237. with patch.object(nftables, "_STATE_INTERNAL_KEYWORDS", mock):
  238. mock = MagicMock(return_value={"result": True, "comment": "", "rule": "a"})
  239. with patch.dict(nftables.__salt__, {"nftables.build_rule": mock}):
  240. mock = MagicMock(
  241. side_effect=[
  242. {"result": False, "comment": ""},
  243. {"result": True, "comment": ""},
  244. {"result": True, "comment": ""},
  245. {"result": True, "comment": ""},
  246. ]
  247. )
  248. with patch.dict(nftables.__salt__, {"nftables.check": mock}):
  249. ret.update(
  250. {
  251. "comment": "nftables rule for salt"
  252. " already absent for ipv4 (a)",
  253. "result": True,
  254. }
  255. )
  256. self.assertDictEqual(
  257. nftables.delete("salt", table="", chain=""), ret
  258. )
  259. with patch.dict(nftables.__opts__, {"test": True}):
  260. ret.update(
  261. {
  262. "result": None,
  263. "comment": "nftables rule for salt needs"
  264. " to be deleted for ipv4 (a)",
  265. }
  266. )
  267. self.assertDictEqual(
  268. nftables.delete("salt", table="", chain=""), ret
  269. )
  270. with patch.dict(nftables.__opts__, {"test": False}):
  271. mock = MagicMock(
  272. side_effect=[
  273. {"result": True, "comment": ""},
  274. {"result": False, "comment": ""},
  275. ]
  276. )
  277. with patch.dict(nftables.__salt__, {"nftables.delete": mock}):
  278. ret.update(
  279. {
  280. "result": True,
  281. "changes": {"locale": "salt"},
  282. "comment": "Delete nftables rule" " for salt a",
  283. }
  284. )
  285. self.assertDictEqual(
  286. nftables.delete(
  287. "salt", table="", chain="", position=""
  288. ),
  289. ret,
  290. )
  291. ret.update(
  292. {
  293. "result": False,
  294. "changes": {},
  295. "comment": "Failed to delete nftables"
  296. " rule for salt.\nAttempted rule was a",
  297. }
  298. )
  299. self.assertDictEqual(
  300. nftables.delete(
  301. "salt", table="", chain="", position=""
  302. ),
  303. ret,
  304. )
  305. def test_flush(self):
  306. """
  307. Test to flush current nftables state
  308. """
  309. ret = {"name": "salt", "changes": {}, "result": None, "comment": ""}
  310. mock = MagicMock(return_value=[])
  311. with patch.object(nftables, "_STATE_INTERNAL_KEYWORDS", mock):
  312. mock = MagicMock(
  313. side_effect=[
  314. {"result": False, "comment": ""},
  315. {"result": True, "comment": ""},
  316. {"result": True, "comment": ""},
  317. {"result": True, "comment": ""},
  318. ]
  319. )
  320. with patch.dict(nftables.__salt__, {"nftables.check_table": mock}):
  321. ret.update(
  322. {
  323. "comment": "Failed to flush table in family"
  324. " ipv4, table does not exist.",
  325. "result": False,
  326. }
  327. )
  328. self.assertDictEqual(nftables.flush("salt", table="", chain=""), ret)
  329. mock = MagicMock(
  330. side_effect=[
  331. {"result": False, "comment": ""},
  332. {"result": True, "comment": ""},
  333. {"result": True, "comment": ""},
  334. ]
  335. )
  336. with patch.dict(nftables.__salt__, {"nftables.check_chain": mock}):
  337. ret.update(
  338. {
  339. "comment": "Failed to flush chain in table"
  340. " in family ipv4, chain does not exist."
  341. }
  342. )
  343. self.assertDictEqual(
  344. nftables.flush("salt", table="", chain=""), ret
  345. )
  346. mock = MagicMock(
  347. side_effect=[
  348. {"result": True, "comment": ""},
  349. {"result": False, "comment": ""},
  350. ]
  351. )
  352. with patch.dict(nftables.__salt__, {"nftables.flush": mock}):
  353. ret.update(
  354. {
  355. "changes": {"locale": "salt"},
  356. "comment": "Flush nftables rules in table"
  357. " chain ipv4 family",
  358. "result": True,
  359. }
  360. )
  361. self.assertDictEqual(
  362. nftables.flush("salt", table="", chain=""), ret
  363. )
  364. ret.update(
  365. {
  366. "changes": {},
  367. "comment": "Failed to flush" " nftables rules",
  368. "result": False,
  369. }
  370. )
  371. self.assertDictEqual(
  372. nftables.flush("salt", table="", chain=""), ret
  373. )