Skip to content

Beenden eines Python-Multiprozessorprogramms, sobald einer seiner Arbeiter eine bestimmte Bedingung erfüllt

Nach dem Durchsuchen verschiedener Repositories und Websites haben wir am Ende die Lösung entdeckt, die wir Ihnen jetzt zeigen.

Lösung:

Kein Prozess kann einen anderen mit brachialer Gewalt aufhalten os.kill()-wie Vorschlaghämmern. Lass es bleiben.

Um dies vernünftig zu tun, müssen Sie Ihren grundlegenden Ansatz überarbeiten: der Hauptprozess und die Arbeitsprozesse müssen miteinander kommunizieren.

Ich würde es ausarbeiten, aber das bisherige Beispiel ist zu dürftig, um es nützlich zu machen. Zum Beispiel, wie geschrieben, nicht mehr als num_workers Aufrufe an rand() gemacht, also gibt es keinen Grund zu der Annahme, dass einer davon > 0,7 sein muss.

Sobald die Worker-Funktion eine Schleife anlegt, wird es deutlicher. Der Worker könnte zum Beispiel prüfen, ob ein mp.Event am Anfang der Schleife gesetzt ist, und wenn dies der Fall ist, wird die Schleife einfach beendet. Der Hauptprozess würde das Event setzen, wenn er möchte, dass die Arbeiter aufhören.

Und ein Arbeiter könnte ein anderes mp.Event setzen, wenn er einen Wert > 0,7 gefunden hat. Der Hauptprozess würde auf diesen Wert warten. Eventund setzt dann die "Zeit bis zum Anhalten" Event für die Arbeiter zu sehen, dann führen Sie die übliche Schleife .join()-Werker für ein sauberes Herunterfahren.

EDIT

Hier ist die Ausarbeitung einer portablen, sauberen Lösung, unter der Annahme, dass die Arbeiter so lange weiterarbeiten, bis mindestens einer einen Wert > 0,7 findet. Beachten Sie, dass ich entfernt habe numpy entfernt habe, weil es für diesen Code irrelevant ist. Der Code hier sollte unter jedem Standard-Python auf jeder Plattform funktionieren, die multiprocessing:

import random
from time import sleep

def worker(i, quit, foundit):
    print "%d started" % i
    while not quit.is_set():
        x = random.random()
        if x > 0.7:
            print '%d found %g' % (i, x)
            foundit.set()
            break
        sleep(0.1)
    print "%d is done" % i

if __name__ == "__main__":
    import multiprocessing as mp
    quit = mp.Event()
    foundit = mp.Event()
    for i in range(mp.cpu_count()):
        p = mp.Process(target=worker, args=(i, quit, foundit))
        p.start()
    foundit.wait()
    quit.set()

Und ein Beispiel für die Ausgabe:

0 started
1 started
2 started
2 found 0.922803
2 is done
3 started
3 is done
4 started
4 is done
5 started
5 is done
6 started
6 is done
7 started
7 is done
0 is done
1 is done

Alles wird sauber beendet: keine Rückverfolgungen, keine abnormalen Beendigungen, keine zurückgelassenen Zombie-Prozesse ... blitzsauber.

IT TÖTEN

Wie @noxdafox anmerkte, gibt es eine Pool.terminate() Methode, die plattformübergreifend ihr Bestes gibt, um Worker-Prozesse zu beenden, egal was sie gerade tun (z.B. ruft sie unter Windows die Plattform TerminateProcess()). Ich empfehle es nicht für Produktionscode, weil das abrupte Beenden eines Prozesses verschiedene gemeinsam genutzte Ressourcen in einem inkonsistenten Zustand lassen oder sie auslaufen lassen kann. Es gibt verschiedene Warnungen darüber in der multiprocessing Dokumentation, die Sie um Ihre Betriebssystem-Dokumente ergänzen sollten.

Trotzdem kann es sinnvoll sein! Hier ist ein vollständiges Programm mit diesem Ansatz. Beachten Sie, dass ich den Cutoff-Wert auf 0,95 erhöht habe, damit die Wahrscheinlichkeit, dass es länger als einen Wimpernschlag dauert, größer ist:

import random
from time import sleep

def worker(i):
    print "%d started" % i
    while True:
        x = random.random()
        print '%d found %g' % (i, x)
        if x > 0.95:
            return x # triggers callback
        sleep(0.5)

# callback running only in __main__
def quit(arg):
    print "quitting with %g" % arg
    # note: p is visible because it's global in __main__
    p.terminate()  # kill all pool workers

if __name__ == "__main__":
    import multiprocessing as mp
    ncpu = mp.cpu_count()
    p = mp.Pool(ncpu)
    for i in range(ncpu):
        p.apply_async(worker, args=(i,), callback=quit)
    p.close()
    p.join()

Und ein Beispiel für die Ausgabe:

$ python mptest.py
0 started
0 found 0.391351
1 started
1 found 0.767374
2 started
2 found 0.110969
3 started
3 found 0.611442
4 started
4 found 0.790782
5 started
5 found 0.554611
6 started
6 found 0.0483844
7 started
7 found 0.862496
0 found 0.27175
1 found 0.0398836
2 found 0.884015
3 found 0.988702
quitting with 0.988702
4 found 0.909178
5 found 0.336805
6 found 0.961192
7 found 0.912875
$ [the program ended]

Es gibt einen viel saubereren und pythonischen Weg, um das zu tun, was Sie tun wollen, und zwar durch die Verwendung der Callback-Funktionen, die von multiprocessing.Pool.

Sie können diese Frage überprüfen, um ein Beispiel für die Implementierung zu sehen.

Wie einer der anderen Benutzer erwähnte, müssen die Prozesse miteinander kommunizieren, um sie dazu zu bringen, ihre Kollegen zu beenden. Sie können zwar os.kill verwenden, um die Peer-Prozesse zu beenden, aber es ist anständiger, eine Beendigung zu signalisieren.

Die von mir verwendete Lösung ist ziemlich einfach:
1. Finden Sie die Prozess-ID (pid) des Hauptprozesses heraus, der alle anderen Arbeitsprozesse erzeugt. Diese Verbindungsinformationen sind vom Betriebssystem verfügbar, das festhält, welcher Kindprozess von welchem Elternprozess erzeugt wurde.
2. Wenn einer der Arbeitsprozesse Ihre Endbedingung erreicht, verwendet er die ID des übergeordneten Prozesses, um alle Kindprozesse des Hauptprozesses (einschließlich seiner selbst) zu finden, geht dann die Liste durch und signalisiert ihnen das Ende (wobei er sicherstellt, dass er sich nicht selbst signalisiert).
Der folgende Code enthält die funktionierende Lösung.

import time
import numpy as np
import multiprocessing as mp
import time
import sys
import os
import psutil
import signal

pid_array = []

def f(i):
    np.random.seed(int(time.time()+i))

    time.sleep(3)
    res=np.random.rand()
    current_process = os.getpid()
    print "From i = ",i, "       res = ",res, " with process ID (pid) = ", current_process
    if res>0.7:
        print "find it"
        # solution: use the parent child connection between processes
        parent = psutil.Process(main_process)
        children = parent.children(recursive=True)
        for process in children:
            if not (process.pid == current_process):
                print "Process: ",current_process,  " killed process: ", process.pid
                process.send_signal(signal.SIGTERM)

if __name__=='__main__':
    num_workers=mp.cpu_count()
    pool=mp.Pool(num_workers)
    main_process = os.getpid()
    print "Main process: ", main_process
    for i in range(num_workers):
        p=mp.Process(target=f,args=(i,))
        p.start()

Die Ausgabe gibt eine klare Vorstellung davon, was passiert:

Main process:  30249
From i =  0        res =  0.224609517693  with process ID (pid) =  30259
From i =  1        res =  0.470935062176  with process ID (pid) =  30260
From i =  2        res =  0.493680214732  with process ID (pid) =  30261
From i =  3        res =  0.342349294134  with process ID (pid) =  30262
From i =  4        res =  0.149124648092  with process ID (pid) =  30263
From i =  5        res =  0.0134122107375  with process ID (pid) =  30264
From i =  6        res =  0.719062852901  with process ID (pid) =  30265
find it
From i =  7        res =  0.663682945388  with process ID (pid) =  30266
Process:  30265  killed process:  30259
Process:  30265  killed process:  30260
Process:  30265  killed process:  30261
Process:  30265  killed process:  30262
Process:  30265  killed process:  30263
Process:  30265  killed process:  30264
Process:  30265  killed process:  30266

Kommentare und Bewertungen des Leitfadens

Sie haben die Möglichkeit, unsere Besetzung zu bestätigen, indem Sie einen Kommentar abgeben und eine Bewertung hinterlassen, wir begrüßen Sie.



Nutzen Sie unsere Suchmaschine

Suche
Generic filters

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht.