test_crypt.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. # coding: utf-8
  2. from __future__ import absolute_import
  3. import os
  4. import shutil
  5. import tempfile
  6. import salt.utils.files
  7. from salt import crypt
  8. from salt.ext import six
  9. from tests.support.helpers import slowTest
  10. from tests.support.mock import MagicMock, MockCall, mock_open, patch
  11. from tests.support.unit import TestCase, skipIf
  12. try:
  13. import M2Crypto
  14. HAS_M2 = True
  15. except ImportError:
  16. HAS_M2 = False
  17. try:
  18. from Cryptodome.PublicKey import RSA # pylint: disable=unused-import
  19. HAS_PYCRYPTO_RSA = True
  20. except ImportError:
  21. HAS_PYCRYPTO_RSA = False
  22. if not HAS_PYCRYPTO_RSA:
  23. try:
  24. from Crypto.PublicKey import RSA
  25. HAS_PYCRYPTO_RSA = True
  26. except ImportError:
  27. HAS_PYCRYPTO_RSA = False
  28. PRIVKEY_DATA = (
  29. "-----BEGIN RSA PRIVATE KEY-----\n"
  30. "MIIEpAIBAAKCAQEA75GR6ZTv5JOv90Vq8tKhKC7YQnhDIo2hM0HVziTEk5R4UQBW\n"
  31. "a0CKytFMbTONY2msEDwX9iA0x7F5Lgj0X8eD4ZMsYqLzqjWMekLC8bjhxc+EuPo9\n"
  32. "Dygu3mJ2VgRC7XhlFpmdo5NN8J2E7B/CNB3R4hOcMMZNZdi0xLtFoTfwU61UPfFX\n"
  33. "14mV2laqLbvDEfQLJhUTDeFFV8EN5Z4H1ttLP3sMXJvc3EvM0JiDVj4l1TWFUHHz\n"
  34. "eFgCA1Im0lv8i7PFrgW7nyMfK9uDSsUmIp7k6ai4tVzwkTmV5PsriP1ju88Lo3MB\n"
  35. "4/sUmDv/JmlZ9YyzTO3Po8Uz3Aeq9HJWyBWHAQIDAQABAoIBAGOzBzBYZUWRGOgl\n"
  36. "IY8QjTT12dY/ymC05GM6gMobjxuD7FZ5d32HDLu/QrknfS3kKlFPUQGDAbQhbbb0\n"
  37. "zw6VL5NO9mfOPO2W/3FaG1sRgBQcerWonoSSSn8OJwVBHMFLG3a+U1Zh1UvPoiPK\n"
  38. "S734swIM+zFpNYivGPvOm/muF/waFf8tF/47t1cwt/JGXYQnkG/P7z0vp47Irpsb\n"
  39. "Yjw7vPe4BnbY6SppSxscW3KoV7GtJLFKIxAXbxsuJMF/rYe3O3w2VKJ1Sug1VDJl\n"
  40. "/GytwAkSUer84WwP2b07Wn4c5pCnmLslMgXCLkENgi1NnJMhYVOnckxGDZk54hqP\n"
  41. "9RbLnkkCgYEA/yKuWEvgdzYRYkqpzB0l9ka7Y00CV4Dha9Of6GjQi9i4VCJ/UFVr\n"
  42. "UlhTo5y0ZzpcDAPcoZf5CFZsD90a/BpQ3YTtdln2MMCL/Kr3QFmetkmDrt+3wYnX\n"
  43. "sKESfsa2nZdOATRpl1antpwyD4RzsAeOPwBiACj4fkq5iZJBSI0bxrMCgYEA8GFi\n"
  44. "qAjgKh81/Uai6KWTOW2kX02LEMVRrnZLQ9VPPLGid4KZDDk1/dEfxjjkcyOxX1Ux\n"
  45. "Klu4W8ZEdZyzPcJrfk7PdopfGOfrhWzkREK9C40H7ou/1jUecq/STPfSOmxh3Y+D\n"
  46. "ifMNO6z4sQAHx8VaHaxVsJ7SGR/spr0pkZL+NXsCgYEA84rIgBKWB1W+TGRXJzdf\n"
  47. "yHIGaCjXpm2pQMN3LmP3RrcuZWm0vBt94dHcrR5l+u/zc6iwEDTAjJvqdU4rdyEr\n"
  48. "tfkwr7v6TNlQB3WvpWanIPyVzfVSNFX/ZWSsAgZvxYjr9ixw6vzWBXOeOb/Gqu7b\n"
  49. "cvpLkjmJ0wxDhbXtyXKhZA8CgYBZyvcQb+hUs732M4mtQBSD0kohc5TsGdlOQ1AQ\n"
  50. "McFcmbpnzDghkclyW8jzwdLMk9uxEeDAwuxWE/UEvhlSi6qdzxC+Zifp5NBc0fVe\n"
  51. "7lMx2mfJGxj5CnSqQLVdHQHB4zSXkAGB6XHbBd0MOUeuvzDPfs2voVQ4IG3FR0oc\n"
  52. "3/znuwKBgQChZGH3McQcxmLA28aUwOVbWssfXKdDCsiJO+PEXXlL0maO3SbnFn+Q\n"
  53. "Tyf8oHI5cdP7AbwDSx9bUfRPjg9dKKmATBFr2bn216pjGxK0OjYOCntFTVr0psRB\n"
  54. "CrKg52Qrq71/2l4V2NLQZU40Dr1bN9V+Ftd9L0pvpCAEAWpIbLXGDw==\n"
  55. "-----END RSA PRIVATE KEY-----"
  56. )
  57. PUBKEY_DATA = (
  58. "-----BEGIN PUBLIC KEY-----\n"
  59. "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA75GR6ZTv5JOv90Vq8tKh\n"
  60. "KC7YQnhDIo2hM0HVziTEk5R4UQBWa0CKytFMbTONY2msEDwX9iA0x7F5Lgj0X8eD\n"
  61. "4ZMsYqLzqjWMekLC8bjhxc+EuPo9Dygu3mJ2VgRC7XhlFpmdo5NN8J2E7B/CNB3R\n"
  62. "4hOcMMZNZdi0xLtFoTfwU61UPfFX14mV2laqLbvDEfQLJhUTDeFFV8EN5Z4H1ttL\n"
  63. "P3sMXJvc3EvM0JiDVj4l1TWFUHHzeFgCA1Im0lv8i7PFrgW7nyMfK9uDSsUmIp7k\n"
  64. "6ai4tVzwkTmV5PsriP1ju88Lo3MB4/sUmDv/JmlZ9YyzTO3Po8Uz3Aeq9HJWyBWH\n"
  65. "AQIDAQAB\n"
  66. "-----END PUBLIC KEY-----"
  67. )
  68. MSG = b"It's me, Mario"
  69. SIG = (
  70. b"\x07\xf3\xb1\xe7\xdb\x06\xf4_\xe2\xdc\xcb!F\xfb\xbex{W\x1d\xe4E"
  71. b"\xd3\r\xc5\x90\xca(\x05\x1d\x99\x8b\x1aug\x9f\x95>\x94\x7f\xe3+"
  72. b"\x12\xfa\x9c\xd4\xb8\x02]\x0e\xa5\xa3LL\xc3\xa2\x8f+\x83Z\x1b\x17"
  73. b'\xbfT\xd3\xc7\xfd\x0b\xf4\xd7J\xfe^\x86q"I\xa3x\xbc\xd3$\xe9M<\xe1'
  74. b"\x07\xad\xf2_\x9f\xfa\xf7g(~\xd8\xf5\xe7\xda-\xa3Ko\xfc.\x99\xcf"
  75. b"\x9b\xb9\xc1U\x97\x82'\xcb\xc6\x08\xaa\xa0\xe4\xd0\xc1+\xfc\x86"
  76. b'\r\xe4y\xb1#\xd3\x1dS\x96D28\xc4\xd5\r\xd4\x98\x1a44"\xd7\xc2\xb4'
  77. b"]\xa7\x0f\xa7Db\x85G\x8c\xd6\x94!\x8af1O\xf6g\xd7\x03\xfd\xb3\xbc"
  78. b"\xce\x9f\xe7\x015\xb8\x1d]AHK\xa0\x14m\xda=O\xa7\xde\xf2\xff\x9b"
  79. b"\x8e\x83\xc8j\x11\x1a\x98\x85\xde\xc5\x91\x07\x84!\x12^4\xcb\xa8"
  80. b"\x98\x8a\x8a&#\xb9(#?\x80\x15\x9eW\xb5\x12\xd1\x95S\xf2<G\xeb\xf1"
  81. b"\x14H\xb2\xc4>\xc3A\xed\x86x~\xcfU\xd5Q\xfe~\x10\xd2\x9b"
  82. )
  83. @skipIf(not HAS_PYCRYPTO_RSA, "pycrypto >= 2.6 is not available")
  84. @skipIf(HAS_M2, "m2crypto is used by salt.crypt if installed")
  85. class CryptTestCase(TestCase):
  86. @slowTest
  87. def test_gen_keys(self):
  88. open_priv_wb = MockCall("/keydir{0}keyname.pem".format(os.sep), "wb+")
  89. open_pub_wb = MockCall("/keydir{0}keyname.pub".format(os.sep), "wb+")
  90. with patch.multiple(
  91. os,
  92. umask=MagicMock(),
  93. chmod=MagicMock(),
  94. access=MagicMock(return_value=True),
  95. ):
  96. with patch("salt.utils.files.fopen", mock_open()) as m_open, patch(
  97. "os.path.isfile", return_value=True
  98. ):
  99. result = crypt.gen_keys("/keydir", "keyname", 2048)
  100. assert result == "/keydir{0}keyname.pem".format(os.sep), result
  101. assert open_priv_wb not in m_open.calls
  102. assert open_pub_wb not in m_open.calls
  103. with patch("salt.utils.files.fopen", mock_open()) as m_open, patch(
  104. "os.path.isfile", return_value=False
  105. ):
  106. crypt.gen_keys("/keydir", "keyname", 2048)
  107. assert open_priv_wb in m_open.calls
  108. assert open_pub_wb in m_open.calls
  109. @patch("os.umask", MagicMock())
  110. @patch("os.chmod", MagicMock())
  111. @patch("os.chown", MagicMock(), create=True)
  112. @patch("os.access", MagicMock(return_value=True))
  113. @slowTest
  114. def test_gen_keys_with_passphrase(self):
  115. key_path = os.path.join(os.sep, "keydir")
  116. open_priv_wb = MockCall(os.path.join(key_path, "keyname.pem"), "wb+")
  117. open_pub_wb = MockCall(os.path.join(key_path, "keyname.pub"), "wb+")
  118. with patch("salt.utils.files.fopen", mock_open()) as m_open, patch(
  119. "os.path.isfile", return_value=True
  120. ):
  121. self.assertEqual(
  122. crypt.gen_keys(key_path, "keyname", 2048, passphrase="password"),
  123. os.path.join(key_path, "keyname.pem"),
  124. )
  125. result = crypt.gen_keys(key_path, "keyname", 2048, passphrase="password")
  126. assert result == os.path.join(key_path, "keyname.pem"), result
  127. assert open_priv_wb not in m_open.calls
  128. assert open_pub_wb not in m_open.calls
  129. with patch("salt.utils.files.fopen", mock_open()) as m_open, patch(
  130. "os.path.isfile", return_value=False
  131. ):
  132. crypt.gen_keys(key_path, "keyname", 2048)
  133. assert open_priv_wb in m_open.calls
  134. assert open_pub_wb in m_open.calls
  135. def test_sign_message(self):
  136. key = RSA.importKey(PRIVKEY_DATA)
  137. with patch("salt.crypt.get_rsa_key", return_value=key):
  138. self.assertEqual(SIG, salt.crypt.sign_message("/keydir/keyname.pem", MSG))
  139. def test_sign_message_with_passphrase(self):
  140. key = RSA.importKey(PRIVKEY_DATA)
  141. with patch("salt.crypt.get_rsa_key", return_value=key):
  142. self.assertEqual(
  143. SIG,
  144. crypt.sign_message("/keydir/keyname.pem", MSG, passphrase="password"),
  145. )
  146. def test_verify_signature(self):
  147. with patch("salt.utils.files.fopen", mock_open(read_data=PUBKEY_DATA)):
  148. self.assertTrue(crypt.verify_signature("/keydir/keyname.pub", MSG, SIG))
  149. @skipIf(not HAS_M2, "m2crypto is not available")
  150. class M2CryptTestCase(TestCase):
  151. @patch("os.umask", MagicMock())
  152. @patch("os.chmod", MagicMock())
  153. @patch("os.access", MagicMock(return_value=True))
  154. @slowTest
  155. def test_gen_keys(self):
  156. with patch("M2Crypto.RSA.RSA.save_pem", MagicMock()) as save_pem:
  157. with patch("M2Crypto.RSA.RSA.save_pub_key", MagicMock()) as save_pub:
  158. with patch("os.path.isfile", return_value=True):
  159. self.assertEqual(
  160. crypt.gen_keys("/keydir", "keyname", 2048),
  161. "/keydir{0}keyname.pem".format(os.sep),
  162. )
  163. save_pem.assert_not_called()
  164. save_pub.assert_not_called()
  165. with patch("os.path.isfile", return_value=False):
  166. self.assertEqual(
  167. crypt.gen_keys("/keydir", "keyname", 2048),
  168. "/keydir{0}keyname.pem".format(os.sep),
  169. )
  170. save_pem.assert_called_once_with(
  171. "/keydir{0}keyname.pem".format(os.sep), cipher=None
  172. )
  173. save_pub.assert_called_once_with(
  174. "/keydir{0}keyname.pub".format(os.sep)
  175. )
  176. @patch("os.umask", MagicMock())
  177. @patch("os.chmod", MagicMock())
  178. @patch("os.chown", MagicMock())
  179. @patch("os.access", MagicMock(return_value=True))
  180. @slowTest
  181. def test_gen_keys_with_passphrase(self):
  182. with patch("M2Crypto.RSA.RSA.save_pem", MagicMock()) as save_pem:
  183. with patch("M2Crypto.RSA.RSA.save_pub_key", MagicMock()) as save_pub:
  184. with patch("os.path.isfile", return_value=True):
  185. self.assertEqual(
  186. crypt.gen_keys(
  187. "/keydir", "keyname", 2048, passphrase="password"
  188. ),
  189. "/keydir{0}keyname.pem".format(os.sep),
  190. )
  191. save_pem.assert_not_called()
  192. save_pub.assert_not_called()
  193. with patch("os.path.isfile", return_value=False):
  194. self.assertEqual(
  195. crypt.gen_keys(
  196. "/keydir", "keyname", 2048, passphrase="password"
  197. ),
  198. "/keydir{0}keyname.pem".format(os.sep),
  199. )
  200. callback = save_pem.call_args[1]["callback"]
  201. save_pem.assert_called_once_with(
  202. "/keydir{0}keyname.pem".format(os.sep),
  203. cipher="des_ede3_cbc",
  204. callback=callback,
  205. )
  206. self.assertEqual(callback(None), b"password")
  207. save_pub.assert_called_once_with(
  208. "/keydir{0}keyname.pub".format(os.sep)
  209. )
  210. def test_sign_message(self):
  211. key = M2Crypto.RSA.load_key_string(six.b(PRIVKEY_DATA))
  212. with patch("salt.crypt.get_rsa_key", return_value=key):
  213. self.assertEqual(SIG, salt.crypt.sign_message("/keydir/keyname.pem", MSG))
  214. def test_sign_message_with_passphrase(self):
  215. key = M2Crypto.RSA.load_key_string(six.b(PRIVKEY_DATA))
  216. with patch("salt.crypt.get_rsa_key", return_value=key):
  217. self.assertEqual(
  218. SIG,
  219. crypt.sign_message("/keydir/keyname.pem", MSG, passphrase="password"),
  220. )
  221. def test_verify_signature(self):
  222. with patch("salt.utils.files.fopen", mock_open(read_data=six.b(PUBKEY_DATA))):
  223. self.assertTrue(crypt.verify_signature("/keydir/keyname.pub", MSG, SIG))
  224. def test_encrypt_decrypt_bin(self):
  225. priv_key = M2Crypto.RSA.load_key_string(six.b(PRIVKEY_DATA))
  226. pub_key = M2Crypto.RSA.load_pub_key_bio(
  227. M2Crypto.BIO.MemoryBuffer(six.b(PUBKEY_DATA))
  228. )
  229. encrypted = salt.crypt.private_encrypt(priv_key, b"salt")
  230. decrypted = salt.crypt.public_decrypt(pub_key, encrypted)
  231. self.assertEqual(b"salt", decrypted)
  232. class TestBadCryptodomePubKey(TestCase):
  233. """
  234. Test that we can load public keys exported by pycrpytodome<=3.4.6
  235. """
  236. TEST_KEY = (
  237. "-----BEGIN RSA PUBLIC KEY-----\n"
  238. "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzLtFhsvfbFDFaUgulSEX\n"
  239. "Gl12XriL1DT78Ef2/u8HHaSMmPie37BLWas/zaHwI6066bIyYQJ/nUCahTaoHM7L\n"
  240. "GlWc0wOU6zyfpihCRQHil05Y6F+olFBoZuYbFPvtp7/hJx/D7I/0n2o/c7M5i3Y2\n"
  241. "3sBxAYNooIQHXHUmPQW6C9iu95ylZDW8JQzYy/EI4vCC8yQMdTK8jK1FQV0Sbwny\n"
  242. "qcMxSyAWDoFbnhh2P2TnO8HOWuUOaXR8ZHOJzVcDl+a6ew+medW090x3K5O1f80D\n"
  243. "+WjgnG6b2HG7VQpOCfM2GALD/FrxicPilvZ38X1aLhJuwjmVE4LAAv8DVNJXohaO\n"
  244. "WQIDAQAB\n"
  245. "-----END RSA PUBLIC KEY-----\n"
  246. )
  247. def setUp(self):
  248. self.test_dir = tempfile.mkdtemp()
  249. self.key_path = os.path.join(self.test_dir, "cryptodom-3.4.6.pub")
  250. with salt.utils.files.fopen(self.key_path, "wb") as fd:
  251. fd.write(self.TEST_KEY.encode())
  252. def tearDown(self):
  253. shutil.rmtree(self.test_dir)
  254. @skipIf(not HAS_M2, "Skip when m2crypto is not installed")
  255. def test_m2_bad_key(self):
  256. """
  257. Load public key with an invalid header using m2crypto and validate it
  258. """
  259. key = salt.crypt.get_rsa_pub_key(self.key_path)
  260. assert key.check_key() == 1
  261. @skipIf(HAS_M2, "Skip when m2crypto is installed")
  262. def test_crypto_bad_key(self):
  263. """
  264. Load public key with an invalid header and validate it without m2crypto
  265. """
  266. key = salt.crypt.get_rsa_pub_key(self.key_path)
  267. assert key.can_encrypt()
  268. class TestM2CryptoRegression47124(TestCase):
  269. SIGNATURE = (
  270. b"w\xac\xfe18o\xeb\xfb\x14+\x9e\xd1\xb7\x7fe}\xec\xd6\xe1P\x9e\xab"
  271. b"\xb5\x07\xe0\xc1\xfd\xda#\x04Z\x8d\x7f\x0b\x1f}:~\xb2s\x860u\x02N"
  272. b'\xd4q"\xb7\x86*\x8f\x1f\xd0\x9d\x11\x92\xc5~\xa68\xac>\x12H\xc2%y,'
  273. b"\xe6\xceU\x1e\xa3?\x0c,\xf0u\xbb\xd0[g_\xdd\x8b\xb0\x95:Y\x18\xa5*"
  274. b"\x99\xfd\xf3K\x92\x92 ({\xd1\xff\xd9F\xc8\xd6K\x86e\xf9\xa8\xad\xb0z"
  275. b"\xe3\x9dD\xf5k\x8b_<\xe7\xe7\xec\xf3\"'\xd5\xd2M\xb4\xce\x1a\xe3$"
  276. b"\x9c\x81\xad\xf9\x11\xf6\xf5>)\xc7\xdd\x03&\xf7\x86@ks\xa6\x05\xc2"
  277. b"\xd0\xbd\x1a7\xfc\xde\xe6\xb0\xad!\x12#\xc86Y\xea\xc5\xe3\xe2\xb3"
  278. b"\xc9\xaf\xfa\x0c\xf2?\xbf\x93w\x18\x9e\x0b\xa2a\x10:M\x05\x89\xe2W.Q"
  279. b"\xe8;yGT\xb1\xf2\xc6A\xd2\xc4\xbeN\xb3\xcfS\xaf\x03f\xe2\xb4)\xe7\xf6"
  280. b'\xdbs\xd0Z}8\xa4\xd2\x1fW*\xe6\x1c"\x8b\xd0\x18w\xb9\x7f\x9e\x96\xa3'
  281. b"\xd9v\xf7\x833\x8e\x01"
  282. )
  283. @skipIf(not HAS_M2, "Skip when m2crypto is not installed")
  284. def test_m2crypto_verify_bytes(self):
  285. message = salt.utils.stringutils.to_unicode("meh")
  286. with patch("salt.utils.files.fopen", mock_open(read_data=six.b(PUBKEY_DATA))):
  287. salt.crypt.verify_signature("/keydir/keyname.pub", message, self.SIGNATURE)
  288. @skipIf(not HAS_M2, "Skip when m2crypto is not installed")
  289. def test_m2crypto_verify_unicode(self):
  290. message = salt.utils.stringutils.to_bytes("meh")
  291. with patch("salt.utils.files.fopen", mock_open(read_data=six.b(PUBKEY_DATA))):
  292. salt.crypt.verify_signature("/keydir/keyname.pub", message, self.SIGNATURE)
  293. @skipIf(not HAS_M2, "Skip when m2crypto is not installed")
  294. def test_m2crypto_sign_bytes(self):
  295. message = salt.utils.stringutils.to_unicode("meh")
  296. key = M2Crypto.RSA.load_key_string(six.b(PRIVKEY_DATA))
  297. with patch("salt.crypt.get_rsa_key", return_value=key):
  298. signature = salt.crypt.sign_message(
  299. "/keydir/keyname.pem", message, passphrase="password"
  300. )
  301. self.assertEqual(signature, self.SIGNATURE)
  302. @skipIf(not HAS_M2, "Skip when m2crypto is not installed")
  303. def test_m2crypto_sign_unicode(self):
  304. message = salt.utils.stringutils.to_bytes("meh")
  305. key = M2Crypto.RSA.load_key_string(six.b(PRIVKEY_DATA))
  306. with patch("salt.crypt.get_rsa_key", return_value=key):
  307. signature = salt.crypt.sign_message(
  308. "/keydir/keyname.pem", message, passphrase="password"
  309. )
  310. self.assertEqual(signature, self.SIGNATURE)