test_mysql_grants.py 8.8 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. Tests for the MySQL states
  4. """
  5. # Import python libs
  6. from __future__ import absolute_import, print_function, unicode_literals
  7. import logging
  8. # Import salt libs
  9. import salt.utils.path
  10. from salt.ext import six
  11. from salt.modules import mysql as mysqlmod
  12. # Import Salt Testing libs
  13. from tests.support.case import ModuleCase
  14. from tests.support.helpers import destructiveTest
  15. from tests.support.mixins import SaltReturnAssertsMixin
  16. from tests.support.unit import skipIf
  17. log = logging.getLogger(__name__)
  18. NO_MYSQL = False
  19. try:
  20. import MySQLdb # pylint: disable=import-error,unused-import
  21. except ImportError:
  22. NO_MYSQL = True
  23. if not salt.utils.path.which("mysqladmin"):
  24. NO_MYSQL = True
  25. @skipIf(
  26. NO_MYSQL,
  27. "Please install MySQL bindings and a MySQL Server before running"
  28. "MySQL integration tests.",
  29. )
  30. class MysqlGrantsStateTest(ModuleCase, SaltReturnAssertsMixin):
  31. """
  32. Validate the mysql_grants states
  33. """
  34. user = "root"
  35. password = "poney"
  36. # yep, theses are valid MySQL db names
  37. # very special chars are _ % and .
  38. testdb1 = "tes.t'\"saltdb"
  39. testdb2 = "t_st `(:=salt%b)"
  40. testdb3 = "test `(:=salteeb)"
  41. table1 = "foo"
  42. table2 = "foo `'%_bar"
  43. users = {
  44. "user1": {"name": "foo", "pwd": "bar"},
  45. "user2": {"name": 'user ";--,?:&/\\', "pwd": '";--(),?:@=&/\\'},
  46. # this is : passwd 標標
  47. "user3": {"name": "user( @ )=foobar", "pwd": "\xe6\xa8\x99\xe6\xa8\x99"},
  48. # this is : user/password containing 標標
  49. "user4": {"name": "user \xe6\xa8\x99", "pwd": "\xe6\xa8\x99\xe6\xa8\x99"},
  50. }
  51. @destructiveTest
  52. def setUp(self):
  53. """
  54. Test presence of MySQL server, enforce a root password
  55. """
  56. super(MysqlGrantsStateTest, self).setUp()
  57. NO_MYSQL_SERVER = True
  58. # now ensure we know the mysql root password
  59. # one of theses two at least should work
  60. ret1 = self.run_state(
  61. "cmd.run",
  62. name='mysqladmin --host="localhost" -u '
  63. + self.user
  64. + ' flush-privileges password "'
  65. + self.password
  66. + '"',
  67. )
  68. ret2 = self.run_state(
  69. "cmd.run",
  70. name='mysqladmin --host="localhost" -u '
  71. + self.user
  72. + ' --password="'
  73. + self.password
  74. + '" flush-privileges password "'
  75. + self.password
  76. + '"',
  77. )
  78. key, value = ret2.popitem()
  79. if value["result"]:
  80. NO_MYSQL_SERVER = False
  81. else:
  82. self.skipTest("No MySQL Server running, or no root access on it.")
  83. # Create some users and a test db
  84. for user, userdef in six.iteritems(self.users):
  85. self._userCreation(uname=userdef["name"], password=userdef["pwd"])
  86. self.run_state(
  87. "mysql_database.present",
  88. name=self.testdb1,
  89. character_set="utf8",
  90. collate="utf8_general_ci",
  91. connection_user=self.user,
  92. connection_pass=self.password,
  93. )
  94. self.run_state(
  95. "mysql_database.present",
  96. name=self.testdb2,
  97. character_set="utf8",
  98. collate="utf8_general_ci",
  99. connection_user=self.user,
  100. connection_pass=self.password,
  101. )
  102. create_query = (
  103. "CREATE TABLE {tblname} ("
  104. " id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,"
  105. " data VARCHAR(100)) ENGINE={engine};".format(
  106. tblname=mysqlmod.quote_identifier(self.table1), engine="MYISAM",
  107. )
  108. )
  109. log.info("Adding table '%s'", self.table1)
  110. self.run_function(
  111. "mysql.query",
  112. database=self.testdb2,
  113. query=create_query,
  114. connection_user=self.user,
  115. connection_pass=self.password,
  116. )
  117. create_query = (
  118. "CREATE TABLE {tblname} ("
  119. " id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,"
  120. " data VARCHAR(100)) ENGINE={engine};".format(
  121. tblname=mysqlmod.quote_identifier(self.table2), engine="MYISAM",
  122. )
  123. )
  124. log.info("Adding table '%s'", self.table2)
  125. self.run_function(
  126. "mysql.query",
  127. database=self.testdb2,
  128. query=create_query,
  129. connection_user=self.user,
  130. connection_pass=self.password,
  131. )
  132. @destructiveTest
  133. def tearDown(self):
  134. """
  135. Removes created users and db
  136. """
  137. for user, userdef in six.iteritems(self.users):
  138. self._userRemoval(uname=userdef["name"], password=userdef["pwd"])
  139. self.run_state(
  140. "mysql_database.absent",
  141. name=self.testdb1,
  142. connection_user=self.user,
  143. connection_pass=self.password,
  144. )
  145. self.run_function(
  146. "mysql_database.absent",
  147. name=self.testdb2,
  148. connection_user=self.user,
  149. connection_pass=self.password,
  150. )
  151. def _userCreation(self, uname, password=None):
  152. """
  153. Create a test user
  154. """
  155. self.run_state(
  156. "mysql_user.present",
  157. name=uname,
  158. host="localhost",
  159. password=password,
  160. connection_user=self.user,
  161. connection_pass=self.password,
  162. connection_charset="utf8",
  163. saltenv={"LC_ALL": "en_US.utf8"},
  164. )
  165. def _userRemoval(self, uname, password=None):
  166. """
  167. Removes a test user
  168. """
  169. self.run_state(
  170. "mysql_user.absent",
  171. name=uname,
  172. host="localhost",
  173. connection_user=self.user,
  174. connection_pass=self.password,
  175. connection_charset="utf8",
  176. saltenv={"LC_ALL": "en_US.utf8"},
  177. )
  178. @destructiveTest
  179. def test_grant_present_absent(self):
  180. """
  181. mysql_database.present
  182. """
  183. ret = self.run_state(
  184. "mysql_grants.present",
  185. name="grant test 1",
  186. grant="SELECT, INSERT",
  187. database=self.testdb1 + ".*",
  188. user=self.users["user1"]["name"],
  189. host="localhost",
  190. grant_option=True,
  191. revoke_first=True,
  192. connection_user=self.user,
  193. connection_pass=self.password,
  194. connection_charset="utf8",
  195. )
  196. self.assertSaltTrueReturn(ret)
  197. ret = self.run_state(
  198. "mysql_grants.present",
  199. name="grant test 2",
  200. grant="SELECT, ALTER,CREATE TEMPORARY tables, execute",
  201. database=self.testdb1 + ".*",
  202. user=self.users["user1"]["name"],
  203. host="localhost",
  204. grant_option=True,
  205. revoke_first=True,
  206. connection_user=self.user,
  207. connection_pass=self.password,
  208. connection_charset="utf8",
  209. )
  210. self.assertSaltTrueReturn(ret)
  211. ret = self.run_state(
  212. "mysql_grants.present",
  213. name="grant test 3",
  214. grant="SELECT, INSERT",
  215. database=self.testdb2 + "." + self.table2,
  216. user=self.users["user2"]["name"],
  217. host="localhost",
  218. grant_option=True,
  219. revoke_first=True,
  220. connection_user=self.user,
  221. connection_pass=self.password,
  222. connection_charset="utf8",
  223. )
  224. self.assertSaltTrueReturn(ret)
  225. ret = self.run_state(
  226. "mysql_grants.present",
  227. name="grant test 4",
  228. grant="SELECT, INSERT",
  229. database=self.testdb2 + "." + self.table2,
  230. user=self.users["user2"]["name"],
  231. host="localhost",
  232. grant_option=True,
  233. revoke_first=True,
  234. connection_user=self.user,
  235. connection_pass=self.password,
  236. connection_charset="utf8",
  237. )
  238. self.assertSaltTrueReturn(ret)
  239. ret = self.run_state(
  240. "mysql_grants.present",
  241. name="grant test 5",
  242. grant="SELECT, UPDATE",
  243. database=self.testdb2 + ".*",
  244. user=self.users["user1"]["name"],
  245. host="localhost",
  246. grant_option=True,
  247. revoke_first=False,
  248. connection_user=self.user,
  249. connection_pass=self.password,
  250. connection_charset="utf8",
  251. )
  252. self.assertSaltTrueReturn(ret)
  253. ret = self.run_state(
  254. "mysql_grants.absent",
  255. name="grant test 6",
  256. grant="SELECT,update",
  257. database=self.testdb2 + ".*",
  258. user=self.users["user1"]["name"],
  259. host="localhost",
  260. grant_option=True,
  261. revoke_first=False,
  262. connection_user=self.user,
  263. connection_pass=self.password,
  264. connection_charset="utf8",
  265. )
  266. self.assertSaltTrueReturn(ret)