test_smb.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. # -*- coding: utf-8 -*-
  2. """
  3. Test utility methods that communicate with SMB shares.
  4. """
  5. from __future__ import absolute_import
  6. import getpass
  7. import logging
  8. import os
  9. import signal
  10. import subprocess
  11. import tempfile
  12. import time
  13. import salt.utils.files
  14. import salt.utils.network
  15. import salt.utils.path
  16. import salt.utils.smb
  17. from tests.support.case import TestCase
  18. from tests.support.unit import skipIf
  19. log = logging.getLogger(__name__)
  20. CONFIG = (
  21. "[global]\n"
  22. "realm = saltstack.com\n"
  23. "interfaces = lo 127.0.0.0/8\n"
  24. "smb ports = 1445\n"
  25. "log level = 2\n"
  26. "map to guest = Bad User\n"
  27. "enable core files = no\n"
  28. "passdb backend = smbpasswd\n"
  29. "smb passwd file = {passwdb}\n"
  30. "lock directory = {samba_dir}\n"
  31. "state directory = {samba_dir}\n"
  32. "cache directory = {samba_dir}\n"
  33. "pid directory = {samba_dir}\n"
  34. "private dir = {samba_dir}\n"
  35. "ncalrpc dir = {samba_dir}\n"
  36. "socket options = IPTOS_LOWDELAY TCP_NODELAY\n"
  37. "min receivefile size = 0\n"
  38. "write cache size = 0\n"
  39. "client ntlmv2 auth = no\n"
  40. "client min protocol = SMB3_11\n"
  41. "client plaintext auth = no\n"
  42. "\n"
  43. "[public]\n"
  44. "path = {public_dir}\n"
  45. "read only = no\n"
  46. "guest ok = no\n"
  47. "writeable = yes\n"
  48. "force user = {user}\n"
  49. )
  50. TBE = (
  51. "{}:0:XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX:AC8E657F8"
  52. "3DF82BEEA5D43BDAF7800CC:[U ]:LCT-507C14C7:"
  53. )
  54. IPV6_ENABLED = bool(salt.utils.network.ip_addrs6(include_loopback=True))
  55. def which_smbd():
  56. """
  57. Find the smbd executable and cache the result if it exits.
  58. """
  59. if hasattr(which_smbd, "cached_result"):
  60. return which_smbd.cached_result
  61. smbd = salt.utils.path.which("smbd")
  62. if smbd:
  63. which_smbd.cached_result = smbd
  64. return smbd
  65. @skipIf(not which_smbd(), reason="smbd binary not found")
  66. @skipIf(
  67. not salt.utils.smb.HAS_SMBPROTOCOL, '"smbprotocol" needs to be installed.',
  68. )
  69. class TestSmb(TestCase):
  70. _smbd = None
  71. @staticmethod
  72. def check_pid(pid):
  73. try:
  74. os.kill(pid, 0)
  75. except OSError:
  76. return False
  77. else:
  78. return True
  79. @classmethod
  80. def setUpClass(cls):
  81. tmpdir = tempfile.mkdtemp()
  82. cls.samba_dir = os.path.join(tmpdir, "samba")
  83. cls.public_dir = os.path.join(tmpdir, "public")
  84. os.makedirs(cls.samba_dir)
  85. os.makedirs(cls.public_dir)
  86. os.chmod(cls.samba_dir, 0o775)
  87. os.chmod(cls.public_dir, 0o775)
  88. passwdb = os.path.join(tmpdir, "passwdb")
  89. cls.username = getpass.getuser()
  90. with salt.utils.files.fopen(passwdb, "w") as fp:
  91. fp.write(TBE.format(cls.username))
  92. samba_conf = os.path.join(tmpdir, "smb.conf")
  93. with salt.utils.files.fopen(samba_conf, "w") as fp:
  94. fp.write(
  95. CONFIG.format(
  96. samba_dir=cls.samba_dir,
  97. public_dir=cls.public_dir,
  98. passwdb=passwdb,
  99. user=cls.username,
  100. )
  101. )
  102. cls._smbd = subprocess.Popen(
  103. "{0} -FS -P0 -s {1}".format(which_smbd(), samba_conf), shell=True
  104. )
  105. time.sleep(1)
  106. pidfile = os.path.join(cls.samba_dir, "smbd.pid")
  107. with salt.utils.files.fopen(pidfile, "r") as fp:
  108. cls._pid = int(fp.read().strip())
  109. if not cls.check_pid(cls._pid):
  110. raise Exception("Unable to locate smbd's pid file")
  111. @classmethod
  112. def tearDownClass(cls):
  113. log.warning("teardown")
  114. os.kill(cls._pid, signal.SIGTERM)
  115. def test_write_file_ipv4(self):
  116. """
  117. Transfer a file over SMB
  118. """
  119. name = "test_write_file_v4.txt"
  120. content = "write test file content ipv4"
  121. share_path = os.path.join(self.public_dir, name)
  122. assert not os.path.exists(share_path)
  123. local_path = tempfile.mktemp()
  124. with salt.utils.files.fopen(local_path, "w") as fp:
  125. fp.write(content)
  126. conn = salt.utils.smb.get_conn("127.0.0.1", self.username, "foo", port=1445)
  127. salt.utils.smb.put_file(local_path, name, "public", conn=conn)
  128. conn.close()
  129. assert os.path.exists(share_path)
  130. with salt.utils.files.fopen(share_path, "r") as fp:
  131. result = fp.read()
  132. assert result == content
  133. @skipIf(not IPV6_ENABLED, "IPv6 not enabled")
  134. def test_write_file_ipv6(self):
  135. """
  136. Transfer a file over SMB
  137. """
  138. name = "test_write_file_v6.txt"
  139. content = "write test file content ipv6"
  140. share_path = os.path.join(self.public_dir, name)
  141. assert not os.path.exists(share_path)
  142. local_path = tempfile.mktemp()
  143. with salt.utils.files.fopen(local_path, "w") as fp:
  144. fp.write(content)
  145. conn = salt.utils.smb.get_conn("::1", self.username, "foo", port=1445)
  146. salt.utils.smb.put_file(local_path, name, "public", conn=conn)
  147. conn.close()
  148. assert os.path.exists(share_path)
  149. with salt.utils.files.fopen(share_path, "r") as fp:
  150. result = fp.read()
  151. assert result == content
  152. def test_write_str_v4(self):
  153. """
  154. Write a string to a file over SMB
  155. """
  156. name = "test_write_str.txt"
  157. content = "write test file content"
  158. share_path = os.path.join(self.public_dir, name)
  159. assert not os.path.exists(share_path)
  160. conn = salt.utils.smb.get_conn("127.0.0.1", self.username, "foo", port=1445)
  161. salt.utils.smb.put_str(content, name, "public", conn=conn)
  162. conn.close()
  163. assert os.path.exists(share_path)
  164. with salt.utils.files.fopen(share_path, "r") as fp:
  165. result = fp.read()
  166. assert result == content
  167. @skipIf(not IPV6_ENABLED, "IPv6 not enabled")
  168. def test_write_str_v6(self):
  169. """
  170. Write a string to a file over SMB
  171. """
  172. name = "test_write_str_v6.txt"
  173. content = "write test file content"
  174. share_path = os.path.join(self.public_dir, name)
  175. assert not os.path.exists(share_path)
  176. conn = salt.utils.smb.get_conn("::1", self.username, "foo", port=1445)
  177. salt.utils.smb.put_str(content, name, "public", conn=conn)
  178. conn.close()
  179. assert os.path.exists(share_path)
  180. with salt.utils.files.fopen(share_path, "r") as fp:
  181. result = fp.read()
  182. assert result == content
  183. def test_delete_file_v4(self):
  184. """
  185. Validate deletion of files over SMB
  186. """
  187. name = "test_delete_file.txt"
  188. content = "read test file content"
  189. share_path = os.path.join(self.public_dir, name)
  190. with salt.utils.files.fopen(share_path, "w") as fp:
  191. fp.write(content)
  192. assert os.path.exists(share_path)
  193. conn = salt.utils.smb.get_conn("127.0.0.1", self.username, "foo", port=1445)
  194. salt.utils.smb.delete_file(name, "public", conn=conn)
  195. conn.close()
  196. assert not os.path.exists(share_path)
  197. @skipIf(not IPV6_ENABLED, "IPv6 not enabled")
  198. def test_delete_file_v6(self):
  199. """
  200. Validate deletion of files over SMB
  201. """
  202. name = "test_delete_file_v6.txt"
  203. content = "read test file content"
  204. share_path = os.path.join(self.public_dir, name)
  205. with salt.utils.files.fopen(share_path, "w") as fp:
  206. fp.write(content)
  207. assert os.path.exists(share_path)
  208. conn = salt.utils.smb.get_conn("::1", self.username, "foo", port=1445)
  209. salt.utils.smb.delete_file(name, "public", conn=conn)
  210. conn.close()
  211. assert not os.path.exists(share_path)
  212. def test_mkdirs_v4(self):
  213. """
  214. Create directories over SMB
  215. """
  216. dir_name = "mkdirs/test"
  217. share_path = os.path.join(self.public_dir, dir_name)
  218. assert not os.path.exists(share_path)
  219. conn = salt.utils.smb.get_conn("127.0.0.1", self.username, "foo", port=1445)
  220. salt.utils.smb.mkdirs(dir_name, "public", conn=conn)
  221. conn.close()
  222. assert os.path.exists(share_path)
  223. @skipIf(not IPV6_ENABLED, "IPv6 not enabled")
  224. def test_mkdirs_v6(self):
  225. """
  226. Create directories over SMB
  227. """
  228. dir_name = "mkdirs/testv6"
  229. share_path = os.path.join(self.public_dir, dir_name)
  230. assert not os.path.exists(share_path)
  231. conn = salt.utils.smb.get_conn("::1", self.username, "foo", port=1445)
  232. salt.utils.smb.mkdirs(dir_name, "public", conn=conn)
  233. conn.close()
  234. assert os.path.exists(share_path)
  235. def test_delete_dirs_v4(self):
  236. """
  237. Validate deletion of directoreies over SMB
  238. """
  239. dir_name = "deldirs"
  240. subdir_name = "deldirs/test"
  241. local_path = os.path.join(self.public_dir, subdir_name)
  242. os.makedirs(local_path)
  243. assert os.path.exists(local_path)
  244. conn = salt.utils.smb.get_conn("127.0.0.1", self.username, "foo", port=1445)
  245. salt.utils.smb.delete_directory(subdir_name, "public", conn=conn)
  246. conn.close()
  247. conn = salt.utils.smb.get_conn("127.0.0.1", self.username, "foo", port=1445)
  248. salt.utils.smb.delete_directory(dir_name, "public", conn=conn)
  249. conn.close()
  250. assert not os.path.exists(local_path)
  251. assert not os.path.exists(os.path.join(self.public_dir, dir_name))
  252. @skipIf(not IPV6_ENABLED, "IPv6 not enabled")
  253. def test_delete_dirs_v6(self):
  254. """
  255. Validate deletion of directoreies over SMB
  256. """
  257. dir_name = "deldirsv6"
  258. subdir_name = "deldirsv6/test"
  259. local_path = os.path.join(self.public_dir, subdir_name)
  260. os.makedirs(local_path)
  261. assert os.path.exists(local_path)
  262. conn = salt.utils.smb.get_conn("::1", self.username, "foo", port=1445)
  263. salt.utils.smb.delete_directory(subdir_name, "public", conn=conn)
  264. conn.close()
  265. conn = salt.utils.smb.get_conn("::1", self.username, "foo", port=1445)
  266. salt.utils.smb.delete_directory(dir_name, "public", conn=conn)
  267. conn.close()
  268. assert not os.path.exists(local_path)
  269. assert not os.path.exists(os.path.join(self.public_dir, dir_name))
  270. def test_connection(self):
  271. """
  272. Validate creation of an SMB connection
  273. """
  274. conn = salt.utils.smb.get_conn("127.0.0.1", self.username, "foo", port=1445)
  275. conn.close()
  276. @skipIf(not IPV6_ENABLED, "IPv6 not enabled")
  277. def test_connection_v6(self):
  278. """
  279. Validate creation of an SMB connection
  280. """
  281. conn = salt.utils.smb.get_conn("::1", self.username, "foo", port=1445)
  282. conn.close()