test_swarm.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. import pytest
  2. import salt.modules.swarm as swarm
  3. from requests.models import Response
  4. from tests.support.mock import DEFAULT, MagicMock, patch
  5. HAS_DOCKER = False
  6. try:
  7. import docker
  8. HAS_DOCKER = True
  9. except ImportError:
  10. HAS_DOCKER = False
  11. pytestmark = pytest.mark.skipif(
  12. HAS_DOCKER is False, reason="The docker python sdk may not be installed"
  13. )
  14. @pytest.fixture(autouse=True)
  15. def setup_loader():
  16. setup_loader_modules = {swarm: {"__context__": {}}}
  17. with pytest.helpers.loader_mock(setup_loader_modules) as loader_mock:
  18. yield loader_mock
  19. @pytest.fixture
  20. def fake_context_client():
  21. fake_swarm_client = MagicMock()
  22. patch_context = patch.dict(
  23. swarm.__context__, {"client": fake_swarm_client, "server_name": "test swarm"}
  24. )
  25. patch_swarm_token = patch(
  26. "salt.modules.swarm.swarm_tokens", autospec=True, return_value="mocked_token"
  27. )
  28. with patch_context, patch_swarm_token:
  29. yield fake_swarm_client
  30. def test_when_swarm_init_is_called_with_the_same_information_twice_it_should_return_the_docker_error(
  31. fake_context_client,
  32. ):
  33. error_response = Response()
  34. error_response._content = b'{"message":"This node is already part of a swarm. Use \\"docker swarm leave\\" to leave this swarm and join another one."}\n'
  35. error_response.status_code = 503
  36. error_response.reason = "Service Unavailable"
  37. swarm_error_message = 'This node is already part of a swarm. Use "docker swarm leave" to leave this swarm and join another one.'
  38. fake_context_client.swarm.init.side_effect = [
  39. DEFAULT,
  40. docker.errors.APIError(
  41. swarm_error_message,
  42. response=error_response,
  43. explanation=swarm_error_message,
  44. ),
  45. ]
  46. expected_good_result = {
  47. "Comment": "Docker swarm has been initialized on test swarm and the worker/manager Join token is below",
  48. "Tokens": "mocked_token",
  49. }
  50. expected_bad_result = {"Comment": swarm_error_message, "result": False}
  51. first_result = swarm.swarm_init("127.0.0.1", "0.0.0.0", False)
  52. second_result = swarm.swarm_init("127.0.0.1", "0.0.0.0", False)
  53. assert first_result == expected_good_result
  54. assert second_result == expected_bad_result