test_mysql.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470
  1. # -*- coding: utf-8 -*-
  2. '''
  3. :codeauthor: Mike Place (mp@saltstack.com)
  4. tests.unit.modules.mysql
  5. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  6. '''
  7. # Import Python libs
  8. from __future__ import absolute_import, print_function, unicode_literals
  9. # Import Salt Testing libs
  10. from tests.support.mixins import LoaderModuleMockMixin
  11. from tests.support.unit import skipIf, TestCase
  12. from tests.support.mock import MagicMock, patch, call
  13. # Import salt libs
  14. import salt.modules.mysql as mysql
  15. NO_MYSQL = False
  16. try:
  17. import MySQLdb # pylint: disable=W0611
  18. except Exception:
  19. NO_MYSQL = True
  20. __all_privileges__ = [
  21. 'ALTER',
  22. 'ALTER ROUTINE',
  23. 'BACKUP_ADMIN',
  24. 'BINLOG_ADMIN',
  25. 'CONNECTION_ADMIN',
  26. 'CREATE',
  27. 'CREATE ROLE',
  28. 'CREATE ROUTINE',
  29. 'CREATE TABLESPACE',
  30. 'CREATE TEMPORARY TABLES',
  31. 'CREATE USER',
  32. 'CREATE VIEW',
  33. 'DELETE',
  34. 'DROP',
  35. 'DROP ROLE',
  36. 'ENCRYPTION_KEY_ADMIN',
  37. 'EVENT',
  38. 'EXECUTE',
  39. 'FILE',
  40. 'GROUP_REPLICATION_ADMIN',
  41. 'INDEX',
  42. 'INSERT',
  43. 'LOCK TABLES',
  44. 'PERSIST_RO_VARIABLES_ADMIN',
  45. 'PROCESS',
  46. 'REFERENCES',
  47. 'RELOAD',
  48. 'REPLICATION CLIENT',
  49. 'REPLICATION SLAVE',
  50. 'REPLICATION_SLAVE_ADMIN',
  51. 'RESOURCE_GROUP_ADMIN',
  52. 'RESOURCE_GROUP_USER',
  53. 'ROLE_ADMIN',
  54. 'SELECT',
  55. 'SET_USER_ID',
  56. 'SHOW DATABASES',
  57. 'SHOW VIEW',
  58. 'SHUTDOWN',
  59. 'SUPER',
  60. 'SYSTEM_VARIABLES_ADMIN',
  61. 'TRIGGER',
  62. 'UPDATE',
  63. 'XA_RECOVER_ADMIN'
  64. ]
  65. @skipIf(NO_MYSQL, 'Install MySQL bindings before running MySQL unit tests.')
  66. class MySQLTestCase(TestCase, LoaderModuleMockMixin):
  67. def setup_loader_modules(self):
  68. return {mysql: {}}
  69. def test_user_exists(self):
  70. '''
  71. Test to see if mysql module properly forms the MySQL query to see if a user exists
  72. Do it before test_user_create_when_user_exists mocks the user_exists call
  73. '''
  74. with patch.object(mysql, 'version', return_value='8.0.10'):
  75. self._test_call(mysql.user_exists,
  76. {'sql': ('SELECT User,Host FROM mysql.user WHERE '
  77. 'User = %(user)s AND Host = %(host)s AND '
  78. 'Password = PASSWORD(%(password)s)'),
  79. 'sql_args': {'host': 'localhost',
  80. 'password': 'BLUECOW',
  81. 'user': 'mytestuser'
  82. }
  83. },
  84. user='mytestuser',
  85. host='localhost',
  86. password='BLUECOW'
  87. )
  88. with patch.object(mysql, 'version', return_value='10.1.38-MariaDB'):
  89. self._test_call(mysql.user_exists,
  90. {'sql': ('SELECT User,Host FROM mysql.user WHERE '
  91. 'User = %(user)s AND Host = %(host)s AND '
  92. 'Password = PASSWORD(%(password)s)'),
  93. 'sql_args': {'host': 'localhost',
  94. 'password': 'BLUECOW',
  95. 'user': 'mytestuser'
  96. }
  97. },
  98. user='mytestuser',
  99. host='localhost',
  100. password='BLUECOW'
  101. )
  102. with patch.object(mysql, 'version', return_value='8.0.11'):
  103. self._test_call(mysql.user_exists,
  104. {'sql': ('SELECT User,Host FROM mysql.user WHERE '
  105. 'User = %(user)s AND Host = %(host)s'),
  106. 'sql_args': {'host': 'localhost',
  107. 'user': 'mytestuser'
  108. }
  109. },
  110. user='mytestuser',
  111. host='localhost',
  112. password='BLUECOW'
  113. )
  114. with patch.object(mysql, 'version', return_value='8.0.11'):
  115. self._test_call(mysql.user_exists,
  116. {'sql': ('SELECT User,Host FROM mysql.user WHERE '
  117. 'User = %(user)s AND Host = %(host)s'),
  118. 'sql_args': {'host': '%',
  119. 'user': 'mytestuser'
  120. }
  121. },
  122. user='mytestuser',
  123. host='%',
  124. password='BLUECOW'
  125. )
  126. with patch.object(mysql, 'version', return_value='10.2.21-MariaDB'):
  127. self._test_call(mysql.user_exists,
  128. {'sql': ('SELECT User,Host FROM mysql.user WHERE '
  129. 'User = %(user)s AND Host = %(host)s'),
  130. 'sql_args': {'host': 'localhost',
  131. 'user': 'mytestuser'
  132. }
  133. },
  134. user='mytestuser',
  135. host='localhost',
  136. password='BLUECOW'
  137. )
  138. # test_user_create_when_user_exists(self):
  139. # ensure we don't try to create a user when one already exists
  140. # mock the version of MySQL
  141. with patch.object(mysql, 'version', return_value='8.0.10'):
  142. with patch.object(mysql, 'user_exists', MagicMock(return_value=True)):
  143. with patch.dict(mysql.__salt__, {'config.option': MagicMock()}):
  144. ret = mysql.user_create('testuser')
  145. self.assertEqual(False, ret)
  146. # test_user_create_when_user_exists(self):
  147. # ensure we don't try to create a user when one already exists
  148. # mock the version of MySQL
  149. with patch.object(mysql, 'version', return_value='8.0.11'):
  150. with patch.object(mysql, 'user_exists', MagicMock(return_value=True)):
  151. with patch.object(mysql, 'verify_login', MagicMock(return_value=True)):
  152. with patch.dict(mysql.__salt__, {'config.option': MagicMock()}):
  153. ret = mysql.user_create('testuser')
  154. self.assertEqual(False, ret)
  155. def test_user_create(self):
  156. '''
  157. Test the creation of a MySQL user in mysql exec module
  158. '''
  159. self._test_call(mysql.user_create,
  160. {'sql': 'CREATE USER %(user)s@%(host)s IDENTIFIED BY %(password)s',
  161. 'sql_args': {'password': 'BLUECOW',
  162. 'user': 'testuser',
  163. 'host': 'localhost',
  164. }
  165. },
  166. 'testuser',
  167. password='BLUECOW'
  168. )
  169. def test_user_chpass(self):
  170. '''
  171. Test changing a MySQL user password in mysql exec module
  172. '''
  173. connect_mock = MagicMock()
  174. with patch.object(mysql, '_connect', connect_mock):
  175. with patch.object(mysql, 'version', return_value='8.0.10'):
  176. with patch.dict(mysql.__salt__, {'config.option': MagicMock()}):
  177. mysql.user_chpass('testuser', password='BLUECOW')
  178. calls = (
  179. call().cursor().execute(
  180. 'UPDATE mysql.user SET Password=PASSWORD(%(password)s) WHERE User=%(user)s AND Host = %(host)s;',
  181. {'password': 'BLUECOW',
  182. 'user': 'testuser',
  183. 'host': 'localhost',
  184. }
  185. ),
  186. call().cursor().execute('FLUSH PRIVILEGES;'),
  187. )
  188. connect_mock.assert_has_calls(calls, any_order=True)
  189. connect_mock = MagicMock()
  190. with patch.object(mysql, '_connect', connect_mock):
  191. with patch.object(mysql, 'version', return_value='8.0.11'):
  192. with patch.dict(mysql.__salt__, {'config.option': MagicMock()}):
  193. mysql.user_chpass('testuser', password='BLUECOW')
  194. calls = (
  195. call().cursor().execute(
  196. "ALTER USER %(user)s@%(host)s IDENTIFIED BY %(password)s;",
  197. {'password': 'BLUECOW',
  198. 'user': 'testuser',
  199. 'host': 'localhost',
  200. }
  201. ),
  202. call().cursor().execute('FLUSH PRIVILEGES;'),
  203. )
  204. connect_mock.assert_has_calls(calls, any_order=True)
  205. def test_user_remove(self):
  206. '''
  207. Test the removal of a MySQL user in mysql exec module
  208. '''
  209. self._test_call(mysql.user_remove,
  210. {'sql': 'DROP USER %(user)s@%(host)s',
  211. 'sql_args': {'user': 'testuser',
  212. 'host': 'localhost',
  213. }
  214. },
  215. 'testuser'
  216. )
  217. def test_db_check(self):
  218. '''
  219. Test MySQL db check function in mysql exec module
  220. '''
  221. self._test_call(mysql.db_check, 'CHECK TABLE `test``\'" db`.`my``\'" table`', 'test`\'" db', 'my`\'" table')
  222. def test_db_repair(self):
  223. '''
  224. Test MySQL db repair function in mysql exec module
  225. '''
  226. self._test_call(mysql.db_repair, 'REPAIR TABLE `test``\'" db`.`my``\'" table`', 'test`\'" db', 'my`\'" table')
  227. def test_db_optimize(self):
  228. '''
  229. Test MySQL db optimize function in mysql exec module
  230. '''
  231. self._test_call(mysql.db_optimize, 'OPTIMIZE TABLE `test``\'" db`.`my``\'" table`', 'test`\'" db', 'my`\'" table')
  232. def test_db_remove(self):
  233. '''
  234. Test MySQL db remove function in mysql exec module
  235. '''
  236. with patch.object(mysql, 'db_exists', MagicMock(return_value=True)):
  237. self._test_call(mysql.db_remove, 'DROP DATABASE `test``\'" db`;', 'test`\'" db')
  238. def test_db_tables(self):
  239. '''
  240. Test MySQL db_tables function in mysql exec module
  241. '''
  242. with patch.object(mysql, 'db_exists', MagicMock(return_value=True)):
  243. self._test_call(mysql.db_tables, 'SHOW TABLES IN `test``\'" db`', 'test`\'" db')
  244. def test_db_exists(self):
  245. '''
  246. Test MySQL db_exists function in mysql exec module
  247. '''
  248. self._test_call(
  249. mysql.db_exists,
  250. {'sql': 'SHOW DATABASES LIKE %(dbname)s;',
  251. 'sql_args': {'dbname': r'''test%_`" db'''}
  252. },
  253. 'test%_`" db'
  254. )
  255. def test_db_create(self):
  256. '''
  257. Test MySQL db_create function in mysql exec module
  258. '''
  259. self._test_call(
  260. mysql.db_create,
  261. 'CREATE DATABASE IF NOT EXISTS `test``\'" db`;',
  262. 'test`\'" db'
  263. )
  264. def test_user_list(self):
  265. '''
  266. Test MySQL user_list function in mysql exec module
  267. '''
  268. self._test_call(mysql.user_list, 'SELECT User,Host FROM mysql.user')
  269. def test_user_info(self):
  270. '''
  271. Test to see if the mysql execution module correctly forms the SQL for information on a MySQL user.
  272. '''
  273. self._test_call(mysql.user_info,
  274. {'sql': 'SELECT * FROM mysql.user WHERE User = %(user)s AND Host = %(host)s',
  275. 'sql_args': {'host': 'localhost',
  276. 'user': 'mytestuser',
  277. }
  278. },
  279. 'mytestuser'
  280. )
  281. def test_user_grants(self):
  282. '''
  283. Test to ensure the mysql user_grants function returns properly formed SQL for a basic query
  284. '''
  285. with patch.object(mysql, 'user_exists', MagicMock(return_value=True)):
  286. self._test_call(mysql.user_grants,
  287. {'sql': 'SHOW GRANTS FOR %(user)s@%(host)s',
  288. 'sql_args': {'host': 'localhost',
  289. 'user': 'testuser',
  290. }
  291. },
  292. 'testuser')
  293. def test_grant_exists_true(self):
  294. '''
  295. Test to ensure that we can find a grant that exists
  296. '''
  297. mock_grants = [
  298. "GRANT USAGE ON *.* TO 'testuser'@'%'",
  299. "GRANT SELECT, INSERT, UPDATE ON `testdb`.`testtableone` TO 'testuser'@'%'",
  300. "GRANT SELECT ON `testdb`.`testtabletwo` TO 'testuer'@'%'",
  301. "GRANT SELECT ON `testdb`.`testtablethree` TO 'testuser'@'%'",
  302. ]
  303. with patch.object(mysql, 'version', return_value='5.6.41'):
  304. mock = MagicMock(return_value=mock_grants)
  305. with patch.object(mysql, 'user_grants', return_value=mock_grants) as mock_user_grants:
  306. ret = mysql.grant_exists(
  307. 'SELECT, INSERT, UPDATE',
  308. 'testdb.testtableone',
  309. 'testuser',
  310. '%'
  311. )
  312. self.assertEqual(ret, True)
  313. def test_grant_exists_false(self):
  314. '''
  315. Test to ensure that we don't find a grant that doesn't exist
  316. '''
  317. mock_grants = [
  318. "GRANT USAGE ON *.* TO 'testuser'@'%'",
  319. "GRANT SELECT, INSERT, UPDATE ON `testdb`.`testtableone` TO 'testuser'@'%'",
  320. "GRANT SELECT ON `testdb`.`testtablethree` TO 'testuser'@'%'",
  321. ]
  322. with patch.object(mysql, 'version', return_value='5.6.41'):
  323. mock = MagicMock(return_value=mock_grants)
  324. with patch.object(mysql, 'user_grants', return_value=mock_grants) as mock_user_grants:
  325. ret = mysql.grant_exists(
  326. 'SELECT',
  327. 'testdb.testtabletwo',
  328. 'testuser',
  329. '%'
  330. )
  331. self.assertEqual(ret, False)
  332. def test_grant_exists_all(self):
  333. '''
  334. Test to ensure that we can find a grant that exists
  335. '''
  336. mock_grants = [
  337. "GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, SHUTDOWN, PROCESS, FILE, REFERENCES, INDEX, ALTER, SHOW DATABASES, SUPER, CREATE TEMPORARY TABLES, LOCK TABLES, EXECUTE, REPLICATION SLAVE, REPLICATION CLIENT, CREATE VIEW, SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER, CREATE TABLESPACE, CREATE ROLE, DROP ROLE ON testdb.testtableone TO `testuser`@`%`",
  338. "GRANT BACKUP_ADMIN,BINLOG_ADMIN,CONNECTION_ADMIN,ENCRYPTION_KEY_ADMIN,GROUP_REPLICATION_ADMIN,PERSIST_RO_VARIABLES_ADMIN,REPLICATION_SLAVE_ADMIN,RESOURCE_GROUP_ADMIN,RESOURCE_GROUP_USER,ROLE_ADMIN,SET_USER_ID,SYSTEM_VARIABLES_ADMIN,XA_RECOVER_ADMIN ON testdb.testtableone TO `testuser`@`%`"
  339. ]
  340. with patch.object(mysql, 'version', return_value='8.0.10'):
  341. mock = MagicMock(return_value=mock_grants)
  342. with patch.object(mysql, 'user_grants', return_value=mock_grants) as mock_user_grants:
  343. ret = mysql.grant_exists(
  344. 'ALL',
  345. 'testdb.testtableone',
  346. 'testuser',
  347. '%'
  348. )
  349. self.assertEqual(ret, True)
  350. mock_grants = ["GRANT ALL PRIVILEGES ON testdb.testtableone TO `testuser`@`%`"]
  351. with patch.object(mysql, 'version', return_value='5.6.41'):
  352. mock = MagicMock(return_value=mock_grants)
  353. with patch.object(mysql, 'user_grants', return_value=mock_grants) as mock_user_grants:
  354. ret = mysql.grant_exists(
  355. 'ALL PRIVILEGES',
  356. 'testdb.testtableone',
  357. 'testuser',
  358. '%'
  359. )
  360. self.assertEqual(ret, True)
  361. @skipIf(True, 'TODO: Mock up user_grants()')
  362. def test_grant_add(self):
  363. '''
  364. Test grant_add function in mysql exec module
  365. '''
  366. self._test_call(mysql.grant_add, '', 'SELECT,INSERT,UPDATE', 'database.*', 'frank', 'localhost')
  367. @skipIf(True, 'TODO: Mock up user_grants()')
  368. def test_grant_revoke(self):
  369. '''
  370. Test grant revoke in mysql exec module
  371. '''
  372. self._test_call(mysql.grant_revoke, '', 'SELECT,INSERT,UPDATE', 'database.*', 'frank', 'localhost')
  373. def test_processlist(self):
  374. '''
  375. Test processlist function in mysql exec module
  376. '''
  377. self._test_call(mysql.processlist, 'SHOW FULL PROCESSLIST')
  378. def test_get_master_status(self):
  379. '''
  380. Test get_master_status in the mysql execution module
  381. '''
  382. self._test_call(mysql.get_master_status, 'SHOW MASTER STATUS')
  383. def test_get_slave_status(self):
  384. '''
  385. Test get_slave_status in the mysql execution module
  386. '''
  387. self._test_call(mysql.get_slave_status, 'SHOW SLAVE STATUS')
  388. def test_get_slave_status_bad_server(self):
  389. '''
  390. Test get_slave_status in the mysql execution module, simulating a broken server
  391. '''
  392. connect_mock = MagicMock(return_value=None)
  393. with patch.object(mysql, '_connect', connect_mock):
  394. with patch.dict(mysql.__salt__, {'config.option': MagicMock()}):
  395. rslt = mysql.get_slave_status()
  396. connect_mock.assert_has_calls([call()])
  397. self.assertEqual(rslt, [])
  398. @skipIf(True, 'MySQL module claims this function is not ready for production')
  399. def test_free_slave(self):
  400. pass
  401. def test_query(self):
  402. self._test_call(mysql.query, 'SELECT * FROM testdb', 'testdb', 'SELECT * FROM testdb')
  403. def test_query_error(self):
  404. connect_mock = MagicMock()
  405. with patch.object(mysql, '_connect', connect_mock):
  406. with patch.dict(mysql.__salt__, {'config.option': MagicMock()}):
  407. # Use the OperationalError from the salt mysql module because that
  408. # exception can come from either MySQLdb or pymysql
  409. side_effect = mysql.OperationalError(9999, 'Something Went Wrong')
  410. with patch.object(mysql, '_execute', MagicMock(side_effect=side_effect)):
  411. mysql.query('testdb', 'SELECT * FROM testdb')
  412. self.assertIn('mysql.error', mysql.__context__)
  413. expected = 'MySQL Error 9999: Something Went Wrong'
  414. self.assertEqual(mysql.__context__['mysql.error'], expected)
  415. def _test_call(self, function, expected_sql, *args, **kwargs):
  416. connect_mock = MagicMock()
  417. with patch.object(mysql, '_connect', connect_mock):
  418. with patch.dict(mysql.__salt__, {'config.option': MagicMock()}):
  419. function(*args, **kwargs)
  420. if isinstance(expected_sql, dict):
  421. calls = call().cursor().execute('{0}'.format(expected_sql['sql']), expected_sql['sql_args'])
  422. else:
  423. calls = call().cursor().execute('{0}'.format(expected_sql))
  424. connect_mock.assert_has_calls((calls,), True)