test_mysql_grants.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. # -*- coding: utf-8 -*-
  2. """
  3. Tests for the MySQL states
  4. """
  5. from __future__ import absolute_import, print_function, unicode_literals
  6. import logging
  7. import pytest
  8. import salt.utils.path
  9. from salt.ext import six
  10. from salt.modules import mysql as mysqlmod
  11. from tests.support.case import ModuleCase
  12. from tests.support.mixins import SaltReturnAssertsMixin
  13. from tests.support.unit import skipIf
  14. log = logging.getLogger(__name__)
  15. NO_MYSQL = False
  16. try:
  17. import MySQLdb # pylint: disable=import-error,unused-import
  18. except ImportError:
  19. NO_MYSQL = True
  20. if not salt.utils.path.which("mysqladmin"):
  21. NO_MYSQL = True
  22. @skipIf(
  23. NO_MYSQL,
  24. "Please install MySQL bindings and a MySQL Server before running"
  25. "MySQL integration tests.",
  26. )
  27. class MysqlGrantsStateTest(ModuleCase, SaltReturnAssertsMixin):
  28. """
  29. Validate the mysql_grants states
  30. """
  31. user = "root"
  32. password = "poney"
  33. # yep, theses are valid MySQL db names
  34. # very special chars are _ % and .
  35. testdb1 = "tes.t'\"saltdb"
  36. testdb2 = "t_st `(:=salt%b)"
  37. testdb3 = "test `(:=salteeb)"
  38. table1 = "foo"
  39. table2 = "foo `'%_bar"
  40. users = {
  41. "user1": {"name": "foo", "pwd": "bar"},
  42. "user2": {"name": 'user ";--,?:&/\\', "pwd": '";--(),?:@=&/\\'},
  43. # this is : passwd 標標
  44. "user3": {"name": "user( @ )=foobar", "pwd": "\xe6\xa8\x99\xe6\xa8\x99"},
  45. # this is : user/password containing 標標
  46. "user4": {"name": "user \xe6\xa8\x99", "pwd": "\xe6\xa8\x99\xe6\xa8\x99"},
  47. }
  48. @pytest.mark.destructive_test
  49. def setUp(self):
  50. """
  51. Test presence of MySQL server, enforce a root password
  52. """
  53. super(MysqlGrantsStateTest, self).setUp()
  54. NO_MYSQL_SERVER = True
  55. # now ensure we know the mysql root password
  56. # one of theses two at least should work
  57. ret1 = self.run_state(
  58. "cmd.run",
  59. name='mysqladmin --host="localhost" -u '
  60. + self.user
  61. + ' flush-privileges password "'
  62. + self.password
  63. + '"',
  64. )
  65. ret2 = self.run_state(
  66. "cmd.run",
  67. name='mysqladmin --host="localhost" -u '
  68. + self.user
  69. + ' --password="'
  70. + self.password
  71. + '" flush-privileges password "'
  72. + self.password
  73. + '"',
  74. )
  75. key, value = ret2.popitem()
  76. if value["result"]:
  77. NO_MYSQL_SERVER = False
  78. else:
  79. self.skipTest("No MySQL Server running, or no root access on it.")
  80. # Create some users and a test db
  81. for user, userdef in six.iteritems(self.users):
  82. self._userCreation(uname=userdef["name"], password=userdef["pwd"])
  83. self.run_state(
  84. "mysql_database.present",
  85. name=self.testdb1,
  86. character_set="utf8",
  87. collate="utf8_general_ci",
  88. connection_user=self.user,
  89. connection_pass=self.password,
  90. )
  91. self.run_state(
  92. "mysql_database.present",
  93. name=self.testdb2,
  94. character_set="utf8",
  95. collate="utf8_general_ci",
  96. connection_user=self.user,
  97. connection_pass=self.password,
  98. )
  99. create_query = (
  100. "CREATE TABLE {tblname} ("
  101. " id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,"
  102. " data VARCHAR(100)) ENGINE={engine};".format(
  103. tblname=mysqlmod.quote_identifier(self.table1), engine="MYISAM",
  104. )
  105. )
  106. log.info("Adding table '%s'", self.table1)
  107. self.run_function(
  108. "mysql.query",
  109. database=self.testdb2,
  110. query=create_query,
  111. connection_user=self.user,
  112. connection_pass=self.password,
  113. )
  114. create_query = (
  115. "CREATE TABLE {tblname} ("
  116. " id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,"
  117. " data VARCHAR(100)) ENGINE={engine};".format(
  118. tblname=mysqlmod.quote_identifier(self.table2), engine="MYISAM",
  119. )
  120. )
  121. log.info("Adding table '%s'", self.table2)
  122. self.run_function(
  123. "mysql.query",
  124. database=self.testdb2,
  125. query=create_query,
  126. connection_user=self.user,
  127. connection_pass=self.password,
  128. )
  129. @pytest.mark.destructive_test
  130. def tearDown(self):
  131. """
  132. Removes created users and db
  133. """
  134. for user, userdef in six.iteritems(self.users):
  135. self._userRemoval(uname=userdef["name"], password=userdef["pwd"])
  136. self.run_state(
  137. "mysql_database.absent",
  138. name=self.testdb1,
  139. connection_user=self.user,
  140. connection_pass=self.password,
  141. )
  142. self.run_function(
  143. "mysql_database.absent",
  144. name=self.testdb2,
  145. connection_user=self.user,
  146. connection_pass=self.password,
  147. )
  148. def _userCreation(self, uname, password=None):
  149. """
  150. Create a test user
  151. """
  152. self.run_state(
  153. "mysql_user.present",
  154. name=uname,
  155. host="localhost",
  156. password=password,
  157. connection_user=self.user,
  158. connection_pass=self.password,
  159. connection_charset="utf8",
  160. saltenv={"LC_ALL": "en_US.utf8"},
  161. )
  162. def _userRemoval(self, uname, password=None):
  163. """
  164. Removes a test user
  165. """
  166. self.run_state(
  167. "mysql_user.absent",
  168. name=uname,
  169. host="localhost",
  170. connection_user=self.user,
  171. connection_pass=self.password,
  172. connection_charset="utf8",
  173. saltenv={"LC_ALL": "en_US.utf8"},
  174. )
  175. @pytest.mark.destructive_test
  176. def test_grant_present_absent(self):
  177. """
  178. mysql_database.present
  179. """
  180. ret = self.run_state(
  181. "mysql_grants.present",
  182. name="grant test 1",
  183. grant="SELECT, INSERT",
  184. database=self.testdb1 + ".*",
  185. user=self.users["user1"]["name"],
  186. host="localhost",
  187. grant_option=True,
  188. revoke_first=True,
  189. connection_user=self.user,
  190. connection_pass=self.password,
  191. connection_charset="utf8",
  192. )
  193. self.assertSaltTrueReturn(ret)
  194. ret = self.run_state(
  195. "mysql_grants.present",
  196. name="grant test 2",
  197. grant="SELECT, ALTER,CREATE TEMPORARY tables, execute",
  198. database=self.testdb1 + ".*",
  199. user=self.users["user1"]["name"],
  200. host="localhost",
  201. grant_option=True,
  202. revoke_first=True,
  203. connection_user=self.user,
  204. connection_pass=self.password,
  205. connection_charset="utf8",
  206. )
  207. self.assertSaltTrueReturn(ret)
  208. ret = self.run_state(
  209. "mysql_grants.present",
  210. name="grant test 3",
  211. grant="SELECT, INSERT",
  212. database=self.testdb2 + "." + self.table2,
  213. user=self.users["user2"]["name"],
  214. host="localhost",
  215. grant_option=True,
  216. revoke_first=True,
  217. connection_user=self.user,
  218. connection_pass=self.password,
  219. connection_charset="utf8",
  220. )
  221. self.assertSaltTrueReturn(ret)
  222. ret = self.run_state(
  223. "mysql_grants.present",
  224. name="grant test 4",
  225. grant="SELECT, INSERT",
  226. database=self.testdb2 + "." + self.table2,
  227. user=self.users["user2"]["name"],
  228. host="localhost",
  229. grant_option=True,
  230. revoke_first=True,
  231. connection_user=self.user,
  232. connection_pass=self.password,
  233. connection_charset="utf8",
  234. )
  235. self.assertSaltTrueReturn(ret)
  236. ret = self.run_state(
  237. "mysql_grants.present",
  238. name="grant test 5",
  239. grant="SELECT, UPDATE",
  240. database=self.testdb2 + ".*",
  241. user=self.users["user1"]["name"],
  242. host="localhost",
  243. grant_option=True,
  244. revoke_first=False,
  245. connection_user=self.user,
  246. connection_pass=self.password,
  247. connection_charset="utf8",
  248. )
  249. self.assertSaltTrueReturn(ret)
  250. ret = self.run_state(
  251. "mysql_grants.absent",
  252. name="grant test 6",
  253. grant="SELECT,update",
  254. database=self.testdb2 + ".*",
  255. user=self.users["user1"]["name"],
  256. host="localhost",
  257. grant_option=True,
  258. revoke_first=False,
  259. connection_user=self.user,
  260. connection_pass=self.password,
  261. connection_charset="utf8",
  262. )
  263. self.assertSaltTrueReturn(ret)