612
613
"Checker", "ExtendedTimeout", "Expires",
613
614
"LastCheckerStatus")
616
def run(self, clients, bus=None, mandos=None):
617
print(self.output(clients.values()))
619
def output(self, clients):
620
raise NotImplementedError()
616
623
class DumpJSON(Output):
617
def run(self, clients, bus=None, mandos=None):
624
def output(self, clients):
618
625
data = {client["Name"]:
619
626
{key: self.dbus_boolean_to_bool(client[key])
620
627
for key in self.all_keywords}
621
for client in clients.values()}
622
print(json.dumps(data, indent=4, separators=(',', ': ')))
628
for client in clients}
629
return json.dumps(data, indent=4, separators=(',', ': '))
625
632
def dbus_boolean_to_bool(value):
746
753
raise NotImplementedError()
749
class Enable(PropertySetter):
756
class Enable(Property):
750
757
propname = "Enabled"
751
758
value_to_set = dbus.Boolean(True)
754
class Disable(PropertySetter):
761
class Disable(Property):
755
762
propname = "Enabled"
756
763
value_to_set = dbus.Boolean(False)
759
class BumpTimeout(PropertySetter):
766
class BumpTimeout(Property):
760
767
propname = "LastCheckedOK"
761
768
value_to_set = ""
764
class StartChecker(PropertySetter):
765
propname = "CheckerRunning"
766
value_to_set = dbus.Boolean(True)
769
class StopChecker(PropertySetter):
770
propname = "CheckerRunning"
771
value_to_set = dbus.Boolean(False)
774
class ApproveByDefault(PropertySetter):
775
propname = "ApprovedByDefault"
776
value_to_set = dbus.Boolean(True)
779
class DenyByDefault(PropertySetter):
780
propname = "ApprovedByDefault"
781
value_to_set = dbus.Boolean(False)
784
class PropertySetterValue(PropertySetter):
785
"""Abstract class for PropertySetter recieving a value as
786
constructor argument instead of a class attribute."""
771
class StartChecker(Property):
772
propname = "CheckerRunning"
773
value_to_set = dbus.Boolean(True)
776
class StopChecker(Property):
777
propname = "CheckerRunning"
778
value_to_set = dbus.Boolean(False)
781
class ApproveByDefault(Property):
782
propname = "ApprovedByDefault"
783
value_to_set = dbus.Boolean(True)
786
class DenyByDefault(Property):
787
propname = "ApprovedByDefault"
788
value_to_set = dbus.Boolean(False)
791
class PropertyValue(Property):
792
"Abstract class for Property recieving a value as argument"
787
793
def __init__(self, value):
788
794
self.value_to_set = value
791
class SetChecker(PropertySetterValue):
797
class SetChecker(PropertyValue):
792
798
propname = "Checker"
795
class SetHost(PropertySetterValue):
801
class SetHost(PropertyValue):
796
802
propname = "Host"
799
class SetSecret(PropertySetterValue):
805
class SetSecret(PropertyValue):
800
806
propname = "Secret"
1107
1112
class Test_get_managed_objects(TestCaseWithAssertLogs):
1108
1113
def test_calls_and_returns_GetManagedObjects(self):
1109
managed_objects = {"/clients/client": { "Name": "client"}}
1110
class ObjectManagerStub(object):
1114
managed_objects = {"/clients/foo": { "Name": "foo"}}
1115
class MockObjectManager(object):
1111
1116
def GetManagedObjects(self):
1112
1117
return managed_objects
1113
retval = get_managed_objects(ObjectManagerStub())
1118
retval = get_managed_objects(MockObjectManager())
1114
1119
self.assertDictEqual(managed_objects, retval)
1116
1121
def test_logs_and_exits_on_dbus_error(self):
1117
1122
dbus_logger = logging.getLogger("dbus.proxies")
1119
class ObjectManagerFailingStub(object):
1124
class MockObjectManagerFailing(object):
1120
1125
def GetManagedObjects(self):
1121
1126
dbus_logger.error("Test")
1122
1127
raise dbus.exceptions.DBusException("Test")
1173
1178
self.assertEqual(value, getattr(command, key))
1175
1180
def test_is_enabled_short(self):
1176
self.assert_command_from_args(["-V", "client"],
1181
self.assert_command_from_args(["-V", "foo"],
1177
1182
command.IsEnabled)
1179
1184
def test_approve(self):
1180
self.assert_command_from_args(["--approve", "client"],
1185
self.assert_command_from_args(["--approve", "foo"],
1181
1186
command.Approve)
1183
1188
def test_approve_short(self):
1184
self.assert_command_from_args(["-A", "client"],
1189
self.assert_command_from_args(["-A", "foo"], command.Approve)
1187
1191
def test_deny(self):
1188
self.assert_command_from_args(["--deny", "client"],
1192
self.assert_command_from_args(["--deny", "foo"], command.Deny)
1191
1194
def test_deny_short(self):
1192
self.assert_command_from_args(["-D", "client"], command.Deny)
1195
self.assert_command_from_args(["-D", "foo"], command.Deny)
1194
1197
def test_remove(self):
1195
self.assert_command_from_args(["--remove", "client"],
1198
self.assert_command_from_args(["--remove", "foo"],
1196
1199
command.Remove)
1198
1201
def test_deny_before_remove(self):
1199
1202
options = self.parser.parse_args(["--deny", "--remove",
1201
1204
check_option_syntax(self.parser, options)
1202
1205
commands = commands_from_options(options)
1203
1206
self.assertEqual(2, len(commands))
1214
1217
self.assertIsInstance(commands[1], command.Remove)
1216
1219
def test_remove_short(self):
1217
self.assert_command_from_args(["-r", "client"],
1220
self.assert_command_from_args(["-r", "foo"], command.Remove)
1220
1222
def test_dump_json(self):
1221
1223
self.assert_command_from_args(["--dump-json"],
1222
1224
command.DumpJSON)
1224
1226
def test_enable(self):
1225
self.assert_command_from_args(["--enable", "client"],
1227
self.assert_command_from_args(["--enable", "foo"],
1226
1228
command.Enable)
1228
1230
def test_enable_short(self):
1229
self.assert_command_from_args(["-e", "client"],
1231
self.assert_command_from_args(["-e", "foo"], command.Enable)
1232
1233
def test_disable(self):
1233
self.assert_command_from_args(["--disable", "client"],
1234
self.assert_command_from_args(["--disable", "foo"],
1234
1235
command.Disable)
1236
1237
def test_disable_short(self):
1237
self.assert_command_from_args(["-d", "client"],
1238
self.assert_command_from_args(["-d", "foo"], command.Disable)
1240
1240
def test_bump_timeout(self):
1241
self.assert_command_from_args(["--bump-timeout", "client"],
1241
self.assert_command_from_args(["--bump-timeout", "foo"],
1242
1242
command.BumpTimeout)
1244
1244
def test_bump_timeout_short(self):
1245
self.assert_command_from_args(["-b", "client"],
1245
self.assert_command_from_args(["-b", "foo"],
1246
1246
command.BumpTimeout)
1248
1248
def test_start_checker(self):
1249
self.assert_command_from_args(["--start-checker", "client"],
1249
self.assert_command_from_args(["--start-checker", "foo"],
1250
1250
command.StartChecker)
1252
1252
def test_stop_checker(self):
1253
self.assert_command_from_args(["--stop-checker", "client"],
1253
self.assert_command_from_args(["--stop-checker", "foo"],
1254
1254
command.StopChecker)
1256
1256
def test_approve_by_default(self):
1257
self.assert_command_from_args(["--approve-by-default",
1257
self.assert_command_from_args(["--approve-by-default", "foo"],
1259
1258
command.ApproveByDefault)
1261
1260
def test_deny_by_default(self):
1262
self.assert_command_from_args(["--deny-by-default", "client"],
1261
self.assert_command_from_args(["--deny-by-default", "foo"],
1263
1262
command.DenyByDefault)
1265
1264
def test_checker(self):
1266
self.assert_command_from_args(["--checker", ":", "client"],
1265
self.assert_command_from_args(["--checker", ":", "foo"],
1267
1266
command.SetChecker,
1268
1267
value_to_set=":")
1270
1269
def test_checker_empty(self):
1271
self.assert_command_from_args(["--checker", "", "client"],
1270
self.assert_command_from_args(["--checker", "", "foo"],
1272
1271
command.SetChecker,
1273
1272
value_to_set="")
1275
1274
def test_checker_short(self):
1276
self.assert_command_from_args(["-c", ":", "client"],
1275
self.assert_command_from_args(["-c", ":", "foo"],
1277
1276
command.SetChecker,
1278
1277
value_to_set=":")
1280
1279
def test_host(self):
1281
self.assert_command_from_args(
1282
["--host", "client.example.org", "client"],
1283
command.SetHost, value_to_set="client.example.org")
1280
self.assert_command_from_args(["--host", "foo.example.org",
1281
"foo"], command.SetHost,
1282
value_to_set="foo.example.org")
1285
1284
def test_host_short(self):
1286
self.assert_command_from_args(
1287
["-H", "client.example.org", "client"], command.SetHost,
1288
value_to_set="client.example.org")
1285
self.assert_command_from_args(["-H", "foo.example.org",
1286
"foo"], command.SetHost,
1287
value_to_set="foo.example.org")
1290
1289
def test_secret_devnull(self):
1291
1290
self.assert_command_from_args(["--secret", os.path.devnull,
1292
"client"], command.SetSecret,
1291
"foo"], command.SetSecret,
1293
1292
value_to_set=b"")
1295
1294
def test_secret_tempfile(self):
1312
1310
value = b"secret\0xyzzy\nbar"
1315
self.assert_command_from_args(["-s", f.name, "client"],
1313
self.assert_command_from_args(["-s", f.name, "foo"],
1316
1314
command.SetSecret,
1317
1315
value_to_set=value)
1319
1317
def test_timeout(self):
1320
self.assert_command_from_args(["--timeout", "PT5M", "client"],
1318
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1321
1319
command.SetTimeout,
1322
1320
value_to_set=300000)
1324
1322
def test_timeout_short(self):
1325
self.assert_command_from_args(["-t", "PT5M", "client"],
1323
self.assert_command_from_args(["-t", "PT5M", "foo"],
1326
1324
command.SetTimeout,
1327
1325
value_to_set=300000)
1329
1327
def test_extended_timeout(self):
1330
1328
self.assert_command_from_args(["--extended-timeout", "PT15M",
1332
1330
command.SetExtendedTimeout,
1333
1331
value_to_set=900000)
1335
1333
def test_interval(self):
1336
self.assert_command_from_args(["--interval", "PT2M",
1337
"client"], command.SetInterval,
1334
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1335
command.SetInterval,
1338
1336
value_to_set=120000)
1340
1338
def test_interval_short(self):
1341
self.assert_command_from_args(["-i", "PT2M", "client"],
1339
self.assert_command_from_args(["-i", "PT2M", "foo"],
1342
1340
command.SetInterval,
1343
1341
value_to_set=120000)
1345
1343
def test_approval_delay(self):
1346
1344
self.assert_command_from_args(["--approval-delay", "PT30S",
1348
1346
command.SetApprovalDelay,
1349
1347
value_to_set=30000)
1351
1349
def test_approval_duration(self):
1352
1350
self.assert_command_from_args(["--approval-duration", "PT1S",
1354
1352
command.SetApprovalDuration,
1355
1353
value_to_set=1000)
1439
1437
LastCheckerStatus=-2)
1440
1438
self.clients = collections.OrderedDict(
1442
(self.client.__dbus_object_path__,
1443
self.client.attributes),
1444
(self.other_client.__dbus_object_path__,
1445
self.other_client.attributes),
1440
("/clients/foo", self.client.attributes),
1441
("/clients/barbar", self.other_client.attributes),
1447
self.one_client = {self.client.__dbus_object_path__:
1448
self.client.attributes}
1443
self.one_client = {"/clients/foo": self.client.attributes}
1452
class MockBus(object):
1454
1449
def get_object(client_bus_name, path):
1455
1450
self.assertEqual(dbus_busname, client_bus_name)
1456
# Note: "self" here is the TestCmd instance, not the
1457
# MockBus instance, since this is a static method!
1458
if path == self.client.__dbus_object_path__:
1460
elif path == self.other_client.__dbus_object_path__:
1461
return self.other_client
1452
# Note: "self" here is the TestCmd instance, not
1453
# the Bus instance, since this is a static method!
1454
"/clients/foo": self.client,
1455
"/clients/barbar": self.other_client,
1465
1460
class TestBaseCommands(TestCommand):
1561
1556
def test_DumpJSON_normal(self):
1562
with self.capture_stdout_to_buffer() as buffer:
1563
command.DumpJSON().run(self.clients)
1564
json_data = json.loads(buffer.getvalue())
1557
output = command.DumpJSON().output(self.clients.values())
1558
json_data = json.loads(output)
1565
1559
self.assertDictEqual(self.expected_json, json_data)
1568
@contextlib.contextmanager
1569
def capture_stdout_to_buffer():
1570
capture_buffer = io.StringIO()
1571
old_stdout = sys.stdout
1572
sys.stdout = capture_buffer
1574
yield capture_buffer
1576
sys.stdout = old_stdout
1578
1561
def test_DumpJSON_one_client(self):
1579
with self.capture_stdout_to_buffer() as buffer:
1580
command.DumpJSON().run(self.one_client)
1581
json_data = json.loads(buffer.getvalue())
1562
output = command.DumpJSON().output(self.one_client.values())
1563
json_data = json.loads(output)
1582
1564
expected_json = {"foo": self.expected_json["foo"]}
1583
1565
self.assertDictEqual(expected_json, json_data)
1585
1567
def test_PrintTable_normal(self):
1586
with self.capture_stdout_to_buffer() as buffer:
1587
command.PrintTable().run(self.clients)
1568
output = command.PrintTable().output(self.clients.values())
1588
1569
expected_output = "\n".join((
1589
1570
"Name Enabled Timeout Last Successful Check",
1590
1571
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1591
1572
"barbar Yes 00:05:00 2019-02-04T00:00:00 ",
1593
self.assertEqual(expected_output, buffer.getvalue())
1574
self.assertEqual(expected_output, output)
1595
1576
def test_PrintTable_verbose(self):
1596
with self.capture_stdout_to_buffer() as buffer:
1597
command.PrintTable(verbose=True).run(self.clients)
1577
output = command.PrintTable(verbose=True).output(
1578
self.clients.values())
1684
1665
num_lines = max(len(rows) for rows in columns)
1685
expected_output = ("\n".join("".join(rows[line]
1686
for rows in columns)
1687
for line in range(num_lines))
1689
self.assertEqual(expected_output, buffer.getvalue())
1666
expected_output = "\n".join("".join(rows[line]
1667
for rows in columns)
1668
for line in range(num_lines))
1669
self.assertEqual(expected_output, output)
1691
1671
def test_PrintTable_one_client(self):
1692
with self.capture_stdout_to_buffer() as buffer:
1693
command.PrintTable().run(self.one_client)
1672
output = command.PrintTable().output(self.one_client.values())
1694
1673
expected_output = "\n".join((
1695
1674
"Name Enabled Timeout Last Successful Check",
1696
1675
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1698
self.assertEqual(expected_output, buffer.getvalue())
1701
class TestPropertySetterCmd(TestCommand):
1702
"""Abstract class for tests of command.PropertySetter classes"""
1677
self.assertEqual(expected_output, output)
1680
class TestPropertyCmd(TestCommand):
1681
"""Abstract class for tests of command.Property classes"""
1703
1682
def runTest(self):
1704
1683
if not hasattr(self, "command"):
1726
1705
self.command().run(clients, self.bus)
1729
class TestEnableCmd(TestPropertySetterCmd):
1708
class TestEnableCmd(TestPropertyCmd):
1730
1709
command = command.Enable
1731
1710
propname = "Enabled"
1732
1711
values_to_set = [dbus.Boolean(True)]
1735
class TestDisableCmd(TestPropertySetterCmd):
1714
class TestDisableCmd(TestPropertyCmd):
1736
1715
command = command.Disable
1737
1716
propname = "Enabled"
1738
1717
values_to_set = [dbus.Boolean(False)]
1741
class TestBumpTimeoutCmd(TestPropertySetterCmd):
1720
class TestBumpTimeoutCmd(TestPropertyCmd):
1742
1721
command = command.BumpTimeout
1743
1722
propname = "LastCheckedOK"
1744
1723
values_to_set = [""]
1747
class TestStartCheckerCmd(TestPropertySetterCmd):
1726
class TestStartCheckerCmd(TestPropertyCmd):
1748
1727
command = command.StartChecker
1749
1728
propname = "CheckerRunning"
1750
1729
values_to_set = [dbus.Boolean(True)]
1753
class TestStopCheckerCmd(TestPropertySetterCmd):
1732
class TestStopCheckerCmd(TestPropertyCmd):
1754
1733
command = command.StopChecker
1755
1734
propname = "CheckerRunning"
1756
1735
values_to_set = [dbus.Boolean(False)]
1759
class TestApproveByDefaultCmd(TestPropertySetterCmd):
1738
class TestApproveByDefaultCmd(TestPropertyCmd):
1760
1739
command = command.ApproveByDefault
1761
1740
propname = "ApprovedByDefault"
1762
1741
values_to_set = [dbus.Boolean(True)]
1765
class TestDenyByDefaultCmd(TestPropertySetterCmd):
1744
class TestDenyByDefaultCmd(TestPropertyCmd):
1766
1745
command = command.DenyByDefault
1767
1746
propname = "ApprovedByDefault"
1768
1747
values_to_set = [dbus.Boolean(False)]
1771
class TestPropertySetterValueCmd(TestPropertySetterCmd):
1772
"""Abstract class for tests of PropertySetterValueCmd classes"""
1750
class TestPropertyValueCmd(TestPropertyCmd):
1751
"""Abstract class for tests of PropertyValueCmd classes"""
1774
1753
def runTest(self):
1775
if type(self) is TestPropertySetterValueCmd:
1754
if type(self) is TestPropertyValueCmd:
1777
return super(TestPropertySetterValueCmd, self).runTest()
1756
return super(TestPropertyValueCmd, self).runTest()
1779
1758
def run_command(self, value, clients):
1780
1759
self.command(value).run(clients, self.bus)
1783
class TestSetCheckerCmd(TestPropertySetterValueCmd):
1762
class TestSetCheckerCmd(TestPropertyValueCmd):
1784
1763
command = command.SetChecker
1785
1764
propname = "Checker"
1786
1765
values_to_set = ["", ":", "fping -q -- %s"]
1789
class TestSetHostCmd(TestPropertySetterValueCmd):
1768
class TestSetHostCmd(TestPropertyValueCmd):
1790
1769
command = command.SetHost
1791
1770
propname = "Host"
1792
values_to_set = ["192.0.2.3", "client.example.org"]
1795
class TestSetSecretCmd(TestPropertySetterValueCmd):
1771
values_to_set = ["192.0.2.3", "foo.example.org"]
1774
class TestSetSecretCmd(TestPropertyValueCmd):
1796
1775
command = command.SetSecret
1797
1776
propname = "Secret"
1798
1777
values_to_set = [io.BytesIO(b""),
1799
1778
io.BytesIO(b"secret\0xyzzy\nbar")]
1800
values_to_get = [f.getvalue() for f in values_to_set]
1803
class TestSetTimeoutCmd(TestPropertySetterValueCmd):
1779
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1782
class TestSetTimeoutCmd(TestPropertyValueCmd):
1804
1783
command = command.SetTimeout
1805
1784
propname = "Timeout"
1806
1785
values_to_set = [datetime.timedelta(),