1
0

test_crypt.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. # coding: utf-8
  2. # python libs
  3. from __future__ import absolute_import
  4. import os
  5. import shutil
  6. import tempfile
  7. import pytest
  8. import salt.utils.files
  9. from salt import crypt
  10. # salt libs
  11. from salt.ext import six
  12. from tests.support.mock import MagicMock, MockCall, mock_open, patch
  13. # salt testing libs
  14. from tests.support.unit import TestCase, skipIf
  15. # third-party libs
  16. try:
  17. import M2Crypto
  18. HAS_M2 = True
  19. except ImportError:
  20. HAS_M2 = False
  21. try:
  22. from Cryptodome.PublicKey import RSA # pylint: disable=unused-import
  23. HAS_PYCRYPTO_RSA = True
  24. except ImportError:
  25. HAS_PYCRYPTO_RSA = False
  26. if not HAS_PYCRYPTO_RSA:
  27. try:
  28. from Crypto.PublicKey import RSA
  29. HAS_PYCRYPTO_RSA = True
  30. except ImportError:
  31. HAS_PYCRYPTO_RSA = False
  32. PRIVKEY_DATA = (
  33. "-----BEGIN RSA PRIVATE KEY-----\n"
  34. "MIIEpAIBAAKCAQEA75GR6ZTv5JOv90Vq8tKhKC7YQnhDIo2hM0HVziTEk5R4UQBW\n"
  35. "a0CKytFMbTONY2msEDwX9iA0x7F5Lgj0X8eD4ZMsYqLzqjWMekLC8bjhxc+EuPo9\n"
  36. "Dygu3mJ2VgRC7XhlFpmdo5NN8J2E7B/CNB3R4hOcMMZNZdi0xLtFoTfwU61UPfFX\n"
  37. "14mV2laqLbvDEfQLJhUTDeFFV8EN5Z4H1ttLP3sMXJvc3EvM0JiDVj4l1TWFUHHz\n"
  38. "eFgCA1Im0lv8i7PFrgW7nyMfK9uDSsUmIp7k6ai4tVzwkTmV5PsriP1ju88Lo3MB\n"
  39. "4/sUmDv/JmlZ9YyzTO3Po8Uz3Aeq9HJWyBWHAQIDAQABAoIBAGOzBzBYZUWRGOgl\n"
  40. "IY8QjTT12dY/ymC05GM6gMobjxuD7FZ5d32HDLu/QrknfS3kKlFPUQGDAbQhbbb0\n"
  41. "zw6VL5NO9mfOPO2W/3FaG1sRgBQcerWonoSSSn8OJwVBHMFLG3a+U1Zh1UvPoiPK\n"
  42. "S734swIM+zFpNYivGPvOm/muF/waFf8tF/47t1cwt/JGXYQnkG/P7z0vp47Irpsb\n"
  43. "Yjw7vPe4BnbY6SppSxscW3KoV7GtJLFKIxAXbxsuJMF/rYe3O3w2VKJ1Sug1VDJl\n"
  44. "/GytwAkSUer84WwP2b07Wn4c5pCnmLslMgXCLkENgi1NnJMhYVOnckxGDZk54hqP\n"
  45. "9RbLnkkCgYEA/yKuWEvgdzYRYkqpzB0l9ka7Y00CV4Dha9Of6GjQi9i4VCJ/UFVr\n"
  46. "UlhTo5y0ZzpcDAPcoZf5CFZsD90a/BpQ3YTtdln2MMCL/Kr3QFmetkmDrt+3wYnX\n"
  47. "sKESfsa2nZdOATRpl1antpwyD4RzsAeOPwBiACj4fkq5iZJBSI0bxrMCgYEA8GFi\n"
  48. "qAjgKh81/Uai6KWTOW2kX02LEMVRrnZLQ9VPPLGid4KZDDk1/dEfxjjkcyOxX1Ux\n"
  49. "Klu4W8ZEdZyzPcJrfk7PdopfGOfrhWzkREK9C40H7ou/1jUecq/STPfSOmxh3Y+D\n"
  50. "ifMNO6z4sQAHx8VaHaxVsJ7SGR/spr0pkZL+NXsCgYEA84rIgBKWB1W+TGRXJzdf\n"
  51. "yHIGaCjXpm2pQMN3LmP3RrcuZWm0vBt94dHcrR5l+u/zc6iwEDTAjJvqdU4rdyEr\n"
  52. "tfkwr7v6TNlQB3WvpWanIPyVzfVSNFX/ZWSsAgZvxYjr9ixw6vzWBXOeOb/Gqu7b\n"
  53. "cvpLkjmJ0wxDhbXtyXKhZA8CgYBZyvcQb+hUs732M4mtQBSD0kohc5TsGdlOQ1AQ\n"
  54. "McFcmbpnzDghkclyW8jzwdLMk9uxEeDAwuxWE/UEvhlSi6qdzxC+Zifp5NBc0fVe\n"
  55. "7lMx2mfJGxj5CnSqQLVdHQHB4zSXkAGB6XHbBd0MOUeuvzDPfs2voVQ4IG3FR0oc\n"
  56. "3/znuwKBgQChZGH3McQcxmLA28aUwOVbWssfXKdDCsiJO+PEXXlL0maO3SbnFn+Q\n"
  57. "Tyf8oHI5cdP7AbwDSx9bUfRPjg9dKKmATBFr2bn216pjGxK0OjYOCntFTVr0psRB\n"
  58. "CrKg52Qrq71/2l4V2NLQZU40Dr1bN9V+Ftd9L0pvpCAEAWpIbLXGDw==\n"
  59. "-----END RSA PRIVATE KEY-----"
  60. )
  61. PUBKEY_DATA = (
  62. "-----BEGIN PUBLIC KEY-----\n"
  63. "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA75GR6ZTv5JOv90Vq8tKh\n"
  64. "KC7YQnhDIo2hM0HVziTEk5R4UQBWa0CKytFMbTONY2msEDwX9iA0x7F5Lgj0X8eD\n"
  65. "4ZMsYqLzqjWMekLC8bjhxc+EuPo9Dygu3mJ2VgRC7XhlFpmdo5NN8J2E7B/CNB3R\n"
  66. "4hOcMMZNZdi0xLtFoTfwU61UPfFX14mV2laqLbvDEfQLJhUTDeFFV8EN5Z4H1ttL\n"
  67. "P3sMXJvc3EvM0JiDVj4l1TWFUHHzeFgCA1Im0lv8i7PFrgW7nyMfK9uDSsUmIp7k\n"
  68. "6ai4tVzwkTmV5PsriP1ju88Lo3MB4/sUmDv/JmlZ9YyzTO3Po8Uz3Aeq9HJWyBWH\n"
  69. "AQIDAQAB\n"
  70. "-----END PUBLIC KEY-----"
  71. )
  72. MSG = b"It's me, Mario"
  73. SIG = (
  74. b"\x07\xf3\xb1\xe7\xdb\x06\xf4_\xe2\xdc\xcb!F\xfb\xbex{W\x1d\xe4E"
  75. b"\xd3\r\xc5\x90\xca(\x05\x1d\x99\x8b\x1aug\x9f\x95>\x94\x7f\xe3+"
  76. b"\x12\xfa\x9c\xd4\xb8\x02]\x0e\xa5\xa3LL\xc3\xa2\x8f+\x83Z\x1b\x17"
  77. b'\xbfT\xd3\xc7\xfd\x0b\xf4\xd7J\xfe^\x86q"I\xa3x\xbc\xd3$\xe9M<\xe1'
  78. b"\x07\xad\xf2_\x9f\xfa\xf7g(~\xd8\xf5\xe7\xda-\xa3Ko\xfc.\x99\xcf"
  79. b"\x9b\xb9\xc1U\x97\x82'\xcb\xc6\x08\xaa\xa0\xe4\xd0\xc1+\xfc\x86"
  80. b'\r\xe4y\xb1#\xd3\x1dS\x96D28\xc4\xd5\r\xd4\x98\x1a44"\xd7\xc2\xb4'
  81. b"]\xa7\x0f\xa7Db\x85G\x8c\xd6\x94!\x8af1O\xf6g\xd7\x03\xfd\xb3\xbc"
  82. b"\xce\x9f\xe7\x015\xb8\x1d]AHK\xa0\x14m\xda=O\xa7\xde\xf2\xff\x9b"
  83. b"\x8e\x83\xc8j\x11\x1a\x98\x85\xde\xc5\x91\x07\x84!\x12^4\xcb\xa8"
  84. b"\x98\x8a\x8a&#\xb9(#?\x80\x15\x9eW\xb5\x12\xd1\x95S\xf2<G\xeb\xf1"
  85. b"\x14H\xb2\xc4>\xc3A\xed\x86x~\xcfU\xd5Q\xfe~\x10\xd2\x9b"
  86. )
  87. @skipIf(not HAS_PYCRYPTO_RSA, "pycrypto >= 2.6 is not available")
  88. @skipIf(HAS_M2, "m2crypto is used by salt.crypt if installed")
  89. class CryptTestCase(TestCase):
  90. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  91. def test_gen_keys(self):
  92. open_priv_wb = MockCall("/keydir{0}keyname.pem".format(os.sep), "wb+")
  93. open_pub_wb = MockCall("/keydir{0}keyname.pub".format(os.sep), "wb+")
  94. with patch.multiple(
  95. os,
  96. umask=MagicMock(),
  97. chmod=MagicMock(),
  98. access=MagicMock(return_value=True),
  99. ):
  100. with patch("salt.utils.files.fopen", mock_open()) as m_open, patch(
  101. "os.path.isfile", return_value=True
  102. ):
  103. result = crypt.gen_keys("/keydir", "keyname", 2048)
  104. assert result == "/keydir{0}keyname.pem".format(os.sep), result
  105. assert open_priv_wb not in m_open.calls
  106. assert open_pub_wb not in m_open.calls
  107. with patch("salt.utils.files.fopen", mock_open()) as m_open, patch(
  108. "os.path.isfile", return_value=False
  109. ):
  110. crypt.gen_keys("/keydir", "keyname", 2048)
  111. assert open_priv_wb in m_open.calls
  112. assert open_pub_wb in m_open.calls
  113. @patch("os.umask", MagicMock())
  114. @patch("os.chmod", MagicMock())
  115. @patch("os.chown", MagicMock(), create=True)
  116. @patch("os.access", MagicMock(return_value=True))
  117. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  118. def test_gen_keys_with_passphrase(self):
  119. key_path = os.path.join(os.sep, "keydir")
  120. open_priv_wb = MockCall(os.path.join(key_path, "keyname.pem"), "wb+")
  121. open_pub_wb = MockCall(os.path.join(key_path, "keyname.pub"), "wb+")
  122. with patch("salt.utils.files.fopen", mock_open()) as m_open, patch(
  123. "os.path.isfile", return_value=True
  124. ):
  125. self.assertEqual(
  126. crypt.gen_keys(key_path, "keyname", 2048, passphrase="password"),
  127. os.path.join(key_path, "keyname.pem"),
  128. )
  129. result = crypt.gen_keys(key_path, "keyname", 2048, passphrase="password")
  130. assert result == os.path.join(key_path, "keyname.pem"), result
  131. assert open_priv_wb not in m_open.calls
  132. assert open_pub_wb not in m_open.calls
  133. with patch("salt.utils.files.fopen", mock_open()) as m_open, patch(
  134. "os.path.isfile", return_value=False
  135. ):
  136. crypt.gen_keys(key_path, "keyname", 2048)
  137. assert open_priv_wb in m_open.calls
  138. assert open_pub_wb in m_open.calls
  139. @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
  140. def test_sign_message(self):
  141. key = RSA.importKey(PRIVKEY_DATA)
  142. with patch("salt.crypt.get_rsa_key", return_value=key):
  143. self.assertEqual(SIG, salt.crypt.sign_message("/keydir/keyname.pem", MSG))
  144. def test_sign_message_with_passphrase(self):
  145. key = RSA.importKey(PRIVKEY_DATA)
  146. with patch("salt.crypt.get_rsa_key", return_value=key):
  147. self.assertEqual(
  148. SIG,
  149. crypt.sign_message("/keydir/keyname.pem", MSG, passphrase="password"),
  150. )
  151. def test_verify_signature(self):
  152. with patch("salt.utils.files.fopen", mock_open(read_data=PUBKEY_DATA)):
  153. self.assertTrue(crypt.verify_signature("/keydir/keyname.pub", MSG, SIG))
  154. @skipIf(not HAS_M2, "m2crypto is not available")
  155. class M2CryptTestCase(TestCase):
  156. @patch("os.umask", MagicMock())
  157. @patch("os.chmod", MagicMock())
  158. @patch("os.access", MagicMock(return_value=True))
  159. @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
  160. def test_gen_keys(self):
  161. with patch("M2Crypto.RSA.RSA.save_pem", MagicMock()) as save_pem:
  162. with patch("M2Crypto.RSA.RSA.save_pub_key", MagicMock()) as save_pub:
  163. with patch("os.path.isfile", return_value=True):
  164. self.assertEqual(
  165. crypt.gen_keys("/keydir", "keyname", 2048),
  166. "/keydir{0}keyname.pem".format(os.sep),
  167. )
  168. save_pem.assert_not_called()
  169. save_pub.assert_not_called()
  170. with patch("os.path.isfile", return_value=False):
  171. self.assertEqual(
  172. crypt.gen_keys("/keydir", "keyname", 2048),
  173. "/keydir{0}keyname.pem".format(os.sep),
  174. )
  175. save_pem.assert_called_once_with(
  176. "/keydir{0}keyname.pem".format(os.sep), cipher=None
  177. )
  178. save_pub.assert_called_once_with(
  179. "/keydir{0}keyname.pub".format(os.sep)
  180. )
  181. @patch("os.umask", MagicMock())
  182. @patch("os.chmod", MagicMock())
  183. @patch("os.chown", MagicMock())
  184. @patch("os.access", MagicMock(return_value=True))
  185. @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
  186. def test_gen_keys_with_passphrase(self):
  187. with patch("M2Crypto.RSA.RSA.save_pem", MagicMock()) as save_pem:
  188. with patch("M2Crypto.RSA.RSA.save_pub_key", MagicMock()) as save_pub:
  189. with patch("os.path.isfile", return_value=True):
  190. self.assertEqual(
  191. crypt.gen_keys(
  192. "/keydir", "keyname", 2048, passphrase="password"
  193. ),
  194. "/keydir{0}keyname.pem".format(os.sep),
  195. )
  196. save_pem.assert_not_called()
  197. save_pub.assert_not_called()
  198. with patch("os.path.isfile", return_value=False):
  199. self.assertEqual(
  200. crypt.gen_keys(
  201. "/keydir", "keyname", 2048, passphrase="password"
  202. ),
  203. "/keydir{0}keyname.pem".format(os.sep),
  204. )
  205. callback = save_pem.call_args[1]["callback"]
  206. save_pem.assert_called_once_with(
  207. "/keydir{0}keyname.pem".format(os.sep),
  208. cipher="des_ede3_cbc",
  209. callback=callback,
  210. )
  211. self.assertEqual(callback(None), b"password")
  212. save_pub.assert_called_once_with(
  213. "/keydir{0}keyname.pub".format(os.sep)
  214. )
  215. def test_sign_message(self):
  216. key = M2Crypto.RSA.load_key_string(six.b(PRIVKEY_DATA))
  217. with patch("salt.crypt.get_rsa_key", return_value=key):
  218. self.assertEqual(SIG, salt.crypt.sign_message("/keydir/keyname.pem", MSG))
  219. def test_sign_message_with_passphrase(self):
  220. key = M2Crypto.RSA.load_key_string(six.b(PRIVKEY_DATA))
  221. with patch("salt.crypt.get_rsa_key", return_value=key):
  222. self.assertEqual(
  223. SIG,
  224. crypt.sign_message("/keydir/keyname.pem", MSG, passphrase="password"),
  225. )
  226. def test_verify_signature(self):
  227. with patch("salt.utils.files.fopen", mock_open(read_data=six.b(PUBKEY_DATA))):
  228. self.assertTrue(crypt.verify_signature("/keydir/keyname.pub", MSG, SIG))
  229. def test_encrypt_decrypt_bin(self):
  230. priv_key = M2Crypto.RSA.load_key_string(six.b(PRIVKEY_DATA))
  231. pub_key = M2Crypto.RSA.load_pub_key_bio(
  232. M2Crypto.BIO.MemoryBuffer(six.b(PUBKEY_DATA))
  233. )
  234. encrypted = salt.crypt.private_encrypt(priv_key, b"salt")
  235. decrypted = salt.crypt.public_decrypt(pub_key, encrypted)
  236. self.assertEqual(b"salt", decrypted)
  237. class TestBadCryptodomePubKey(TestCase):
  238. """
  239. Test that we can load public keys exported by pycrpytodome<=3.4.6
  240. """
  241. TEST_KEY = (
  242. "-----BEGIN RSA PUBLIC KEY-----\n"
  243. "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzLtFhsvfbFDFaUgulSEX\n"
  244. "Gl12XriL1DT78Ef2/u8HHaSMmPie37BLWas/zaHwI6066bIyYQJ/nUCahTaoHM7L\n"
  245. "GlWc0wOU6zyfpihCRQHil05Y6F+olFBoZuYbFPvtp7/hJx/D7I/0n2o/c7M5i3Y2\n"
  246. "3sBxAYNooIQHXHUmPQW6C9iu95ylZDW8JQzYy/EI4vCC8yQMdTK8jK1FQV0Sbwny\n"
  247. "qcMxSyAWDoFbnhh2P2TnO8HOWuUOaXR8ZHOJzVcDl+a6ew+medW090x3K5O1f80D\n"
  248. "+WjgnG6b2HG7VQpOCfM2GALD/FrxicPilvZ38X1aLhJuwjmVE4LAAv8DVNJXohaO\n"
  249. "WQIDAQAB\n"
  250. "-----END RSA PUBLIC KEY-----\n"
  251. )
  252. def setUp(self):
  253. self.test_dir = tempfile.mkdtemp()
  254. self.key_path = os.path.join(self.test_dir, "cryptodom-3.4.6.pub")
  255. with salt.utils.files.fopen(self.key_path, "wb") as fd:
  256. fd.write(self.TEST_KEY.encode())
  257. def tearDown(self):
  258. shutil.rmtree(self.test_dir)
  259. @skipIf(not HAS_M2, "Skip when m2crypto is not installed")
  260. def test_m2_bad_key(self):
  261. """
  262. Load public key with an invalid header using m2crypto and validate it
  263. """
  264. key = salt.crypt.get_rsa_pub_key(self.key_path)
  265. assert key.check_key() == 1
  266. @skipIf(HAS_M2, "Skip when m2crypto is installed")
  267. def test_crypto_bad_key(self):
  268. """
  269. Load public key with an invalid header and validate it without m2crypto
  270. """
  271. key = salt.crypt.get_rsa_pub_key(self.key_path)
  272. assert key.can_encrypt()
  273. class TestM2CryptoRegression47124(TestCase):
  274. SIGNATURE = (
  275. b"w\xac\xfe18o\xeb\xfb\x14+\x9e\xd1\xb7\x7fe}\xec\xd6\xe1P\x9e\xab"
  276. b"\xb5\x07\xe0\xc1\xfd\xda#\x04Z\x8d\x7f\x0b\x1f}:~\xb2s\x860u\x02N"
  277. b'\xd4q"\xb7\x86*\x8f\x1f\xd0\x9d\x11\x92\xc5~\xa68\xac>\x12H\xc2%y,'
  278. b"\xe6\xceU\x1e\xa3?\x0c,\xf0u\xbb\xd0[g_\xdd\x8b\xb0\x95:Y\x18\xa5*"
  279. b"\x99\xfd\xf3K\x92\x92 ({\xd1\xff\xd9F\xc8\xd6K\x86e\xf9\xa8\xad\xb0z"
  280. b"\xe3\x9dD\xf5k\x8b_<\xe7\xe7\xec\xf3\"'\xd5\xd2M\xb4\xce\x1a\xe3$"
  281. b"\x9c\x81\xad\xf9\x11\xf6\xf5>)\xc7\xdd\x03&\xf7\x86@ks\xa6\x05\xc2"
  282. b"\xd0\xbd\x1a7\xfc\xde\xe6\xb0\xad!\x12#\xc86Y\xea\xc5\xe3\xe2\xb3"
  283. b"\xc9\xaf\xfa\x0c\xf2?\xbf\x93w\x18\x9e\x0b\xa2a\x10:M\x05\x89\xe2W.Q"
  284. b"\xe8;yGT\xb1\xf2\xc6A\xd2\xc4\xbeN\xb3\xcfS\xaf\x03f\xe2\xb4)\xe7\xf6"
  285. b'\xdbs\xd0Z}8\xa4\xd2\x1fW*\xe6\x1c"\x8b\xd0\x18w\xb9\x7f\x9e\x96\xa3'
  286. b"\xd9v\xf7\x833\x8e\x01"
  287. )
  288. @skipIf(not HAS_M2, "Skip when m2crypto is not installed")
  289. def test_m2crypto_verify_bytes(self):
  290. message = salt.utils.stringutils.to_unicode("meh")
  291. with patch("salt.utils.files.fopen", mock_open(read_data=six.b(PUBKEY_DATA))):
  292. salt.crypt.verify_signature("/keydir/keyname.pub", message, self.SIGNATURE)
  293. @skipIf(not HAS_M2, "Skip when m2crypto is not installed")
  294. def test_m2crypto_verify_unicode(self):
  295. message = salt.utils.stringutils.to_bytes("meh")
  296. with patch("salt.utils.files.fopen", mock_open(read_data=six.b(PUBKEY_DATA))):
  297. salt.crypt.verify_signature("/keydir/keyname.pub", message, self.SIGNATURE)
  298. @skipIf(not HAS_M2, "Skip when m2crypto is not installed")
  299. def test_m2crypto_sign_bytes(self):
  300. message = salt.utils.stringutils.to_unicode("meh")
  301. key = M2Crypto.RSA.load_key_string(six.b(PRIVKEY_DATA))
  302. with patch("salt.crypt.get_rsa_key", return_value=key):
  303. signature = salt.crypt.sign_message(
  304. "/keydir/keyname.pem", message, passphrase="password"
  305. )
  306. self.assertEqual(signature, self.SIGNATURE)
  307. @skipIf(not HAS_M2, "Skip when m2crypto is not installed")
  308. def test_m2crypto_sign_unicode(self):
  309. message = salt.utils.stringutils.to_bytes("meh")
  310. key = M2Crypto.RSA.load_key_string(six.b(PRIVKEY_DATA))
  311. with patch("salt.crypt.get_rsa_key", return_value=key):
  312. signature = salt.crypt.sign_message(
  313. "/keydir/keyname.pem", message, passphrase="password"
  314. )
  315. self.assertEqual(signature, self.SIGNATURE)