1
0

test_smb.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. # -*- coding: utf-8 -*-
  2. """
  3. Test utility methods that communicate with SMB shares.
  4. """
  5. from __future__ import absolute_import
  6. import getpass
  7. import logging
  8. import os
  9. import signal
  10. import subprocess
  11. import tempfile
  12. import time
  13. import pytest
  14. import salt.utils.files
  15. import salt.utils.path
  16. import salt.utils.smb
  17. from tests.support.case import TestCase
  18. log = logging.getLogger(__name__)
  19. CONFIG = (
  20. "[global]\n"
  21. "realm = saltstack.com\n"
  22. "interfaces = lo 127.0.0.0/8\n"
  23. "smb ports = 1445\n"
  24. "log level = 2\n"
  25. "map to guest = Bad User\n"
  26. "enable core files = no\n"
  27. "passdb backend = smbpasswd\n"
  28. "smb passwd file = {passwdb}\n"
  29. "lock directory = {samba_dir}\n"
  30. "state directory = {samba_dir}\n"
  31. "cache directory = {samba_dir}\n"
  32. "pid directory = {samba_dir}\n"
  33. "private dir = {samba_dir}\n"
  34. "ncalrpc dir = {samba_dir}\n"
  35. "socket options = IPTOS_LOWDELAY TCP_NODELAY\n"
  36. "min receivefile size = 0\n"
  37. "write cache size = 0\n"
  38. "client ntlmv2 auth = no\n"
  39. "client min protocol = SMB3_11\n"
  40. "client plaintext auth = no\n"
  41. "\n"
  42. "[public]\n"
  43. "path = {public_dir}\n"
  44. "read only = no\n"
  45. "guest ok = no\n"
  46. "writeable = yes\n"
  47. "force user = {user}\n"
  48. )
  49. TBE = (
  50. "{}:0:XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX:AC8E657F8"
  51. "3DF82BEEA5D43BDAF7800CC:[U ]:LCT-507C14C7:"
  52. )
  53. def which_smbd():
  54. """
  55. Find the smbd executable and cache the result if it exits.
  56. """
  57. if hasattr(which_smbd, "cached_result"):
  58. return which_smbd.cached_result
  59. smbd = salt.utils.path.which("smbd")
  60. if smbd:
  61. which_smbd.cached_result = smbd
  62. return smbd
  63. @pytest.mark.skipif(not which_smbd(), reason="smbd binary not found")
  64. @pytest.mark.skipif(
  65. any([salt.utils.smb.HAS_IMPACKET, salt.utils.smb.HAS_SMBPROTOCOL]),
  66. reason='Either "impacket" or "smbprotocol" needs to be installed.',
  67. )
  68. class TestSmb(TestCase):
  69. _smbd = None
  70. @staticmethod
  71. def check_pid(pid):
  72. try:
  73. os.kill(pid, 0)
  74. except OSError:
  75. return False
  76. else:
  77. return True
  78. @classmethod
  79. def setUpClass(cls):
  80. tmpdir = tempfile.mkdtemp()
  81. cls.samba_dir = os.path.join(tmpdir, "samba")
  82. cls.public_dir = os.path.join(tmpdir, "public")
  83. os.makedirs(cls.samba_dir)
  84. os.makedirs(cls.public_dir)
  85. os.chmod(cls.samba_dir, 0o775)
  86. os.chmod(cls.public_dir, 0o775)
  87. passwdb = os.path.join(tmpdir, "passwdb")
  88. cls.username = getpass.getuser()
  89. with salt.utils.files.fopen(passwdb, "w") as fp:
  90. fp.write(TBE.format(cls.username))
  91. samba_conf = os.path.join(tmpdir, "smb.conf")
  92. with salt.utils.files.fopen(samba_conf, "w") as fp:
  93. fp.write(
  94. CONFIG.format(
  95. samba_dir=cls.samba_dir,
  96. public_dir=cls.public_dir,
  97. passwdb=passwdb,
  98. user=cls.username,
  99. )
  100. )
  101. cls._smbd = subprocess.Popen(
  102. "{0} -FS -P0 -s {1}".format(which_smbd(), samba_conf), shell=True
  103. )
  104. time.sleep(1)
  105. pidfile = os.path.join(cls.samba_dir, "smbd.pid")
  106. with salt.utils.files.fopen(pidfile, "r") as fp:
  107. cls._pid = int(fp.read().strip())
  108. if not cls.check_pid(cls._pid):
  109. raise Exception("Unable to locate smbd's pid file")
  110. @classmethod
  111. def tearDownClass(cls):
  112. log.warning("teardown")
  113. os.kill(cls._pid, signal.SIGTERM)
  114. def test_write_file(self):
  115. """
  116. Transfer a file over SMB
  117. """
  118. name = "test_write_file.txt"
  119. content = "write test file content"
  120. share_path = os.path.join(self.public_dir, name)
  121. assert not os.path.exists(share_path)
  122. local_path = tempfile.mktemp()
  123. with salt.utils.files.fopen(local_path, "w") as fp:
  124. fp.write(content)
  125. conn = salt.utils.smb.get_conn("127.0.0.1", self.username, "foo", port=1445)
  126. salt.utils.smb.put_file(local_path, name, "public", conn=conn)
  127. conn.close()
  128. assert os.path.exists(share_path)
  129. with salt.utils.files.fopen(share_path, "r") as fp:
  130. result = fp.read()
  131. assert result == content
  132. def test_write_str(self):
  133. """
  134. Write a string to a file over SMB
  135. """
  136. name = "test_write_str.txt"
  137. content = "write test file content"
  138. share_path = os.path.join(self.public_dir, name)
  139. assert not os.path.exists(share_path)
  140. conn = salt.utils.smb.get_conn("127.0.0.1", self.username, "foo", port=1445)
  141. salt.utils.smb.put_str(content, name, "public", conn=conn)
  142. conn.close()
  143. assert os.path.exists(share_path)
  144. with salt.utils.files.fopen(share_path, "r") as fp:
  145. result = fp.read()
  146. assert result == content
  147. def test_delete_file(self):
  148. """
  149. Validate deletion of files over SMB
  150. """
  151. name = "test_delete_file.txt"
  152. content = "read test file content"
  153. share_path = os.path.join(self.public_dir, name)
  154. with salt.utils.files.fopen(share_path, "w") as fp:
  155. fp.write(content)
  156. assert os.path.exists(share_path)
  157. conn = salt.utils.smb.get_conn("127.0.0.1", self.username, "foo", port=1445)
  158. salt.utils.smb.delete_file(name, "public", conn=conn)
  159. conn.close()
  160. assert not os.path.exists(share_path)
  161. def test_mkdirs(self):
  162. """
  163. Create directories over SMB
  164. """
  165. dir_name = "mkdirs/test"
  166. share_path = os.path.join(self.public_dir, dir_name)
  167. assert not os.path.exists(share_path)
  168. conn = salt.utils.smb.get_conn("127.0.0.1", self.username, "foo", port=1445)
  169. salt.utils.smb.mkdirs(dir_name, "public", conn=conn)
  170. conn.close()
  171. assert os.path.exists(share_path)
  172. def test_delete_dirs(self):
  173. """
  174. Validate deletion of directoreies over SMB
  175. """
  176. dir_name = "deldirs"
  177. subdir_name = "deldirs/test"
  178. local_path = os.path.join(self.public_dir, subdir_name)
  179. os.makedirs(local_path)
  180. assert os.path.exists(local_path)
  181. conn = salt.utils.smb.get_conn("127.0.0.1", self.username, "foo", port=1445)
  182. salt.utils.smb.delete_directory(subdir_name, "public", conn=conn)
  183. conn.close()
  184. conn = salt.utils.smb.get_conn("127.0.0.1", self.username, "foo", port=1445)
  185. salt.utils.smb.delete_directory(dir_name, "public", conn=conn)
  186. conn.close()
  187. assert not os.path.exists(local_path)
  188. assert not os.path.exists(os.path.join(self.public_dir, dir_name))
  189. def test_connection(self):
  190. """
  191. Validate creation of an SMB connection
  192. """
  193. conn = salt.utils.smb.get_conn("127.0.0.1", self.username, "foo", port=1445)
  194. conn.close()