/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-16 04:32:51 UTC
  • Revision ID: teddy@recompile.se-20190316043251-lqkgv164b7x6bsj5
mandos-ctl: Refactor tests

* mandos-ctl (TestCaseWithAssertLogs): New.
  (Test_string_to_delta): Inherit from TestCaseWithAssertLogs and
                          simply use self.assertLogs() when needed.
  (Test_get_mandos_dbus_object): - '' -
  (Test_get_mandos_dbus_object.CriticalFilter): Remove.
  (Test_get_managed_objects): - '' - and use a CountingHandler instead
                              of a CountingFilter.
  (Test_get_managed_objects.CriticalFilter): Remove.
  (Test_SilenceLogger): Remove.

Show diffs side-by-side

added added

removed removed

Lines of Context:
839
839
 
840
840
 
841
841
 
842
 
class Test_string_to_delta(unittest.TestCase):
 
842
class TestCaseWithAssertLogs(unittest.TestCase):
 
843
    """unittest.TestCase.assertLogs only exists in Python 3.4"""
 
844
 
 
845
    if not hasattr(unittest.TestCase, "assertLogs"):
 
846
        @contextlib.contextmanager
 
847
        def assertLogs(self, logger, level=logging.INFO):
 
848
            capturing_handler = self.CapturingLevelHandler(level)
 
849
            old_level = logger.level
 
850
            old_propagate = logger.propagate
 
851
            logger.addHandler(capturing_handler)
 
852
            logger.setLevel(level)
 
853
            logger.propagate = False
 
854
            try:
 
855
                yield capturing_handler.watcher
 
856
            finally:
 
857
                logger.propagate = old_propagate
 
858
                logger.removeHandler(capturing_handler)
 
859
                logger.setLevel(old_level)
 
860
            self.assertGreater(len(capturing_handler.watcher.records),
 
861
                               0)
 
862
 
 
863
        class CapturingLevelHandler(logging.Handler):
 
864
            def __init__(self, level, *args, **kwargs):
 
865
                logging.Handler.__init__(self, *args, **kwargs)
 
866
                self.watcher = self.LoggingWatcher([], [])
 
867
            def emit(self, record):
 
868
                self.watcher.records.append(record)
 
869
                self.watcher.output.append(self.format(record))
 
870
 
 
871
            LoggingWatcher = collections.namedtuple("LoggingWatcher",
 
872
                                                    ("records",
 
873
                                                     "output"))
 
874
 
 
875
class Test_string_to_delta(TestCaseWithAssertLogs):
843
876
    def test_handles_basic_rfc3339(self):
844
877
        self.assertEqual(string_to_delta("PT0S"),
845
878
                         datetime.timedelta())
851
884
                         datetime.timedelta(0, 7200))
852
885
 
853
886
    def test_falls_back_to_pre_1_6_1_with_warning(self):
854
 
        # assertLogs only exists in Python 3.4
855
 
        if hasattr(self, "assertLogs"):
856
 
            with self.assertLogs(log, logging.WARNING):
857
 
                value = string_to_delta("2h")
858
 
        else:
859
 
            class WarningFilter(logging.Filter):
860
 
                """Don't show, but record the presence of, warnings"""
861
 
                def filter(self, record):
862
 
                    is_warning = record.levelno >= logging.WARNING
863
 
                    self.found = is_warning or getattr(self, "found",
864
 
                                                       False)
865
 
                    return not is_warning
866
 
            warning_filter = WarningFilter()
867
 
            log.addFilter(warning_filter)
868
 
            try:
869
 
                value = string_to_delta("2h")
870
 
            finally:
871
 
                log.removeFilter(warning_filter)
872
 
            self.assertTrue(getattr(warning_filter, "found", False))
 
887
        with self.assertLogs(log, logging.WARNING):
 
888
            value = string_to_delta("2h")
873
889
        self.assertEqual(value, datetime.timedelta(0, 7200))
874
890
 
875
891
 
1005
1021
                self.check_option_syntax(options)
1006
1022
 
1007
1023
 
1008
 
class Test_get_mandos_dbus_object(unittest.TestCase):
 
1024
class Test_get_mandos_dbus_object(TestCaseWithAssertLogs):
1009
1025
    def test_calls_and_returns_get_object_on_bus(self):
1010
1026
        class MockBus(object):
1011
1027
            called = False
1026
1042
            def get_object(self, busname, dbus_path):
1027
1043
                raise dbus.exceptions.DBusException("Test")
1028
1044
 
1029
 
        # assertLogs only exists in Python 3.4
1030
 
        if hasattr(self, "assertLogs"):
1031
 
            with self.assertLogs(log, logging.CRITICAL):
1032
 
                with self.assertRaises(SystemExit) as e:
1033
 
                    bus = get_mandos_dbus_object(bus=MockBus())
1034
 
        else:
1035
 
            critical_filter = self.CriticalFilter()
1036
 
            log.addFilter(critical_filter)
1037
 
            try:
1038
 
                with self.assertRaises(SystemExit) as e:
1039
 
                    get_mandos_dbus_object(bus=MockBusFailing())
1040
 
            finally:
1041
 
                log.removeFilter(critical_filter)
1042
 
            self.assertTrue(critical_filter.found)
 
1045
        with self.assertLogs(log, logging.CRITICAL):
 
1046
            with self.assertRaises(SystemExit) as e:
 
1047
                bus = get_mandos_dbus_object(bus=MockBusFailing())
 
1048
 
1043
1049
        if isinstance(e.exception.code, int):
1044
1050
            self.assertNotEqual(e.exception.code, 0)
1045
1051
        else:
1046
1052
            self.assertIsNotNone(e.exception.code)
1047
1053
 
1048
 
    class CriticalFilter(logging.Filter):
1049
 
        """Don't show, but register, critical messages"""
1050
 
        found = False
1051
 
        def filter(self, record):
1052
 
            is_critical = record.levelno >= logging.CRITICAL
1053
 
            self.found = is_critical or self.found
1054
 
            return not is_critical
1055
 
 
1056
 
 
1057
 
class Test_get_managed_objects(unittest.TestCase):
 
1054
 
 
1055
class Test_get_managed_objects(TestCaseWithAssertLogs):
1058
1056
    def test_calls_and_returns_GetManagedObjects(self):
1059
1057
        managed_objects = {"/clients/foo": { "Name": "foo"}}
1060
1058
        class MockObjectManager(object):
1065
1063
        self.assertDictEqual(managed_objects, retval)
1066
1064
 
1067
1065
    def test_logs_and_exits_on_dbus_error(self):
 
1066
        dbus_logger = logging.getLogger("dbus.proxies")
 
1067
 
1068
1068
        class MockObjectManagerFailing(object):
1069
1069
            @staticmethod
1070
1070
            def GetManagedObjects():
 
1071
                dbus_logger.error("Test")
1071
1072
                raise dbus.exceptions.DBusException("Test")
1072
1073
 
1073
 
        if hasattr(self, "assertLogs"):
1074
 
            with self.assertLogs(log, logging.CRITICAL):
1075
 
                with self.assertRaises(SystemExit):
1076
 
                    get_managed_objects(MockObjectManagerFailing())
1077
 
        else:
1078
 
            critical_filter = self.CriticalFilter()
1079
 
            log.addFilter(critical_filter)
1080
 
            try:
 
1074
        class CountingHandler(logging.Handler):
 
1075
            count = 0
 
1076
            def emit(self, record):
 
1077
                self.count += 1
 
1078
 
 
1079
        counting_handler = CountingHandler()
 
1080
 
 
1081
        dbus_logger.addHandler(counting_handler)
 
1082
 
 
1083
        try:
 
1084
            with self.assertLogs(log, logging.CRITICAL) as watcher:
1081
1085
                with self.assertRaises(SystemExit) as e:
1082
1086
                    get_managed_objects(MockObjectManagerFailing())
1083
 
            finally:
1084
 
                log.removeFilter(critical_filter)
1085
 
            self.assertTrue(critical_filter.found)
 
1087
        finally:
 
1088
            dbus_logger.removeFilter(counting_handler)
 
1089
        self.assertEqual(counting_handler.count, 0)
 
1090
 
 
1091
        # Test that the dbus_logger still works
 
1092
        with self.assertLogs(dbus_logger, logging.ERROR):
 
1093
            dbus_logger.error("Test")
 
1094
 
1086
1095
        if isinstance(e.exception.code, int):
1087
1096
            self.assertNotEqual(e.exception.code, 0)
1088
1097
        else:
1089
1098
            self.assertIsNotNone(e.exception.code)
1090
1099
 
1091
 
    class CriticalFilter(logging.Filter):
1092
 
        """Don't show, but register, critical messages"""
1093
 
        found = False
1094
 
        def filter(self, record):
1095
 
            is_critical = record.levelno >= logging.CRITICAL
1096
 
            self.found = is_critical or self.found
1097
 
            return not is_critical
1098
 
 
1099
 
 
1100
 
class Test_SilenceLogger(unittest.TestCase):
1101
 
    loggername = "mandos-ctl.Test_SilenceLogger"
1102
 
    log = logging.getLogger(loggername)
1103
 
    log.propagate = False
1104
 
    log.addHandler(logging.NullHandler())
1105
 
 
1106
 
    def setUp(self):
1107
 
        self.counting_filter = self.CountingFilter()
1108
 
 
1109
 
    class CountingFilter(logging.Filter):
1110
 
        "Count number of records"
1111
 
        count = 0
1112
 
        def filter(self, record):
1113
 
            self.count += 1
1114
 
            return True
1115
 
 
1116
 
    def test_should_filter_records_only_when_active(self):
1117
 
        try:
1118
 
            with SilenceLogger(self.loggername):
1119
 
                self.log.addFilter(self.counting_filter)
1120
 
                self.log.info("Filtered log message 1")
1121
 
            self.log.info("Non-filtered message 2")
1122
 
            self.log.info("Non-filtered message 3")
1123
 
        finally:
1124
 
            self.log.removeFilter(self.counting_filter)
1125
 
        self.assertEqual(self.counting_filter.count, 2)
1126
 
 
1127
1100
 
1128
1101
class Test_commands_from_options(unittest.TestCase):
1129
1102
    def setUp(self):