/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos

  • Committer: Teddy Hogeborn
  • Date: 2015-05-23 20:18:34 UTC
  • mto: This revision was merged to the branch mainline in revision 756.
  • Revision ID: teddy@recompile.se-20150523201834-e89ex4ito93yni8x
mandos: Use multiprocessing module to run checkers.

For a long time, the Mandos server has occasionally logged the message
"ERROR: Child process vanished".  This was never a fatal error, but it
has been annoying and slightly worrying, since a definite cause was
not found.  One potential cause could be the "multiprocessing" and
"subprocess" modules conflicting w.r.t. SIGCHLD.  To avoid this,
change the running of checkers from using subprocess.Popen
asynchronously to instead first create a multiprocessing.Process()
(which is asynchronous) calling a function, and have that function
then call subprocess.call() (which is synchronous).  In this way, the
only thing using any asynchronous subprocesses is the multiprocessing
module.

This makes it necessary to change one small thing in the D-Bus API,
since the subprocesses.call() function does not expose the raw wait(2)
status value.

DBUS-API (CheckerCompleted): Change the second value provided by this
                             D-Bus signal from the raw wait(2) status
                             to the actual terminating signal number.
mandos (subprocess_call_pipe): New function to be called by
                               multiprocessing.Process (starting a
                               separate process).
(Client.last_checker signal): New attribute for signal which
                              terminated last checker.  Like
                              last_checker_status, only not accessible
                              via D-Bus.
(Client.checker_callback): Take new "connection" argument and use it
                           to get returncode; set last_checker_signal.
                           Return False so gobject does not call this
                           callback again.
(Client.start_checker): Start checker using a multiprocessing.Process
                        instead of a subprocess.Popen.
(ClientDBus.checker_callback): Take new "connection" argument.        Call
                               Client.checker_callback early to have
                               it set last_checker_status and
                               last_checker_signal; use those.  Change
                               second value provided to D-Bus signal
                               CheckerCompleted to use
                               last_checker_signal if checker was
                               terminated by signal.
mandos-monitor: Update to reflect DBus API change.
(MandosClientWidget.checker_completed): Take "signal" instead of
                                        "condition" argument.  Use it
                                        accordingly.  Remove dead code
                                        (os.WCOREDUMP case).

Show diffs side-by-side

added added

removed removed

Lines of Context:
423
423
            .format(self.name)))
424
424
        return ret
425
425
 
 
426
def subprocess_call_pipe(connection, # : multiprocessing.Connection
 
427
                         *args, **kwargs):
 
428
    """This function is meant to be called by multiprocessing.Process
 
429
    
 
430
    This function runs a synchronous subprocess.call(), and writes the
 
431
    resulting return code on the provided multiprocessing.Connection.
 
432
    """
 
433
    connection.send(subprocess.call(*args, **kwargs))
 
434
    connection.close()
426
435
 
427
436
class Client(object):
428
437
    """A representation of a client host served by this server.
455
464
    last_checker_status: integer between 0 and 255 reflecting exit
456
465
                         status of last checker. -1 reflects crashed
457
466
                         checker, -2 means no checker completed yet.
 
467
    last_checker_signal: The signal which killed the last checker, if
 
468
                         last_checker_status is -1
458
469
    last_enabled: datetime.datetime(); (UTC) or None
459
470
    name:       string; from the config file, used in log messages and
460
471
                        D-Bus identifiers
634
645
        # Also start a new checker *right now*.
635
646
        self.start_checker()
636
647
    
637
 
    def checker_callback(self, pid, condition, command):
 
648
    def checker_callback(self, source, condition,
 
649
                         (connection, command)):
638
650
        """The checker has completed, so take appropriate actions."""
639
651
        self.checker_callback_tag = None
640
652
        self.checker = None
641
 
        if os.WIFEXITED(condition):
642
 
            self.last_checker_status = os.WEXITSTATUS(condition)
 
653
        # Read return code from connection (see subprocess_call_pipe)
 
654
        returncode = connection.recv()
 
655
        connection.close()
 
656
        
 
657
        if returncode >= 0:
 
658
            self.last_checker_status = returncode
 
659
            self.last_checker_signal = None
643
660
            if self.last_checker_status == 0:
644
661
                logger.info("Checker for %(name)s succeeded",
645
662
                            vars(self))
648
665
                logger.info("Checker for %(name)s failed", vars(self))
649
666
        else:
650
667
            self.last_checker_status = -1
 
668
            self.last_checker_signal = -returncode
651
669
            logger.warning("Checker for %(name)s crashed?",
652
670
                           vars(self))
 
671
        return False
653
672
    
654
673
    def checked_ok(self):
655
674
        """Assert that the client has been seen, alive and well."""
656
675
        self.last_checked_ok = datetime.datetime.utcnow()
657
676
        self.last_checker_status = 0
 
677
        self.last_checker_signal = None
658
678
        self.bump_timeout()
659
679
    
660
680
    def bump_timeout(self, timeout=None):
686
706
        # than 'timeout' for the client to be disabled, which is as it
687
707
        # should be.
688
708
        
689
 
        # If a checker exists, make sure it is not a zombie
690
 
        try:
691
 
            pid, status = os.waitpid(self.checker.pid, os.WNOHANG)
692
 
        except AttributeError:
693
 
            pass
694
 
        except OSError as error:
695
 
            if error.errno != errno.ECHILD:
696
 
                raise
697
 
        else:
698
 
            if pid:
699
 
                logger.warning("Checker was a zombie")
700
 
                gobject.source_remove(self.checker_callback_tag)
701
 
                self.checker_callback(pid, status,
702
 
                                      self.current_checker_command)
 
709
        if self.checker is not None and not self.checker.is_alive():
 
710
            logger.warning("Checker was not alive; joining")
 
711
            self.checker.join()
 
712
            self.checker = None
703
713
        # Start a new checker if needed
704
714
        if self.checker is None:
705
715
            # Escape attributes for the shell
714
724
                             exc_info=error)
715
725
                return True     # Try again later
716
726
            self.current_checker_command = command
717
 
            try:
718
 
                logger.info("Starting checker %r for %s", command,
719
 
                            self.name)
720
 
                # We don't need to redirect stdout and stderr, since
721
 
                # in normal mode, that is already done by daemon(),
722
 
                # and in debug mode we don't want to.  (Stdin is
723
 
                # always replaced by /dev/null.)
724
 
                # The exception is when not debugging but nevertheless
725
 
                # running in the foreground; use the previously
726
 
                # created wnull.
727
 
                popen_args = {}
728
 
                if (not self.server_settings["debug"]
729
 
                    and self.server_settings["foreground"]):
730
 
                    popen_args.update({"stdout": wnull,
731
 
                                       "stderr": wnull })
732
 
                self.checker = subprocess.Popen(command,
733
 
                                                close_fds=True,
734
 
                                                shell=True,
735
 
                                                cwd="/",
736
 
                                                **popen_args)
737
 
            except OSError as error:
738
 
                logger.error("Failed to start subprocess",
739
 
                             exc_info=error)
740
 
                return True
741
 
            self.checker_callback_tag = gobject.child_watch_add(
742
 
                self.checker.pid, self.checker_callback, data=command)
743
 
            # The checker may have completed before the gobject
744
 
            # watch was added.  Check for this.
745
 
            try:
746
 
                pid, status = os.waitpid(self.checker.pid, os.WNOHANG)
747
 
            except OSError as error:
748
 
                if error.errno == errno.ECHILD:
749
 
                    # This should never happen
750
 
                    logger.error("Child process vanished",
751
 
                                 exc_info=error)
752
 
                    return True
753
 
                raise
754
 
            if pid:
755
 
                gobject.source_remove(self.checker_callback_tag)
756
 
                self.checker_callback(pid, status, command)
 
727
            logger.info("Starting checker %r for %s", command,
 
728
                        self.name)
 
729
            # We don't need to redirect stdout and stderr, since
 
730
            # in normal mode, that is already done by daemon(),
 
731
            # and in debug mode we don't want to.  (Stdin is
 
732
            # always replaced by /dev/null.)
 
733
            # The exception is when not debugging but nevertheless
 
734
            # running in the foreground; use the previously
 
735
            # created wnull.
 
736
            popen_args = { "close_fds": True,
 
737
                           "shell": True,
 
738
                           "cwd": "/" }
 
739
            if (not self.server_settings["debug"]
 
740
                and self.server_settings["foreground"]):
 
741
                popen_args.update({"stdout": wnull,
 
742
                                   "stderr": wnull })
 
743
            pipe = multiprocessing.Pipe(duplex=False)
 
744
            self.checker = multiprocessing.Process(
 
745
                target=subprocess_call_pipe, args=(pipe[1], command),
 
746
                kwargs=popen_args)
 
747
            self.checker.start()
 
748
            self.checker_callback_tag = gobject.io_add_watch(
 
749
                pipe[0].fileno(), gobject.IO_IN,
 
750
                self.checker_callback, (pipe[0], command))
757
751
        # Re-run this periodically if run by gobject.timeout_add
758
752
        return True
759
753
    
765
759
        if getattr(self, "checker", None) is None:
766
760
            return
767
761
        logger.debug("Stopping checker for %(name)s", vars(self))
768
 
        try:
769
 
            self.checker.terminate()
770
 
            #time.sleep(0.5)
771
 
            #if self.checker.poll() is None:
772
 
            #    self.checker.kill()
773
 
        except OSError as error:
774
 
            if error.errno != errno.ESRCH: # No such process
775
 
                raise
 
762
        self.checker.terminate()
776
763
        self.checker = None
777
764
 
778
765
 
1355
1342
            DBusObjectWithProperties.__del__(self, *args, **kwargs)
1356
1343
        Client.__del__(self, *args, **kwargs)
1357
1344
    
1358
 
    def checker_callback(self, pid, condition, command,
1359
 
                         *args, **kwargs):
1360
 
        self.checker_callback_tag = None
1361
 
        self.checker = None
1362
 
        if os.WIFEXITED(condition):
1363
 
            exitstatus = os.WEXITSTATUS(condition)
 
1345
    def checker_callback(self, source, condition,
 
1346
                         (connection, command), *args, **kwargs):
 
1347
        ret = Client.checker_callback(self, source, condition,
 
1348
                                      (connection, command), *args,
 
1349
                                      **kwargs)
 
1350
        exitstatus = self.last_checker_status
 
1351
        if exitstatus >= 0:
1364
1352
            # Emit D-Bus signal
1365
1353
            self.CheckerCompleted(dbus.Int16(exitstatus),
1366
 
                                  dbus.Int64(condition),
 
1354
                                  dbus.Int64(0),
1367
1355
                                  dbus.String(command))
1368
1356
        else:
1369
1357
            # Emit D-Bus signal
1370
1358
            self.CheckerCompleted(dbus.Int16(-1),
1371
 
                                  dbus.Int64(condition),
 
1359
                                  dbus.Int64(
 
1360
                                      self.last_checker_signal),
1372
1361
                                  dbus.String(command))
1373
 
        
1374
 
        return Client.checker_callback(self, pid, condition, command,
1375
 
                                       *args, **kwargs)
 
1362
        return ret
1376
1363
    
1377
1364
    def start_checker(self, *args, **kwargs):
1378
1365
        old_checker_pid = getattr(self.checker, "pid", None)
2654
2641
                    pass
2655
2642
            
2656
2643
            # Clients who has passed its expire date can still be
2657
 
            # enabled if its last checker was successful.  Clients
 
2644
            # enabled if its last checker was successful.  A Client
2658
2645
            # whose checker succeeded before we stored its state is
2659
2646
            # assumed to have successfully run all checkers during
2660
2647
            # downtime.