# -*- coding: utf-8 -*- ''' Tests for the MySQL states ''' # Import python libs from __future__ import absolute_import, print_function, unicode_literals import logging # Import Salt Testing libs from tests.support.case import ModuleCase from tests.support.unit import skipIf from tests.support.helpers import destructiveTest from tests.support.mixins import SaltReturnAssertsMixin # Import salt libs import salt.utils.path from salt.ext import six from salt.modules import mysql as mysqlmod log = logging.getLogger(__name__) NO_MYSQL = False try: import MySQLdb # pylint: disable=import-error,unused-import except ImportError: NO_MYSQL = True if not salt.utils.path.which('mysqladmin'): NO_MYSQL = True @skipIf( NO_MYSQL, 'Please install MySQL bindings and a MySQL Server before running' 'MySQL integration tests.' ) class MysqlGrantsStateTest(ModuleCase, SaltReturnAssertsMixin): ''' Validate the mysql_grants states ''' user = 'root' password = 'poney' # yep, theses are valid MySQL db names # very special chars are _ % and . testdb1 = 'tes.t\'"saltdb' testdb2 = 't_st `(:=salt%b)' testdb3 = 'test `(:=salteeb)' table1 = 'foo' table2 = "foo `\'%_bar" users = { 'user1': { 'name': 'foo', 'pwd': 'bar', }, 'user2': { 'name': 'user ";--,?:&/\\', 'pwd': '";--(),?:@=&/\\', }, # this is : passwd 標標 'user3': { 'name': 'user( @ )=foobar', 'pwd': '\xe6\xa8\x99\xe6\xa8\x99', }, # this is : user/password containing 標標 'user4': { 'name': 'user \xe6\xa8\x99', 'pwd': '\xe6\xa8\x99\xe6\xa8\x99', }, } @destructiveTest def setUp(self): ''' Test presence of MySQL server, enforce a root password ''' super(MysqlGrantsStateTest, self).setUp() NO_MYSQL_SERVER = True # now ensure we know the mysql root password # one of theses two at least should work ret1 = self.run_state( 'cmd.run', name='mysqladmin --host="localhost" -u ' + self.user + ' flush-privileges password "' + self.password + '"' ) ret2 = self.run_state( 'cmd.run', name='mysqladmin --host="localhost" -u ' + self.user + ' --password="' + self.password + '" flush-privileges password "' + self.password + '"' ) key, value = ret2.popitem() if value['result']: NO_MYSQL_SERVER = False else: self.skipTest('No MySQL Server running, or no root access on it.') # Create some users and a test db for user, userdef in six.iteritems(self.users): self._userCreation(uname=userdef['name'], password=userdef['pwd']) self.run_state( 'mysql_database.present', name=self.testdb1, character_set='utf8', collate='utf8_general_ci', connection_user=self.user, connection_pass=self.password, ) self.run_state( 'mysql_database.present', name=self.testdb2, character_set='utf8', collate='utf8_general_ci', connection_user=self.user, connection_pass=self.password, ) create_query = ('CREATE TABLE {tblname} (' ' id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,' ' data VARCHAR(100)) ENGINE={engine};'.format( tblname=mysqlmod.quote_identifier(self.table1), engine='MYISAM', )) log.info('Adding table \'%s\'', self.table1) self.run_function( 'mysql.query', database=self.testdb2, query=create_query, connection_user=self.user, connection_pass=self.password ) create_query = ('CREATE TABLE {tblname} (' ' id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,' ' data VARCHAR(100)) ENGINE={engine};'.format( tblname=mysqlmod.quote_identifier(self.table2), engine='MYISAM', )) log.info('Adding table \'%s\'', self.table2) self.run_function( 'mysql.query', database=self.testdb2, query=create_query, connection_user=self.user, connection_pass=self.password ) @destructiveTest def tearDown(self): ''' Removes created users and db ''' for user, userdef in six.iteritems(self.users): self._userRemoval(uname=userdef['name'], password=userdef['pwd']) self.run_state( 'mysql_database.absent', name=self.testdb1, connection_user=self.user, connection_pass=self.password, ) self.run_function( 'mysql_database.absent', name=self.testdb2, connection_user=self.user, connection_pass=self.password, ) def _userCreation(self, uname, password=None): ''' Create a test user ''' self.run_state( 'mysql_user.present', name=uname, host='localhost', password=password, connection_user=self.user, connection_pass=self.password, connection_charset='utf8', saltenv={"LC_ALL": "en_US.utf8"} ) def _userRemoval(self, uname, password=None): ''' Removes a test user ''' self.run_state( 'mysql_user.absent', name=uname, host='localhost', connection_user=self.user, connection_pass=self.password, connection_charset='utf8', saltenv={"LC_ALL": "en_US.utf8"} ) @destructiveTest def test_grant_present_absent(self): ''' mysql_database.present ''' ret = self.run_state( 'mysql_grants.present', name='grant test 1', grant='SELECT, INSERT', database=self.testdb1 + '.*', user=self.users['user1']['name'], host='localhost', grant_option=True, revoke_first=True, connection_user=self.user, connection_pass=self.password, connection_charset='utf8' ) self.assertSaltTrueReturn(ret) ret = self.run_state( 'mysql_grants.present', name='grant test 2', grant='SELECT, ALTER,CREATE TEMPORARY tables, execute', database=self.testdb1 + '.*', user=self.users['user1']['name'], host='localhost', grant_option=True, revoke_first=True, connection_user=self.user, connection_pass=self.password, connection_charset='utf8' ) self.assertSaltTrueReturn(ret) ret = self.run_state( 'mysql_grants.present', name='grant test 3', grant='SELECT, INSERT', database=self.testdb2 + '.' + self.table2, user=self.users['user2']['name'], host='localhost', grant_option=True, revoke_first=True, connection_user=self.user, connection_pass=self.password, connection_charset='utf8' ) self.assertSaltTrueReturn(ret) ret = self.run_state( 'mysql_grants.present', name='grant test 4', grant='SELECT, INSERT', database=self.testdb2 + '.' + self.table2, user=self.users['user2']['name'], host='localhost', grant_option=True, revoke_first=True, connection_user=self.user, connection_pass=self.password, connection_charset='utf8' ) self.assertSaltTrueReturn(ret) ret = self.run_state( 'mysql_grants.present', name='grant test 5', grant='SELECT, UPDATE', database=self.testdb2 + '.*', user=self.users['user1']['name'], host='localhost', grant_option=True, revoke_first=False, connection_user=self.user, connection_pass=self.password, connection_charset='utf8' ) self.assertSaltTrueReturn(ret) ret = self.run_state( 'mysql_grants.absent', name='grant test 6', grant='SELECT,update', database=self.testdb2 + '.*', user=self.users['user1']['name'], host='localhost', grant_option=True, revoke_first=False, connection_user=self.user, connection_pass=self.password, connection_charset='utf8' ) self.assertSaltTrueReturn(ret)