|
- # -*- coding: utf-8 -*-
- from __future__ import absolute_import, print_function, unicode_literals
- import logging
- import pytest
- import salt.utils.path
- from salt.ext import six
- from salt.ext.six.moves import range
- from salt.modules import mysql as mysqlmod
- from tests.support.case import ModuleCase
- from tests.support.helpers import destructiveTest
- from tests.support.mixins import SaltReturnAssertsMixin
- from tests.support.unit import skipIf
- log = logging.getLogger(__name__)
- NO_MYSQL = False
- try:
- import MySQLdb # pylint: disable=import-error,unused-import
- except Exception: # pylint: disable=broad-except
- NO_MYSQL = True
- if not salt.utils.path.which("mysqladmin"):
- NO_MYSQL = True
- @skipIf(
- NO_MYSQL,
- "Please install MySQL bindings and a MySQL Server before running"
- "MySQL integration tests.",
- )
- @pytest.mark.windows_whitelisted
- class MysqlModuleDbTest(ModuleCase, SaltReturnAssertsMixin):
- """
- Module testing database creation on a real MySQL Server.
- """
- user = "root"
- password = "poney"
- @destructiveTest
- def setUp(self):
- """
- Test presence of MySQL server, enforce a root password
- """
- super(MysqlModuleDbTest, self).setUp()
- NO_MYSQL_SERVER = True
- # now ensure we know the mysql root password
- # one of theses two at least should work
- ret1 = self.run_state(
- "cmd.run",
- name='mysqladmin --host="localhost" -u '
- + self.user
- + ' flush-privileges password "'
- + self.password
- + '"',
- )
- ret2 = self.run_state(
- "cmd.run",
- name='mysqladmin --host="localhost" -u '
- + self.user
- + ' --password="'
- + self.password
- + '" flush-privileges password "'
- + self.password
- + '"',
- )
- key, value = ret2.popitem()
- if value["result"]:
- NO_MYSQL_SERVER = False
- else:
- self.skipTest("No MySQL Server running, or no root access on it.")
- def _db_creation_loop(self, db_name, returning_name, test_conn=False, **kwargs):
- """
- Used in db testCase, create, check exists, check in list and removes.
- """
- ret = self.run_function("mysql.db_create", name=db_name, **kwargs)
- self.assertEqual(
- True, ret, "Problem while creating db for db name: '{0}'".format(db_name)
- )
- # test db exists
- ret = self.run_function("mysql.db_exists", name=db_name, **kwargs)
- self.assertEqual(
- True,
- ret,
- "Problem while testing db exists for db name: '{0}'".format(db_name),
- )
- # List db names to ensure db is created with the right utf8 string
- ret = self.run_function("mysql.db_list", **kwargs)
- if not isinstance(ret, list):
- raise AssertionError(
- (
- "Unexpected query result while retrieving databases list"
- " '{0}' for '{1}' test"
- ).format(ret, db_name)
- )
- self.assertIn(
- returning_name,
- ret,
- (
- "Problem while testing presence of db name in db lists"
- " for db name: '{0}' in list '{1}'"
- ).format(db_name, ret),
- )
- if test_conn:
- # test connections on database with root user
- ret = self.run_function(
- "mysql.query", database=db_name, query="SELECT 1", **kwargs
- )
- if not isinstance(ret, dict) or "results" not in ret:
- raise AssertionError(
- (
- "Unexpected result while testing connection"
- " on database : {0}"
- ).format(repr(db_name))
- )
- self.assertEqual([["1"]], ret["results"])
- # Now remove database
- ret = self.run_function("mysql.db_remove", name=db_name, **kwargs)
- self.assertEqual(
- True, ret, "Problem while removing db for db name: '{0}'".format(db_name)
- )
- @destructiveTest
- def test_database_creation_level1(self):
- """
- Create database, test presence, then drop db. All theses with complex names.
- """
- # name with space
- db_name = "foo 1"
- self._db_creation_loop(
- db_name=db_name,
- returning_name=db_name,
- test_conn=True,
- connection_user=self.user,
- connection_pass=self.password,
- )
- # ```````
- # create
- # also with character_set and collate only
- ret = self.run_function(
- "mysql.db_create",
- name="foo`2",
- character_set="utf8",
- collate="utf8_general_ci",
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertEqual(True, ret)
- # test db exists
- ret = self.run_function(
- "mysql.db_exists",
- name="foo`2",
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertEqual(True, ret)
- # redoing the same should fail
- # even with other character sets or collations
- ret = self.run_function(
- "mysql.db_create",
- name="foo`2",
- character_set="utf8",
- collate="utf8_general_ci",
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertEqual(False, ret)
- # redoing the same should fail
- ret = self.run_function(
- "mysql.db_create",
- name="foo`2",
- character_set="utf8",
- collate="utf8_general_ci",
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertEqual(False, ret)
- # Now remove database
- ret = self.run_function(
- "mysql.db_remove",
- name="foo`2",
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertEqual(True, ret)
- # '''''''
- # create
- # also with character_set only
- db_name = "foo'3"
- self._db_creation_loop(
- db_name=db_name,
- returning_name=db_name,
- test_conn=True,
- character_set="utf8",
- connection_user=self.user,
- connection_pass=self.password,
- )
- # """"""""
- # also with collate only
- db_name = 'foo"4'
- self._db_creation_loop(
- db_name=db_name,
- returning_name=db_name,
- test_conn=True,
- collate="utf8_general_ci",
- connection_user=self.user,
- connection_pass=self.password,
- )
- # fuzzy
- db_name = '<foo` --"5>'
- self._db_creation_loop(
- db_name=db_name,
- returning_name=db_name,
- test_conn=True,
- connection_user=self.user,
- connection_pass=self.password,
- )
- @destructiveTest
- def test_mysql_dbname_character_percent(self):
- """
- Play with the '%' character problems
- This character should be escaped in the form '%%' on queries, but only
- when theses queries have arguments. It is also a special character
- in LIKE SQL queries. Finally it is used to indicate query arguments.
- """
- db_name1 = "foo%1_"
- db_name2 = "foo%12"
- ret = self.run_function(
- "mysql.db_create",
- name=db_name1,
- character_set="utf8",
- collate="utf8_general_ci",
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertEqual(True, ret)
- ret = self.run_function(
- "mysql.db_create",
- name=db_name2,
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertEqual(True, ret)
- ret = self.run_function(
- "mysql.db_remove",
- name=db_name1,
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertEqual(True, ret)
- ret = self.run_function(
- "mysql.db_exists",
- name=db_name1,
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertEqual(False, ret)
- ret = self.run_function(
- "mysql.db_exists",
- name=db_name2,
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertEqual(True, ret)
- ret = self.run_function(
- "mysql.db_remove",
- name=db_name2,
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertEqual(True, ret)
- @destructiveTest
- def test_database_creation_utf8(self):
- """
- Test support of utf8 in database names
- """
- # Simple accents : using utf8 string
- db_name_unicode = "notam\xe9rican"
- # same as 'notamérican' because of file encoding
- # but ensure it on this test
- db_name_utf8 = "notam\xc3\xa9rican"
- # FIXME: MySQLdb problems on conn strings containing
- # utf-8 on user name of db name prevent conn test
- self._db_creation_loop(
- db_name=db_name_utf8,
- returning_name=db_name_utf8,
- test_conn=False,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- # test unicode entry will also return utf8 name
- self._db_creation_loop(
- db_name=db_name_unicode,
- returning_name=db_name_utf8,
- test_conn=False,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- # Using more complex unicode characters:
- db_name_unicode = "\u6a19\u6e96\u8a9e"
- # same as '標準語' because of file encoding
- # but ensure it on this test
- db_name_utf8 = "\xe6\xa8\x99\xe6\xba\x96\xe8\xaa\x9e"
- self._db_creation_loop(
- db_name=db_name_utf8,
- returning_name=db_name_utf8,
- test_conn=False,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- # test unicode entry will also return utf8 name
- self._db_creation_loop(
- db_name=db_name_unicode,
- returning_name=db_name_utf8,
- test_conn=False,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- @destructiveTest
- def test_database_maintenance(self):
- """
- Test maintenance operations on a created database
- """
- dbname = "foo%'-- `\"'"
- # create database
- # but first silently try to remove it
- # in case of previous tests failures
- ret = self.run_function(
- "mysql.db_remove",
- name=dbname,
- connection_user=self.user,
- connection_pass=self.password,
- )
- ret = self.run_function(
- "mysql.db_create",
- name=dbname,
- character_set="utf8",
- collate="utf8_general_ci",
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertEqual(True, ret)
- # test db exists
- ret = self.run_function(
- "mysql.db_exists",
- name=dbname,
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertEqual(True, ret)
- # Create 3 tables
- tablenames = {
- 'A%table "`1': "MYISAM",
- "B%table '`2": "InnoDB",
- "Ctable --`3": "MEMORY",
- }
- for tablename, engine in sorted(six.iteritems(tablenames)):
- # prepare queries
- create_query = (
- "CREATE TABLE {tblname} ("
- " id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,"
- " data VARCHAR(100)) ENGINE={engine};".format(
- tblname=mysqlmod.quote_identifier(tablename), engine=engine,
- )
- )
- insert_query = "INSERT INTO {tblname} (data)" " VALUES ".format(
- tblname=mysqlmod.quote_identifier(tablename)
- )
- delete_query = "DELETE from {tblname}" " order by rand() limit 50;".format(
- tblname=mysqlmod.quote_identifier(tablename)
- )
- for x in range(100):
- insert_query += "('foo" + six.text_type(x) + "'),"
- insert_query += "('bar');"
- # populate database
- log.info("Adding table '%s'", tablename)
- ret = self.run_function(
- "mysql.query",
- database=dbname,
- query=create_query,
- connection_user=self.user,
- connection_pass=self.password,
- )
- if not isinstance(ret, dict) or "rows affected" not in ret:
- raise AssertionError(
- (
- "Unexpected query result while populating test table"
- " '{0}' : '{1}'"
- ).format(
- tablename, ret,
- )
- )
- self.assertEqual(ret["rows affected"], 0)
- log.info("Populating table '%s'", tablename)
- ret = self.run_function(
- "mysql.query",
- database=dbname,
- query=insert_query,
- connection_user=self.user,
- connection_pass=self.password,
- )
- if not isinstance(ret, dict) or "rows affected" not in ret:
- raise AssertionError(
- (
- "Unexpected query result while populating test table"
- " '{0}' : '{1}'"
- ).format(
- tablename, ret,
- )
- )
- self.assertEqual(ret["rows affected"], 101)
- log.info("Removing some rows on table'%s'", tablename)
- ret = self.run_function(
- "mysql.query",
- database=dbname,
- query=delete_query,
- connection_user=self.user,
- connection_pass=self.password,
- )
- if not isinstance(ret, dict) or "rows affected" not in ret:
- raise AssertionError(
- (
- "Unexpected query result while removing rows on test table"
- " '{0}' : '{1}'"
- ).format(
- tablename, ret,
- )
- )
- self.assertEqual(ret["rows affected"], 50)
- # test check/repair/opimize on 1 table
- tablename = 'A%table "`1'
- ret = self.run_function(
- "mysql.db_check",
- name=dbname,
- table=tablename,
- connection_user=self.user,
- connection_pass=self.password,
- )
- # Note that returned result does not quote_identifier of table and db
- self.assertEqual(
- ret,
- [
- {
- "Table": dbname + "." + tablename,
- "Msg_text": "OK",
- "Msg_type": "status",
- "Op": "check",
- }
- ],
- )
- ret = self.run_function(
- "mysql.db_repair",
- name=dbname,
- table=tablename,
- connection_user=self.user,
- connection_pass=self.password,
- )
- # Note that returned result does not quote_identifier of table and db
- self.assertEqual(
- ret,
- [
- {
- "Table": dbname + "." + tablename,
- "Msg_text": "OK",
- "Msg_type": "status",
- "Op": "repair",
- }
- ],
- )
- ret = self.run_function(
- "mysql.db_optimize",
- name=dbname,
- table=tablename,
- connection_user=self.user,
- connection_pass=self.password,
- )
- # Note that returned result does not quote_identifier of table and db
- self.assertEqual(
- ret,
- [
- {
- "Table": dbname + "." + tablename,
- "Msg_text": "OK",
- "Msg_type": "status",
- "Op": "optimize",
- }
- ],
- )
- # test check/repair/opimize on all tables
- ret = self.run_function(
- "mysql.db_check",
- name=dbname,
- connection_user=self.user,
- connection_pass=self.password,
- )
- expected = []
- for tablename, engine in sorted(six.iteritems(tablenames)):
- if engine == "MEMORY":
- expected.append(
- [
- {
- "Table": dbname + "." + tablename,
- "Msg_text": (
- "The storage engine for the table doesn't"
- " support check"
- ),
- "Msg_type": "note",
- "Op": "check",
- }
- ]
- )
- else:
- expected.append(
- [
- {
- "Table": dbname + "." + tablename,
- "Msg_text": "OK",
- "Msg_type": "status",
- "Op": "check",
- }
- ]
- )
- self.assertEqual(ret, expected)
- ret = self.run_function(
- "mysql.db_repair",
- name=dbname,
- connection_user=self.user,
- connection_pass=self.password,
- )
- expected = []
- for tablename, engine in sorted(six.iteritems(tablenames)):
- if engine == "MYISAM":
- expected.append(
- [
- {
- "Table": dbname + "." + tablename,
- "Msg_text": "OK",
- "Msg_type": "status",
- "Op": "repair",
- }
- ]
- )
- else:
- expected.append(
- [
- {
- "Table": dbname + "." + tablename,
- "Msg_text": (
- "The storage engine for the table doesn't"
- " support repair"
- ),
- "Msg_type": "note",
- "Op": "repair",
- }
- ]
- )
- self.assertEqual(ret, expected)
- ret = self.run_function(
- "mysql.db_optimize",
- name=dbname,
- connection_user=self.user,
- connection_pass=self.password,
- )
- expected = []
- for tablename, engine in sorted(six.iteritems(tablenames)):
- if engine == "MYISAM":
- expected.append(
- [
- {
- "Table": dbname + "." + tablename,
- "Msg_text": "OK",
- "Msg_type": "status",
- "Op": "optimize",
- }
- ]
- )
- elif engine == "InnoDB":
- expected.append(
- [
- {
- "Table": dbname + "." + tablename,
- "Msg_text": (
- "Table does not support optimize, "
- "doing recreate + analyze instead"
- ),
- "Msg_type": "note",
- "Op": "optimize",
- },
- {
- "Table": dbname + "." + tablename,
- "Msg_text": "OK",
- "Msg_type": "status",
- "Op": "optimize",
- },
- ]
- )
- elif engine == "MEMORY":
- expected.append(
- [
- {
- "Table": dbname + "." + tablename,
- "Msg_text": (
- "The storage engine for the table doesn't"
- " support optimize"
- ),
- "Msg_type": "note",
- "Op": "optimize",
- }
- ]
- )
- self.assertEqual(ret, expected)
- # Teardown, remove database
- ret = self.run_function(
- "mysql.db_remove",
- name=dbname,
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertEqual(True, ret)
- @skipIf(
- NO_MYSQL,
- "Please install MySQL bindings and a MySQL Server before running"
- "MySQL integration tests.",
- )
- @pytest.mark.windows_whitelisted
- class MysqlModuleUserTest(ModuleCase, SaltReturnAssertsMixin):
- """
- User Creation and connection tests
- """
- user = "root"
- password = "poney"
- @destructiveTest
- def setUp(self):
- """
- Test presence of MySQL server, enforce a root password
- """
- super(MysqlModuleUserTest, self).setUp()
- NO_MYSQL_SERVER = True
- # now ensure we know the mysql root password
- # one of theses two at least should work
- ret1 = self.run_state(
- "cmd.run",
- name='mysqladmin --host="localhost" -u '
- + self.user
- + ' flush-privileges password "'
- + self.password
- + '"',
- )
- ret2 = self.run_state(
- "cmd.run",
- name='mysqladmin --host="localhost" -u '
- + self.user
- + ' --password="'
- + self.password
- + '" flush-privileges password "'
- + self.password
- + '"',
- )
- key, value = ret2.popitem()
- if value["result"]:
- NO_MYSQL_SERVER = False
- else:
- self.skipTest("No MySQL Server running, or no root access on it.")
- def _userCreationLoop(
- self,
- uname,
- host,
- password=None,
- new_password=None,
- new_password_hash=None,
- **kwargs
- ):
- """
- Perform some tests around creation of the given user
- """
- # First silently remove it, in case of
- ret = self.run_function("mysql.user_remove", user=uname, host=host, **kwargs)
- # creation
- ret = self.run_function(
- "mysql.user_create", user=uname, host=host, password=password, **kwargs
- )
- self.assertEqual(
- True,
- ret,
- ("Calling user_create on" " user '{0}' did not return True: {1}").format(
- uname, repr(ret)
- ),
- )
- # double creation failure
- ret = self.run_function(
- "mysql.user_create", user=uname, host=host, password=password, **kwargs
- )
- self.assertEqual(
- False,
- ret,
- (
- "Calling user_create a second time on"
- " user '{0}' did not return False: {1}"
- ).format(uname, repr(ret)),
- )
- # Alter password
- if new_password is not None or new_password_hash is not None:
- ret = self.run_function(
- "mysql.user_chpass",
- user=uname,
- host=host,
- password=new_password,
- password_hash=new_password_hash,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- self.assertEqual(
- True,
- ret,
- (
- "Calling user_chpass on" " user '{0}' did not return True: {1}"
- ).format(uname, repr(ret)),
- )
- def _chck_userinfo(self, user, host, check_user, check_hash):
- """
- Internal routine to check user_info returned results
- """
- ret = self.run_function(
- "mysql.user_info",
- user=user,
- host=host,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- if not isinstance(ret, dict):
- raise AssertionError(
- "Unexpected result while retrieving user_info for " "'{0}'".format(user)
- )
- self.assertEqual(ret["Host"], host)
- self.assertEqual(ret["Password"], check_hash)
- self.assertEqual(ret["User"], check_user)
- def _chk_remove_user(self, user, host, **kwargs):
- """
- Internal routine to check user_remove
- """
- ret = self.run_function("mysql.user_remove", user=user, host=host, **kwargs)
- self.assertEqual(
- True,
- ret,
- (
- "Assertion failed while removing user" " '{0}' on host '{1}': {2}"
- ).format(user, host, repr(ret)),
- )
- @destructiveTest
- def test_user_management(self):
- """
- Test various users creation settings
- """
- # Create users with rights on this database
- # and rights on other databases
- user1 = "user '1"
- user1_pwd = "pwd`'\"1b"
- user1_pwd_hash = "*4DF33B3B12E43384677050A818327877FAB2F4BA"
- # this is : user "2'標
- user2 = "user \"2'\xe6\xa8\x99"
- user2_pwd = "user \"2'\xe6\xa8\x99b"
- user2_pwd_hash = "*3A38A7B94B024B983687BB9B44FB60B7AA38FE61"
- user3 = 'user "3;,?:@=&/'
- user3_pwd = 'user "3;,?:@=&/'
- user3_pwd_hash = "*AA3B1D4105A45D381C23A5C221C47EA349E1FD7D"
- # this is : user ":=;4標 in unicode instead of utf-8
- # if unicode char is counted as 1 char we hit the max user
- # size (16)
- user4 = 'user":;,?:@=&/4\u6a19'
- user4_utf8 = 'user":;,?:@=&/4\xe6\xa8\x99'
- user4_pwd = 'user "4;,?:@=&/'
- user4_pwd_hash = "*FC8EF8DBF27628E4E113359F8E7478D5CF3DD57C"
- user5 = 'user ``"5'
- user5_utf8 = 'user ``"5'
- # this is 標標標\
- user5_pwd = "\xe6\xa8\x99\xe6\xa8\x99\\"
- # this is password('標標\\')
- user5_pwd_hash = "*3752E65CDD8751AF8D889C62CFFC6C998B12C376"
- user6 = 'user %--"6'
- user6_utf8 = 'user %--"6'
- # this is : --'"% SIX標b
- user6_pwd_u = " --'\"% SIX\u6a19b"
- user6_pwd_utf8 = " --'\"% SIX\xe6\xa8\x99b"
- # this is password(' --\'"% SIX標b')
- user6_pwd_hash = "*90AE800593E2D407CD9E28CCAFBE42D17EEA5369"
- self._userCreationLoop(
- uname=user1,
- host="localhost",
- password="pwd`'\"1",
- new_password="pwd`'\"1b",
- connection_user=self.user,
- connection_pass=self.password,
- )
- # Now check for results
- ret = self.run_function(
- "mysql.user_exists",
- user=user1,
- host="localhost",
- password=user1_pwd,
- password_hash=None,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- self.assertEqual(
- True,
- ret,
- ("Testing final user '{0}' on host '{1}'" " existence failed").format(
- user1, "localhost"
- ),
- )
- self._userCreationLoop(
- uname=user2,
- host="localhost",
- password=None,
- # this is his name hash : user "2'標
- password_hash="*EEF6F854748ACF841226BB1C2422BEC70AE7F1FF",
- # and this is the same with a 'b' added
- new_password_hash=user2_pwd_hash,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- # user2 can connect from other places with other password
- self._userCreationLoop(
- uname=user2,
- host="10.0.0.1",
- allow_passwordless=True,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- self._userCreationLoop(
- uname=user2,
- host="10.0.0.2",
- allow_passwordless=True,
- unix_socket=True,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- # Now check for results
- ret = self.run_function(
- "mysql.user_exists",
- user=user2,
- host="localhost",
- password=None,
- password_hash=user2_pwd_hash,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- self.assertEqual(
- True,
- ret,
- ("Testing final user '{0}' on host '{1}'" " failed").format(
- user2, "localhost"
- ),
- )
- ret = self.run_function(
- "mysql.user_exists",
- user=user2,
- host="10.0.0.1",
- allow_passwordless=True,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- self.assertEqual(
- True,
- ret,
- (
- "Testing final user '{0}' on host '{1}'" " without password failed"
- ).format(user2, "10.0.0.1"),
- )
- ret = self.run_function(
- "mysql.user_exists",
- user=user2,
- host="10.0.0.2",
- allow_passwordless=True,
- unix_socket=True,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- self.assertEqual(
- True,
- ret,
- (
- "Testing final user '{0}' on host '{1}'" " without password failed"
- ).format(user2, "10.0.0.2"),
- )
- # Empty password is not passwordless (or is it a bug?)
- self._userCreationLoop(
- uname=user3,
- host="localhost",
- password="",
- connection_user=self.user,
- connection_pass=self.password,
- )
- # user 3 on another host with a password
- self._userCreationLoop(
- uname=user3,
- host="%",
- password="foo",
- new_password=user3_pwd,
- connection_user=self.user,
- connection_pass=self.password,
- )
- # Now check for results
- ret = self.run_function(
- "mysql.user_exists",
- user=user3,
- host="localhost",
- password="",
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertEqual(
- True,
- ret,
- (
- "Testing final user '{0}' on host '{1}'"
- " without empty password failed"
- ).format(user3, "localhost"),
- )
- ret = self.run_function(
- "mysql.user_exists",
- user=user3,
- host="%",
- password=user3_pwd,
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertEqual(
- True,
- ret,
- ("Testing final user '{0}' on host '{1}'" " with password failed").format(
- user3, "%"
- ),
- )
- # check unicode name, and password > password_hash
- self._userCreationLoop(
- uname=user4,
- host="%",
- password=user4_pwd,
- # this is password('foo')
- password_hash="*F3A2A51A9B0F2BE2468926B4132313728C250DBF",
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- # Now check for results
- ret = self.run_function(
- "mysql.user_exists",
- user=user4_utf8,
- host="%",
- password=user4_pwd,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- self.assertEqual(
- True,
- ret,
- (
- "Testing final user '{0}' on host '{1}'"
- " with password take from password and not password_hash"
- " failed"
- ).format(user4_utf8, "%"),
- )
- self._userCreationLoop(
- uname=user5,
- host="localhost",
- password="\xe6\xa8\x99\xe6\xa8\x99",
- new_password=user5_pwd,
- unix_socket=True,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- ret = self.run_function(
- "mysql.user_exists",
- user=user5_utf8,
- host="localhost",
- password=user5_pwd,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- self.assertEqual(
- True,
- ret,
- (
- "Testing final user '{0}' on host '{1}'" " with utf8 password failed"
- ).format(user5_utf8, "localhost"),
- )
- # for this one we give password in unicode and check it in utf-8
- self._userCreationLoop(
- uname=user6,
- host="10.0.0.1",
- password=" foobar",
- new_password=user6_pwd_u,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- # Now check for results
- ret = self.run_function(
- "mysql.user_exists",
- user=user6_utf8,
- host="10.0.0.1",
- password=user6_pwd_utf8,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- self.assertEqual(
- True,
- ret,
- (
- "Testing final user '{0}' on host '{1}'" " with unicode password failed"
- ).format(user6_utf8, "10.0.0.1"),
- )
- # Final result should be:
- # mysql> select Host, User, Password from user where user like 'user%';
- # +--------------------+-----------+-------------------------------+
- # | User | Host | Password |
- # +--------------------+-----------+-------------------------------+
- # | user "2'標 | 10.0.0.1 | |
- # | user "2'標 | 10.0.0.2 | |
- # | user "2'標 | localhost | *3A38A7B94B0(...)60B7AA38FE61 |
- # | user "3;,?:@=&/ | % | *AA3B1D4105(...)47EA349E1FD7D |
- # | user "3;,?:@=&/ | localhost | |
- # | user %--"6 | 10.0.0.1 | *90AE800593(...)E42D17EEA5369 |
- # | user '1 | localhost | *4DF33B3B1(...)327877FAB2F4BA |
- # | user ``"5 | localhost | *3752E65CD(...)FC6C998B12C376 |
- # | user":;,?:@=&/4標 | % | *FC8EF8DBF(...)7478D5CF3DD57C |
- # +--------------------+-----------+-------------------------------+
- self._chck_userinfo(
- user=user2, host="10.0.0.1", check_user=user2, check_hash=""
- )
- self._chck_userinfo(
- user=user2, host="10.0.0.2", check_user=user2, check_hash=""
- )
- self._chck_userinfo(
- user=user2, host="localhost", check_user=user2, check_hash=user2_pwd_hash
- )
- self._chck_userinfo(
- user=user3, host="%", check_user=user3, check_hash=user3_pwd_hash
- )
- self._chck_userinfo(
- user=user3, host="localhost", check_user=user3, check_hash=""
- )
- self._chck_userinfo(
- user=user4, host="%", check_user=user4_utf8, check_hash=user4_pwd_hash
- )
- self._chck_userinfo(
- user=user6,
- host="10.0.0.1",
- check_user=user6_utf8,
- check_hash=user6_pwd_hash,
- )
- self._chck_userinfo(
- user=user1, host="localhost", check_user=user1, check_hash=user1_pwd_hash
- )
- self._chck_userinfo(
- user=user5,
- host="localhost",
- check_user=user5_utf8,
- check_hash=user5_pwd_hash,
- )
- # check user_list function
- ret = self.run_function(
- "mysql.user_list",
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- self.assertIn({"Host": "localhost", "User": user1}, ret)
- self.assertIn({"Host": "localhost", "User": user2}, ret)
- self.assertIn({"Host": "10.0.0.1", "User": user2}, ret)
- self.assertIn({"Host": "10.0.0.2", "User": user2}, ret)
- self.assertIn({"Host": "%", "User": user3}, ret)
- self.assertIn({"Host": "localhost", "User": user3}, ret)
- self.assertIn({"Host": "%", "User": user4_utf8}, ret)
- self.assertIn({"Host": "localhost", "User": user5_utf8}, ret)
- self.assertIn({"Host": "10.0.0.1", "User": user6_utf8}, ret)
- # And finally, test connections on MySQL with theses users
- ret = self.run_function(
- "mysql.query",
- database="information_schema",
- query="SELECT 1",
- connection_user=user1,
- connection_pass="pwd`'\"1b",
- connection_host="localhost",
- )
- if not isinstance(ret, dict) or "results" not in ret:
- raise AssertionError(
- (
- "Unexpected result while testing connection" " with user '{0}': {1}"
- ).format(user1, repr(ret))
- )
- self.assertEqual([["1"]], ret["results"])
- # FIXME: still failing, but works by hand...
- # mysql --user="user \"2'標" --password="user \"2'標b" information_schema
- # Seems to be a python-mysql library problem with user names containing
- # utf8 characters
- # @see https://github.com/farcepest/MySQLdb1/issues/40
- # import urllib
- # ret = self.run_function(
- # 'mysql.query',
- # database='information_schema',
- # query='SELECT 1',
- # connection_user=urllib.quote_plus(user2),
- # connection_pass=urllib.quote_plus(user2_pwd),
- # connection_host='localhost',
- # connection_charset='utf8',
- # saltenv={"LC_ALL": "en_US.utf8"}
- # )
- # if not isinstance(ret, dict) or 'results' not in ret:
- # raise AssertionError(
- # ('Unexpected result while testing connection'
- # ' with user \'{0}\': {1}').format(
- # user2,
- # repr(ret)
- # )
- # )
- # self.assertEqual([['1']], ret['results'])
- ret = self.run_function(
- "mysql.query",
- database="information_schema",
- query="SELECT 1",
- connection_user=user3,
- connection_pass="",
- connection_host="localhost",
- )
- if not isinstance(ret, dict) or "results" not in ret:
- raise AssertionError(
- (
- "Unexpected result while testing connection" " with user '{0}': {1}"
- ).format(user3, repr(ret))
- )
- self.assertEqual([["1"]], ret["results"])
- # FIXME: Failing
- # ret = self.run_function(
- # 'mysql.query',
- # database='information_schema',
- # query='SELECT 1',
- # connection_user=user4_utf8,
- # connection_pass=user4_pwd,
- # connection_host='localhost',
- # connection_charset='utf8',
- # saltenv={"LC_ALL": "en_US.utf8"}
- # )
- # if not isinstance(ret, dict) or 'results' not in ret:
- # raise AssertionError(
- # ('Unexpected result while testing connection'
- # ' with user \'{0}\': {1}').format(
- # user4_utf8,
- # repr(ret)
- # )
- # )
- # self.assertEqual([['1']], ret['results'])
- ret = self.run_function(
- "mysql.query",
- database="information_schema",
- query="SELECT 1",
- connection_user=user5_utf8,
- connection_pass=user5_pwd,
- connection_host="localhost",
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- if not isinstance(ret, dict) or "results" not in ret:
- raise AssertionError(
- (
- "Unexpected result while testing connection" " with user '{0}': {1}"
- ).format(user5_utf8, repr(ret))
- )
- self.assertEqual([["1"]], ret["results"])
- # Teardown by deleting with user_remove
- self._chk_remove_user(
- user=user2,
- host="10.0.0.1",
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- self._chk_remove_user(
- user=user2,
- host="10.0.0.2",
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- self._chk_remove_user(
- user=user2,
- host="localhost",
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- self._chk_remove_user(
- user=user3,
- host="%",
- connection_user=self.user,
- connection_pass=self.password,
- )
- self._chk_remove_user(
- user=user3,
- host="localhost",
- connection_user=self.user,
- connection_pass=self.password,
- )
- self._chk_remove_user(
- user=user4,
- host="%",
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- self._chk_remove_user(
- user=user6,
- host="10.0.0.1",
- connection_user=self.user,
- connection_pass=self.password,
- )
- self._chk_remove_user(
- user=user1,
- host="localhost",
- connection_user=self.user,
- connection_pass=self.password,
- )
- self._chk_remove_user(
- user=user5,
- host="localhost",
- connection_user=self.user,
- connection_pass=self.password,
- )
- # Final verification of the cleanup
- ret = self.run_function(
- "mysql.user_list",
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- self.assertNotIn({"Host": "localhost", "User": user1}, ret)
- self.assertNotIn({"Host": "localhost", "User": user2}, ret)
- self.assertNotIn({"Host": "10.0.0.1", "User": user2}, ret)
- self.assertNotIn({"Host": "10.0.0.2", "User": user2}, ret)
- self.assertNotIn({"Host": "%", "User": user3}, ret)
- self.assertNotIn({"Host": "localhost", "User": user3}, ret)
- self.assertNotIn({"Host": "%", "User": user4_utf8}, ret)
- self.assertNotIn({"Host": "localhost", "User": user5_utf8}, ret)
- self.assertNotIn({"Host": "10.0.0.1", "User": user6_utf8}, ret)
- @skipIf(
- NO_MYSQL,
- "Please install MySQL bindings and a MySQL Server before running"
- "MySQL integration tests.",
- )
- @pytest.mark.windows_whitelisted
- class MysqlModuleUserGrantTest(ModuleCase, SaltReturnAssertsMixin):
- """
- User Creation and connection tests
- """
- user = "root"
- password = "poney"
- # yep, theses are valid MySQL db names
- # very special chars are _ % and .
- testdb1 = "tes.t'\"saltdb"
- testdb2 = "t_st `(:=salt%b)"
- testdb3 = "test `(:=salteeb)"
- test_file_query_db = "test_query"
- table1 = "foo"
- table2 = "foo `'%_bar"
- users = {
- "user1": {"name": "foo", "pwd": "bar"},
- "user2": {"name": 'user ";--,?:&/\\', "pwd": '";--(),?:@=&/\\'},
- # this is : passwd 標標
- "user3": {"name": "user( @ )=foobar", "pwd": "\xe6\xa8\x99\xe6\xa8\x99"},
- # this is : user/password containing 標標
- "user4": {"name": "user \xe6\xa8\x99", "pwd": "\xe6\xa8\x99\xe6\xa8\x99"},
- }
- @destructiveTest
- def setUp(self):
- """
- Test presence of MySQL server, enforce a root password, create users
- """
- super(MysqlModuleUserGrantTest, self).setUp()
- NO_MYSQL_SERVER = True
- # now ensure we know the mysql root password
- # one of theses two at least should work
- ret1 = self.run_state(
- "cmd.run",
- name='mysqladmin --host="localhost" -u '
- + self.user
- + ' flush-privileges password "'
- + self.password
- + '"',
- )
- ret2 = self.run_state(
- "cmd.run",
- name='mysqladmin --host="localhost" -u '
- + self.user
- + ' --password="'
- + self.password
- + '" flush-privileges password "'
- + self.password
- + '"',
- )
- key, value = ret2.popitem()
- if value["result"]:
- NO_MYSQL_SERVER = False
- else:
- self.skipTest("No MySQL Server running, or no root access on it.")
- # Create some users and a test db
- for user, userdef in six.iteritems(self.users):
- self._userCreation(uname=userdef["name"], password=userdef["pwd"])
- self.run_function(
- "mysql.db_create",
- name=self.testdb1,
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.run_function(
- "mysql.db_create",
- name=self.testdb2,
- connection_user=self.user,
- connection_pass=self.password,
- )
- create_query = (
- "CREATE TABLE {tblname} ("
- " id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,"
- " data VARCHAR(100)) ENGINE={engine};".format(
- tblname=mysqlmod.quote_identifier(self.table1), engine="MYISAM",
- )
- )
- log.info("Adding table '%s'", self.table1)
- self.run_function(
- "mysql.query",
- database=self.testdb2,
- query=create_query,
- connection_user=self.user,
- connection_pass=self.password,
- )
- create_query = (
- "CREATE TABLE {tblname} ("
- " id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,"
- " data VARCHAR(100)) ENGINE={engine};".format(
- tblname=mysqlmod.quote_identifier(self.table2), engine="MYISAM",
- )
- )
- log.info("Adding table '%s'", self.table2)
- self.run_function(
- "mysql.query",
- database=self.testdb2,
- query=create_query,
- connection_user=self.user,
- connection_pass=self.password,
- )
- @destructiveTest
- def tearDown(self):
- """
- Removes created users and db
- """
- for user, userdef in six.iteritems(self.users):
- self._userRemoval(uname=userdef["name"], password=userdef["pwd"])
- self.run_function(
- "mysql.db_remove",
- name=self.testdb1,
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.run_function(
- "mysql.db_remove",
- name=self.testdb2,
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.run_function(
- "mysql.db_remove",
- name=self.test_file_query_db,
- connection_user=self.user,
- connection_pass=self.password,
- )
- def _userCreation(self, uname, password=None):
- """
- Create a test user
- """
- self.run_function(
- "mysql.user_create",
- user=uname,
- host="localhost",
- password=password,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- def _userRemoval(self, uname, password=None):
- """
- Removes a test user
- """
- self.run_function(
- "mysql.user_remove",
- user=uname,
- host="localhost",
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- saltenv={"LC_ALL": "en_US.utf8"},
- )
- def _addGrantRoutine(
- self, grant, user, db, grant_option=False, escape=True, **kwargs
- ):
- """
- Perform some tests around creation of the given grants
- """
- ret = self.run_function(
- "mysql.grant_add",
- grant=grant,
- database=db,
- user=user,
- grant_option=grant_option,
- escape=escape,
- **kwargs
- )
- self.assertEqual(
- True,
- ret,
- (
- "Calling grant_add on"
- " user '{0}' and grants '{1}' did not return True: {2}"
- ).format(user, grant, repr(ret)),
- )
- ret = self.run_function(
- "mysql.grant_exists",
- grant=grant,
- database=db,
- user=user,
- grant_option=grant_option,
- escape=escape,
- **kwargs
- )
- self.assertEqual(
- True,
- ret,
- (
- "Calling grant_exists on"
- " user '{0}' and grants '{1}' did not return True: {2}"
- ).format(user, grant, repr(ret)),
- )
- @destructiveTest
- def testGrants(self):
- """
- Test user grant methods
- """
- self._addGrantRoutine(
- grant="SELECT, INSERT,UPDATE, CREATE",
- user=self.users["user1"]["name"],
- db=self.testdb1 + ".*",
- grant_option=True,
- escape=True,
- connection_user=self.user,
- connection_pass=self.password,
- )
- self._addGrantRoutine(
- grant="INSERT, SELECT",
- user=self.users["user1"]["name"],
- db=self.testdb2 + "." + self.table1,
- grant_option=True,
- escape=True,
- connection_user=self.user,
- connection_pass=self.password,
- )
- self._addGrantRoutine(
- grant=" SELECT, UPDATE,DELETE, CREATE TEMPORARY TABLES",
- user=self.users["user2"]["name"],
- db=self.testdb1 + ".*",
- grant_option=True,
- escape=True,
- connection_user=self.user,
- connection_pass=self.password,
- )
- self._addGrantRoutine(
- grant="select, ALTER,CREATE TEMPORARY TABLES, EXECUTE ",
- user=self.users["user3"]["name"],
- db=self.testdb1 + ".*",
- grant_option=True,
- escape=True,
- connection_user=self.user,
- connection_pass=self.password,
- )
- self._addGrantRoutine(
- grant="SELECT, INSERT",
- user=self.users["user4"]["name"],
- db=self.testdb2 + "." + self.table2,
- grant_option=False,
- escape=True,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- )
- self._addGrantRoutine(
- grant="CREATE",
- user=self.users["user4"]["name"],
- db=self.testdb2 + ".*",
- grant_option=False,
- escape=True,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- )
- self._addGrantRoutine(
- grant="SELECT, INSERT",
- user=self.users["user4"]["name"],
- db=self.testdb2 + "." + self.table1,
- grant_option=False,
- escape=True,
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- )
- # '' is valid for anonymous users
- self._addGrantRoutine(
- grant="DELETE",
- user="",
- db=self.testdb3 + ".*",
- grant_option=False,
- escape=True,
- connection_user=self.user,
- connection_pass=self.password,
- )
- # Check result for users
- ret = self.run_function(
- "mysql.user_grants",
- user=self.users["user1"]["name"],
- host="localhost",
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertEqual(
- ret,
- [
- "GRANT USAGE ON *.* TO 'foo'@'localhost'",
- (
- "GRANT SELECT, INSERT, UPDATE, CREATE ON "
- "`tes.t'\"saltdb`.* TO 'foo'@'localhost' WITH GRANT OPTION"
- ),
- (
- "GRANT SELECT, INSERT ON `t_st ``(:=salt%b)`.`foo`"
- " TO 'foo'@'localhost' WITH GRANT OPTION"
- ),
- ],
- )
- ret = self.run_function(
- "mysql.user_grants",
- user=self.users["user2"]["name"],
- host="localhost",
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertEqual(
- ret,
- [
- "GRANT USAGE ON *.* TO 'user \";--,?:&/\\'@'localhost'",
- (
- "GRANT SELECT, UPDATE, DELETE, CREATE TEMPORARY TABLES ON `tes.t'"
- "\"saltdb`.* TO 'user \";--,?:&/\\'@'localhost'"
- " WITH GRANT OPTION"
- ),
- ],
- )
- ret = self.run_function(
- "mysql.user_grants",
- user=self.users["user3"]["name"],
- host="localhost",
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertEqual(
- ret,
- [
- "GRANT USAGE ON *.* TO 'user( @ )=foobar'@'localhost'",
- (
- "GRANT SELECT, ALTER, CREATE TEMPORARY TABLES, EXECUTE ON "
- "`tes.t'\"saltdb`.* TO 'user( @ )=foobar'@'localhost' "
- "WITH GRANT OPTION"
- ),
- ],
- )
- ret = self.run_function(
- "mysql.user_grants",
- user=self.users["user4"]["name"],
- host="localhost",
- connection_user=self.user,
- connection_pass=self.password,
- connection_charset="utf8",
- )
- self.assertEqual(
- ret,
- [
- "GRANT USAGE ON *.* TO 'user \xe6\xa8\x99'@'localhost'",
- (
- r"GRANT CREATE ON `t\_st ``(:=salt\%b)`.* TO "
- "'user \xe6\xa8\x99'@'localhost'"
- ),
- (
- "GRANT SELECT, INSERT ON `t_st ``(:=salt%b)`.`foo ``'%_bar` TO "
- "'user \xe6\xa8\x99'@'localhost'"
- ),
- (
- "GRANT SELECT, INSERT ON `t_st ``(:=salt%b)`.`foo` TO "
- "'user \xe6\xa8\x99'@'localhost'"
- ),
- ],
- )
- ret = self.run_function(
- "mysql.user_grants",
- user="",
- host="localhost",
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertEqual(
- ret,
- [
- "GRANT USAGE ON *.* TO ''@'localhost'",
- "GRANT DELETE ON `test ``(:=salteeb)`.* TO ''@'localhost'",
- ],
- )
- @skipIf(
- NO_MYSQL,
- "Please install MySQL bindings and a MySQL Server before running"
- "MySQL integration tests.",
- )
- @pytest.mark.windows_whitelisted
- class MysqlModuleFileQueryTest(ModuleCase, SaltReturnAssertsMixin):
- """
- Test file query module
- """
- user = "root"
- password = "poney"
- testdb = "test_file_query"
- @destructiveTest
- def setUp(self):
- """
- Test presence of MySQL server, enforce a root password, create users
- """
- super(MysqlModuleFileQueryTest, self).setUp()
- NO_MYSQL_SERVER = True
- # now ensure we know the mysql root password
- # one of theses two at least should work
- ret1 = self.run_state(
- "cmd.run",
- name='mysqladmin --host="localhost" -u '
- + self.user
- + ' flush-privileges password "'
- + self.password
- + '"',
- )
- ret2 = self.run_state(
- "cmd.run",
- name='mysqladmin --host="localhost" -u '
- + self.user
- + ' --password="'
- + self.password
- + '" flush-privileges password "'
- + self.password
- + '"',
- )
- key, value = ret2.popitem()
- if value["result"]:
- NO_MYSQL_SERVER = False
- else:
- self.skipTest("No MySQL Server running, or no root access on it.")
- # Create some users and a test db
- self.run_function(
- "mysql.db_create",
- name=self.testdb,
- connection_user=self.user,
- connection_pass=self.password,
- connection_db="mysql",
- )
- @destructiveTest
- def tearDown(self):
- """
- Removes created users and db
- """
- self.run_function(
- "mysql.db_remove",
- name=self.testdb,
- connection_user=self.user,
- connection_pass=self.password,
- connection_db="mysql",
- )
- @destructiveTest
- def test_update_file_query(self):
- """
- Test query without any output
- """
- ret = self.run_function(
- "mysql.file_query",
- database=self.testdb,
- file_name="salt://mysql/update_query.sql",
- character_set="utf8",
- collate="utf8_general_ci",
- connection_user=self.user,
- connection_pass=self.password,
- )
- self.assertTrue("query time" in ret)
- ret.pop("query time")
- self.assertEqual(ret, {"rows affected": 2})
- @destructiveTest
- def test_select_file_query(self):
- """
- Test query with table output
- """
- ret = self.run_function(
- "mysql.file_query",
- database=self.testdb,
- file_name="salt://mysql/select_query.sql",
- character_set="utf8",
- collate="utf8_general_ci",
- connection_user=self.user,
- connection_pass=self.password,
- )
- expected = {
- "rows affected": 5,
- "rows returned": 4,
- "results": [[["2"], ["3"], ["4"], ["5"]]],
- "columns": [["a"]],
- }
- self.assertTrue("query time" in ret)
- ret.pop("query time")
- self.assertEqual(ret, expected)
|