test_crypt.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. # coding: utf-8
  2. # python libs
  3. from __future__ import absolute_import
  4. import os
  5. import tempfile
  6. import shutil
  7. # salt testing libs
  8. from tests.support.unit import TestCase, skipIf
  9. from tests.support.mock import (
  10. patch,
  11. mock_open,
  12. NO_MOCK,
  13. NO_MOCK_REASON,
  14. MagicMock,
  15. MockCall,
  16. )
  17. # salt libs
  18. from salt.ext import six
  19. import salt.utils.files
  20. from salt import crypt
  21. # third-party libs
  22. try:
  23. import M2Crypto
  24. HAS_M2 = True
  25. except ImportError:
  26. HAS_M2 = False
  27. try:
  28. from Cryptodome.PublicKey import RSA # pylint: disable=unused-import
  29. HAS_PYCRYPTO_RSA = True
  30. except ImportError:
  31. HAS_PYCRYPTO_RSA = False
  32. if not HAS_PYCRYPTO_RSA:
  33. try:
  34. from Crypto.PublicKey import RSA
  35. HAS_PYCRYPTO_RSA = True
  36. except ImportError:
  37. HAS_PYCRYPTO_RSA = False
  38. PRIVKEY_DATA = (
  39. '-----BEGIN RSA PRIVATE KEY-----\n'
  40. 'MIIEpAIBAAKCAQEA75GR6ZTv5JOv90Vq8tKhKC7YQnhDIo2hM0HVziTEk5R4UQBW\n'
  41. 'a0CKytFMbTONY2msEDwX9iA0x7F5Lgj0X8eD4ZMsYqLzqjWMekLC8bjhxc+EuPo9\n'
  42. 'Dygu3mJ2VgRC7XhlFpmdo5NN8J2E7B/CNB3R4hOcMMZNZdi0xLtFoTfwU61UPfFX\n'
  43. '14mV2laqLbvDEfQLJhUTDeFFV8EN5Z4H1ttLP3sMXJvc3EvM0JiDVj4l1TWFUHHz\n'
  44. 'eFgCA1Im0lv8i7PFrgW7nyMfK9uDSsUmIp7k6ai4tVzwkTmV5PsriP1ju88Lo3MB\n'
  45. '4/sUmDv/JmlZ9YyzTO3Po8Uz3Aeq9HJWyBWHAQIDAQABAoIBAGOzBzBYZUWRGOgl\n'
  46. 'IY8QjTT12dY/ymC05GM6gMobjxuD7FZ5d32HDLu/QrknfS3kKlFPUQGDAbQhbbb0\n'
  47. 'zw6VL5NO9mfOPO2W/3FaG1sRgBQcerWonoSSSn8OJwVBHMFLG3a+U1Zh1UvPoiPK\n'
  48. 'S734swIM+zFpNYivGPvOm/muF/waFf8tF/47t1cwt/JGXYQnkG/P7z0vp47Irpsb\n'
  49. 'Yjw7vPe4BnbY6SppSxscW3KoV7GtJLFKIxAXbxsuJMF/rYe3O3w2VKJ1Sug1VDJl\n'
  50. '/GytwAkSUer84WwP2b07Wn4c5pCnmLslMgXCLkENgi1NnJMhYVOnckxGDZk54hqP\n'
  51. '9RbLnkkCgYEA/yKuWEvgdzYRYkqpzB0l9ka7Y00CV4Dha9Of6GjQi9i4VCJ/UFVr\n'
  52. 'UlhTo5y0ZzpcDAPcoZf5CFZsD90a/BpQ3YTtdln2MMCL/Kr3QFmetkmDrt+3wYnX\n'
  53. 'sKESfsa2nZdOATRpl1antpwyD4RzsAeOPwBiACj4fkq5iZJBSI0bxrMCgYEA8GFi\n'
  54. 'qAjgKh81/Uai6KWTOW2kX02LEMVRrnZLQ9VPPLGid4KZDDk1/dEfxjjkcyOxX1Ux\n'
  55. 'Klu4W8ZEdZyzPcJrfk7PdopfGOfrhWzkREK9C40H7ou/1jUecq/STPfSOmxh3Y+D\n'
  56. 'ifMNO6z4sQAHx8VaHaxVsJ7SGR/spr0pkZL+NXsCgYEA84rIgBKWB1W+TGRXJzdf\n'
  57. 'yHIGaCjXpm2pQMN3LmP3RrcuZWm0vBt94dHcrR5l+u/zc6iwEDTAjJvqdU4rdyEr\n'
  58. 'tfkwr7v6TNlQB3WvpWanIPyVzfVSNFX/ZWSsAgZvxYjr9ixw6vzWBXOeOb/Gqu7b\n'
  59. 'cvpLkjmJ0wxDhbXtyXKhZA8CgYBZyvcQb+hUs732M4mtQBSD0kohc5TsGdlOQ1AQ\n'
  60. 'McFcmbpnzDghkclyW8jzwdLMk9uxEeDAwuxWE/UEvhlSi6qdzxC+Zifp5NBc0fVe\n'
  61. '7lMx2mfJGxj5CnSqQLVdHQHB4zSXkAGB6XHbBd0MOUeuvzDPfs2voVQ4IG3FR0oc\n'
  62. '3/znuwKBgQChZGH3McQcxmLA28aUwOVbWssfXKdDCsiJO+PEXXlL0maO3SbnFn+Q\n'
  63. 'Tyf8oHI5cdP7AbwDSx9bUfRPjg9dKKmATBFr2bn216pjGxK0OjYOCntFTVr0psRB\n'
  64. 'CrKg52Qrq71/2l4V2NLQZU40Dr1bN9V+Ftd9L0pvpCAEAWpIbLXGDw==\n'
  65. '-----END RSA PRIVATE KEY-----')
  66. PUBKEY_DATA = (
  67. '-----BEGIN PUBLIC KEY-----\n'
  68. 'MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA75GR6ZTv5JOv90Vq8tKh\n'
  69. 'KC7YQnhDIo2hM0HVziTEk5R4UQBWa0CKytFMbTONY2msEDwX9iA0x7F5Lgj0X8eD\n'
  70. '4ZMsYqLzqjWMekLC8bjhxc+EuPo9Dygu3mJ2VgRC7XhlFpmdo5NN8J2E7B/CNB3R\n'
  71. '4hOcMMZNZdi0xLtFoTfwU61UPfFX14mV2laqLbvDEfQLJhUTDeFFV8EN5Z4H1ttL\n'
  72. 'P3sMXJvc3EvM0JiDVj4l1TWFUHHzeFgCA1Im0lv8i7PFrgW7nyMfK9uDSsUmIp7k\n'
  73. '6ai4tVzwkTmV5PsriP1ju88Lo3MB4/sUmDv/JmlZ9YyzTO3Po8Uz3Aeq9HJWyBWH\n'
  74. 'AQIDAQAB\n'
  75. '-----END PUBLIC KEY-----')
  76. MSG = b'It\'s me, Mario'
  77. SIG = (
  78. b'\x07\xf3\xb1\xe7\xdb\x06\xf4_\xe2\xdc\xcb!F\xfb\xbex{W\x1d\xe4E'
  79. b'\xd3\r\xc5\x90\xca(\x05\x1d\x99\x8b\x1aug\x9f\x95>\x94\x7f\xe3+'
  80. b'\x12\xfa\x9c\xd4\xb8\x02]\x0e\xa5\xa3LL\xc3\xa2\x8f+\x83Z\x1b\x17'
  81. b'\xbfT\xd3\xc7\xfd\x0b\xf4\xd7J\xfe^\x86q"I\xa3x\xbc\xd3$\xe9M<\xe1'
  82. b'\x07\xad\xf2_\x9f\xfa\xf7g(~\xd8\xf5\xe7\xda-\xa3Ko\xfc.\x99\xcf'
  83. b'\x9b\xb9\xc1U\x97\x82\'\xcb\xc6\x08\xaa\xa0\xe4\xd0\xc1+\xfc\x86'
  84. b'\r\xe4y\xb1#\xd3\x1dS\x96D28\xc4\xd5\r\xd4\x98\x1a44"\xd7\xc2\xb4'
  85. b']\xa7\x0f\xa7Db\x85G\x8c\xd6\x94!\x8af1O\xf6g\xd7\x03\xfd\xb3\xbc'
  86. b'\xce\x9f\xe7\x015\xb8\x1d]AHK\xa0\x14m\xda=O\xa7\xde\xf2\xff\x9b'
  87. b'\x8e\x83\xc8j\x11\x1a\x98\x85\xde\xc5\x91\x07\x84!\x12^4\xcb\xa8'
  88. b'\x98\x8a\x8a&#\xb9(#?\x80\x15\x9eW\xb5\x12\xd1\x95S\xf2<G\xeb\xf1'
  89. b'\x14H\xb2\xc4>\xc3A\xed\x86x~\xcfU\xd5Q\xfe~\x10\xd2\x9b')
  90. @skipIf(NO_MOCK, NO_MOCK_REASON)
  91. @skipIf(not HAS_PYCRYPTO_RSA, 'pycrypto >= 2.6 is not available')
  92. @skipIf(HAS_M2, 'm2crypto is used by salt.crypt if installed')
  93. class CryptTestCase(TestCase):
  94. def test_gen_keys(self):
  95. open_priv_wb = MockCall('/keydir{0}keyname.pem'.format(os.sep), 'wb+')
  96. open_pub_wb = MockCall('/keydir{0}keyname.pub'.format(os.sep), 'wb+')
  97. with patch.multiple(os, umask=MagicMock(), chmod=MagicMock(),
  98. access=MagicMock(return_value=True)):
  99. with patch('salt.utils.files.fopen', mock_open()) as m_open, \
  100. patch('os.path.isfile', return_value=True):
  101. result = crypt.gen_keys('/keydir', 'keyname', 2048)
  102. assert result == '/keydir{0}keyname.pem'.format(os.sep), result
  103. assert open_priv_wb not in m_open.calls
  104. assert open_pub_wb not in m_open.calls
  105. with patch('salt.utils.files.fopen', mock_open()) as m_open, \
  106. patch('os.path.isfile', return_value=False):
  107. crypt.gen_keys('/keydir', 'keyname', 2048)
  108. assert open_priv_wb in m_open.calls
  109. assert open_pub_wb in m_open.calls
  110. @patch('os.umask', MagicMock())
  111. @patch('os.chmod', MagicMock())
  112. @patch('os.chown', MagicMock(), create=True)
  113. @patch('os.access', MagicMock(return_value=True))
  114. def test_gen_keys_with_passphrase(self):
  115. key_path = os.path.join(os.sep, 'keydir')
  116. open_priv_wb = MockCall(os.path.join(key_path, 'keyname.pem'), 'wb+')
  117. open_pub_wb = MockCall(os.path.join(key_path, 'keyname.pub'), 'wb+')
  118. with patch('salt.utils.files.fopen', mock_open()) as m_open, \
  119. patch('os.path.isfile', return_value=True):
  120. self.assertEqual(crypt.gen_keys(key_path, 'keyname', 2048, passphrase='password'), os.path.join(key_path, 'keyname.pem'))
  121. result = crypt.gen_keys(key_path, 'keyname', 2048,
  122. passphrase='password')
  123. assert result == os.path.join(key_path, 'keyname.pem'), result
  124. assert open_priv_wb not in m_open.calls
  125. assert open_pub_wb not in m_open.calls
  126. with patch('salt.utils.files.fopen', mock_open()) as m_open, \
  127. patch('os.path.isfile', return_value=False):
  128. crypt.gen_keys(key_path, 'keyname', 2048)
  129. assert open_priv_wb in m_open.calls
  130. assert open_pub_wb in m_open.calls
  131. def test_sign_message(self):
  132. key = RSA.importKey(PRIVKEY_DATA)
  133. with patch('salt.crypt.get_rsa_key', return_value=key):
  134. self.assertEqual(SIG, salt.crypt.sign_message('/keydir/keyname.pem', MSG))
  135. def test_sign_message_with_passphrase(self):
  136. key = RSA.importKey(PRIVKEY_DATA)
  137. with patch('salt.crypt.get_rsa_key', return_value=key):
  138. self.assertEqual(SIG, crypt.sign_message('/keydir/keyname.pem', MSG, passphrase='password'))
  139. def test_verify_signature(self):
  140. with patch('salt.utils.files.fopen', mock_open(read_data=PUBKEY_DATA)):
  141. self.assertTrue(crypt.verify_signature('/keydir/keyname.pub', MSG, SIG))
  142. @skipIf(NO_MOCK, NO_MOCK_REASON)
  143. @skipIf(not HAS_M2, 'm2crypto is not available')
  144. class M2CryptTestCase(TestCase):
  145. @patch('os.umask', MagicMock())
  146. @patch('os.chmod', MagicMock())
  147. @patch('os.access', MagicMock(return_value=True))
  148. def test_gen_keys(self):
  149. with patch('M2Crypto.RSA.RSA.save_pem', MagicMock()) as save_pem:
  150. with patch('M2Crypto.RSA.RSA.save_pub_key', MagicMock()) as save_pub:
  151. with patch('os.path.isfile', return_value=True):
  152. self.assertEqual(crypt.gen_keys('/keydir', 'keyname', 2048),
  153. '/keydir{0}keyname.pem'.format(os.sep))
  154. save_pem.assert_not_called()
  155. save_pub.assert_not_called()
  156. with patch('os.path.isfile', return_value=False):
  157. self.assertEqual(crypt.gen_keys('/keydir', 'keyname', 2048),
  158. '/keydir{0}keyname.pem'.format(os.sep))
  159. save_pem.assert_called_once_with('/keydir{0}keyname.pem'.format(os.sep), cipher=None)
  160. save_pub.assert_called_once_with('/keydir{0}keyname.pub'.format(os.sep))
  161. @patch('os.umask', MagicMock())
  162. @patch('os.chmod', MagicMock())
  163. @patch('os.chown', MagicMock())
  164. @patch('os.access', MagicMock(return_value=True))
  165. def test_gen_keys_with_passphrase(self):
  166. with patch('M2Crypto.RSA.RSA.save_pem', MagicMock()) as save_pem:
  167. with patch('M2Crypto.RSA.RSA.save_pub_key', MagicMock()) as save_pub:
  168. with patch('os.path.isfile', return_value=True):
  169. self.assertEqual(crypt.gen_keys('/keydir', 'keyname', 2048, passphrase='password'),
  170. '/keydir{0}keyname.pem'.format(os.sep))
  171. save_pem.assert_not_called()
  172. save_pub.assert_not_called()
  173. with patch('os.path.isfile', return_value=False):
  174. self.assertEqual(crypt.gen_keys('/keydir', 'keyname', 2048, passphrase='password'),
  175. '/keydir{0}keyname.pem'.format(os.sep))
  176. callback = save_pem.call_args[1]['callback']
  177. save_pem.assert_called_once_with('/keydir{0}keyname.pem'.format(os.sep),
  178. cipher='des_ede3_cbc',
  179. callback=callback)
  180. self.assertEqual(callback(None), b'password')
  181. save_pub.assert_called_once_with('/keydir{0}keyname.pub'.format(os.sep))
  182. def test_sign_message(self):
  183. key = M2Crypto.RSA.load_key_string(six.b(PRIVKEY_DATA))
  184. with patch('salt.crypt.get_rsa_key', return_value=key):
  185. self.assertEqual(SIG, salt.crypt.sign_message('/keydir/keyname.pem', MSG))
  186. def test_sign_message_with_passphrase(self):
  187. key = M2Crypto.RSA.load_key_string(six.b(PRIVKEY_DATA))
  188. with patch('salt.crypt.get_rsa_key', return_value=key):
  189. self.assertEqual(SIG, crypt.sign_message('/keydir/keyname.pem', MSG, passphrase='password'))
  190. def test_verify_signature(self):
  191. with patch('salt.utils.files.fopen', mock_open(read_data=six.b(PUBKEY_DATA))):
  192. self.assertTrue(crypt.verify_signature('/keydir/keyname.pub', MSG, SIG))
  193. def test_encrypt_decrypt_bin(self):
  194. priv_key = M2Crypto.RSA.load_key_string(six.b(PRIVKEY_DATA))
  195. pub_key = M2Crypto.RSA.load_pub_key_bio(M2Crypto.BIO.MemoryBuffer(six.b(PUBKEY_DATA)))
  196. encrypted = salt.crypt.private_encrypt(priv_key, b'salt')
  197. decrypted = salt.crypt.public_decrypt(pub_key, encrypted)
  198. self.assertEqual(b'salt', decrypted)
  199. class TestBadCryptodomePubKey(TestCase):
  200. '''
  201. Test that we can load public keys exported by pycrpytodome<=3.4.6
  202. '''
  203. TEST_KEY = (
  204. '-----BEGIN RSA PUBLIC KEY-----\n'
  205. 'MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzLtFhsvfbFDFaUgulSEX\n'
  206. 'Gl12XriL1DT78Ef2/u8HHaSMmPie37BLWas/zaHwI6066bIyYQJ/nUCahTaoHM7L\n'
  207. 'GlWc0wOU6zyfpihCRQHil05Y6F+olFBoZuYbFPvtp7/hJx/D7I/0n2o/c7M5i3Y2\n'
  208. '3sBxAYNooIQHXHUmPQW6C9iu95ylZDW8JQzYy/EI4vCC8yQMdTK8jK1FQV0Sbwny\n'
  209. 'qcMxSyAWDoFbnhh2P2TnO8HOWuUOaXR8ZHOJzVcDl+a6ew+medW090x3K5O1f80D\n'
  210. '+WjgnG6b2HG7VQpOCfM2GALD/FrxicPilvZ38X1aLhJuwjmVE4LAAv8DVNJXohaO\n'
  211. 'WQIDAQAB\n'
  212. '-----END RSA PUBLIC KEY-----\n'
  213. )
  214. def setUp(self):
  215. self.test_dir = tempfile.mkdtemp()
  216. self.key_path = os.path.join(self.test_dir, 'cryptodom-3.4.6.pub')
  217. with salt.utils.files.fopen(self.key_path, 'wb') as fd:
  218. fd.write(self.TEST_KEY.encode())
  219. def tearDown(self):
  220. shutil.rmtree(self.test_dir)
  221. @skipIf(not HAS_M2, "Skip when m2crypto is not installed")
  222. def test_m2_bad_key(self):
  223. '''
  224. Load public key with an invalid header using m2crypto and validate it
  225. '''
  226. key = salt.crypt.get_rsa_pub_key(self.key_path)
  227. assert key.check_key() == 1
  228. @skipIf(HAS_M2, "Skip when m2crypto is installed")
  229. def test_crypto_bad_key(self):
  230. '''
  231. Load public key with an invalid header and validate it without m2crypto
  232. '''
  233. key = salt.crypt.get_rsa_pub_key(self.key_path)
  234. assert key.can_encrypt()
  235. class TestM2CryptoRegression47124(TestCase):
  236. SIGNATURE = (
  237. b'w\xac\xfe18o\xeb\xfb\x14+\x9e\xd1\xb7\x7fe}\xec\xd6\xe1P\x9e\xab'
  238. b'\xb5\x07\xe0\xc1\xfd\xda#\x04Z\x8d\x7f\x0b\x1f}:~\xb2s\x860u\x02N'
  239. b'\xd4q"\xb7\x86*\x8f\x1f\xd0\x9d\x11\x92\xc5~\xa68\xac>\x12H\xc2%y,'
  240. b'\xe6\xceU\x1e\xa3?\x0c,\xf0u\xbb\xd0[g_\xdd\x8b\xb0\x95:Y\x18\xa5*'
  241. b'\x99\xfd\xf3K\x92\x92 ({\xd1\xff\xd9F\xc8\xd6K\x86e\xf9\xa8\xad\xb0z'
  242. b'\xe3\x9dD\xf5k\x8b_<\xe7\xe7\xec\xf3"\'\xd5\xd2M\xb4\xce\x1a\xe3$'
  243. b'\x9c\x81\xad\xf9\x11\xf6\xf5>)\xc7\xdd\x03&\xf7\x86@ks\xa6\x05\xc2'
  244. b'\xd0\xbd\x1a7\xfc\xde\xe6\xb0\xad!\x12#\xc86Y\xea\xc5\xe3\xe2\xb3'
  245. b'\xc9\xaf\xfa\x0c\xf2?\xbf\x93w\x18\x9e\x0b\xa2a\x10:M\x05\x89\xe2W.Q'
  246. b'\xe8;yGT\xb1\xf2\xc6A\xd2\xc4\xbeN\xb3\xcfS\xaf\x03f\xe2\xb4)\xe7\xf6'
  247. b'\xdbs\xd0Z}8\xa4\xd2\x1fW*\xe6\x1c"\x8b\xd0\x18w\xb9\x7f\x9e\x96\xa3'
  248. b'\xd9v\xf7\x833\x8e\x01'
  249. )
  250. @skipIf(not HAS_M2, "Skip when m2crypto is not installed")
  251. def test_m2crypto_verify_bytes(self):
  252. message = salt.utils.stringutils.to_unicode('meh')
  253. with patch('salt.utils.files.fopen', mock_open(read_data=six.b(PUBKEY_DATA))):
  254. salt.crypt.verify_signature('/keydir/keyname.pub', message, self.SIGNATURE)
  255. @skipIf(not HAS_M2, "Skip when m2crypto is not installed")
  256. def test_m2crypto_verify_unicode(self):
  257. message = salt.utils.stringutils.to_bytes('meh')
  258. with patch('salt.utils.files.fopen', mock_open(read_data=six.b(PUBKEY_DATA))):
  259. salt.crypt.verify_signature('/keydir/keyname.pub', message, self.SIGNATURE)
  260. @skipIf(not HAS_M2, "Skip when m2crypto is not installed")
  261. def test_m2crypto_sign_bytes(self):
  262. message = salt.utils.stringutils.to_unicode('meh')
  263. key = M2Crypto.RSA.load_key_string(six.b(PRIVKEY_DATA))
  264. with patch('salt.crypt.get_rsa_key', return_value=key):
  265. signature = salt.crypt.sign_message('/keydir/keyname.pem', message, passphrase='password')
  266. self.assertEqual(signature, self.SIGNATURE)
  267. @skipIf(not HAS_M2, "Skip when m2crypto is not installed")
  268. def test_m2crypto_sign_unicode(self):
  269. message = salt.utils.stringutils.to_bytes('meh')
  270. key = M2Crypto.RSA.load_key_string(six.b(PRIVKEY_DATA))
  271. with patch('salt.crypt.get_rsa_key', return_value=key):
  272. signature = salt.crypt.sign_message('/keydir/keyname.pem', message, passphrase='password')
  273. self.assertEqual(signature, self.SIGNATURE)