test_ldap.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. # -*- coding: utf-8 -*-
  2. """Test cases for the ``ldap`` state module
  3. This code is gross. I started out trying to remove some of the
  4. duplicate code in the test cases, and before I knew it the test code
  5. was an ugly second implementation.
  6. I'm leaving it for now, but this should really be gutted and replaced
  7. with something sensible.
  8. """
  9. from __future__ import absolute_import, print_function, unicode_literals
  10. import copy
  11. import salt.states.ldap
  12. from salt.ext import six
  13. from salt.utils.oset import OrderedSet
  14. from tests.support.mixins import LoaderModuleMockMixin
  15. from tests.support.unit import TestCase
  16. # emulates the LDAP database. each key is the DN of an entry and it
  17. # maps to a dict which maps attribute names to sets of values.
  18. db = {}
  19. def _init_db(newdb=None):
  20. if newdb is None:
  21. newdb = {}
  22. global db
  23. db = newdb
  24. def _complex_db():
  25. return {
  26. "dnfoo": {
  27. "attrfoo1": OrderedSet(("valfoo1.1", "valfoo1.2",)),
  28. "attrfoo2": OrderedSet(("valfoo2.1",)),
  29. },
  30. "dnbar": {
  31. "attrbar1": OrderedSet(("valbar1.1", "valbar1.2",)),
  32. "attrbar2": OrderedSet(("valbar2.1",)),
  33. },
  34. }
  35. class _dummy_ctx(object):
  36. def __init__(self):
  37. pass
  38. def __enter__(self):
  39. return self
  40. def __exit__(self, *exc):
  41. pass
  42. def _dummy_connect(connect_spec):
  43. return _dummy_ctx()
  44. def _dummy_search(connect_spec, base, scope):
  45. if base not in db:
  46. return {}
  47. return {
  48. base: dict(
  49. ((attr, list(db[base][attr])) for attr in db[base] if len(db[base][attr]))
  50. )
  51. }
  52. def _dummy_add(connect_spec, dn, attributes):
  53. assert dn not in db
  54. assert attributes
  55. db[dn] = {}
  56. for attr, vals in six.iteritems(attributes):
  57. assert vals
  58. db[dn][attr] = OrderedSet(vals)
  59. return True
  60. def _dummy_delete(connect_spec, dn):
  61. assert dn in db
  62. del db[dn]
  63. return True
  64. def _dummy_change(connect_spec, dn, before, after):
  65. assert before != after
  66. assert before
  67. assert after
  68. assert dn in db
  69. e = db[dn]
  70. assert e == before
  71. all_attrs = OrderedSet()
  72. all_attrs.update(before)
  73. all_attrs.update(after)
  74. directives = []
  75. for attr in all_attrs:
  76. if attr not in before:
  77. assert attr in after
  78. assert after[attr]
  79. directives.append(("add", attr, after[attr]))
  80. elif attr not in after:
  81. assert attr in before
  82. assert before[attr]
  83. directives.append(("delete", attr, ()))
  84. else:
  85. assert before[attr]
  86. assert after[attr]
  87. to_del = before[attr] - after[attr]
  88. if to_del:
  89. directives.append(("delete", attr, to_del))
  90. to_add = after[attr] - before[attr]
  91. if to_add:
  92. directives.append(("add", attr, to_add))
  93. return _dummy_modify(connect_spec, dn, directives)
  94. def _dummy_modify(connect_spec, dn, directives):
  95. assert dn in db
  96. e = db[dn]
  97. for op, attr, vals in directives:
  98. if op == "add":
  99. assert vals
  100. existing_vals = e.setdefault(attr, OrderedSet())
  101. for val in vals:
  102. assert val not in existing_vals
  103. existing_vals.add(val)
  104. elif op == "delete":
  105. assert attr in e
  106. existing_vals = e[attr]
  107. assert existing_vals
  108. if not vals:
  109. del e[attr]
  110. continue
  111. for val in vals:
  112. assert val in existing_vals
  113. existing_vals.remove(val)
  114. if not existing_vals:
  115. del e[attr]
  116. elif op == "replace":
  117. e.pop(attr, None)
  118. e[attr] = OrderedSet(vals)
  119. else:
  120. raise ValueError()
  121. return True
  122. def _dump_db(d=None):
  123. if d is None:
  124. d = db
  125. return dict(((dn, dict(((attr, list(d[dn][attr])) for attr in d[dn]))) for dn in d))
  126. class LDAPTestCase(TestCase, LoaderModuleMockMixin):
  127. def setup_loader_modules(self):
  128. salt_dunder = {}
  129. for fname in ("connect", "search", "add", "delete", "change", "modify"):
  130. salt_dunder["ldap3.{}".format(fname)] = globals()["_dummy_" + fname]
  131. return {
  132. salt.states.ldap: {"__opts__": {"test": False}, "__salt__": salt_dunder}
  133. }
  134. def _test_helper(self, init_db, expected_ret, replace, delete_others=False):
  135. _init_db(copy.deepcopy(init_db))
  136. old = _dump_db()
  137. new = _dump_db()
  138. expected_db = copy.deepcopy(init_db)
  139. for dn, attrs in six.iteritems(replace):
  140. for attr, vals in six.iteritems(attrs):
  141. if vals:
  142. new.setdefault(dn, {})[attr] = list(OrderedSet(vals))
  143. expected_db.setdefault(dn, {})[attr] = OrderedSet(vals)
  144. elif dn in expected_db:
  145. new[dn].pop(attr, None)
  146. expected_db[dn].pop(attr, None)
  147. if not expected_db.get(dn, {}):
  148. new.pop(dn, None)
  149. expected_db.pop(dn, None)
  150. if delete_others:
  151. dn_to_delete = OrderedSet()
  152. for dn, attrs in six.iteritems(expected_db):
  153. if dn in replace:
  154. to_delete = OrderedSet()
  155. for attr, vals in six.iteritems(attrs):
  156. if attr not in replace[dn]:
  157. to_delete.add(attr)
  158. for attr in to_delete:
  159. del attrs[attr]
  160. del new[dn][attr]
  161. if not attrs:
  162. dn_to_delete.add(dn)
  163. for dn in dn_to_delete:
  164. del new[dn]
  165. del expected_db[dn]
  166. name = "ldapi:///"
  167. expected_ret["name"] = name
  168. expected_ret.setdefault("result", True)
  169. expected_ret.setdefault("comment", "Successfully updated LDAP entries")
  170. expected_ret.setdefault(
  171. "changes",
  172. dict(
  173. (
  174. (
  175. dn,
  176. {
  177. "old": dict(
  178. (attr, vals)
  179. for attr, vals in six.iteritems(old[dn])
  180. if vals != new.get(dn, {}).get(attr, ())
  181. )
  182. if dn in old
  183. else None,
  184. "new": dict(
  185. (attr, vals)
  186. for attr, vals in six.iteritems(new[dn])
  187. if vals != old.get(dn, {}).get(attr, ())
  188. )
  189. if dn in new
  190. else None,
  191. },
  192. )
  193. for dn in replace
  194. if old.get(dn, {}) != new.get(dn, {})
  195. )
  196. ),
  197. )
  198. entries = [
  199. {dn: [{"replace": attrs}, {"delete_others": delete_others}]}
  200. for dn, attrs in six.iteritems(replace)
  201. ]
  202. actual = salt.states.ldap.managed(name, entries)
  203. self.assertDictEqual(expected_ret, actual)
  204. self.assertDictEqual(expected_db, db)
  205. def _test_helper_success(self, init_db, replace, delete_others=False):
  206. self._test_helper(init_db, {}, replace, delete_others)
  207. def _test_helper_nochange(self, init_db, replace, delete_others=False):
  208. expected = {
  209. "changes": {},
  210. "comment": "LDAP entries already set",
  211. }
  212. self._test_helper(init_db, expected, replace, delete_others)
  213. def test_managed_empty(self):
  214. _init_db()
  215. name = "ldapi:///"
  216. expected = {
  217. "name": name,
  218. "changes": {},
  219. "result": True,
  220. "comment": "LDAP entries already set",
  221. }
  222. actual = salt.states.ldap.managed(name, {})
  223. self.assertDictEqual(expected, actual)
  224. def test_managed_add_entry(self):
  225. self._test_helper_success({}, {"dummydn": {"foo": ["bar", "baz"]}})
  226. def test_managed_add_attr(self):
  227. self._test_helper_success(_complex_db(), {"dnfoo": {"attrfoo3": ["valfoo3.1"]}})
  228. def test_managed_simplereplace(self):
  229. self._test_helper_success(_complex_db(), {"dnfoo": {"attrfoo1": ["valfoo1.3"]}})
  230. def test_managed_deleteattr(self):
  231. self._test_helper_success(_complex_db(), {"dnfoo": {"attrfoo1": []}})
  232. def test_managed_deletenonexistattr(self):
  233. self._test_helper_nochange(_complex_db(), {"dnfoo": {"dummyattr": []}})
  234. def test_managed_deleteentry(self):
  235. self._test_helper_success(_complex_db(), {"dnfoo": {}}, True)
  236. def test_managed_deletenonexistentry(self):
  237. self._test_helper_nochange(_complex_db(), {"dummydn": {}}, True)
  238. def test_managed_deletenonexistattrinnonexistentry(self):
  239. self._test_helper_nochange(_complex_db(), {"dummydn": {"dummyattr": []}})
  240. def test_managed_add_attr_delete_others(self):
  241. self._test_helper_success(
  242. _complex_db(), {"dnfoo": {"dummyattr": ["dummyval"]}}, True
  243. )
  244. def test_managed_no_net_change(self):
  245. self._test_helper_nochange(
  246. _complex_db(), {"dnfoo": {"attrfoo1": ["valfoo1.1", "valfoo1.2"]}}
  247. )
  248. def test_managed_repeated_values(self):
  249. self._test_helper_success(
  250. {}, {"dummydn": {"dummyattr": ["dummyval", "dummyval"]}}
  251. )