test_nftables.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783
  1. # -*- coding: utf-8 -*-
  2. """
  3. :codeauthor: Jayesh Kariya <jayeshk@saltstack.com>
  4. """
  5. # Import Python Libs
  6. from __future__ import absolute_import, print_function, unicode_literals
  7. import pytest
  8. # Import Salt Libs
  9. import salt.modules.nftables as nftables
  10. import salt.utils.files
  11. from salt.exceptions import CommandExecutionError
  12. # Import Salt Testing Libs
  13. from tests.support.mixins import LoaderModuleMockMixin
  14. from tests.support.mock import MagicMock, mock_open, patch
  15. from tests.support.unit import TestCase
  16. class NftablesTestCase(TestCase, LoaderModuleMockMixin):
  17. """
  18. Test cases for salt.modules.nftables
  19. """
  20. def setup_loader_modules(self):
  21. return {nftables: {}}
  22. # 'version' function tests: 1
  23. def test_version(self):
  24. """
  25. Test if it return version from nftables --version
  26. """
  27. mock = MagicMock(return_value="nf_tables 0.3-1")
  28. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  29. self.assertEqual(nftables.version(), "0.3-1")
  30. # 'build_rule' function tests: 1
  31. def test_build_rule(self):
  32. """
  33. Test if it build a well-formatted nftables rule based on kwargs.
  34. """
  35. self.assertEqual(
  36. nftables.build_rule(full="True"),
  37. {"result": False, "rule": "", "comment": "Table needs to be specified"},
  38. )
  39. self.assertEqual(
  40. nftables.build_rule(table="filter", full="True"),
  41. {"result": False, "rule": "", "comment": "Chain needs to be specified"},
  42. )
  43. self.assertEqual(
  44. nftables.build_rule(table="filter", chain="input", full="True"),
  45. {"result": False, "rule": "", "comment": "Command needs to be specified"},
  46. )
  47. self.assertEqual(
  48. nftables.build_rule(
  49. table="filter",
  50. chain="input",
  51. command="insert",
  52. position="3",
  53. full="True",
  54. ),
  55. {
  56. "result": True,
  57. "rule": "nft insert rule ip filter input position 3 ",
  58. "comment": "Successfully built rule",
  59. },
  60. )
  61. self.assertEqual(
  62. nftables.build_rule(
  63. table="filter", chain="input", command="insert", full="True"
  64. ),
  65. {
  66. "result": True,
  67. "rule": "nft insert rule ip filter input ",
  68. "comment": "Successfully built rule",
  69. },
  70. )
  71. self.assertEqual(
  72. nftables.build_rule(
  73. table="filter", chain="input", command="halt", full="True"
  74. ),
  75. {
  76. "result": True,
  77. "rule": "nft halt rule ip filter input ",
  78. "comment": "Successfully built rule",
  79. },
  80. )
  81. self.assertEqual(
  82. nftables.build_rule(), {"result": True, "rule": "", "comment": ""}
  83. )
  84. # 'get_saved_rules' function tests: 1
  85. def test_get_saved_rules(self):
  86. """
  87. Test if it return a data structure of the rules in the conf file
  88. """
  89. with patch.dict(nftables.__grains__, {"os_family": "Debian"}):
  90. with patch.object(salt.utils.files, "fopen", MagicMock(mock_open())):
  91. self.assertListEqual(nftables.get_saved_rules(), [])
  92. # 'list_tables' function tests: 1
  93. def test_list_tables(self):
  94. """
  95. Test if it return a data structure of the current, in-memory tables
  96. """
  97. list_tables = [{"family": "inet", "name": "filter", "handle": 2}]
  98. list_tables_mock = MagicMock(return_value=list_tables)
  99. with patch.object(nftables, "list_tables", list_tables_mock):
  100. self.assertListEqual(nftables.list_tables(), list_tables)
  101. list_tables_mock = MagicMock(return_value=[])
  102. with patch.object(nftables, "list_tables", list_tables_mock):
  103. self.assertListEqual(nftables.list_tables(), [])
  104. # 'get_rules' function tests: 1
  105. def test_get_rules(self):
  106. """
  107. Test if it return a data structure of the current, in-memory rules
  108. """
  109. list_tables_mock = MagicMock(
  110. return_value=[{"family": "inet", "name": "filter", "handle": 2}]
  111. )
  112. list_rules_return = """table inet filter {
  113. chain input {
  114. type filter hook input priority 0; policy accept;
  115. }
  116. chain forward {
  117. type filter hook forward priority 0; policy accept;
  118. }
  119. chain output {
  120. type filter hook output priority 0; policy accept;
  121. }
  122. }"""
  123. list_rules_mock = MagicMock(return_value=list_rules_return)
  124. expected = [list_rules_return]
  125. with patch.object(nftables, "list_tables", list_tables_mock):
  126. with patch.dict(nftables.__salt__, {"cmd.run": list_rules_mock}):
  127. self.assertListEqual(nftables.get_rules(), expected)
  128. list_tables_mock = MagicMock(return_value=[])
  129. with patch.object(nftables, "list_tables", list_tables_mock):
  130. self.assertListEqual(nftables.get_rules(), [])
  131. # 'save' function tests: 1
  132. def test_save(self):
  133. """
  134. Test if it save the current in-memory rules to disk
  135. """
  136. with patch.dict(nftables.__grains__, {"os_family": "Debian"}):
  137. mock = MagicMock(return_value=False)
  138. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  139. with patch.object(salt.utils.files, "fopen", MagicMock(mock_open())):
  140. self.assertEqual(nftables.save(), "#! nft -f\n\n")
  141. with patch.object(
  142. salt.utils.files, "fopen", MagicMock(side_effect=IOError)
  143. ):
  144. self.assertRaises(CommandExecutionError, nftables.save)
  145. # 'get_rule_handle' function tests: 1
  146. def test_get_rule_handle(self):
  147. """
  148. Test if it get the handle for a particular rule
  149. """
  150. self.assertEqual(
  151. nftables.get_rule_handle(),
  152. {"result": False, "comment": "Chain needs to be specified"},
  153. )
  154. self.assertEqual(
  155. nftables.get_rule_handle(chain="input"),
  156. {"result": False, "comment": "Rule needs to be specified"},
  157. )
  158. _ru = "input tcp dport 22 log accept"
  159. ret = {"result": False, "comment": "Table filter in family ipv4 does not exist"}
  160. mock = MagicMock(return_value="")
  161. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  162. self.assertEqual(nftables.get_rule_handle(chain="input", rule=_ru), ret)
  163. ret = {
  164. "result": False,
  165. "comment": "Chain input in table filter in family ipv4 does not exist",
  166. }
  167. mock = MagicMock(return_value="table ip filter")
  168. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  169. self.assertEqual(nftables.get_rule_handle(chain="input", rule=_ru), ret)
  170. ret = {
  171. "result": False,
  172. "comment": (
  173. "Rule input tcp dport 22 log accept chain"
  174. " input in table filter in family ipv4 does not exist"
  175. ),
  176. }
  177. ret1 = {
  178. "result": False,
  179. "comment": "Could not find rule input tcp dport 22 log accept",
  180. }
  181. with patch.object(
  182. nftables,
  183. "check_table",
  184. MagicMock(return_value={"result": True, "comment": ""}),
  185. ):
  186. with patch.object(
  187. nftables,
  188. "check_chain",
  189. MagicMock(return_value={"result": True, "comment": ""}),
  190. ):
  191. _ret1 = {
  192. "result": False,
  193. "comment": (
  194. "Rule input tcp dport 22 log accept"
  195. " chain input in table filter in"
  196. " family ipv4 does not exist"
  197. ),
  198. }
  199. _ret2 = {"result": True, "comment": ""}
  200. with patch.object(
  201. nftables, "check", MagicMock(side_effect=[_ret1, _ret2])
  202. ):
  203. self.assertEqual(
  204. nftables.get_rule_handle(chain="input", rule=_ru), ret
  205. )
  206. _ru = "input tcp dport 22 log accept"
  207. mock = MagicMock(return_value="")
  208. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  209. self.assertEqual(
  210. nftables.get_rule_handle(chain="input", rule=_ru), ret1
  211. )
  212. # 'check' function tests: 1
  213. def test_check(self):
  214. """
  215. Test if it check for the existence of a rule in the table and chain
  216. """
  217. self.assertEqual(
  218. nftables.check(),
  219. {"result": False, "comment": "Chain needs to be specified"},
  220. )
  221. self.assertEqual(
  222. nftables.check(chain="input"),
  223. {"result": False, "comment": "Rule needs to be specified"},
  224. )
  225. _ru = "tcp dport 22 log accept"
  226. ret = {"result": False, "comment": "Table filter in family ipv4 does not exist"}
  227. mock = MagicMock(return_value="")
  228. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  229. self.assertEqual(nftables.check(chain="input", rule=_ru), ret)
  230. mock = MagicMock(return_value="table ip filter")
  231. ret = {
  232. "result": False,
  233. "comment": "Chain input in table filter in family ipv4 does not exist",
  234. }
  235. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  236. self.assertEqual(nftables.check(chain="input", rule=_ru), ret)
  237. mock = MagicMock(return_value="table ip filter chain input {{")
  238. ret = {
  239. "result": False,
  240. "comment": "Rule tcp dport 22 log accept in chain input in table filter in family ipv4 does not exist",
  241. }
  242. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  243. self.assertEqual(nftables.check(chain="input", rule=_ru), ret)
  244. r_val = "table ip filter chain input {{ input tcp dport 22 log accept #"
  245. mock = MagicMock(return_value=r_val)
  246. ret = {
  247. "result": True,
  248. "comment": "Rule tcp dport 22 log accept in chain input in table filter in family ipv4 exists",
  249. }
  250. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  251. self.assertEqual(nftables.check(chain="input", rule=_ru), ret)
  252. # 'check_chain' function tests: 1
  253. def test_check_chain(self):
  254. """
  255. Test if it check for the existence of a chain in the table
  256. """
  257. self.assertEqual(
  258. nftables.check_chain(),
  259. {"result": False, "comment": "Chain needs to be specified"},
  260. )
  261. mock = MagicMock(return_value="")
  262. ret = {
  263. "comment": "Chain input in table filter in family ipv4 does not exist",
  264. "result": False,
  265. }
  266. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  267. self.assertEqual(nftables.check_chain(chain="input"), ret)
  268. mock = MagicMock(return_value="chain input {{")
  269. ret = {
  270. "comment": "Chain input in table filter in family ipv4 exists",
  271. "result": True,
  272. }
  273. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  274. self.assertEqual(nftables.check_chain(chain="input"), ret)
  275. # 'check_table' function tests: 1
  276. def test_check_table(self):
  277. """
  278. Test if it check for the existence of a table
  279. """
  280. self.assertEqual(
  281. nftables.check_table(),
  282. {"result": False, "comment": "Table needs to be specified"},
  283. )
  284. mock = MagicMock(return_value="")
  285. ret = {"comment": "Table nat in family ipv4 does not exist", "result": False}
  286. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  287. self.assertEqual(nftables.check_table(table="nat"), ret)
  288. mock = MagicMock(return_value="table ip nat")
  289. ret = {"comment": "Table nat in family ipv4 exists", "result": True}
  290. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  291. self.assertEqual(nftables.check_table(table="nat"), ret)
  292. # 'new_table' function tests: 1
  293. def test_new_table(self):
  294. """
  295. Test if it create new custom table.
  296. """
  297. self.assertEqual(
  298. nftables.new_table(table=None),
  299. {"result": False, "comment": "Table needs to be specified"},
  300. )
  301. mock = MagicMock(return_value="")
  302. ret = {"comment": "Table nat in family ipv4 created", "result": True}
  303. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  304. self.assertEqual(nftables.new_table(table="nat"), ret)
  305. mock = MagicMock(return_value="table ip nat")
  306. ret = {"comment": "Table nat in family ipv4 exists", "result": True}
  307. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  308. self.assertEqual(nftables.new_table(table="nat"), ret)
  309. # 'delete_table' function tests: 1
  310. def test_delete_table(self):
  311. """
  312. Test if it delete custom table.
  313. """
  314. self.assertEqual(
  315. nftables.delete_table(table=None),
  316. {"result": False, "comment": "Table needs to be specified"},
  317. )
  318. mock_ret = {
  319. "result": False,
  320. "comment": "Table nat in family ipv4 does not exist",
  321. }
  322. with patch(
  323. "salt.modules.nftables.check_table", MagicMock(return_value=mock_ret)
  324. ):
  325. ret = nftables.delete_table(table="nat")
  326. self.assertEqual(
  327. ret,
  328. {"result": False, "comment": "Table nat in family ipv4 does not exist"},
  329. )
  330. mock = MagicMock(return_value="table ip nat")
  331. with patch.dict(nftables.__salt__, {"cmd.run": mock}), patch(
  332. "salt.modules.nftables.check_table",
  333. MagicMock(return_value={"result": True, "comment": ""}),
  334. ):
  335. self.assertEqual(
  336. nftables.delete_table(table="nat"),
  337. {
  338. "comment": "Table nat in family ipv4 could not be deleted",
  339. "result": False,
  340. },
  341. )
  342. mock = MagicMock(return_value="")
  343. with patch.dict(nftables.__salt__, {"cmd.run": mock}), patch(
  344. "salt.modules.nftables.check_table",
  345. MagicMock(return_value={"result": True, "comment": ""}),
  346. ):
  347. self.assertEqual(
  348. nftables.delete_table(table="nat"),
  349. {"comment": "Table nat in family ipv4 deleted", "result": True},
  350. )
  351. # 'new_chain' function tests: 2
  352. def test_new_chain(self):
  353. """
  354. Test if it create new chain to the specified table.
  355. """
  356. self.assertEqual(
  357. nftables.new_chain(),
  358. {"result": False, "comment": "Chain needs to be specified"},
  359. )
  360. ret = {"result": False, "comment": "Table filter in family ipv4 does not exist"}
  361. mock = MagicMock(return_value="")
  362. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  363. self.assertEqual(nftables.new_chain(chain="input"), ret)
  364. ret = {
  365. "result": False,
  366. "comment": "Chain input in table filter in family ipv4 already exists",
  367. }
  368. mock = MagicMock(return_value="table ip filter chain input {{")
  369. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  370. self.assertEqual(nftables.new_chain(chain="input"), ret)
  371. def test_new_chain_variable(self):
  372. """
  373. Test if it create new chain to the specified table.
  374. """
  375. mock = MagicMock(return_value="")
  376. with patch.dict(nftables.__salt__, {"cmd.run": mock}), patch(
  377. "salt.modules.nftables.check_chain",
  378. MagicMock(return_value={"result": False, "comment": ""}),
  379. ), patch(
  380. "salt.modules.nftables.check_table",
  381. MagicMock(return_value={"result": True, "comment": ""}),
  382. ):
  383. self.assertEqual(
  384. nftables.new_chain(chain="input", table_type="filter"),
  385. {
  386. "result": False,
  387. "comment": "Table_type, hook, and priority required.",
  388. },
  389. )
  390. self.assertTrue(
  391. nftables.new_chain(
  392. chain="input", table_type="filter", hook="input", priority=0
  393. )
  394. )
  395. # 'delete_chain' function tests: 1
  396. def test_delete_chain(self):
  397. """
  398. Test if it delete the chain from the specified table.
  399. """
  400. self.assertEqual(
  401. nftables.delete_chain(),
  402. {"result": False, "comment": "Chain needs to be specified"},
  403. )
  404. ret = {"result": False, "comment": "Table filter in family ipv4 does not exist"}
  405. mock = MagicMock(return_value="")
  406. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  407. self.assertEqual(nftables.delete_chain(chain="input"), ret)
  408. ret = {
  409. "result": False,
  410. "comment": "Chain input in table filter in family ipv4 could not be deleted",
  411. }
  412. mock = MagicMock(return_value="table ip filter")
  413. with patch.dict(nftables.__salt__, {"cmd.run": mock}), patch(
  414. "salt.modules.nftables.check_table",
  415. MagicMock(return_value={"result": True, "comment": ""}),
  416. ), patch(
  417. "salt.modules.nftables.check_chain",
  418. MagicMock(return_value={"result": True, "comment": ""}),
  419. ):
  420. self.assertEqual(nftables.delete_chain(chain="input"), ret)
  421. ret = {
  422. "result": True,
  423. "comment": "Chain input in table filter in family ipv4 deleted",
  424. }
  425. mock = MagicMock(return_value="")
  426. with patch.dict(nftables.__salt__, {"cmd.run": mock}), patch(
  427. "salt.modules.nftables.check_table",
  428. MagicMock(return_value={"result": True, "comment": ""}),
  429. ), patch(
  430. "salt.modules.nftables.check_chain",
  431. MagicMock(return_value={"result": True, "comment": ""}),
  432. ):
  433. self.assertEqual(nftables.delete_chain(chain="input"), ret)
  434. def test_delete_chain_variables(self):
  435. """
  436. Test if it delete the chain from the specified table.
  437. """
  438. mock = MagicMock(return_value="")
  439. with patch.dict(nftables.__salt__, {"cmd.run": mock}), patch(
  440. "salt.modules.nftables.check_chain",
  441. MagicMock(return_value={"result": True, "comment": ""}),
  442. ), patch(
  443. "salt.modules.nftables.check_table",
  444. MagicMock(return_value={"result": True, "comment": ""}),
  445. ):
  446. _expected = {
  447. "comment": "Chain input in table filter in family ipv4 deleted",
  448. "result": True,
  449. }
  450. self.assertEqual(nftables.delete_chain(chain="input"), _expected)
  451. # 'append' function tests: 2
  452. def test_append(self):
  453. """
  454. Test if it append a rule to the specified table & chain.
  455. """
  456. self.assertEqual(
  457. nftables.append(),
  458. {"result": False, "comment": "Chain needs to be specified"},
  459. )
  460. self.assertEqual(
  461. nftables.append(chain="input"),
  462. {"result": False, "comment": "Rule needs to be specified"},
  463. )
  464. _ru = "input tcp dport 22 log accept"
  465. ret = {"comment": "Table filter in family ipv4 does not exist", "result": False}
  466. mock = MagicMock(return_value="")
  467. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  468. self.assertEqual(nftables.append(chain="input", rule=_ru), ret)
  469. ret = {
  470. "comment": "Chain input in table filter in family ipv4 does not exist",
  471. "result": False,
  472. }
  473. mock = MagicMock(return_value="table ip filter")
  474. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  475. self.assertEqual(nftables.append(chain="input", rule=_ru), ret)
  476. r_val = "table ip filter chain input {{ input tcp dport 22 log accept #"
  477. mock = MagicMock(return_value=r_val)
  478. _expected = {
  479. "comment": "Rule input tcp dport 22 log accept chain input in table filter in family ipv4 already exists",
  480. "result": False,
  481. }
  482. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  483. self.assertEqual(nftables.append(chain="input", rule=_ru), _expected)
  484. @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
  485. def test_append_rule(self):
  486. """
  487. Test if it append a rule to the specified table & chain.
  488. """
  489. _ru = "input tcp dport 22 log accept"
  490. mock = MagicMock(side_effect=["1", ""])
  491. with patch.dict(nftables.__salt__, {"cmd.run": mock}), patch(
  492. "salt.modules.nftables.check",
  493. MagicMock(return_value={"result": False, "comment": ""}),
  494. ), patch(
  495. "salt.modules.nftables.check_chain",
  496. MagicMock(return_value={"result": True, "comment": ""}),
  497. ), patch(
  498. "salt.modules.nftables.check_table",
  499. MagicMock(return_value={"result": True, "comment": ""}),
  500. ):
  501. _expected = {
  502. "comment": 'Failed to add rule "{0}" chain input in table filter in family ipv4.'.format(
  503. _ru
  504. ),
  505. "result": False,
  506. }
  507. self.assertEqual(nftables.append(chain="input", rule=_ru), _expected)
  508. _expected = {
  509. "comment": 'Added rule "{0}" chain input in table filter in family ipv4.'.format(
  510. _ru
  511. ),
  512. "result": True,
  513. }
  514. self.assertEqual(nftables.append(chain="input", rule=_ru), _expected)
  515. # 'insert' function tests: 2
  516. def test_insert(self):
  517. """
  518. Test if it insert a rule into the specified table & chain,
  519. at the specified position.
  520. """
  521. self.assertEqual(
  522. nftables.insert(),
  523. {"result": False, "comment": "Chain needs to be specified"},
  524. )
  525. self.assertEqual(
  526. nftables.insert(chain="input"),
  527. {"result": False, "comment": "Rule needs to be specified"},
  528. )
  529. _ru = "input tcp dport 22 log accept"
  530. ret = {"result": False, "comment": "Table filter in family ipv4 does not exist"}
  531. mock = MagicMock(return_value="")
  532. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  533. self.assertEqual(nftables.insert(chain="input", rule=_ru), ret)
  534. ret = {
  535. "result": False,
  536. "comment": "Chain input in table filter in family ipv4 does not exist",
  537. }
  538. mock = MagicMock(return_value="table ip filter")
  539. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  540. self.assertEqual(nftables.insert(chain="input", rule=_ru), ret)
  541. r_val = "table ip filter chain input {{ input tcp dport 22 log accept #"
  542. mock = MagicMock(return_value=r_val)
  543. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  544. res = nftables.insert(chain="input", rule=_ru)
  545. import logging
  546. log = logging.getLogger(__name__)
  547. log.debug("=== res %s ===", res)
  548. self.assertTrue(nftables.insert(chain="input", rule=_ru))
  549. def test_insert_rule(self):
  550. """
  551. Test if it insert a rule into the specified table & chain,
  552. at the specified position.
  553. """
  554. _ru = "input tcp dport 22 log accept"
  555. mock = MagicMock(side_effect=["1", ""])
  556. with patch.dict(nftables.__salt__, {"cmd.run": mock}), patch(
  557. "salt.modules.nftables.check",
  558. MagicMock(return_value={"result": False, "comment": ""}),
  559. ), patch(
  560. "salt.modules.nftables.check_chain",
  561. MagicMock(return_value={"result": True, "comment": ""}),
  562. ), patch(
  563. "salt.modules.nftables.check_table",
  564. MagicMock(return_value={"result": True, "comment": ""}),
  565. ):
  566. _expected = {
  567. "result": False,
  568. "comment": 'Failed to add rule "{0}" chain input in table filter in family ipv4.'.format(
  569. _ru
  570. ),
  571. }
  572. self.assertEqual(nftables.insert(chain="input", rule=_ru), _expected)
  573. _expected = {
  574. "result": True,
  575. "comment": 'Added rule "{0}" chain input in table filter in family ipv4.'.format(
  576. _ru
  577. ),
  578. }
  579. self.assertEqual(nftables.insert(chain="input", rule=_ru), _expected)
  580. # 'delete' function tests: 2
  581. def test_delete(self):
  582. """
  583. Test if it delete a rule from the specified table & chain,
  584. specifying either the rule in its entirety, or
  585. the rule's position in the chain.
  586. """
  587. _ru = "input tcp dport 22 log accept"
  588. ret = {
  589. "result": False,
  590. "comment": "Only specify a position or a rule, not both",
  591. }
  592. self.assertEqual(
  593. nftables.delete(table="filter", chain="input", position="3", rule=_ru), ret
  594. )
  595. ret = {"result": False, "comment": "Table filter in family ipv4 does not exist"}
  596. mock = MagicMock(return_value="")
  597. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  598. self.assertEqual(
  599. nftables.delete(table="filter", chain="input", rule=_ru), ret
  600. )
  601. ret = {
  602. "result": False,
  603. "comment": "Chain input in table filter in family ipv4 does not exist",
  604. }
  605. mock = MagicMock(return_value="table ip filter")
  606. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  607. self.assertEqual(
  608. nftables.delete(table="filter", chain="input", rule=_ru), ret
  609. )
  610. mock = MagicMock(return_value="table ip filter chain input {{")
  611. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  612. self.assertTrue(nftables.delete(table="filter", chain="input", rule=_ru))
  613. def test_delete_rule(self):
  614. """
  615. Test if it delete a rule from the specified table & chain,
  616. specifying either the rule in its entirety, or
  617. the rule's position in the chain.
  618. """
  619. mock = MagicMock(side_effect=["1", ""])
  620. with patch.dict(nftables.__salt__, {"cmd.run": mock}), patch(
  621. "salt.modules.nftables.check",
  622. MagicMock(return_value={"result": True, "comment": ""}),
  623. ), patch(
  624. "salt.modules.nftables.check_chain",
  625. MagicMock(return_value={"result": True, "comment": ""}),
  626. ), patch(
  627. "salt.modules.nftables.check_table",
  628. MagicMock(return_value={"result": True, "comment": ""}),
  629. ):
  630. _expected = {
  631. "result": False,
  632. "comment": 'Failed to delete rule "None" in chain input table filter in family ipv4',
  633. }
  634. self.assertEqual(
  635. nftables.delete(table="filter", chain="input", position="3"), _expected
  636. )
  637. _expected = {
  638. "result": True,
  639. "comment": 'Deleted rule "None" in chain input in table filter in family ipv4.',
  640. }
  641. self.assertEqual(
  642. nftables.delete(table="filter", chain="input", position="3"), _expected
  643. )
  644. # 'flush' function tests: 2
  645. def test_flush(self):
  646. """
  647. Test if it flush the chain in the specified table, flush all chains
  648. in the specified table if chain is not specified.
  649. """
  650. ret = {"result": False, "comment": "Table filter in family ipv4 does not exist"}
  651. mock = MagicMock(return_value="")
  652. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  653. self.assertEqual(nftables.flush(table="filter", chain="input"), ret)
  654. ret = {
  655. "result": False,
  656. "comment": "Chain input in table filter in family ipv4 does not exist",
  657. }
  658. mock = MagicMock(return_value="table ip filter")
  659. with patch.dict(nftables.__salt__, {"cmd.run": mock}):
  660. self.assertEqual(nftables.flush(table="filter", chain="input"), ret)
  661. def test_flush_chain(self):
  662. """
  663. Test if it flush the chain in the specified table, flush all chains
  664. in the specified table if chain is not specified.
  665. """
  666. mock = MagicMock(side_effect=["1", ""])
  667. with patch.dict(nftables.__salt__, {"cmd.run": mock}), patch(
  668. "salt.modules.nftables.check_chain",
  669. MagicMock(return_value={"result": True, "comment": ""}),
  670. ), patch(
  671. "salt.modules.nftables.check_table",
  672. MagicMock(return_value={"result": True, "comment": ""}),
  673. ):
  674. _expected = {
  675. "result": False,
  676. "comment": "Failed to flush rules from chain input in table filter in family ipv4.",
  677. }
  678. self.assertEqual(nftables.flush(table="filter", chain="input"), _expected)
  679. _expected = {
  680. "result": True,
  681. "comment": "Flushed rules from chain input in table filter in family ipv4.",
  682. }
  683. self.assertEqual(nftables.flush(table="filter", chain="input"), _expected)