test_logstash_mod.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. import errno
  4. import logging
  5. import socket
  6. import time
  7. import salt.utils.stringutils
  8. import zmq
  9. from salt.log.handlers.logstash_mod import DatagramLogstashHandler, ZMQLogstashHander
  10. from saltfactories.utils.ports import get_unused_localhost_port
  11. from tests.support.helpers import slowTest
  12. from tests.support.unit import TestCase
  13. log = logging.getLogger(__name__)
  14. # At the moment of writing this test the `functional` suite is not yet complete
  15. # TODO move to the `functional` suite since this test doesn't require running instance of Salt Master/Minion
  16. class DatagramLogstashHandlerTest(TestCase):
  17. def setUp(self):
  18. self.test_server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  19. port = get_unused_localhost_port()
  20. self.test_server.bind(("127.0.0.1", port))
  21. self.test_server.settimeout(2)
  22. self.logger = logging.getLogger("test_logstash_logger")
  23. self.logger.setLevel(logging.DEBUG)
  24. self.logger.addHandler(DatagramLogstashHandler("127.0.0.1", port))
  25. def tearDown(self):
  26. self.test_server.close()
  27. @slowTest
  28. def test_log_pickling(self):
  29. # given
  30. the_log = "test message"
  31. # when
  32. self.logger.info(the_log)
  33. # then
  34. try:
  35. received_log, addr = self.test_server.recvfrom(12)
  36. self.assertEqual(received_log, salt.utils.stringutils.to_bytes(the_log))
  37. except socket.timeout:
  38. self.fail(
  39. "Log message was not received.\n"
  40. "Check either pickling failed (and message was not send) or some other error occurred"
  41. )
  42. # At the moment of writing this test the `functional` suite is not yet complete
  43. # TODO move to the `functional` suite since this test doesn't require running instance of Salt Master/Minion
  44. class ZMQLogstashHanderTest(TestCase):
  45. def setUp(self):
  46. self.context = zmq.Context()
  47. port = get_unused_localhost_port()
  48. self.zmq_server = self.context.socket(zmq.SUB)
  49. self.zmq_server.setsockopt(zmq.SUBSCRIBE, b"")
  50. self.zmq_server.bind("tcp://127.0.0.1:{}".format(port))
  51. self.logger = logging.getLogger("test_logstash_logger")
  52. self.logger.setLevel(logging.DEBUG)
  53. self.logger.addHandler(ZMQLogstashHander("tcp://127.0.0.1:{}".format(port)))
  54. def tearDown(self):
  55. self.zmq_server.close()
  56. self.context.term()
  57. @slowTest
  58. def test_log_pickling(self):
  59. # given
  60. the_log = "test message"
  61. attempts = 5
  62. received_log = "wrong message"
  63. # I couldn't receive the first log message, that's why it is done using loop...
  64. # https://zeromq.jira.com/browse/LIBZMQ-270 could be related
  65. while attempts >= 0:
  66. try:
  67. # when
  68. self.logger.info(the_log)
  69. time.sleep(1)
  70. received_log = self.zmq_server.recv(zmq.NOBLOCK)
  71. # then
  72. break
  73. except zmq.ZMQError as exc:
  74. if exc.errno == errno.EAGAIN:
  75. attempts -= 1
  76. continue
  77. raise
  78. self.assertEqual(
  79. received_log,
  80. salt.utils.stringutils.to_bytes(the_log),
  81. "Check either pickling failed (and message was not send) or some other error occurred",
  82. )