1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368 |
- # -*- coding: utf-8 -*-
- # TODO: Update skipped tests to expect dictionary results from the execution
- # module functions.
- # Import Python libs
- from __future__ import absolute_import, print_function, unicode_literals
- import os.path
- import random
- import string
- import sys
- # pylint: disable=3rd-party-module-not-gated
- import pkg_resources
- from pkg_resources import DistributionNotFound
- import pytest
- # Import Salt libs
- import salt.config
- import salt.loader
- import salt.modules.boto_vpc as boto_vpc
- from salt.exceptions import CommandExecutionError, SaltInvocationError
- # Import 3rd-party libs
- from salt.ext import six
- # pylint: disable=import-error
- from salt.ext.six.moves import range # pylint: disable=redefined-builtin
- from salt.modules.boto_vpc import _maybe_set_name_tag, _maybe_set_tags
- from salt.utils.versions import LooseVersion
- # Import Salt Testing libs
- from tests.support.mixins import LoaderModuleMockMixin
- from tests.support.mock import MagicMock, patch
- from tests.support.runtests import RUNTIME_VARS
- from tests.support.unit import TestCase, skipIf
- # pylint: enable=3rd-party-module-not-gated
- # pylint: disable=no-name-in-module,unused-import
- try:
- import boto
- boto.ENDPOINTS_PATH = os.path.join(
- RUNTIME_VARS.TESTS_DIR, "unit/files/endpoints.json"
- )
- import boto3
- from boto.exception import BotoServerError
- HAS_BOTO = True
- except ImportError:
- HAS_BOTO = False
- try:
- import moto
- from moto import mock_ec2_deprecated
- HAS_MOTO = True
- except ImportError:
- HAS_MOTO = False
- def mock_ec2_deprecated(self):
- """
- if the mock_ec2_deprecated function is not available due to import failure
- this replaces the decorated function with stub_function.
- Allows boto_vpc unit tests to use the @mock_ec2_deprecated decorator
- without a "NameError: name 'mock_ec2_deprecated' is not defined" error.
- """
- def stub_function(self):
- pass
- return stub_function
- # pylint: enable=import-error,no-name-in-module,unused-import
- # the boto_vpc module relies on the connect_to_region() method
- # which was added in boto 2.8.0
- # https://github.com/boto/boto/commit/33ac26b416fbb48a60602542b4ce15dcc7029f12
- required_boto_version = "2.8.0"
- required_moto_version = "1.0.0"
- region = "us-east-1"
- access_key = "GKTADJGHEIQSXMKKRBJ08H"
- secret_key = "askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs"
- conn_parameters = {
- "region": region,
- "key": access_key,
- "keyid": secret_key,
- "profile": {},
- }
- cidr_block = "10.0.0.0/24"
- dhcp_options_parameters = {
- "domain_name": "example.com",
- "domain_name_servers": ["1.2.3.4"],
- "ntp_servers": ["5.6.7.8"],
- "netbios_name_servers": ["10.0.0.1"],
- "netbios_node_type": 2,
- }
- network_acl_entry_parameters = ("fake", 100, -1, "allow", cidr_block)
- dhcp_options_parameters.update(conn_parameters)
- def _has_required_boto():
- """
- Returns True/False boolean depending on if Boto is installed and correct
- version.
- """
- if not HAS_BOTO:
- return False
- elif LooseVersion(boto.__version__) < LooseVersion(required_boto_version):
- return False
- else:
- return True
- def _get_boto_version():
- """
- Returns the boto version
- """
- if not HAS_BOTO:
- return False
- return LooseVersion(boto.__version__)
- def _get_moto_version():
- """
- Returns the moto version
- """
- try:
- return LooseVersion(six.text_type(moto.__version__))
- except AttributeError:
- try:
- return LooseVersion(pkg_resources.get_distribution("moto").version)
- except DistributionNotFound:
- return False
- def _has_required_moto():
- """
- Returns True/False boolean depending on if Moto is installed and correct
- version.
- """
- if not HAS_MOTO:
- return False
- else:
- if _get_moto_version() < LooseVersion(required_moto_version):
- return False
- return True
- @skipIf(HAS_BOTO is False, "The boto module must be installed.")
- @skipIf(HAS_MOTO is False, "The moto module must be installed.")
- @skipIf(
- _has_required_boto() is False,
- "The boto module must be greater than"
- " or equal to version {}. Installed: {}".format(
- required_boto_version, _get_boto_version() if HAS_BOTO else "None"
- ),
- )
- @skipIf(
- _has_required_moto() is False,
- "The moto version must be >= to version {}. Installed: {}".format(
- required_moto_version, _get_moto_version() if HAS_MOTO else "None"
- ),
- )
- class BotoVpcTestCaseBase(TestCase, LoaderModuleMockMixin):
- conn3 = None
- def setup_loader_modules(self):
- self.opts = opts = salt.config.DEFAULT_MINION_OPTS.copy()
- utils = salt.loader.utils(
- opts, whitelist=["boto", "boto3", "args", "systemd", "path", "platform"]
- )
- return {boto_vpc: {"__utils__": utils}}
- # Set up MagicMock to replace the boto3 session
- def setUp(self):
- super(BotoVpcTestCaseBase, self).setUp()
- boto_vpc.__init__(self.opts)
- delattr(self, "opts")
- # connections keep getting cached from prior tests, can't find the
- # correct context object to clear it. So randomize the cache key, to prevent any
- # cache hits
- conn_parameters["key"] = "".join(
- random.choice(string.ascii_lowercase + string.digits) for _ in range(50)
- )
- self.patcher = patch("boto3.session.Session")
- self.addCleanup(self.patcher.stop)
- self.addCleanup(delattr, self, "patcher")
- mock_session = self.patcher.start()
- session_instance = mock_session.return_value
- self.conn3 = MagicMock()
- self.addCleanup(delattr, self, "conn3")
- session_instance.client.return_value = self.conn3
- class BotoVpcTestCaseMixin(object):
- conn = None
- def _create_vpc(self, name=None, tags=None):
- """
- Helper function to create a test vpc
- """
- if not self.conn:
- self.conn = boto.vpc.connect_to_region(region)
- vpc = self.conn.create_vpc(cidr_block)
- _maybe_set_name_tag(name, vpc)
- _maybe_set_tags(tags, vpc)
- return vpc
- def _create_subnet(
- self,
- vpc_id,
- cidr_block="10.0.0.0/25",
- name=None,
- tags=None,
- availability_zone=None,
- ):
- """
- Helper function to create a test subnet
- """
- if not self.conn:
- self.conn = boto.vpc.connect_to_region(region)
- subnet = self.conn.create_subnet(
- vpc_id, cidr_block, availability_zone=availability_zone
- )
- _maybe_set_name_tag(name, subnet)
- _maybe_set_tags(tags, subnet)
- return subnet
- def _create_internet_gateway(self, vpc_id, name=None, tags=None):
- """
- Helper function to create a test internet gateway
- """
- if not self.conn:
- self.conn = boto.vpc.connect_to_region(region)
- igw = self.conn.create_internet_gateway()
- _maybe_set_name_tag(name, igw)
- _maybe_set_tags(tags, igw)
- return igw
- def _create_customer_gateway(self, vpc_id, name=None, tags=None):
- """
- Helper function to create a test customer gateway
- """
- if not self.conn:
- self.conn = boto.vpc.connect_to_region(region)
- gw = self.conn.create_customer_gateway(vpc_id)
- _maybe_set_name_tag(name, gw)
- _maybe_set_tags(tags, gw)
- return gw
- def _create_dhcp_options(
- self,
- domain_name="example.com",
- domain_name_servers=None,
- ntp_servers=None,
- netbios_name_servers=None,
- netbios_node_type=2,
- ):
- """
- Helper function to create test dchp options
- """
- if not netbios_name_servers:
- netbios_name_servers = ["10.0.0.1"]
- if not ntp_servers:
- ntp_servers = ["5.6.7.8"]
- if not domain_name_servers:
- domain_name_servers = ["1.2.3.4"]
- if not self.conn:
- self.conn = boto.vpc.connect_to_region(region)
- return self.conn.create_dhcp_options(
- domain_name=domain_name,
- domain_name_servers=domain_name_servers,
- ntp_servers=ntp_servers,
- netbios_name_servers=netbios_name_servers,
- netbios_node_type=netbios_node_type,
- )
- def _create_network_acl(self, vpc_id):
- """
- Helper function to create test network acl
- """
- if not self.conn:
- self.conn = boto.vpc.connect_to_region(region)
- return self.conn.create_network_acl(vpc_id)
- def _create_network_acl_entry(
- self,
- network_acl_id,
- rule_number,
- protocol,
- rule_action,
- cidr_block,
- egress=None,
- icmp_code=None,
- icmp_type=None,
- port_range_from=None,
- port_range_to=None,
- ):
- """
- Helper function to create test network acl entry
- """
- if not self.conn:
- self.conn = boto.vpc.connect_to_region(region)
- return self.conn.create_network_acl_entry(
- network_acl_id,
- rule_number,
- protocol,
- rule_action,
- cidr_block,
- egress=egress,
- icmp_code=icmp_code,
- icmp_type=icmp_type,
- port_range_from=port_range_from,
- port_range_to=port_range_to,
- )
- def _create_route_table(self, vpc_id, name=None, tags=None):
- """
- Helper function to create a test route table
- """
- if not self.conn:
- self.conn = boto.vpc.connect_to_region(region)
- rtbl = self.conn.create_route_table(vpc_id)
- _maybe_set_name_tag(name, rtbl)
- _maybe_set_tags(tags, rtbl)
- return rtbl
- @skipIf(
- sys.version_info > (3, 6),
- "Disabled for 3.7+ pending https://github.com/spulec/moto/issues/1706.",
- )
- class BotoVpcTestCase(BotoVpcTestCaseBase, BotoVpcTestCaseMixin):
- """
- TestCase for salt.modules.boto_vpc module
- """
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_checking_if_a_vpc_exists_by_id_and_a_vpc_exists_the_vpc_exists_method_returns_true(
- self,
- ):
- """
- Tests checking vpc existence via id when the vpc already exists
- """
- vpc = self._create_vpc()
- vpc_exists_result = boto_vpc.exists(vpc_id=vpc.id, **conn_parameters)
- self.assertTrue(vpc_exists_result["exists"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_checking_if_a_vpc_exists_by_id_and_a_vpc_does_not_exist_the_vpc_exists_method_returns_false(
- self,
- ):
- """
- Tests checking vpc existence via id when the vpc does not exist
- """
- self._create_vpc() # Created to ensure that the filters are applied correctly
- vpc_exists_result = boto_vpc.exists(vpc_id="fake", **conn_parameters)
- self.assertFalse(vpc_exists_result["exists"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_checking_if_a_vpc_exists_by_name_and_a_vpc_exists_the_vpc_exists_method_returns_true(
- self,
- ):
- """
- Tests checking vpc existence via name when vpc exists
- """
- self._create_vpc(name="test")
- vpc_exists_result = boto_vpc.exists(name="test", **conn_parameters)
- self.assertTrue(vpc_exists_result["exists"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_checking_if_a_vpc_exists_by_name_and_a_vpc_does_not_exist_the_vpc_exists_method_returns_false(
- self,
- ):
- """
- Tests checking vpc existence via name when vpc does not exist
- """
- self._create_vpc() # Created to ensure that the filters are applied correctly
- vpc_exists_result = boto_vpc.exists(name="test", **conn_parameters)
- self.assertFalse(vpc_exists_result["exists"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_checking_if_a_vpc_exists_by_tags_and_a_vpc_exists_the_vpc_exists_method_returns_true(
- self,
- ):
- """
- Tests checking vpc existence via tag when vpc exists
- """
- self._create_vpc(tags={"test": "testvalue"})
- vpc_exists_result = boto_vpc.exists(
- tags={"test": "testvalue"}, **conn_parameters
- )
- self.assertTrue(vpc_exists_result["exists"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_checking_if_a_vpc_exists_by_tags_and_a_vpc_does_not_exist_the_vpc_exists_method_returns_false(
- self,
- ):
- """
- Tests checking vpc existence via tag when vpc does not exist
- """
- self._create_vpc() # Created to ensure that the filters are applied correctly
- vpc_exists_result = boto_vpc.exists(
- tags={"test": "testvalue"}, **conn_parameters
- )
- self.assertFalse(vpc_exists_result["exists"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_checking_if_a_vpc_exists_by_cidr_and_a_vpc_exists_the_vpc_exists_method_returns_true(
- self,
- ):
- """
- Tests checking vpc existence via cidr when vpc exists
- """
- self._create_vpc()
- vpc_exists_result = boto_vpc.exists(cidr="10.0.0.0/24", **conn_parameters)
- self.assertTrue(vpc_exists_result["exists"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_checking_if_a_vpc_exists_by_cidr_and_a_vpc_does_not_exist_the_vpc_exists_method_returns_false(
- self,
- ):
- """
- Tests checking vpc existence via cidr when vpc does not exist
- """
- self._create_vpc() # Created to ensure that the filters are applied correctly
- vpc_exists_result = boto_vpc.exists(cidr="10.10.10.10/24", **conn_parameters)
- self.assertFalse(vpc_exists_result["exists"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_checking_if_a_vpc_exists_but_providing_no_filters_the_vpc_exists_method_raises_a_salt_invocation_error(
- self,
- ):
- """
- Tests checking vpc existence when no filters are provided
- """
- with self.assertRaisesRegex(
- SaltInvocationError,
- "At least one of the following "
- "must be provided: vpc_id, vpc_name, "
- "cidr or tags.",
- ):
- boto_vpc.exists(**conn_parameters)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_get_vpc_id_method_when_filtering_by_name(self):
- """
- Tests getting vpc id when filtering by name
- """
- vpc = self._create_vpc(name="test")
- get_id_result = boto_vpc.get_id(name="test", **conn_parameters)
- self.assertEqual(vpc.id, get_id_result["id"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_get_vpc_id_method_when_filtering_by_invalid_name(self):
- """
- Tests getting vpc id when filtering by invalid name
- """
- self._create_vpc(name="test")
- get_id_result = boto_vpc.get_id(name="test_fake", **conn_parameters)
- self.assertEqual(get_id_result["id"], None)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_get_vpc_id_method_when_filtering_by_cidr(self):
- """
- Tests getting vpc id when filtering by cidr
- """
- vpc = self._create_vpc()
- get_id_result = boto_vpc.get_id(cidr="10.0.0.0/24", **conn_parameters)
- self.assertEqual(vpc.id, get_id_result["id"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_get_vpc_id_method_when_filtering_by_invalid_cidr(self):
- """
- Tests getting vpc id when filtering by invalid cidr
- """
- self._create_vpc()
- get_id_result = boto_vpc.get_id(cidr="10.10.10.10/24", **conn_parameters)
- self.assertEqual(get_id_result["id"], None)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_get_vpc_id_method_when_filtering_by_tags(self):
- """
- Tests getting vpc id when filtering by tags
- """
- vpc = self._create_vpc(tags={"test": "testvalue"})
- get_id_result = boto_vpc.get_id(tags={"test": "testvalue"}, **conn_parameters)
- self.assertEqual(vpc.id, get_id_result["id"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_get_vpc_id_method_when_filtering_by_invalid_tags(self):
- """
- Tests getting vpc id when filtering by invalid tags
- """
- self._create_vpc(tags={"test": "testvalue"})
- get_id_result = boto_vpc.get_id(
- tags={"test": "fake-testvalue"}, **conn_parameters
- )
- self.assertEqual(get_id_result["id"], None)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_get_vpc_id_method_when_not_providing_filters_raises_a_salt_invocation_error(
- self,
- ):
- """
- Tests getting vpc id but providing no filters
- """
- with self.assertRaisesRegex(
- SaltInvocationError,
- "At least one of the following must be provided: vpc_id, vpc_name, cidr or tags.",
- ):
- boto_vpc.get_id(**conn_parameters)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_get_vpc_id_method_when_more_than_one_vpc_is_matched_raises_a_salt_command_execution_error(
- self,
- ):
- """
- Tests getting vpc id but providing no filters
- """
- vpc1 = self._create_vpc(name="vpc-test1")
- vpc2 = self._create_vpc(name="vpc-test2")
- with self.assertRaisesRegex(
- CommandExecutionError, "Found more than one VPC matching the criteria."
- ):
- boto_vpc.get_id(cidr="10.0.0.0/24", **conn_parameters)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_a_vpc_succeeds_the_create_vpc_method_returns_true(self):
- """
- tests True VPC created.
- """
- vpc_creation_result = boto_vpc.create(cidr_block, **conn_parameters)
- self.assertTrue(vpc_creation_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_a_vpc_and_specifying_a_vpc_name_succeeds_the_create_vpc_method_returns_true(
- self,
- ):
- """
- tests True VPC created.
- """
- vpc_creation_result = boto_vpc.create(
- cidr_block, vpc_name="test", **conn_parameters
- )
- self.assertTrue(vpc_creation_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_a_vpc_and_specifying_tags_succeeds_the_create_vpc_method_returns_true(
- self,
- ):
- """
- tests True VPC created.
- """
- vpc_creation_result = boto_vpc.create(
- cidr_block, tags={"test": "value"}, **conn_parameters
- )
- self.assertTrue(vpc_creation_result)
- @mock_ec2_deprecated
- @skipIf(True, "Disabled pending https://github.com/spulec/moto/issues/493")
- def test_that_when_creating_a_vpc_fails_the_create_vpc_method_returns_false(self):
- """
- tests False VPC not created.
- """
- with patch(
- "moto.ec2.models.VPCBackend.create_vpc",
- side_effect=BotoServerError(400, "Mocked error"),
- ):
- vpc_creation_result = boto_vpc.create(cidr_block, **conn_parameters)
- self.assertFalse(vpc_creation_result["created"])
- self.assertTrue("error" in vpc_creation_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_deleting_an_existing_vpc_the_delete_vpc_method_returns_true(
- self,
- ):
- """
- Tests deleting an existing vpc
- """
- vpc = self._create_vpc()
- vpc_deletion_result = boto_vpc.delete(vpc.id, **conn_parameters)
- self.assertTrue(vpc_deletion_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_deleting_a_non_existent_vpc_the_delete_vpc_method_returns_false(
- self,
- ):
- """
- Tests deleting a non-existent vpc
- """
- delete_vpc_result = boto_vpc.delete("1234", **conn_parameters)
- self.assertFalse(delete_vpc_result["deleted"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_describing_vpc_by_id_it_returns_the_dict_of_properties_returns_true(
- self,
- ):
- """
- Tests describing parameters via vpc id if vpc exist
- """
- # With moto 0.4.25 through 0.4.30, is_default is set to True.
- # 0.4.24 and older and 0.4.31 and newer, is_default is False
- if LooseVersion("0.4.25") <= _get_moto_version() < LooseVersion("0.4.31"):
- is_default = True
- else:
- is_default = False
- vpc = self._create_vpc(name="test", tags={"test": "testvalue"})
- describe_vpc = boto_vpc.describe(vpc_id=vpc.id, **conn_parameters)
- vpc_properties = dict(
- id=vpc.id,
- cidr_block=six.text_type(cidr_block),
- is_default=is_default,
- state="available",
- tags={"Name": "test", "test": "testvalue"},
- dhcp_options_id="dopt-7a8b9c2d",
- region="us-east-1",
- instance_tenancy="default",
- )
- self.assertEqual(describe_vpc, {"vpc": vpc_properties})
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_describing_vpc_by_id_it_returns_the_dict_of_properties_returns_false(
- self,
- ):
- """
- Tests describing parameters via vpc id if vpc does not exist
- """
- vpc = self._create_vpc(name="test", tags={"test": "testvalue"})
- describe_vpc = boto_vpc.describe(vpc_id="vpc-fake", **conn_parameters)
- self.assertFalse(describe_vpc["vpc"])
- @mock_ec2_deprecated
- @skipIf(True, "Disabled pending https://github.com/spulec/moto/issues/493")
- def test_that_when_describing_vpc_by_id_on_connection_error_it_returns_error(self):
- """
- Tests describing parameters failure
- """
- vpc = self._create_vpc(name="test", tags={"test": "testvalue"})
- with patch(
- "moto.ec2.models.VPCBackend.get_all_vpcs",
- side_effect=BotoServerError(400, "Mocked error"),
- ):
- describe_result = boto_vpc.describe(vpc_id=vpc.id, **conn_parameters)
- self.assertTrue("error" in describe_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_describing_vpc_but_providing_no_vpc_id_the_describe_method_raises_a_salt_invocation_error(
- self,
- ):
- """
- Tests describing vpc without vpc id
- """
- with self.assertRaisesRegex(
- SaltInvocationError, "A valid vpc id or name needs to be specified."
- ):
- boto_vpc.describe(vpc_id=None, **conn_parameters)
- @skipIf(HAS_BOTO is False, "The boto module must be installed.")
- @skipIf(HAS_MOTO is False, "The moto module must be installed.")
- @skipIf(
- _has_required_boto() is False,
- "The boto module must be greater than"
- " or equal to version {}. Installed: {}".format(
- required_boto_version, _get_boto_version()
- ),
- )
- @skipIf(
- _has_required_moto() is False,
- "The moto version must be >= to version {0}".format(required_moto_version),
- )
- @skipIf(
- sys.version_info > (3, 6),
- "Disabled for 3.7+ pending https://github.com/spulec/moto/issues/1706.",
- )
- class BotoVpcSubnetsTestCase(BotoVpcTestCaseBase, BotoVpcTestCaseMixin):
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_get_subnet_association_single_subnet(self):
- """
- tests that given multiple subnet ids in the same VPC that the VPC ID is
- returned. The test is valuable because it uses a string as an argument
- to subnets as opposed to a list.
- """
- vpc = self._create_vpc()
- subnet = self._create_subnet(vpc.id)
- subnet_association = boto_vpc.get_subnet_association(
- subnets=subnet.id, **conn_parameters
- )
- self.assertEqual(vpc.id, subnet_association["vpc_id"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_get_subnet_association_multiple_subnets_same_vpc(self):
- """
- tests that given multiple subnet ids in the same VPC that the VPC ID is
- returned.
- """
- vpc = self._create_vpc()
- subnet_a = self._create_subnet(vpc.id, "10.0.0.0/25")
- subnet_b = self._create_subnet(vpc.id, "10.0.0.128/25")
- subnet_association = boto_vpc.get_subnet_association(
- [subnet_a.id, subnet_b.id], **conn_parameters
- )
- self.assertEqual(vpc.id, subnet_association["vpc_id"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_get_subnet_association_multiple_subnets_different_vpc(self):
- """
- tests that given multiple subnet ids in different VPCs that False is
- returned.
- """
- vpc_a = self._create_vpc()
- vpc_b = self.conn.create_vpc(cidr_block)
- subnet_a = self._create_subnet(vpc_a.id, "10.0.0.0/24")
- subnet_b = self._create_subnet(vpc_b.id, "10.0.0.0/24")
- subnet_association = boto_vpc.get_subnet_association(
- [subnet_a.id, subnet_b.id], **conn_parameters
- )
- self.assertEqual(set(subnet_association["vpc_ids"]), set([vpc_a.id, vpc_b.id]))
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_a_subnet_succeeds_the_create_subnet_method_returns_true(
- self,
- ):
- """
- Tests creating a subnet successfully
- """
- vpc = self._create_vpc()
- subnet_creation_result = boto_vpc.create_subnet(
- vpc.id, "10.0.0.0/24", **conn_parameters
- )
- self.assertTrue(subnet_creation_result["created"])
- self.assertTrue("id" in subnet_creation_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_a_subnet_and_specifying_a_name_succeeds_the_create_subnet_method_returns_true(
- self,
- ):
- """
- Tests creating a subnet successfully when specifying a name
- """
- vpc = self._create_vpc()
- subnet_creation_result = boto_vpc.create_subnet(
- vpc.id, "10.0.0.0/24", subnet_name="test", **conn_parameters
- )
- self.assertTrue(subnet_creation_result["created"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_a_subnet_and_specifying_tags_succeeds_the_create_subnet_method_returns_true(
- self,
- ):
- """
- Tests creating a subnet successfully when specifying a tag
- """
- vpc = self._create_vpc()
- subnet_creation_result = boto_vpc.create_subnet(
- vpc.id, "10.0.0.0/24", tags={"test": "testvalue"}, **conn_parameters
- )
- self.assertTrue(subnet_creation_result["created"])
- @mock_ec2_deprecated
- @skipIf(True, "Disabled pending https://github.com/spulec/moto/issues/493")
- def test_that_when_creating_a_subnet_fails_the_create_subnet_method_returns_error(
- self,
- ):
- """
- Tests creating a subnet failure
- """
- vpc = self._create_vpc()
- with patch(
- "moto.ec2.models.SubnetBackend.create_subnet",
- side_effect=BotoServerError(400, "Mocked error"),
- ):
- subnet_creation_result = boto_vpc.create_subnet(
- vpc.id, "10.0.0.0/24", **conn_parameters
- )
- self.assertTrue("error" in subnet_creation_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_deleting_an_existing_subnet_the_delete_subnet_method_returns_true(
- self,
- ):
- """
- Tests deleting an existing subnet
- """
- vpc = self._create_vpc()
- subnet = self._create_subnet(vpc.id)
- subnet_deletion_result = boto_vpc.delete_subnet(
- subnet_id=subnet.id, **conn_parameters
- )
- self.assertTrue(subnet_deletion_result["deleted"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_deleting_a_non_existent_subnet_the_delete_vpc_method_returns_false(
- self,
- ):
- """
- Tests deleting a subnet that doesn't exist
- """
- delete_subnet_result = boto_vpc.delete_subnet(
- subnet_id="1234", **conn_parameters
- )
- self.assertTrue("error" in delete_subnet_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_checking_if_a_subnet_exists_by_id_the_subnet_exists_method_returns_true(
- self,
- ):
- """
- Tests checking if a subnet exists when it does exist
- """
- vpc = self._create_vpc()
- subnet = self._create_subnet(vpc.id)
- subnet_exists_result = boto_vpc.subnet_exists(
- subnet_id=subnet.id, **conn_parameters
- )
- self.assertTrue(subnet_exists_result["exists"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_a_subnet_does_not_exist_the_subnet_exists_method_returns_false(
- self,
- ):
- """
- Tests checking if a subnet exists which doesn't exist
- """
- subnet_exists_result = boto_vpc.subnet_exists("fake", **conn_parameters)
- self.assertFalse(subnet_exists_result["exists"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_checking_if_a_subnet_exists_by_name_the_subnet_exists_method_returns_true(
- self,
- ):
- """
- Tests checking subnet existence by name
- """
- vpc = self._create_vpc()
- self._create_subnet(vpc.id, name="test")
- subnet_exists_result = boto_vpc.subnet_exists(name="test", **conn_parameters)
- self.assertTrue(subnet_exists_result["exists"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_checking_if_a_subnet_exists_by_name_the_subnet_does_not_exist_the_subnet_method_returns_false(
- self,
- ):
- """
- Tests checking subnet existence by name when it doesn't exist
- """
- vpc = self._create_vpc()
- self._create_subnet(vpc.id)
- subnet_exists_result = boto_vpc.subnet_exists(name="test", **conn_parameters)
- self.assertFalse(subnet_exists_result["exists"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_checking_if_a_subnet_exists_by_tags_the_subnet_exists_method_returns_true(
- self,
- ):
- """
- Tests checking subnet existence by tag
- """
- vpc = self._create_vpc()
- self._create_subnet(vpc.id, tags={"test": "testvalue"})
- subnet_exists_result = boto_vpc.subnet_exists(
- tags={"test": "testvalue"}, **conn_parameters
- )
- self.assertTrue(subnet_exists_result["exists"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_checking_if_a_subnet_exists_by_tags_the_subnet_does_not_exist_the_subnet_method_returns_false(
- self,
- ):
- """
- Tests checking subnet existence by tag when subnet doesn't exist
- """
- vpc = self._create_vpc()
- self._create_subnet(vpc.id)
- subnet_exists_result = boto_vpc.subnet_exists(
- tags={"test": "testvalue"}, **conn_parameters
- )
- self.assertFalse(subnet_exists_result["exists"])
- @mock_ec2_deprecated
- @skipIf(True, "Disabled pending https://github.com/spulec/moto/issues/493")
- def test_that_when_checking_if_a_subnet_exists_but_providing_no_filters_the_subnet_exists_method_raises_a_salt_invocation_error(
- self,
- ):
- """
- Tests checking subnet existence without any filters
- """
- with self.assertRaisesRegex(
- SaltInvocationError,
- "At least one of the following must be specified: "
- "subnet id, cidr, subnet_name, tags, or zones.",
- ):
- boto_vpc.subnet_exists(**conn_parameters)
- @skipIf(True, "Skip these tests while investigating failures")
- @mock_ec2_deprecated
- def test_that_describe_subnet_by_id_for_existing_subnet_returns_correct_data(self):
- """
- Tests describing a subnet by id.
- """
- vpc = self._create_vpc()
- subnet = self._create_subnet(vpc.id)
- describe_subnet_results = boto_vpc.describe_subnet(
- region=region, key=secret_key, keyid=access_key, subnet_id=subnet.id
- )
- self.assertEqual(
- set(describe_subnet_results["subnet"].keys()),
- set(["id", "cidr_block", "availability_zone", "tags"]),
- )
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_describe_subnet_by_id_for_non_existent_subnet_returns_none(self):
- """
- Tests describing a non-existent subnet by id.
- """
- self._create_vpc()
- describe_subnet_results = boto_vpc.describe_subnet(
- region=region, key=secret_key, keyid=access_key, subnet_id="subnet-a1b2c3"
- )
- self.assertEqual(describe_subnet_results["subnet"], None)
- @skipIf(True, "Skip these tests while investigating failures")
- @mock_ec2_deprecated
- def test_that_describe_subnet_by_name_for_existing_subnet_returns_correct_data(
- self,
- ):
- """
- Tests describing a subnet by name.
- """
- vpc = self._create_vpc()
- self._create_subnet(vpc.id, name="test")
- describe_subnet_results = boto_vpc.describe_subnet(
- region=region, key=secret_key, keyid=access_key, subnet_name="test"
- )
- self.assertEqual(
- set(describe_subnet_results["subnet"].keys()),
- set(["id", "cidr_block", "availability_zone", "tags"]),
- )
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_describe_subnet_by_name_for_non_existent_subnet_returns_none(self):
- """
- Tests describing a non-existent subnet by id.
- """
- self._create_vpc()
- describe_subnet_results = boto_vpc.describe_subnet(
- region=region, key=secret_key, keyid=access_key, subnet_name="test"
- )
- self.assertEqual(describe_subnet_results["subnet"], None)
- @skipIf(True, "Skip these tests while investigating failures")
- @mock_ec2_deprecated
- def test_that_describe_subnets_by_id_for_existing_subnet_returns_correct_data(self):
- """
- Tests describing multiple subnets by id.
- """
- vpc = self._create_vpc()
- subnet1 = self._create_subnet(vpc.id)
- subnet2 = self._create_subnet(vpc.id)
- describe_subnet_results = boto_vpc.describe_subnets(
- region=region,
- key=secret_key,
- keyid=access_key,
- subnet_ids=[subnet1.id, subnet2.id],
- )
- self.assertEqual(len(describe_subnet_results["subnets"]), 2)
- self.assertEqual(
- set(describe_subnet_results["subnets"][0].keys()),
- set(["id", "cidr_block", "availability_zone", "tags"]),
- )
- @skipIf(True, "Skip these tests while investigating failures")
- @mock_ec2_deprecated
- def test_that_describe_subnets_by_name_for_existing_subnets_returns_correct_data(
- self,
- ):
- """
- Tests describing multiple subnets by id.
- """
- vpc = self._create_vpc()
- self._create_subnet(vpc.id, name="subnet1")
- self._create_subnet(vpc.id, name="subnet2")
- describe_subnet_results = boto_vpc.describe_subnets(
- region=region,
- key=secret_key,
- keyid=access_key,
- subnet_names=["subnet1", "subnet2"],
- )
- self.assertEqual(len(describe_subnet_results["subnets"]), 2)
- self.assertEqual(
- set(describe_subnet_results["subnets"][0].keys()),
- set(["id", "cidr_block", "availability_zone", "tags"]),
- )
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_create_subnet_passes_availability_zone(self):
- """
- Tests that the availability_zone kwarg is passed on to _create_resource
- """
- vpc = self._create_vpc()
- self._create_subnet(vpc.id, name="subnet1", availability_zone="us-east-1a")
- describe_subnet_results = boto_vpc.describe_subnets(
- region=region, key=secret_key, keyid=access_key, subnet_names=["subnet1"]
- )
- self.assertEqual(
- describe_subnet_results["subnets"][0]["availability_zone"], "us-east-1a"
- )
- @skipIf(HAS_BOTO is False, "The boto module must be installed.")
- @skipIf(HAS_MOTO is False, "The moto module must be installed.")
- @skipIf(
- _has_required_boto() is False,
- "The boto module must be greater than"
- " or equal to version {}. Installed: {}".format(
- required_boto_version, _get_boto_version()
- ),
- )
- @skipIf(
- sys.version_info > (3, 6),
- "Disabled for 3.7+ pending https://github.com/spulec/moto/issues/1706.",
- )
- class BotoVpcInternetGatewayTestCase(BotoVpcTestCaseBase, BotoVpcTestCaseMixin):
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_an_internet_gateway_the_create_internet_gateway_method_returns_true(
- self,
- ):
- """
- Tests creating an internet gateway successfully (with no vpc id or name)
- """
- igw_creation_result = boto_vpc.create_internet_gateway(
- region=region, key=secret_key, keyid=access_key
- )
- self.assertTrue(igw_creation_result.get("created"))
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_an_internet_gateway_with_non_existent_vpc_the_create_internet_gateway_method_returns_an_error(
- self,
- ):
- """
- Tests that creating an internet gateway for a non-existent VPC fails.
- """
- igw_creation_result = boto_vpc.create_internet_gateway(
- region=region, key=secret_key, keyid=access_key, vpc_name="non-existent-vpc"
- )
- self.assertTrue("error" in igw_creation_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_an_internet_gateway_with_vpc_name_specified_the_create_internet_gateway_method_returns_true(
- self,
- ):
- """
- Tests creating an internet gateway with vpc name specified.
- """
- self._create_vpc(name="test-vpc")
- igw_creation_result = boto_vpc.create_internet_gateway(
- region=region, key=secret_key, keyid=access_key, vpc_name="test-vpc"
- )
- self.assertTrue(igw_creation_result.get("created"))
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_an_internet_gateway_with_vpc_id_specified_the_create_internet_gateway_method_returns_true(
- self,
- ):
- """
- Tests creating an internet gateway with vpc name specified.
- """
- vpc = self._create_vpc()
- igw_creation_result = boto_vpc.create_internet_gateway(
- region=region, key=secret_key, keyid=access_key, vpc_id=vpc.id
- )
- self.assertTrue(igw_creation_result.get("created"))
- @skipIf(HAS_BOTO is False, "The boto module must be installed.")
- @skipIf(HAS_MOTO is False, "The moto module must be installed.")
- @skipIf(
- _has_required_boto() is False,
- "The boto module must be greater than"
- " or equal to version {}. Installed: {}".format(
- required_boto_version, _get_boto_version()
- ),
- )
- @skipIf(
- sys.version_info > (3, 6),
- "Disabled for 3.7+ pending https://github.com/spulec/moto/issues/1706.",
- )
- class BotoVpcNatGatewayTestCase(BotoVpcTestCaseBase, BotoVpcTestCaseMixin):
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_an_nat_gateway_the_create_nat_gateway_method_returns_true(
- self,
- ):
- """
- Tests creating an nat gateway successfully (with subnet_id specified)
- """
- vpc = self._create_vpc()
- subnet = self._create_subnet(
- vpc.id, name="subnet1", availability_zone="us-east-1a"
- )
- ngw_creation_result = boto_vpc.create_nat_gateway(
- subnet_id=subnet.id, region=region, key=secret_key, keyid=access_key
- )
- self.assertTrue(ngw_creation_result.get("created"))
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_an_nat_gateway_with_non_existent_subnet_the_create_nat_gateway_method_returns_an_error(
- self,
- ):
- """
- Tests that creating an nat gateway for a non-existent subnet fails.
- """
- ngw_creation_result = boto_vpc.create_nat_gateway(
- region=region,
- key=secret_key,
- keyid=access_key,
- subnet_name="non-existent-subnet",
- )
- self.assertTrue("error" in ngw_creation_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_an_nat_gateway_with_subnet_name_specified_the_create_nat_gateway_method_returns_true(
- self,
- ):
- """
- Tests creating an nat gateway with subnet name specified.
- """
- vpc = self._create_vpc()
- subnet = self._create_subnet(
- vpc.id, name="test-subnet", availability_zone="us-east-1a"
- )
- ngw_creation_result = boto_vpc.create_nat_gateway(
- region=region, key=secret_key, keyid=access_key, subnet_name="test-subnet"
- )
- self.assertTrue(ngw_creation_result.get("created"))
- @skipIf(HAS_BOTO is False, "The boto module must be installed.")
- @skipIf(HAS_MOTO is False, "The moto module must be installed.")
- @skipIf(
- _has_required_boto() is False,
- "The boto module must be greater than"
- " or equal to version {}. Installed: {}".format(
- required_boto_version, _get_boto_version()
- ),
- )
- class BotoVpcCustomerGatewayTestCase(BotoVpcTestCaseBase, BotoVpcTestCaseMixin):
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_creating_a_customer_gateway_the_create_customer_gateway_method_returns_true(
- self,
- ):
- """
- Tests creating an internet gateway successfully (with no vpc id or name)
- """
- gw_creation_result = boto_vpc.create_customer_gateway(
- "ipsec.1", "10.1.1.1", None
- )
- self.assertTrue(gw_creation_result.get("created"))
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_checking_if_a_subnet_exists_by_id_the_subnet_exists_method_returns_true(
- self,
- ):
- """
- Tests checking if a subnet exists when it does exist
- """
- gw_creation_result = boto_vpc.create_customer_gateway(
- "ipsec.1", "10.1.1.1", None
- )
- gw_exists_result = boto_vpc.customer_gateway_exists(
- customer_gateway_id=gw_creation_result["id"]
- )
- self.assertTrue(gw_exists_result["exists"])
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_a_subnet_does_not_exist_the_subnet_exists_method_returns_false(
- self,
- ):
- """
- Tests checking if a subnet exists which doesn't exist
- """
- gw_exists_result = boto_vpc.customer_gateway_exists("fake")
- self.assertFalse(gw_exists_result["exists"])
- @skipIf(HAS_BOTO is False, "The boto module must be installed.")
- @skipIf(HAS_MOTO is False, "The moto module must be installed.")
- @skipIf(
- _has_required_boto() is False,
- "The boto module must be greater than"
- " or equal to version {}. Installed: {}".format(
- required_boto_version, _get_boto_version()
- ),
- )
- @skipIf(
- _has_required_moto() is False,
- "The moto version must be >= to version {0}".format(required_moto_version),
- )
- @skipIf(
- sys.version_info > (3, 6),
- "Disabled for 3.7+ pending https://github.com/spulec/moto/issues/1706.",
- )
- class BotoVpcDHCPOptionsTestCase(BotoVpcTestCaseBase, BotoVpcTestCaseMixin):
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_dhcp_options_succeeds_the_create_dhcp_options_method_returns_true(
- self,
- ):
- """
- Tests creating dhcp options successfully
- """
- dhcp_options_creation_result = boto_vpc.create_dhcp_options(
- **dhcp_options_parameters
- )
- self.assertTrue(dhcp_options_creation_result["created"])
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_creating_dhcp_options_and_specifying_a_name_succeeds_the_create_dhcp_options_method_returns_true(
- self,
- ):
- """
- Tests creating dchp options with name successfully
- """
- dhcp_options_creation_result = boto_vpc.create_dhcp_options(
- dhcp_options_name="test", **dhcp_options_parameters
- )
- self.assertTrue(dhcp_options_creation_result["created"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_dhcp_options_and_specifying_tags_succeeds_the_create_dhcp_options_method_returns_true(
- self,
- ):
- """
- Tests creating dchp options with tag successfully
- """
- dhcp_options_creation_result = boto_vpc.create_dhcp_options(
- tags={"test": "testvalue"}, **dhcp_options_parameters
- )
- self.assertTrue(dhcp_options_creation_result["created"])
- @mock_ec2_deprecated
- @skipIf(True, "Disabled pending https://github.com/spulec/moto/issues/493")
- def test_that_when_creating_dhcp_options_fails_the_create_dhcp_options_method_returns_error(
- self,
- ):
- """
- Tests creating dhcp options failure
- """
- with patch(
- "moto.ec2.models.DHCPOptionsSetBackend.create_dhcp_options",
- side_effect=BotoServerError(400, "Mocked error"),
- ):
- r = dhcp_options_creation_result = boto_vpc.create_dhcp_options(
- **dhcp_options_parameters
- )
- self.assertTrue("error" in r)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_associating_an_existing_dhcp_options_set_to_an_existing_vpc_the_associate_dhcp_options_method_returns_true(
- self,
- ):
- """
- Tests associating existing dchp options successfully
- """
- vpc = self._create_vpc()
- dhcp_options = self._create_dhcp_options()
- dhcp_options_association_result = boto_vpc.associate_dhcp_options_to_vpc(
- dhcp_options.id, vpc.id, **conn_parameters
- )
- self.assertTrue(dhcp_options_association_result["associated"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_associating_a_non_existent_dhcp_options_set_to_an_existing_vpc_the_associate_dhcp_options_method_returns_error(
- self,
- ):
- """
- Tests associating non-existanct dhcp options successfully
- """
- vpc = self._create_vpc()
- dhcp_options_association_result = boto_vpc.associate_dhcp_options_to_vpc(
- "fake", vpc.id, **conn_parameters
- )
- self.assertTrue("error" in dhcp_options_association_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_associating_an_existing_dhcp_options_set_to_a_non_existent_vpc_the_associate_dhcp_options_method_returns_false(
- self,
- ):
- """
- Tests associating existing dhcp options to non-existence vpc
- """
- dhcp_options = self._create_dhcp_options()
- dhcp_options_association_result = boto_vpc.associate_dhcp_options_to_vpc(
- dhcp_options.id, "fake", **conn_parameters
- )
- self.assertTrue("error" in dhcp_options_association_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_dhcp_options_set_to_an_existing_vpc_succeeds_the_associate_new_dhcp_options_method_returns_true(
- self,
- ):
- """
- Tests creation/association of dchp options to an existing vpc successfully
- """
- vpc = self._create_vpc()
- dhcp_creation_result = boto_vpc.create_dhcp_options(
- vpc_id=vpc.id, **dhcp_options_parameters
- )
- self.assertTrue(dhcp_creation_result["created"])
- @mock_ec2_deprecated
- @skipIf(True, "Disabled pending https://github.com/spulec/moto/issues/493")
- def test_that_when_creating_and_associating_dhcp_options_set_to_an_existing_vpc_fails_creating_the_dhcp_options_the_associate_new_dhcp_options_method_raises_exception(
- self,
- ):
- """
- Tests creation failure during creation/association of dchp options to an existing vpc
- """
- vpc = self._create_vpc()
- with patch(
- "moto.ec2.models.DHCPOptionsSetBackend.create_dhcp_options",
- side_effect=BotoServerError(400, "Mocked error"),
- ):
- r = boto_vpc.associate_new_dhcp_options_to_vpc(
- vpc.id, **dhcp_options_parameters
- )
- self.assertTrue("error" in r)
- @mock_ec2_deprecated
- @skipIf(True, "Disabled pending https://github.com/spulec/moto/issues/493")
- def test_that_when_creating_and_associating_dhcp_options_set_to_an_existing_vpc_fails_associating_the_dhcp_options_the_associate_new_dhcp_options_method_raises_exception(
- self,
- ):
- """
- Tests association failure during creation/association of dchp options to existing vpc
- """
- vpc = self._create_vpc()
- with patch(
- "moto.ec2.models.DHCPOptionsSetBackend.associate_dhcp_options",
- side_effect=BotoServerError(400, "Mocked error"),
- ):
- r = boto_vpc.associate_new_dhcp_options_to_vpc(
- vpc.id, **dhcp_options_parameters
- )
- self.assertTrue("error" in r)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_dhcp_options_set_to_a_non_existent_vpc_the_dhcp_options_the_associate_new_dhcp_options_method_returns_false(
- self,
- ):
- """
- Tests creation/association of dhcp options to non-existent vpc
- """
- r = boto_vpc.create_dhcp_options(vpc_name="fake", **dhcp_options_parameters)
- self.assertTrue("error" in r)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_dhcp_options_exists_the_dhcp_options_exists_method_returns_true(
- self,
- ):
- """
- Tests existence of dhcp options successfully
- """
- dhcp_options = self._create_dhcp_options()
- dhcp_options_exists_result = boto_vpc.dhcp_options_exists(
- dhcp_options.id, **conn_parameters
- )
- self.assertTrue(dhcp_options_exists_result["exists"])
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_dhcp_options_do_not_exist_the_dhcp_options_exists_method_returns_false(
- self,
- ):
- """
- Tests existence of dhcp options failure
- """
- r = boto_vpc.dhcp_options_exists("fake", **conn_parameters)
- self.assertFalse(r["exists"])
- @mock_ec2_deprecated
- @skipIf(True, "Disabled pending https://github.com/spulec/moto/issues/493")
- def test_that_when_checking_if_dhcp_options_exists_but_providing_no_filters_the_dhcp_options_exists_method_raises_a_salt_invocation_error(
- self,
- ):
- """
- Tests checking dhcp option existence with no filters
- """
- with self.assertRaisesRegex(
- SaltInvocationError,
- "At least one of the following must be provided: id, name, or tags.",
- ):
- boto_vpc.dhcp_options_exists(**conn_parameters)
- @skipIf(HAS_BOTO is False, "The boto module must be installed.")
- @skipIf(HAS_MOTO is False, "The moto module must be installed.")
- @skipIf(
- _has_required_boto() is False,
- "The boto module must be greater than"
- " or equal to version {}. Installed: {}".format(
- required_boto_version, _get_boto_version()
- ),
- )
- @skipIf(
- sys.version_info > (3, 6),
- "Disabled for 3.7+ pending https://github.com/spulec/moto/issues/1706.",
- )
- class BotoVpcNetworkACLTestCase(BotoVpcTestCaseBase, BotoVpcTestCaseMixin):
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_network_acl_for_an_existing_vpc_the_create_network_acl_method_returns_true(
- self,
- ):
- """
- Tests creation of network acl with existing vpc
- """
- vpc = self._create_vpc()
- network_acl_creation_result = boto_vpc.create_network_acl(
- vpc.id, **conn_parameters
- )
- self.assertTrue(network_acl_creation_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_network_acl_for_an_existing_vpc_and_specifying_a_name_the_create_network_acl_method_returns_true(
- self,
- ):
- """
- Tests creation of network acl via name with an existing vpc
- """
- vpc = self._create_vpc()
- network_acl_creation_result = boto_vpc.create_network_acl(
- vpc.id, network_acl_name="test", **conn_parameters
- )
- self.assertTrue(network_acl_creation_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_network_acl_for_an_existing_vpc_and_specifying_tags_the_create_network_acl_method_returns_true(
- self,
- ):
- """
- Tests creation of network acl via tags with an existing vpc
- """
- vpc = self._create_vpc()
- network_acl_creation_result = boto_vpc.create_network_acl(
- vpc.id, tags={"test": "testvalue"}, **conn_parameters
- )
- self.assertTrue(network_acl_creation_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_network_acl_for_a_non_existent_vpc_the_create_network_acl_method_returns_an_error(
- self,
- ):
- """
- Tests creation of network acl with a non-existent vpc
- """
- network_acl_creation_result = boto_vpc.create_network_acl(
- "fake", **conn_parameters
- )
- self.assertTrue("error" in network_acl_creation_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_creating_network_acl_fails_the_create_network_acl_method_returns_false(
- self,
- ):
- """
- Tests creation of network acl failure
- """
- vpc = self._create_vpc()
- with patch(
- "moto.ec2.models.NetworkACLBackend.create_network_acl",
- side_effect=BotoServerError(400, "Mocked error"),
- ):
- network_acl_creation_result = boto_vpc.create_network_acl(
- vpc.id, **conn_parameters
- )
- self.assertFalse(network_acl_creation_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_deleting_an_existing_network_acl_the_delete_network_acl_method_returns_true(
- self,
- ):
- """
- Tests deletion of existing network acl successfully
- """
- vpc = self._create_vpc()
- network_acl = self._create_network_acl(vpc.id)
- network_acl_deletion_result = boto_vpc.delete_network_acl(
- network_acl.id, **conn_parameters
- )
- self.assertTrue(network_acl_deletion_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_deleting_a_non_existent_network_acl_the_delete_network_acl_method_returns_an_error(
- self,
- ):
- """
- Tests deleting a non-existent network acl
- """
- network_acl_deletion_result = boto_vpc.delete_network_acl(
- "fake", **conn_parameters
- )
- self.assertTrue("error" in network_acl_deletion_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_a_network_acl_exists_the_network_acl_exists_method_returns_true(
- self,
- ):
- """
- Tests existence of network acl
- """
- vpc = self._create_vpc()
- network_acl = self._create_network_acl(vpc.id)
- network_acl_deletion_result = boto_vpc.network_acl_exists(
- network_acl.id, **conn_parameters
- )
- self.assertTrue(network_acl_deletion_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_a_network_acl_does_not_exist_the_network_acl_exists_method_returns_false(
- self,
- ):
- """
- Tests checking network acl does not exist
- """
- network_acl_deletion_result = boto_vpc.network_acl_exists(
- "fake", **conn_parameters
- )
- self.assertFalse(network_acl_deletion_result["exists"])
- @mock_ec2_deprecated
- @skipIf(True, "Disabled pending https://github.com/spulec/moto/issues/493")
- def test_that_when_checking_if_network_acl_exists_but_providing_no_filters_the_network_acl_exists_method_raises_a_salt_invocation_error(
- self,
- ):
- """
- Tests checking existence of network acl with no filters
- """
- with self.assertRaisesRegex(
- SaltInvocationError,
- "At least one of the following must be provided: id, name, or tags.",
- ):
- boto_vpc.dhcp_options_exists(**conn_parameters)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_creating_a_network_acl_entry_successfully_the_create_network_acl_entry_method_returns_true(
- self,
- ):
- """
- Tests creating network acl successfully
- """
- vpc = self._create_vpc()
- network_acl = self._create_network_acl(vpc.id)
- network_acl_entry_creation_result = boto_vpc.create_network_acl_entry(
- network_acl.id, *network_acl_entry_parameters, **conn_parameters
- )
- self.assertTrue(network_acl_entry_creation_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_creating_a_network_acl_entry_for_a_non_existent_network_acl_the_create_network_acl_entry_method_returns_false(
- self,
- ):
- """
- Tests creating network acl entry for non-existent network acl
- """
- network_acl_entry_creation_result = boto_vpc.create_network_acl_entry(
- *network_acl_entry_parameters, **conn_parameters
- )
- self.assertFalse(network_acl_entry_creation_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_replacing_a_network_acl_entry_successfully_the_replace_network_acl_entry_method_returns_true(
- self,
- ):
- """
- Tests replacing network acl entry successfully
- """
- vpc = self._create_vpc()
- network_acl = self._create_network_acl(vpc.id)
- self._create_network_acl_entry(network_acl.id, *network_acl_entry_parameters)
- network_acl_entry_creation_result = boto_vpc.replace_network_acl_entry(
- network_acl.id, *network_acl_entry_parameters, **conn_parameters
- )
- self.assertTrue(network_acl_entry_creation_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_replacing_a_network_acl_entry_for_a_non_existent_network_acl_the_replace_network_acl_entry_method_returns_false(
- self,
- ):
- """
- Tests replacing a network acl entry for a non-existent network acl
- """
- network_acl_entry_creation_result = boto_vpc.create_network_acl_entry(
- *network_acl_entry_parameters, **conn_parameters
- )
- self.assertFalse(network_acl_entry_creation_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_deleting_an_existing_network_acl_entry_the_delete_network_acl_entry_method_returns_true(
- self,
- ):
- """
- Tests deleting existing network acl entry successfully
- """
- vpc = self._create_vpc()
- network_acl = self._create_network_acl(vpc.id)
- network_acl_entry = self._create_network_acl_entry(
- network_acl.id, *network_acl_entry_parameters
- )
- network_acl_entry_deletion_result = boto_vpc.delete_network_acl_entry(
- network_acl_entry.id, 100, **conn_parameters
- )
- self.assertTrue(network_acl_entry_deletion_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_deleting_a_non_existent_network_acl_entry_the_delete_network_acl_entry_method_returns_false(
- self,
- ):
- """
- Tests deleting a non-existent network acl entry
- """
- network_acl_entry_deletion_result = boto_vpc.delete_network_acl_entry(
- "fake", 100, **conn_parameters
- )
- self.assertFalse(network_acl_entry_deletion_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_associating_an_existing_network_acl_to_an_existing_subnet_the_associate_network_acl_method_returns_true(
- self,
- ):
- """
- Tests association of existing network acl to existing subnet successfully
- """
- vpc = self._create_vpc()
- network_acl = self._create_network_acl(vpc.id)
- subnet = self._create_subnet(vpc.id)
- network_acl_association_result = boto_vpc.associate_network_acl_to_subnet(
- network_acl.id, subnet.id, **conn_parameters
- )
- self.assertTrue(network_acl_association_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_associating_a_non_existent_network_acl_to_an_existing_subnet_the_associate_network_acl_method_returns_an_error(
- self,
- ):
- """
- Tests associating a non-existent network acl to existing subnet failure
- """
- vpc = self._create_vpc()
- subnet = self._create_subnet(vpc.id)
- network_acl_association_result = boto_vpc.associate_network_acl_to_subnet(
- "fake", subnet.id, **conn_parameters
- )
- self.assertTrue("error" in network_acl_association_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_associating_an_existing_network_acl_to_a_non_existent_subnet_the_associate_network_acl_method_returns_false(
- self,
- ):
- """
- Tests associating an existing network acl to a non-existent subnet
- """
- vpc = self._create_vpc()
- network_acl = self._create_network_acl(vpc.id)
- network_acl_association_result = boto_vpc.associate_network_acl_to_subnet(
- network_acl.id, "fake", **conn_parameters
- )
- self.assertFalse(network_acl_association_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_creating_and_associating_a_network_acl_to_a_subnet_succeeds_the_associate_new_network_acl_to_subnet_method_returns_true(
- self,
- ):
- """
- Tests creating/associating a network acl to a subnet to a new network
- """
- vpc = self._create_vpc()
- subnet = self._create_subnet(vpc.id)
- network_acl_creation_and_association_result = boto_vpc.associate_new_network_acl_to_subnet(
- vpc.id, subnet.id, **conn_parameters
- )
- self.assertTrue(network_acl_creation_and_association_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_creating_and_associating_a_network_acl_to_a_subnet_and_specifying_a_name_succeeds_the_associate_new_network_acl_to_subnet_method_returns_true(
- self,
- ):
- """
- Tests creation/association of a network acl to subnet via name successfully
- """
- vpc = self._create_vpc()
- subnet = self._create_subnet(vpc.id)
- network_acl_creation_and_association_result = boto_vpc.associate_new_network_acl_to_subnet(
- vpc.id, subnet.id, network_acl_name="test", **conn_parameters
- )
- self.assertTrue(network_acl_creation_and_association_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_creating_and_associating_a_network_acl_to_a_subnet_and_specifying_tags_succeeds_the_associate_new_network_acl_to_subnet_method_returns_true(
- self,
- ):
- """
- Tests creating/association of a network acl to a subnet via tag successfully
- """
- vpc = self._create_vpc()
- subnet = self._create_subnet(vpc.id)
- network_acl_creation_and_association_result = boto_vpc.associate_new_network_acl_to_subnet(
- vpc.id, subnet.id, tags={"test": "testvalue"}, **conn_parameters
- )
- self.assertTrue(network_acl_creation_and_association_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_creating_and_associating_a_network_acl_to_a_non_existent_subnet_the_associate_new_network_acl_to_subnet_method_returns_false(
- self,
- ):
- """
- Tests creation/association of a network acl to a non-existent vpc
- """
- vpc = self._create_vpc()
- network_acl_creation_and_association_result = boto_vpc.associate_new_network_acl_to_subnet(
- vpc.id, "fake", **conn_parameters
- )
- self.assertFalse(network_acl_creation_and_association_result)
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_that_when_creating_a_network_acl_to_a_non_existent_vpc_the_associate_new_network_acl_to_subnet_method_returns_an_error(
- self,
- ):
- """
- Tests creation/association of network acl to a non-existent subnet
- """
- vpc = self._create_vpc()
- subnet = self._create_subnet(vpc.id)
- network_acl_creation_result = boto_vpc.create_network_acl(
- vpc_name="fake", subnet_id=subnet.id, **conn_parameters
- )
- self.assertTrue("error" in network_acl_creation_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_disassociating_network_acl_succeeds_the_disassociate_network_acl_method_should_return_true(
- self,
- ):
- """
- Tests disassociation of network acl success
- """
- vpc = self._create_vpc()
- subnet = self._create_subnet(vpc.id)
- dhcp_disassociate_result = boto_vpc.disassociate_network_acl(
- subnet.id, vpc_id=vpc.id, **conn_parameters
- )
- self.assertTrue(dhcp_disassociate_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_disassociating_network_acl_for_a_non_existent_vpc_the_disassociate_network_acl_method_should_return_false(
- self,
- ):
- """
- Tests disassociation of network acl from non-existent vpc
- """
- vpc = self._create_vpc()
- subnet = self._create_subnet(vpc.id)
- dhcp_disassociate_result = boto_vpc.disassociate_network_acl(
- subnet.id, vpc_id="fake", **conn_parameters
- )
- self.assertFalse(dhcp_disassociate_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_disassociating_network_acl_for_a_non_existent_subnet_the_disassociate_network_acl_method_should_return_false(
- self,
- ):
- """
- Tests disassociation of network acl from non-existent subnet
- """
- vpc = self._create_vpc()
- dhcp_disassociate_result = boto_vpc.disassociate_network_acl(
- "fake", vpc_id=vpc.id, **conn_parameters
- )
- self.assertFalse(dhcp_disassociate_result)
- @skipIf(HAS_BOTO is False, "The boto module must be installed.")
- @skipIf(HAS_MOTO is False, "The moto module must be installed.")
- @skipIf(
- _has_required_boto() is False,
- "The boto module must be greater than"
- " or equal to version {}. Installed: {}".format(
- required_boto_version, _get_boto_version()
- ),
- )
- class BotoVpcRouteTablesTestCase(BotoVpcTestCaseBase, BotoVpcTestCaseMixin):
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_creating_a_route_table_succeeds_the_create_route_table_method_returns_true(
- self,
- ):
- """
- Tests creating route table successfully
- """
- vpc = self._create_vpc()
- route_table_creation_result = boto_vpc.create_route_table(
- vpc.id, **conn_parameters
- )
- self.assertTrue(route_table_creation_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_creating_a_route_table_on_a_non_existent_vpc_the_create_route_table_method_returns_false(
- self,
- ):
- """
- Tests creating route table on a non-existent vpc
- """
- route_table_creation_result = boto_vpc.create_route_table(
- "fake", **conn_parameters
- )
- self.assertTrue(route_table_creation_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_deleting_a_route_table_succeeds_the_delete_route_table_method_returns_true(
- self,
- ):
- """
- Tests deleting route table successfully
- """
- vpc = self._create_vpc()
- route_table = self._create_route_table(vpc.id)
- route_table_deletion_result = boto_vpc.delete_route_table(
- route_table.id, **conn_parameters
- )
- self.assertTrue(route_table_deletion_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_deleting_a_non_existent_route_table_the_delete_route_table_method_returns_false(
- self,
- ):
- """
- Tests deleting non-existent route table
- """
- route_table_deletion_result = boto_vpc.delete_route_table(
- "fake", **conn_parameters
- )
- self.assertFalse(route_table_deletion_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_route_table_exists_the_route_table_exists_method_returns_true(
- self,
- ):
- """
- Tests existence of route table success
- """
- vpc = self._create_vpc()
- route_table = self._create_route_table(vpc.id)
- route_table_existence_result = boto_vpc.route_table_exists(
- route_table.id, **conn_parameters
- )
- self.assertTrue(route_table_existence_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_route_table_does_not_exist_the_route_table_exists_method_returns_false(
- self,
- ):
- """
- Tests existence of route table failure
- """
- route_table_existence_result = boto_vpc.route_table_exists(
- "fake", **conn_parameters
- )
- self.assertFalse(route_table_existence_result)
- @mock_ec2_deprecated
- @skipIf(True, "Disabled pending https://github.com/spulec/moto/issues/493")
- def test_that_when_checking_if_a_route_table_exists_but_providing_no_filters_the_route_table_exists_method_raises_a_salt_invocation_error(
- self,
- ):
- """
- Tests checking route table without filters
- """
- with self.assertRaisesRegex(
- SaltInvocationError,
- "At least one of the following must be provided: id, name, or tags.",
- ):
- boto_vpc.dhcp_options_exists(**conn_parameters)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_associating_a_route_table_succeeds_the_associate_route_table_method_should_return_the_association_id(
- self,
- ):
- """
- Tests associating route table successfully
- """
- vpc = self._create_vpc()
- subnet = self._create_subnet(vpc.id)
- route_table = self._create_route_table(vpc.id)
- association_id = boto_vpc.associate_route_table(
- route_table.id, subnet.id, **conn_parameters
- )
- self.assertTrue(association_id)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_associating_a_route_table_with_a_non_existent_route_table_the_associate_route_table_method_should_return_false(
- self,
- ):
- """
- Tests associating of route table to non-existent route table
- """
- vpc = self._create_vpc()
- subnet = self._create_subnet(vpc.id)
- association_id = boto_vpc.associate_route_table(
- "fake", subnet.id, **conn_parameters
- )
- self.assertFalse(association_id)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_associating_a_route_table_with_a_non_existent_subnet_the_associate_route_table_method_should_return_false(
- self,
- ):
- """
- Tests associating of route table with non-existent subnet
- """
- vpc = self._create_vpc()
- route_table = self._create_route_table(vpc.id)
- association_id = boto_vpc.associate_route_table(
- route_table.id, "fake", **conn_parameters
- )
- self.assertFalse(association_id)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_disassociating_a_route_table_succeeds_the_disassociate_route_table_method_should_return_true(
- self,
- ):
- """
- Tests disassociation of a route
- """
- vpc = self._create_vpc()
- subnet = self._create_subnet(vpc.id)
- route_table = self._create_route_table(vpc.id)
- association_id = self._associate_route_table(route_table.id, subnet.id)
- dhcp_disassociate_result = boto_vpc.disassociate_route_table(
- association_id, **conn_parameters
- )
- self.assertTrue(dhcp_disassociate_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_creating_a_route_succeeds_the_create_route_method_should_return_true(
- self,
- ):
- """
- Tests successful creation of a route
- """
- vpc = self._create_vpc()
- route_table = self._create_route_table(vpc.id)
- route_creation_result = boto_vpc.create_route(
- route_table.id, cidr_block, **conn_parameters
- )
- self.assertTrue(route_creation_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_creating_a_route_with_a_non_existent_route_table_the_create_route_method_should_return_false(
- self,
- ):
- """
- Tests creation of route on non-existent route table
- """
- route_creation_result = boto_vpc.create_route(
- "fake", cidr_block, **conn_parameters
- )
- self.assertFalse(route_creation_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_deleting_a_route_succeeds_the_delete_route_method_should_return_true(
- self,
- ):
- """
- Tests deleting route from route table
- """
- vpc = self._create_vpc()
- route_table = self._create_route_table(vpc.id)
- route_deletion_result = boto_vpc.delete_route(
- route_table.id, cidr_block, **conn_parameters
- )
- self.assertTrue(route_deletion_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_deleting_a_route_with_a_non_existent_route_table_the_delete_route_method_should_return_false(
- self,
- ):
- """
- Tests deleting route from a non-existent route table
- """
- route_deletion_result = boto_vpc.delete_route(
- "fake", cidr_block, **conn_parameters
- )
- self.assertFalse(route_deletion_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_replacing_a_route_succeeds_the_replace_route_method_should_return_true(
- self,
- ):
- """
- Tests replacing route successfully
- """
- vpc = self._create_vpc()
- route_table = self._create_route_table(vpc.id)
- route_replacing_result = boto_vpc.replace_route(
- route_table.id, cidr_block, **conn_parameters
- )
- self.assertTrue(route_replacing_result)
- @mock_ec2_deprecated
- @skipIf(True, "Moto has not implemented this feature. Skipping for now.")
- def test_that_when_replacing_a_route_with_a_non_existent_route_table_the_replace_route_method_should_return_false(
- self,
- ):
- """
- Tests replacing a route when the route table doesn't exist
- """
- route_replacing_result = boto_vpc.replace_route(
- "fake", cidr_block, **conn_parameters
- )
- self.assertFalse(route_replacing_result)
- @skipIf(HAS_BOTO is False, "The boto module must be installed.")
- @skipIf(HAS_MOTO is False, "The moto module must be installed.")
- @skipIf(
- _has_required_boto() is False,
- "The boto module must be greater than"
- " or equal to version {}. Installed: {}".format(
- required_boto_version, _get_boto_version()
- ),
- )
- @skipIf(
- _has_required_moto() is False,
- "The moto version must be >= to version {0}".format(required_moto_version),
- )
- @skipIf(
- sys.version_info > (3, 6),
- "Disabled for 3.7+ pending https://github.com/spulec/moto/issues/1706.",
- )
- class BotoVpcPeeringConnectionsTest(BotoVpcTestCaseBase, BotoVpcTestCaseMixin):
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_request_vpc_peering_connection(self):
- """
- Run with 2 vpc ids and returns a message
- """
- my_vpc = self._create_vpc()
- other_vpc = self._create_vpc()
- self.assertTrue(
- "msg"
- in boto_vpc.request_vpc_peering_connection(
- name="my_peering",
- requester_vpc_id=my_vpc.id,
- peer_vpc_id=other_vpc.id,
- **conn_parameters
- )
- )
- @mock_ec2_deprecated
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_raises_error_if_both_vpc_name_and_vpc_id_are_specified(self):
- """
- Must specify only one
- """
- my_vpc = self._create_vpc()
- other_vpc = self._create_vpc()
- with self.assertRaises(SaltInvocationError):
- boto_vpc.request_vpc_peering_connection(
- name="my_peering",
- requester_vpc_id=my_vpc.id,
- requester_vpc_name="foobar",
- peer_vpc_id=other_vpc.id,
- **conn_parameters
- )
- boto_vpc.request_vpc_peering_connection(
- name="my_peering",
- requester_vpc_name="my_peering",
- peer_vpc_id=other_vpc.id,
- **conn_parameters
- )
|