/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos

  • Committer: Teddy Hogeborn
  • Date: 2015-05-23 20:18:34 UTC
  • mto: This revision was merged to the branch mainline in revision 756.
  • Revision ID: teddy@recompile.se-20150523201834-e89ex4ito93yni8x
mandos: Use multiprocessing module to run checkers.

For a long time, the Mandos server has occasionally logged the message
"ERROR: Child process vanished".  This was never a fatal error, but it
has been annoying and slightly worrying, since a definite cause was
not found.  One potential cause could be the "multiprocessing" and
"subprocess" modules conflicting w.r.t. SIGCHLD.  To avoid this,
change the running of checkers from using subprocess.Popen
asynchronously to instead first create a multiprocessing.Process()
(which is asynchronous) calling a function, and have that function
then call subprocess.call() (which is synchronous).  In this way, the
only thing using any asynchronous subprocesses is the multiprocessing
module.

This makes it necessary to change one small thing in the D-Bus API,
since the subprocesses.call() function does not expose the raw wait(2)
status value.

DBUS-API (CheckerCompleted): Change the second value provided by this
                             D-Bus signal from the raw wait(2) status
                             to the actual terminating signal number.
mandos (subprocess_call_pipe): New function to be called by
                               multiprocessing.Process (starting a
                               separate process).
(Client.last_checker signal): New attribute for signal which
                              terminated last checker.  Like
                              last_checker_status, only not accessible
                              via D-Bus.
(Client.checker_callback): Take new "connection" argument and use it
                           to get returncode; set last_checker_signal.
                           Return False so gobject does not call this
                           callback again.
(Client.start_checker): Start checker using a multiprocessing.Process
                        instead of a subprocess.Popen.
(ClientDBus.checker_callback): Take new "connection" argument.        Call
                               Client.checker_callback early to have
                               it set last_checker_status and
                               last_checker_signal; use those.  Change
                               second value provided to D-Bus signal
                               CheckerCompleted to use
                               last_checker_signal if checker was
                               terminated by signal.
mandos-monitor: Update to reflect DBus API change.
(MandosClientWidget.checker_completed): Take "signal" instead of
                                        "condition" argument.  Use it
                                        accordingly.  Remove dead code
                                        (os.WCOREDUMP case).

Show diffs side-by-side

added added

removed removed

Lines of Context:
78
78
import tempfile
79
79
import itertools
80
80
import collections
81
 
import codecs
82
81
 
83
82
import dbus
84
83
import dbus.service
395
394
                    logger.error(bad_states[state] + ": %r", error)
396
395
            self.cleanup()
397
396
        elif state == avahi.SERVER_RUNNING:
398
 
            try:
399
 
                self.add()
400
 
            except dbus.exceptions.DBusException as error:
401
 
                if (error.get_dbus_name()
402
 
                    == "org.freedesktop.Avahi.CollisionError"):
403
 
                    logger.info("Local Zeroconf service name"
404
 
                                " collision.")
405
 
                    return self.rename(remove=False)
406
 
                else:
407
 
                    logger.critical("D-Bus Exception", exc_info=error)
408
 
                    self.cleanup()
409
 
                    os._exit(1)
 
397
            self.add()
410
398
        else:
411
399
            if error is None:
412
400
                logger.debug("Unknown state: %r", state)
435
423
            .format(self.name)))
436
424
        return ret
437
425
 
438
 
def call_pipe(connection,       # : multiprocessing.Connection
439
 
              func, *args, **kwargs):
 
426
def subprocess_call_pipe(connection, # : multiprocessing.Connection
 
427
                         *args, **kwargs):
440
428
    """This function is meant to be called by multiprocessing.Process
441
429
    
442
 
    This function runs func(*args, **kwargs), and writes the resulting
443
 
    return value on the provided multiprocessing.Connection.
 
430
    This function runs a synchronous subprocess.call(), and writes the
 
431
    resulting return code on the provided multiprocessing.Connection.
444
432
    """
445
 
    connection.send(func(*args, **kwargs))
 
433
    connection.send(subprocess.call(*args, **kwargs))
446
434
    connection.close()
447
435
 
448
436
class Client(object):
657
645
        # Also start a new checker *right now*.
658
646
        self.start_checker()
659
647
    
660
 
    def checker_callback(self, source, condition, connection,
661
 
                         command):
 
648
    def checker_callback(self, source, condition,
 
649
                         (connection, command)):
662
650
        """The checker has completed, so take appropriate actions."""
663
651
        self.checker_callback_tag = None
664
652
        self.checker = None
665
 
        # Read return code from connection (see call_pipe)
 
653
        # Read return code from connection (see subprocess_call_pipe)
666
654
        returncode = connection.recv()
667
655
        connection.close()
668
656
        
752
740
                and self.server_settings["foreground"]):
753
741
                popen_args.update({"stdout": wnull,
754
742
                                   "stderr": wnull })
755
 
            pipe = multiprocessing.Pipe(duplex = False)
 
743
            pipe = multiprocessing.Pipe(duplex=False)
756
744
            self.checker = multiprocessing.Process(
757
 
                target = call_pipe,
758
 
                args = (pipe[1], subprocess.call, command),
759
 
                kwargs = popen_args)
 
745
                target=subprocess_call_pipe, args=(pipe[1], command),
 
746
                kwargs=popen_args)
760
747
            self.checker.start()
761
748
            self.checker_callback_tag = gobject.io_add_watch(
762
749
                pipe[0].fileno(), gobject.IO_IN,
763
 
                self.checker_callback, pipe[0], command)
 
750
                self.checker_callback, (pipe[0], command))
764
751
        # Re-run this periodically if run by gobject.timeout_add
765
752
        return True
766
753
    
1110
1097
                interface_names.add(alt_interface)
1111
1098
                # Is this a D-Bus signal?
1112
1099
                if getattr(attribute, "_dbus_is_signal", False):
1113
 
                    if sys.version_info.major == 2:
1114
 
                        # Extract the original non-method undecorated
1115
 
                        # function by black magic
1116
 
                        nonmethod_func = (dict(
1117
 
                            zip(attribute.func_code.co_freevars,
1118
 
                                attribute.__closure__))
1119
 
                                          ["func"].cell_contents)
1120
 
                    else:
1121
 
                        nonmethod_func = attribute
 
1100
                    # Extract the original non-method undecorated
 
1101
                    # function by black magic
 
1102
                    nonmethod_func = (dict(
 
1103
                        zip(attribute.func_code.co_freevars,
 
1104
                            attribute.__closure__))
 
1105
                                      ["func"].cell_contents)
1122
1106
                    # Create a new, but exactly alike, function
1123
1107
                    # object, and decorate it to be a new D-Bus signal
1124
1108
                    # with the alternate D-Bus interface name
1125
 
                    if sys.version_info.major == 2:
1126
 
                        new_function = types.FunctionType(
1127
 
                            nonmethod_func.func_code,
1128
 
                            nonmethod_func.func_globals,
1129
 
                            nonmethod_func.func_name,
1130
 
                            nonmethod_func.func_defaults,
1131
 
                            nonmethod_func.func_closure)
1132
 
                    else:
1133
 
                        new_function = types.FunctionType(
1134
 
                            nonmethod_func.__code__,
1135
 
                            nonmethod_func.__globals__,
1136
 
                            nonmethod_func.__name__,
1137
 
                            nonmethod_func.__defaults__,
1138
 
                            nonmethod_func.__closure__)
1139
1109
                    new_function = (dbus.service.signal(
1140
 
                        alt_interface,
1141
 
                        attribute._dbus_signature)(new_function))
 
1110
                        alt_interface, attribute._dbus_signature)
 
1111
                                    (types.FunctionType(
 
1112
                                        nonmethod_func.func_code,
 
1113
                                        nonmethod_func.func_globals,
 
1114
                                        nonmethod_func.func_name,
 
1115
                                        nonmethod_func.func_defaults,
 
1116
                                        nonmethod_func.func_closure)))
1142
1117
                    # Copy annotations, if any
1143
1118
                    try:
1144
1119
                        new_function._dbus_annotations = dict(
1368
1343
        Client.__del__(self, *args, **kwargs)
1369
1344
    
1370
1345
    def checker_callback(self, source, condition,
1371
 
                         connection, command, *args, **kwargs):
 
1346
                         (connection, command), *args, **kwargs):
1372
1347
        ret = Client.checker_callback(self, source, condition,
1373
 
                                      connection, command, *args,
 
1348
                                      (connection, command), *args,
1374
1349
                                      **kwargs)
1375
1350
        exitstatus = self.last_checker_status
1376
1351
        if exitstatus >= 0:
1683
1658
        self._pipe = child_pipe
1684
1659
        self._pipe.send(('init', fpr, address))
1685
1660
        if not self._pipe.recv():
1686
 
            raise KeyError(fpr)
 
1661
            raise KeyError()
1687
1662
    
1688
1663
    def __getattribute__(self, name):
1689
1664
        if name == '_pipe':
2152
2127
        
2153
2128
        if command == 'getattr':
2154
2129
            attrname = request[1]
2155
 
            if isinstance(client_object.__getattribute__(attrname),
2156
 
                          collections.Callable):
 
2130
            if callable(client_object.__getattribute__(attrname)):
2157
2131
                parent_pipe.send(('function', ))
2158
2132
            else:
2159
2133
                parent_pipe.send((
2194
2168
    # avoid excessive use of external libraries.
2195
2169
    
2196
2170
    # New type for defining tokens, syntax, and semantics all-in-one
 
2171
    Token = collections.namedtuple("Token",
 
2172
                                   ("regexp", # To match token; if
 
2173
                                              # "value" is not None,
 
2174
                                              # must have a "group"
 
2175
                                              # containing digits
 
2176
                                    "value",  # datetime.timedelta or
 
2177
                                              # None
 
2178
                                    "followers")) # Tokens valid after
 
2179
                                                  # this token
2197
2180
    Token = collections.namedtuple("Token", (
2198
2181
        "regexp",  # To match token; if "value" is not None, must have
2199
2182
                   # a "group" containing digits
2234
2217
    # Define starting values
2235
2218
    value = datetime.timedelta() # Value so far
2236
2219
    found_token = None
2237
 
    followers = frozenset((token_duration, )) # Following valid tokens
 
2220
    followers = frozenset((token_duration,)) # Following valid tokens
2238
2221
    s = duration                # String left to parse
2239
2222
    # Loop until end token is found
2240
2223
    while found_token is not token_end:
2257
2240
                break
2258
2241
        else:
2259
2242
            # No currently valid tokens were found
2260
 
            raise ValueError("Invalid RFC 3339 duration: {!r}"
2261
 
                             .format(duration))
 
2243
            raise ValueError("Invalid RFC 3339 duration")
2262
2244
    # End token found
2263
2245
    return value
2264
2246
 
2397
2379
                        "debug": "False",
2398
2380
                        "priority":
2399
2381
                        "SECURE256:!CTYPE-X.509:+CTYPE-OPENPGP:!RSA"
2400
 
                        ":+SIGN-DSA-SHA256",
 
2382
                        ":+SIGN-RSA-SHA224:+SIGN-RSA-RMD160",
2401
2383
                        "servicename": "Mandos",
2402
2384
                        "use_dbus": "True",
2403
2385
                        "use_ipv6": "True",
2515
2497
            pidfilename = "/var/run/mandos.pid"
2516
2498
        pidfile = None
2517
2499
        try:
2518
 
            pidfile = codecs.open(pidfilename, "w", encoding="utf-8")
 
2500
            pidfile = open(pidfilename, "w")
2519
2501
        except IOError as e:
2520
2502
            logger.error("Could not open file %r", pidfilename,
2521
2503
                         exc_info=e)
2580
2562
            old_bus_name = dbus.service.BusName(
2581
2563
                "se.bsnet.fukt.Mandos", bus,
2582
2564
                do_not_queue=True)
2583
 
        except dbus.exceptions.DBusException as e:
 
2565
        except dbus.exceptions.NameExistsException as e:
2584
2566
            logger.error("Disabling D-Bus:", exc_info=e)
2585
2567
            use_dbus = False
2586
2568
            server_settings["use_dbus"] = False
2717
2699
    
2718
2700
    if not foreground:
2719
2701
        if pidfile is not None:
2720
 
            pid = os.getpid()
2721
2702
            try:
2722
2703
                with pidfile:
2723
 
                    print(pid, file=pidfile)
 
2704
                    pid = os.getpid()
 
2705
                    pidfile.write("{}\n".format(pid).encode("utf-8"))
2724
2706
            except IOError:
2725
2707
                logger.error("Could not write to file %r with PID %d",
2726
2708
                             pidfilename, pid)