test_logstash_mod.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. import errno
  4. import logging
  5. import socket
  6. import time
  7. import salt.utils.stringutils
  8. import zmq
  9. from salt.log.handlers.logstash_mod import DatagramLogstashHandler, ZMQLogstashHander
  10. from tests.support.helpers import get_unused_localhost_port
  11. from tests.support.unit import TestCase, skipIf
  12. log = logging.getLogger(__name__)
  13. # At the moment of writing this test the `functional` suite is not yet complete
  14. # TODO move to the `functional` suite since this test doesn't require running instance of Salt Master/Minion
  15. class DatagramLogstashHandlerTest(TestCase):
  16. def setUp(self):
  17. self.test_server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  18. port = get_unused_localhost_port()
  19. self.test_server.bind(("127.0.0.1", port))
  20. self.test_server.settimeout(2)
  21. self.logger = logging.getLogger("test_logstash_logger")
  22. self.logger.setLevel(logging.DEBUG)
  23. self.logger.addHandler(DatagramLogstashHandler("127.0.0.1", port))
  24. def tearDown(self):
  25. self.test_server.close()
  26. @skipIf(True, "SLOWTEST skip")
  27. def test_log_pickling(self):
  28. # given
  29. the_log = "test message"
  30. # when
  31. self.logger.info(the_log)
  32. # then
  33. try:
  34. received_log, addr = self.test_server.recvfrom(12)
  35. self.assertEqual(received_log, salt.utils.stringutils.to_bytes(the_log))
  36. except socket.timeout:
  37. self.fail(
  38. "Log message was not received.\n"
  39. "Check either pickling failed (and message was not send) or some other error occurred"
  40. )
  41. # At the moment of writing this test the `functional` suite is not yet complete
  42. # TODO move to the `functional` suite since this test doesn't require running instance of Salt Master/Minion
  43. class ZMQLogstashHanderTest(TestCase):
  44. def setUp(self):
  45. self.context = zmq.Context()
  46. port = get_unused_localhost_port()
  47. self.zmq_server = self.context.socket(zmq.SUB)
  48. self.zmq_server.setsockopt(zmq.SUBSCRIBE, b"")
  49. self.zmq_server.bind("tcp://127.0.0.1:{}".format(port))
  50. self.logger = logging.getLogger("test_logstash_logger")
  51. self.logger.setLevel(logging.DEBUG)
  52. self.logger.addHandler(ZMQLogstashHander("tcp://127.0.0.1:{}".format(port)))
  53. def tearDown(self):
  54. self.zmq_server.close()
  55. self.context.term()
  56. @skipIf(True, "SLOWTEST skip")
  57. def test_log_pickling(self):
  58. # given
  59. the_log = "test message"
  60. attempts = 5
  61. received_log = "wrong message"
  62. # I couldn't receive the first log message, that's why it is done using loop...
  63. # https://zeromq.jira.com/browse/LIBZMQ-270 could be related
  64. while attempts >= 0:
  65. try:
  66. # when
  67. self.logger.info(the_log)
  68. time.sleep(1)
  69. received_log = self.zmq_server.recv(zmq.NOBLOCK)
  70. # then
  71. break
  72. except zmq.ZMQError as exc:
  73. if exc.errno == errno.EAGAIN:
  74. attempts -= 1
  75. continue
  76. raise
  77. self.assertEqual(
  78. received_log,
  79. salt.utils.stringutils.to_bytes(the_log),
  80. "Check either pickling failed (and message was not send) or some other error occurred",
  81. )