/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-18 22:29:25 UTC
  • Revision ID: teddy@recompile.se-20190318222925-jvhek84dgcfgj6g3
mandos-ctl: Refactor tests

* mandos-ctl: Where the clients names "foo" and "barbar" do not refer
              to the actual mock clients in the TestCommand class,
              change all occurrences of these names to "client1" and
              "client2" (or just "client" when only one is used) .
              Also change all test doubles to use correct terminology;
              some things called mocks are actually stubs or spies,
              and rename all true mocks to have "mock" in their names.
              Also eliminate duplicate values in tests; derive values
              from previously defined values whenever possible.

Show diffs side-by-side

added added

removed removed

Lines of Context:
983
983
            options = self.parser.parse_args()
984
984
            setattr(options, action, value)
985
985
            options.verbose = True
986
 
            options.client = ["foo"]
 
986
            options.client = ["client"]
987
987
            with self.assertParseError():
988
988
                self.check_option_syntax(options)
989
989
 
1019
1019
        for action, value in self.actions.items():
1020
1020
            options = self.parser.parse_args()
1021
1021
            setattr(options, action, value)
1022
 
            options.client = ["foo"]
 
1022
            options.client = ["client"]
1023
1023
            self.check_option_syntax(options)
1024
1024
 
1025
1025
    def test_one_client_with_all_actions_except_is_enabled(self):
1028
1028
            if action == "is_enabled":
1029
1029
                continue
1030
1030
            setattr(options, action, value)
1031
 
        options.client = ["foo"]
 
1031
        options.client = ["client"]
1032
1032
        self.check_option_syntax(options)
1033
1033
 
1034
1034
    def test_two_clients_with_all_actions_except_is_enabled(self):
1037
1037
            if action == "is_enabled":
1038
1038
                continue
1039
1039
            setattr(options, action, value)
1040
 
        options.client = ["foo", "barbar"]
 
1040
        options.client = ["client1", "client2"]
1041
1041
        self.check_option_syntax(options)
1042
1042
 
1043
1043
    def test_two_clients_are_ok_with_actions_except_is_enabled(self):
1046
1046
                continue
1047
1047
            options = self.parser.parse_args()
1048
1048
            setattr(options, action, value)
1049
 
            options.client = ["foo", "barbar"]
 
1049
            options.client = ["client1", "client2"]
1050
1050
            self.check_option_syntax(options)
1051
1051
 
1052
1052
    def test_is_enabled_fails_without_client(self):
1058
1058
    def test_is_enabled_fails_with_two_clients(self):
1059
1059
        options = self.parser.parse_args()
1060
1060
        options.is_enabled = True
1061
 
        options.client = ["foo", "barbar"]
 
1061
        options.client = ["client1", "client2"]
1062
1062
        with self.assertParseError():
1063
1063
            self.check_option_syntax(options)
1064
1064
 
1091
1091
        self.assertTrue(mockbus.called)
1092
1092
 
1093
1093
    def test_logs_and_exits_on_dbus_error(self):
1094
 
        class MockBusFailing(object):
 
1094
        class FailingBusStub(object):
1095
1095
            def get_object(self, busname, dbus_path):
1096
1096
                raise dbus.exceptions.DBusException("Test")
1097
1097
 
1098
1098
        with self.assertLogs(log, logging.CRITICAL):
1099
1099
            with self.assertRaises(SystemExit) as e:
1100
 
                bus = get_mandos_dbus_object(bus=MockBusFailing())
 
1100
                bus = get_mandos_dbus_object(bus=FailingBusStub())
1101
1101
 
1102
1102
        if isinstance(e.exception.code, int):
1103
1103
            self.assertNotEqual(0, e.exception.code)
1107
1107
 
1108
1108
class Test_get_managed_objects(TestCaseWithAssertLogs):
1109
1109
    def test_calls_and_returns_GetManagedObjects(self):
1110
 
        managed_objects = {"/clients/foo": { "Name": "foo"}}
1111
 
        class MockObjectManager(object):
 
1110
        managed_objects = {"/clients/client": { "Name": "client"}}
 
1111
        class ObjectManagerStub(object):
1112
1112
            def GetManagedObjects(self):
1113
1113
                return managed_objects
1114
 
        retval = get_managed_objects(MockObjectManager())
 
1114
        retval = get_managed_objects(ObjectManagerStub())
1115
1115
        self.assertDictEqual(managed_objects, retval)
1116
1116
 
1117
1117
    def test_logs_and_exits_on_dbus_error(self):
1118
1118
        dbus_logger = logging.getLogger("dbus.proxies")
1119
1119
 
1120
 
        class MockObjectManagerFailing(object):
 
1120
        class ObjectManagerFailingStub(object):
1121
1121
            def GetManagedObjects(self):
1122
1122
                dbus_logger.error("Test")
1123
1123
                raise dbus.exceptions.DBusException("Test")
1134
1134
        try:
1135
1135
            with self.assertLogs(log, logging.CRITICAL) as watcher:
1136
1136
                with self.assertRaises(SystemExit) as e:
1137
 
                    get_managed_objects(MockObjectManagerFailing())
 
1137
                    get_managed_objects(ObjectManagerFailingStub())
1138
1138
        finally:
1139
1139
            dbus_logger.removeFilter(counting_handler)
1140
1140
 
1157
1157
        add_command_line_options(self.parser)
1158
1158
 
1159
1159
    def test_is_enabled(self):
1160
 
        self.assert_command_from_args(["--is-enabled", "foo"],
 
1160
        self.assert_command_from_args(["--is-enabled", "client"],
1161
1161
                                      command.IsEnabled)
1162
1162
 
1163
1163
    def assert_command_from_args(self, args, command_cls,
1174
1174
            self.assertEqual(value, getattr(command, key))
1175
1175
 
1176
1176
    def test_is_enabled_short(self):
1177
 
        self.assert_command_from_args(["-V", "foo"],
 
1177
        self.assert_command_from_args(["-V", "client"],
1178
1178
                                      command.IsEnabled)
1179
1179
 
1180
1180
    def test_approve(self):
1181
 
        self.assert_command_from_args(["--approve", "foo"],
 
1181
        self.assert_command_from_args(["--approve", "client"],
1182
1182
                                      command.Approve)
1183
1183
 
1184
1184
    def test_approve_short(self):
1185
 
        self.assert_command_from_args(["-A", "foo"], command.Approve)
 
1185
        self.assert_command_from_args(["-A", "client"],
 
1186
                                      command.Approve)
1186
1187
 
1187
1188
    def test_deny(self):
1188
 
        self.assert_command_from_args(["--deny", "foo"], command.Deny)
 
1189
        self.assert_command_from_args(["--deny", "client"],
 
1190
                                      command.Deny)
1189
1191
 
1190
1192
    def test_deny_short(self):
1191
 
        self.assert_command_from_args(["-D", "foo"], command.Deny)
 
1193
        self.assert_command_from_args(["-D", "client"], command.Deny)
1192
1194
 
1193
1195
    def test_remove(self):
1194
 
        self.assert_command_from_args(["--remove", "foo"],
 
1196
        self.assert_command_from_args(["--remove", "client"],
1195
1197
                                      command.Remove)
1196
1198
 
1197
1199
    def test_deny_before_remove(self):
1198
1200
        options = self.parser.parse_args(["--deny", "--remove",
1199
 
                                          "foo"])
 
1201
                                          "client"])
1200
1202
        check_option_syntax(self.parser, options)
1201
1203
        commands = commands_from_options(options)
1202
1204
        self.assertEqual(2, len(commands))
1213
1215
        self.assertIsInstance(commands[1], command.Remove)
1214
1216
 
1215
1217
    def test_remove_short(self):
1216
 
        self.assert_command_from_args(["-r", "foo"], command.Remove)
 
1218
        self.assert_command_from_args(["-r", "client"],
 
1219
                                      command.Remove)
1217
1220
 
1218
1221
    def test_dump_json(self):
1219
1222
        self.assert_command_from_args(["--dump-json"],
1220
1223
                                      command.DumpJSON)
1221
1224
 
1222
1225
    def test_enable(self):
1223
 
        self.assert_command_from_args(["--enable", "foo"],
 
1226
        self.assert_command_from_args(["--enable", "client"],
1224
1227
                                      command.Enable)
1225
1228
 
1226
1229
    def test_enable_short(self):
1227
 
        self.assert_command_from_args(["-e", "foo"], command.Enable)
 
1230
        self.assert_command_from_args(["-e", "client"],
 
1231
                                      command.Enable)
1228
1232
 
1229
1233
    def test_disable(self):
1230
 
        self.assert_command_from_args(["--disable", "foo"],
 
1234
        self.assert_command_from_args(["--disable", "client"],
1231
1235
                                      command.Disable)
1232
1236
 
1233
1237
    def test_disable_short(self):
1234
 
        self.assert_command_from_args(["-d", "foo"], command.Disable)
 
1238
        self.assert_command_from_args(["-d", "client"],
 
1239
                                      command.Disable)
1235
1240
 
1236
1241
    def test_bump_timeout(self):
1237
 
        self.assert_command_from_args(["--bump-timeout", "foo"],
 
1242
        self.assert_command_from_args(["--bump-timeout", "client"],
1238
1243
                                      command.BumpTimeout)
1239
1244
 
1240
1245
    def test_bump_timeout_short(self):
1241
 
        self.assert_command_from_args(["-b", "foo"],
 
1246
        self.assert_command_from_args(["-b", "client"],
1242
1247
                                      command.BumpTimeout)
1243
1248
 
1244
1249
    def test_start_checker(self):
1245
 
        self.assert_command_from_args(["--start-checker", "foo"],
 
1250
        self.assert_command_from_args(["--start-checker", "client"],
1246
1251
                                      command.StartChecker)
1247
1252
 
1248
1253
    def test_stop_checker(self):
1249
 
        self.assert_command_from_args(["--stop-checker", "foo"],
 
1254
        self.assert_command_from_args(["--stop-checker", "client"],
1250
1255
                                      command.StopChecker)
1251
1256
 
1252
1257
    def test_approve_by_default(self):
1253
 
        self.assert_command_from_args(["--approve-by-default", "foo"],
 
1258
        self.assert_command_from_args(["--approve-by-default",
 
1259
                                       "client"],
1254
1260
                                      command.ApproveByDefault)
1255
1261
 
1256
1262
    def test_deny_by_default(self):
1257
 
        self.assert_command_from_args(["--deny-by-default", "foo"],
 
1263
        self.assert_command_from_args(["--deny-by-default", "client"],
1258
1264
                                      command.DenyByDefault)
1259
1265
 
1260
1266
    def test_checker(self):
1261
 
        self.assert_command_from_args(["--checker", ":", "foo"],
 
1267
        self.assert_command_from_args(["--checker", ":", "client"],
1262
1268
                                      command.SetChecker,
1263
1269
                                      value_to_set=":")
1264
1270
 
1265
1271
    def test_checker_empty(self):
1266
 
        self.assert_command_from_args(["--checker", "", "foo"],
 
1272
        self.assert_command_from_args(["--checker", "", "client"],
1267
1273
                                      command.SetChecker,
1268
1274
                                      value_to_set="")
1269
1275
 
1270
1276
    def test_checker_short(self):
1271
 
        self.assert_command_from_args(["-c", ":", "foo"],
 
1277
        self.assert_command_from_args(["-c", ":", "client"],
1272
1278
                                      command.SetChecker,
1273
1279
                                      value_to_set=":")
1274
1280
 
1275
1281
    def test_host(self):
1276
 
        self.assert_command_from_args(["--host", "foo.example.org",
1277
 
                                       "foo"], command.SetHost,
1278
 
                                      value_to_set="foo.example.org")
 
1282
        self.assert_command_from_args(
 
1283
            ["--host", "client.example.org", "client"],
 
1284
            command.SetHost, value_to_set="client.example.org")
1279
1285
 
1280
1286
    def test_host_short(self):
1281
 
        self.assert_command_from_args(["-H", "foo.example.org",
1282
 
                                       "foo"], command.SetHost,
1283
 
                                      value_to_set="foo.example.org")
 
1287
        self.assert_command_from_args(
 
1288
            ["-H", "client.example.org", "client"], command.SetHost,
 
1289
            value_to_set="client.example.org")
1284
1290
 
1285
1291
    def test_secret_devnull(self):
1286
1292
        self.assert_command_from_args(["--secret", os.path.devnull,
1287
 
                                       "foo"], command.SetSecret,
 
1293
                                       "client"], command.SetSecret,
1288
1294
                                      value_to_set=b"")
1289
1295
 
1290
1296
    def test_secret_tempfile(self):
1293
1299
            f.write(value)
1294
1300
            f.seek(0)
1295
1301
            self.assert_command_from_args(["--secret", f.name,
1296
 
                                           "foo"], command.SetSecret,
 
1302
                                           "client"],
 
1303
                                          command.SetSecret,
1297
1304
                                          value_to_set=value)
1298
1305
 
1299
1306
    def test_secret_devnull_short(self):
1300
 
        self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1301
 
                                      command.SetSecret,
 
1307
        self.assert_command_from_args(["-s", os.path.devnull,
 
1308
                                       "client"], command.SetSecret,
1302
1309
                                      value_to_set=b"")
1303
1310
 
1304
1311
    def test_secret_tempfile_short(self):
1306
1313
            value = b"secret\0xyzzy\nbar"
1307
1314
            f.write(value)
1308
1315
            f.seek(0)
1309
 
            self.assert_command_from_args(["-s", f.name, "foo"],
 
1316
            self.assert_command_from_args(["-s", f.name, "client"],
1310
1317
                                          command.SetSecret,
1311
1318
                                          value_to_set=value)
1312
1319
 
1313
1320
    def test_timeout(self):
1314
 
        self.assert_command_from_args(["--timeout", "PT5M", "foo"],
 
1321
        self.assert_command_from_args(["--timeout", "PT5M", "client"],
1315
1322
                                      command.SetTimeout,
1316
1323
                                      value_to_set=300000)
1317
1324
 
1318
1325
    def test_timeout_short(self):
1319
 
        self.assert_command_from_args(["-t", "PT5M", "foo"],
 
1326
        self.assert_command_from_args(["-t", "PT5M", "client"],
1320
1327
                                      command.SetTimeout,
1321
1328
                                      value_to_set=300000)
1322
1329
 
1323
1330
    def test_extended_timeout(self):
1324
1331
        self.assert_command_from_args(["--extended-timeout", "PT15M",
1325
 
                                       "foo"],
 
1332
                                       "client"],
1326
1333
                                      command.SetExtendedTimeout,
1327
1334
                                      value_to_set=900000)
1328
1335
 
1329
1336
    def test_interval(self):
1330
 
        self.assert_command_from_args(["--interval", "PT2M", "foo"],
1331
 
                                      command.SetInterval,
 
1337
        self.assert_command_from_args(["--interval", "PT2M",
 
1338
                                       "client"], command.SetInterval,
1332
1339
                                      value_to_set=120000)
1333
1340
 
1334
1341
    def test_interval_short(self):
1335
 
        self.assert_command_from_args(["-i", "PT2M", "foo"],
 
1342
        self.assert_command_from_args(["-i", "PT2M", "client"],
1336
1343
                                      command.SetInterval,
1337
1344
                                      value_to_set=120000)
1338
1345
 
1339
1346
    def test_approval_delay(self):
1340
1347
        self.assert_command_from_args(["--approval-delay", "PT30S",
1341
 
                                       "foo"],
 
1348
                                       "client"],
1342
1349
                                      command.SetApprovalDelay,
1343
1350
                                      value_to_set=30000)
1344
1351
 
1345
1352
    def test_approval_duration(self):
1346
1353
        self.assert_command_from_args(["--approval-duration", "PT1S",
1347
 
                                       "foo"],
 
1354
                                       "client"],
1348
1355
                                      command.SetApprovalDuration,
1349
1356
                                      value_to_set=1000)
1350
1357
 
1433
1440
            LastCheckerStatus=-2)
1434
1441
        self.clients =  collections.OrderedDict(
1435
1442
            [
1436
 
                ("/clients/foo", self.client.attributes),
1437
 
                ("/clients/barbar", self.other_client.attributes),
 
1443
                (self.client.__dbus_object_path__,
 
1444
                 self.client.attributes),
 
1445
                (self.other_client.__dbus_object_path__,
 
1446
                 self.other_client.attributes),
1438
1447
            ])
1439
 
        self.one_client = {"/clients/foo": self.client.attributes}
 
1448
        self.one_client = {self.client.__dbus_object_path__:
 
1449
                           self.client.attributes}
1440
1450
 
1441
1451
    @property
1442
1452
    def bus(self):
1443
 
        class Bus(object):
 
1453
        class MockBus(object):
1444
1454
            @staticmethod
1445
1455
            def get_object(client_bus_name, path):
1446
1456
                self.assertEqual(dbus_busname, client_bus_name)
1447
 
                return {
1448
 
                    # Note: "self" here is the TestCmd instance, not
1449
 
                    # the Bus instance, since this is a static method!
1450
 
                    "/clients/foo": self.client,
1451
 
                    "/clients/barbar": self.other_client,
1452
 
                }[path]
1453
 
        return Bus()
 
1457
                # Note: "self" here is the TestCmd instance, not the
 
1458
                # MockBus instance, since this is a static method!
 
1459
                if path == self.client.__dbus_object_path__:
 
1460
                    return self.client
 
1461
                elif path == self.other_client.__dbus_object_path__:
 
1462
                    return self.other_client
 
1463
        return MockBus()
1454
1464
 
1455
1465
 
1456
1466
class TestBaseCommands(TestCommand):
1487
1497
                          client.calls)
1488
1498
 
1489
1499
    def test_Remove(self):
1490
 
        class MockMandos(object):
 
1500
        class MandosSpy(object):
1491
1501
            def __init__(self):
1492
1502
                self.calls = []
1493
1503
            def RemoveClient(self, dbus_path):
1494
1504
                self.calls.append(("RemoveClient", (dbus_path,)))
1495
 
        mandos = MockMandos()
 
1505
        mandos = MandosSpy()
1496
1506
        command.Remove().run(self.clients, self.bus, mandos)
1497
1507
        for clientpath in self.clients:
1498
1508
            self.assertIn(("RemoveClient", (clientpath,)),
1780
1790
class TestSetHostCmd(TestPropertyValueCmd):
1781
1791
    command = command.SetHost
1782
1792
    propname = "Host"
1783
 
    values_to_set = ["192.0.2.3", "foo.example.org"]
 
1793
    values_to_set = ["192.0.2.3", "client.example.org"]
1784
1794
 
1785
1795
 
1786
1796
class TestSetSecretCmd(TestPropertyValueCmd):
1788
1798
    propname = "Secret"
1789
1799
    values_to_set = [io.BytesIO(b""),
1790
1800
                     io.BytesIO(b"secret\0xyzzy\nbar")]
1791
 
    values_to_get = [b"", b"secret\0xyzzy\nbar"]
 
1801
    values_to_get = [f.getvalue() for f in values_to_set]
1792
1802
 
1793
1803
 
1794
1804
class TestSetTimeoutCmd(TestPropertyValueCmd):
1799
1809
                     datetime.timedelta(seconds=1),
1800
1810
                     datetime.timedelta(weeks=1),
1801
1811
                     datetime.timedelta(weeks=52)]
1802
 
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
 
1812
    values_to_get = [dt.total_seconds()*1000 for dt in values_to_set]
1803
1813
 
1804
1814
 
1805
1815
class TestSetExtendedTimeoutCmd(TestPropertyValueCmd):
1810
1820
                     datetime.timedelta(seconds=1),
1811
1821
                     datetime.timedelta(weeks=1),
1812
1822
                     datetime.timedelta(weeks=52)]
1813
 
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
 
1823
    values_to_get = [dt.total_seconds()*1000 for dt in values_to_set]
1814
1824
 
1815
1825
 
1816
1826
class TestSetIntervalCmd(TestPropertyValueCmd):
1821
1831
                     datetime.timedelta(seconds=1),
1822
1832
                     datetime.timedelta(weeks=1),
1823
1833
                     datetime.timedelta(weeks=52)]
1824
 
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
 
1834
    values_to_get = [dt.total_seconds()*1000 for dt in values_to_set]
1825
1835
 
1826
1836
 
1827
1837
class TestSetApprovalDelayCmd(TestPropertyValueCmd):
1832
1842
                     datetime.timedelta(seconds=1),
1833
1843
                     datetime.timedelta(weeks=1),
1834
1844
                     datetime.timedelta(weeks=52)]
1835
 
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
 
1845
    values_to_get = [dt.total_seconds()*1000 for dt in values_to_set]
1836
1846
 
1837
1847
 
1838
1848
class TestSetApprovalDurationCmd(TestPropertyValueCmd):
1843
1853
                     datetime.timedelta(seconds=1),
1844
1854
                     datetime.timedelta(weeks=1),
1845
1855
                     datetime.timedelta(weeks=52)]
1846
 
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
 
1856
    values_to_get = [dt.total_seconds()*1000 for dt in values_to_set]
1847
1857
 
1848
1858
 
1849
1859