1
0

test_mysql_grants.py 9.0 KB


  1. # -*- coding: utf-8 -*-
  2. '''
  3. Tests for the MySQL states
  4. '''
  5. # Import python libs
  6. from __future__ import absolute_import, print_function, unicode_literals
  7. import logging
  8. # Import Salt Testing libs
  9. from tests.support.case import ModuleCase
  10. from tests.support.unit import skipIf
  11. from tests.support.helpers import destructiveTest
  12. from tests.support.mixins import SaltReturnAssertsMixin
  13. # Import salt libs
  14. import salt.utils.path
  15. from salt.ext import six
  16. from salt.modules import mysql as mysqlmod
  17. log = logging.getLogger(__name__)
  18. NO_MYSQL = False
  19. try:
  20. import MySQLdb # pylint: disable=import-error,unused-import
  21. except ImportError:
  22. NO_MYSQL = True
  23. if not salt.utils.path.which('mysqladmin'):
  24. NO_MYSQL = True
  25. @skipIf(
  26. NO_MYSQL,
  27. 'Please install MySQL bindings and a MySQL Server before running'
  28. 'MySQL integration tests.'
  29. )
  30. class MysqlGrantsStateTest(ModuleCase, SaltReturnAssertsMixin):
  31. '''
  32. Validate the mysql_grants states
  33. '''
  34. user = 'root'
  35. password = 'poney'
  36. # yep, theses are valid MySQL db names
  37. # very special chars are _ % and .
  38. testdb1 = 'tes.t\'"saltdb'
  39. testdb2 = 't_st `(:=salt%b)'
  40. testdb3 = 'test `(:=salteeb)'
  41. table1 = 'foo'
  42. table2 = "foo `\'%_bar"
  43. users = {
  44. 'user1': {
  45. 'name': 'foo',
  46. 'pwd': 'bar',
  47. },
  48. 'user2': {
  49. 'name': 'user ";--,?:&/\\',
  50. 'pwd': '";--(),?:@=&/\\',
  51. },
  52. # this is : passwd 標標
  53. 'user3': {
  54. 'name': 'user( @ )=foobar',
  55. 'pwd': '\xe6\xa8\x99\xe6\xa8\x99',
  56. },
  57. # this is : user/password containing 標標
  58. 'user4': {
  59. 'name': 'user \xe6\xa8\x99',
  60. 'pwd': '\xe6\xa8\x99\xe6\xa8\x99',
  61. },
  62. }
  63. @destructiveTest
  64. def setUp(self):
  65. '''
  66. Test presence of MySQL server, enforce a root password
  67. '''
  68. super(MysqlGrantsStateTest, self).setUp()
  69. NO_MYSQL_SERVER = True
  70. # now ensure we know the mysql root password
  71. # one of theses two at least should work
  72. ret1 = self.run_state(
  73. 'cmd.run',
  74. name='mysqladmin --host="localhost" -u '
  75. + self.user
  76. + ' flush-privileges password "'
  77. + self.password
  78. + '"'
  79. )
  80. ret2 = self.run_state(
  81. 'cmd.run',
  82. name='mysqladmin --host="localhost" -u '
  83. + self.user
  84. + ' --password="'
  85. + self.password
  86. + '" flush-privileges password "'
  87. + self.password
  88. + '"'
  89. )
  90. key, value = ret2.popitem()
  91. if value['result']:
  92. NO_MYSQL_SERVER = False
  93. else:
  94. self.skipTest('No MySQL Server running, or no root access on it.')
  95. # Create some users and a test db
  96. for user, userdef in six.iteritems(self.users):
  97. self._userCreation(uname=userdef['name'], password=userdef['pwd'])
  98. self.run_state(
  99. 'mysql_database.present',
  100. name=self.testdb1,
  101. character_set='utf8',
  102. collate='utf8_general_ci',
  103. connection_user=self.user,
  104. connection_pass=self.password,
  105. )
  106. self.run_state(
  107. 'mysql_database.present',
  108. name=self.testdb2,
  109. character_set='utf8',
  110. collate='utf8_general_ci',
  111. connection_user=self.user,
  112. connection_pass=self.password,
  113. )
  114. create_query = ('CREATE TABLE {tblname} ('
  115. ' id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,'
  116. ' data VARCHAR(100)) ENGINE={engine};'.format(
  117. tblname=mysqlmod.quote_identifier(self.table1),
  118. engine='MYISAM',
  119. ))
  120. log.info('Adding table \'%s\'', self.table1)
  121. self.run_function(
  122. 'mysql.query',
  123. database=self.testdb2,
  124. query=create_query,
  125. connection_user=self.user,
  126. connection_pass=self.password
  127. )
  128. create_query = ('CREATE TABLE {tblname} ('
  129. ' id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,'
  130. ' data VARCHAR(100)) ENGINE={engine};'.format(
  131. tblname=mysqlmod.quote_identifier(self.table2),
  132. engine='MYISAM',
  133. ))
  134. log.info('Adding table \'%s\'', self.table2)
  135. self.run_function(
  136. 'mysql.query',
  137. database=self.testdb2,
  138. query=create_query,
  139. connection_user=self.user,
  140. connection_pass=self.password
  141. )
  142. @destructiveTest
  143. def tearDown(self):
  144. '''
  145. Removes created users and db
  146. '''
  147. for user, userdef in six.iteritems(self.users):
  148. self._userRemoval(uname=userdef['name'], password=userdef['pwd'])
  149. self.run_state(
  150. 'mysql_database.absent',
  151. name=self.testdb1,
  152. connection_user=self.user,
  153. connection_pass=self.password,
  154. )
  155. self.run_function(
  156. 'mysql_database.absent',
  157. name=self.testdb2,
  158. connection_user=self.user,
  159. connection_pass=self.password,
  160. )
  161. def _userCreation(self,
  162. uname,
  163. password=None):
  164. '''
  165. Create a test user
  166. '''
  167. self.run_state(
  168. 'mysql_user.present',
  169. name=uname,
  170. host='localhost',
  171. password=password,
  172. connection_user=self.user,
  173. connection_pass=self.password,
  174. connection_charset='utf8',
  175. saltenv={"LC_ALL": "en_US.utf8"}
  176. )
  177. def _userRemoval(self,
  178. uname,
  179. password=None):
  180. '''
  181. Removes a test user
  182. '''
  183. self.run_state(
  184. 'mysql_user.absent',
  185. name=uname,
  186. host='localhost',
  187. connection_user=self.user,
  188. connection_pass=self.password,
  189. connection_charset='utf8',
  190. saltenv={"LC_ALL": "en_US.utf8"}
  191. )
  192. @destructiveTest
  193. def test_grant_present_absent(self):
  194. '''
  195. mysql_database.present
  196. '''
  197. ret = self.run_state(
  198. 'mysql_grants.present',
  199. name='grant test 1',
  200. grant='SELECT, INSERT',
  201. database=self.testdb1 + '.*',
  202. user=self.users['user1']['name'],
  203. host='localhost',
  204. grant_option=True,
  205. revoke_first=True,
  206. connection_user=self.user,
  207. connection_pass=self.password,
  208. connection_charset='utf8'
  209. )
  210. self.assertSaltTrueReturn(ret)
  211. ret = self.run_state(
  212. 'mysql_grants.present',
  213. name='grant test 2',
  214. grant='SELECT, ALTER,CREATE TEMPORARY tables, execute',
  215. database=self.testdb1 + '.*',
  216. user=self.users['user1']['name'],
  217. host='localhost',
  218. grant_option=True,
  219. revoke_first=True,
  220. connection_user=self.user,
  221. connection_pass=self.password,
  222. connection_charset='utf8'
  223. )
  224. self.assertSaltTrueReturn(ret)
  225. ret = self.run_state(
  226. 'mysql_grants.present',
  227. name='grant test 3',
  228. grant='SELECT, INSERT',
  229. database=self.testdb2 + '.' + self.table2,
  230. user=self.users['user2']['name'],
  231. host='localhost',
  232. grant_option=True,
  233. revoke_first=True,
  234. connection_user=self.user,
  235. connection_pass=self.password,
  236. connection_charset='utf8'
  237. )
  238. self.assertSaltTrueReturn(ret)
  239. ret = self.run_state(
  240. 'mysql_grants.present',
  241. name='grant test 4',
  242. grant='SELECT, INSERT',
  243. database=self.testdb2 + '.' + self.table2,
  244. user=self.users['user2']['name'],
  245. host='localhost',
  246. grant_option=True,
  247. revoke_first=True,
  248. connection_user=self.user,
  249. connection_pass=self.password,
  250. connection_charset='utf8'
  251. )
  252. self.assertSaltTrueReturn(ret)
  253. ret = self.run_state(
  254. 'mysql_grants.present',
  255. name='grant test 5',
  256. grant='SELECT, UPDATE',
  257. database=self.testdb2 + '.*',
  258. user=self.users['user1']['name'],
  259. host='localhost',
  260. grant_option=True,
  261. revoke_first=False,
  262. connection_user=self.user,
  263. connection_pass=self.password,
  264. connection_charset='utf8'
  265. )
  266. self.assertSaltTrueReturn(ret)
  267. ret = self.run_state(
  268. 'mysql_grants.absent',
  269. name='grant test 6',
  270. grant='SELECT,update',
  271. database=self.testdb2 + '.*',
  272. user=self.users['user1']['name'],
  273. host='localhost',
  274. grant_option=True,
  275. revoke_first=False,
  276. connection_user=self.user,
  277. connection_pass=self.password,
  278. connection_charset='utf8'
  279. )
  280. self.assertSaltTrueReturn(ret)