123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401 |
- # -*- coding: utf-8 -*-
- """
- :codeauthor: Rahul Handay <rahulha@saltstack.com>
- """
- # Import Python Libs
- from __future__ import absolute_import, print_function, unicode_literals
- # Import Salt Libs
- import salt.states.nftables as nftables
- # Import Salt Testing Libs
- from tests.support.mixins import LoaderModuleMockMixin
- from tests.support.mock import MagicMock, patch
- from tests.support.unit import TestCase
- class NftablesTestCase(TestCase, LoaderModuleMockMixin):
- """
- Validate the nftables state
- """
- def setup_loader_modules(self):
- return {nftables: {}}
- def test_chain_present(self):
- """
- Test to verify the chain is exist.
- """
- ret = {"name": "salt", "changes": {}, "result": True, "comment": ""}
- mock = MagicMock(
- side_effect=[
- {"result": True, "comment": ""},
- {"result": False, "comment": ""},
- {"result": False, "comment": ""},
- ]
- )
- with patch.dict(nftables.__salt__, {"nftables.check_chain": mock}):
- ret.update(
- {
- "comment": "nftables salt chain is already"
- " exist in filter table for ipv4"
- }
- )
- self.assertDictEqual(nftables.chain_present("salt"), ret)
- mock = MagicMock(
- side_effect=[
- {"result": True, "comment": ""},
- {"result": False, "comment": ""},
- ]
- )
- with patch.dict(nftables.__salt__, {"nftables.new_chain": mock}):
- ret.update(
- {
- "changes": {"locale": "salt"},
- "comment": "nftables salt chain in filter"
- " table create success for ipv4",
- }
- )
- self.assertDictEqual(nftables.chain_present("salt"), ret)
- ret.update(
- {
- "changes": {},
- "comment": "Failed to create salt chain"
- " in filter table: for ipv4",
- "result": False,
- }
- )
- self.assertDictEqual(nftables.chain_present("salt"), ret)
- def test_chain_absent(self):
- """
- Test to verify the chain is absent.
- """
- ret = {"name": "salt", "changes": {}, "result": True, "comment": ""}
- mock = MagicMock(side_effect=[False, True])
- with patch.dict(nftables.__salt__, {"nftables.check_chain": mock}):
- ret.update(
- {
- "comment": "nftables salt chain is already absent"
- " in filter table for ipv4"
- }
- )
- self.assertDictEqual(nftables.chain_absent("salt"), ret)
- mock = MagicMock(return_value="")
- with patch.dict(nftables.__salt__, {"nftables.flush": mock}):
- ret.update(
- {
- "result": False,
- "comment": "Failed to flush salt chain"
- " in filter table: for ipv4",
- }
- )
- self.assertDictEqual(nftables.chain_absent("salt"), ret)
- def test_append(self):
- """
- Test to append a rule to a chain
- """
- ret = {"name": "salt", "changes": {}, "result": True, "comment": ""}
- mock = MagicMock(return_value=[])
- with patch.object(nftables, "_STATE_INTERNAL_KEYWORDS", mock):
- mock = MagicMock(return_value={"result": True, "comment": "", "rule": "a"})
- with patch.dict(nftables.__salt__, {"nftables.build_rule": mock}):
- mock = MagicMock(
- side_effect=[
- {"result": True, "comment": ""},
- {"result": False, "comment": ""},
- {"result": False, "comment": ""},
- {"result": False, "comment": ""},
- ]
- )
- with patch.dict(nftables.__salt__, {"nftables.check": mock}):
- ret.update(
- {
- "comment": "nftables rule for salt"
- " already set (a) for ipv4"
- }
- )
- self.assertDictEqual(
- nftables.append("salt", table="", chain=""), ret
- )
- with patch.dict(nftables.__opts__, {"test": True}):
- ret.update(
- {
- "result": None,
- "comment": "nftables rule for salt needs"
- " to be set (a) for ipv4",
- }
- )
- self.assertDictEqual(
- nftables.append("salt", table="", chain=""), ret
- )
- with patch.dict(nftables.__opts__, {"test": False}):
- mock = MagicMock(
- side_effect=[
- {"result": True, "comment": ""},
- {"result": False, "comment": ""},
- ]
- )
- with patch.dict(nftables.__salt__, {"nftables.append": mock}):
- ret.update(
- {
- "changes": {"locale": "salt"},
- "comment": "Set nftables rule for salt"
- " to: a for ipv4",
- "result": True,
- }
- )
- self.assertDictEqual(
- nftables.append("salt", table="", chain=""), ret
- )
- ret.update(
- {
- "changes": {},
- "comment": "Failed to set nftables"
- " rule for salt.\nAttempted rule was"
- " a for ipv4.\n",
- "result": False,
- }
- )
- self.assertDictEqual(
- nftables.append("salt", table="", chain=""), ret
- )
- def test_insert(self):
- """
- Test to insert a rule into a chain
- """
- ret = {"name": "salt", "changes": {}, "result": True, "comment": ""}
- mock = MagicMock(return_value=[])
- with patch.object(nftables, "_STATE_INTERNAL_KEYWORDS", mock):
- mock = MagicMock(return_value={"result": True, "comment": "", "rule": "a"})
- with patch.dict(nftables.__salt__, {"nftables.build_rule": mock}):
- mock = MagicMock(
- side_effect=[
- {"result": True, "comment": ""},
- {"result": False, "comment": ""},
- {"result": False, "comment": ""},
- {"result": False, "comment": ""},
- ]
- )
- with patch.dict(nftables.__salt__, {"nftables.check": mock}):
- ret.update(
- {
- "comment": "nftables rule for salt already"
- " set for ipv4 (a)"
- }
- )
- self.assertDictEqual(
- nftables.insert("salt", table="", chain=""), ret
- )
- with patch.dict(nftables.__opts__, {"test": True}):
- ret.update(
- {
- "result": None,
- "comment": "nftables rule for salt"
- " needs to be set for ipv4 (a)",
- }
- )
- self.assertDictEqual(
- nftables.insert("salt", table="", chain=""), ret
- )
- with patch.dict(nftables.__opts__, {"test": False}):
- mock = MagicMock(
- side_effect=[
- {"result": True, "comment": ""},
- {"result": False, "comment": ""},
- ]
- )
- with patch.dict(nftables.__salt__, {"nftables.insert": mock}):
- ret.update(
- {
- "changes": {"locale": "salt"},
- "comment": "Set nftables rule for"
- " salt to: a for ipv4",
- "result": True,
- }
- )
- self.assertDictEqual(
- nftables.insert(
- "salt", table="", chain="", position=""
- ),
- ret,
- )
- ret.update(
- {
- "changes": {},
- "comment": "Failed to set nftables"
- " rule for salt.\nAttempted rule was"
- " a",
- "result": False,
- }
- )
- self.assertDictEqual(
- nftables.insert(
- "salt", table="", chain="", position=""
- ),
- ret,
- )
- def test_delete(self):
- """
- Test to delete a rule to a chain
- """
- ret = {"name": "salt", "changes": {}, "result": None, "comment": ""}
- mock = MagicMock(return_value=[])
- with patch.object(nftables, "_STATE_INTERNAL_KEYWORDS", mock):
- mock = MagicMock(return_value={"result": True, "comment": "", "rule": "a"})
- with patch.dict(nftables.__salt__, {"nftables.build_rule": mock}):
- mock = MagicMock(
- side_effect=[
- {"result": False, "comment": ""},
- {"result": True, "comment": ""},
- {"result": True, "comment": ""},
- {"result": True, "comment": ""},
- ]
- )
- with patch.dict(nftables.__salt__, {"nftables.check": mock}):
- ret.update(
- {
- "comment": "nftables rule for salt"
- " already absent for ipv4 (a)",
- "result": True,
- }
- )
- self.assertDictEqual(
- nftables.delete("salt", table="", chain=""), ret
- )
- with patch.dict(nftables.__opts__, {"test": True}):
- ret.update(
- {
- "result": None,
- "comment": "nftables rule for salt needs"
- " to be deleted for ipv4 (a)",
- }
- )
- self.assertDictEqual(
- nftables.delete("salt", table="", chain=""), ret
- )
- with patch.dict(nftables.__opts__, {"test": False}):
- mock = MagicMock(
- side_effect=[
- {"result": True, "comment": ""},
- {"result": False, "comment": ""},
- ]
- )
- with patch.dict(nftables.__salt__, {"nftables.delete": mock}):
- ret.update(
- {
- "result": True,
- "changes": {"locale": "salt"},
- "comment": "Delete nftables rule" " for salt a",
- }
- )
- self.assertDictEqual(
- nftables.delete(
- "salt", table="", chain="", position=""
- ),
- ret,
- )
- ret.update(
- {
- "result": False,
- "changes": {},
- "comment": "Failed to delete nftables"
- " rule for salt.\nAttempted rule was a",
- }
- )
- self.assertDictEqual(
- nftables.delete(
- "salt", table="", chain="", position=""
- ),
- ret,
- )
- def test_flush(self):
- """
- Test to flush current nftables state
- """
- ret = {"name": "salt", "changes": {}, "result": None, "comment": ""}
- mock = MagicMock(return_value=[])
- with patch.object(nftables, "_STATE_INTERNAL_KEYWORDS", mock):
- mock = MagicMock(
- side_effect=[
- {"result": False, "comment": ""},
- {"result": True, "comment": ""},
- {"result": True, "comment": ""},
- {"result": True, "comment": ""},
- ]
- )
- with patch.dict(nftables.__salt__, {"nftables.check_table": mock}):
- ret.update(
- {
- "comment": "Failed to flush table in family"
- " ipv4, table does not exist.",
- "result": False,
- }
- )
- self.assertDictEqual(nftables.flush("salt", table="", chain=""), ret)
- mock = MagicMock(
- side_effect=[
- {"result": False, "comment": ""},
- {"result": True, "comment": ""},
- {"result": True, "comment": ""},
- ]
- )
- with patch.dict(nftables.__salt__, {"nftables.check_chain": mock}):
- ret.update(
- {
- "comment": "Failed to flush chain in table"
- " in family ipv4, chain does not exist."
- }
- )
- self.assertDictEqual(
- nftables.flush("salt", table="", chain=""), ret
- )
- mock = MagicMock(
- side_effect=[
- {"result": True, "comment": ""},
- {"result": False, "comment": ""},
- ]
- )
- with patch.dict(nftables.__salt__, {"nftables.flush": mock}):
- ret.update(
- {
- "changes": {"locale": "salt"},
- "comment": "Flush nftables rules in table"
- " chain ipv4 family",
- "result": True,
- }
- )
- self.assertDictEqual(
- nftables.flush("salt", table="", chain=""), ret
- )
- ret.update(
- {
- "changes": {},
- "comment": "Failed to flush" " nftables rules",
- "result": False,
- }
- )
- self.assertDictEqual(
- nftables.flush("salt", table="", chain=""), ret
- )
|