Pytanie Jak korzystać z wieloprocesowości z instancjami klasy w Pythonie?


Próbuję utworzyć klasę, niż można uruchomić oddzielny proces, aby wykonać jakąś pracę, która zajmuje dużo czasu, uruchomić kilka z nich z głównego modułu, a następnie poczekać, aż wszystkie się zakończą. Chcę uruchomić procesy raz, a następnie karmić je rzeczami do zrobienia, zamiast tworzyć i niszcząc procesy. Na przykład, może mam 10 serwerów z komendą dd, a potem chcę je wszystkie scp pliku itp.

Moim ostatecznym celem jest stworzenie klasy dla każdego systemu, który śledzi informacje dla systemu, w którym jest powiązany z adresem IP, logami, środowiskiem uruchomieniowym itp. Ale ta klasa musi być w stanie uruchomić polecenie systemowe, a następnie powrócić wykonanie z powrotem do wywołującego, podczas gdy polecenie systemowe jest uruchamiane, aby później sprawdzić wynik polecenia systemowego.

Moja próba się nie udała, ponieważ nie mogę wysłać metody instancji klasy przez potok do podprocesu przez pikle. Te są nie do wybrania. Dlatego starałem się to naprawić na różne sposoby, ale nie mogę tego rozgryźć. W jaki sposób mogę załatać mój kod, aby to zrobić? Co to jest wieloprocesorowość, jeśli nie możesz przesłać niczego użytecznego?

Czy istnieje jakaś dobra dokumentacja z wykorzystaniem procesu wieloprocesowego w instancjach klasy? Jedynym sposobem, w jaki mogę uruchomić moduł wieloprocesowy, są proste funkcje. Każda próba użycia go w instancji klasy nie powiodła się. Może zamiast tego powinienem przekazywać wydarzenia? Nie rozumiem, jak to jeszcze zrobić.

import multiprocessing
import sys
import re

class ProcessWorker(multiprocessing.Process):
    """
    This class runs as a separate process to execute worker's commands in parallel
    Once launched, it remains running, monitoring the task queue, until "None" is sent
    """

    def __init__(self, task_q, result_q):
        multiprocessing.Process.__init__(self)
        self.task_q = task_q
        self.result_q = result_q
        return

    def run(self):
        """
        Overloaded function provided by multiprocessing.Process.  Called upon start() signal
        """
        proc_name = self.name
        print '%s: Launched' % (proc_name)
        while True:
            next_task_list = self.task_q.get()
            if next_task is None:
                # Poison pill means shutdown
                print '%s: Exiting' % (proc_name)
                self.task_q.task_done()
                break
            next_task = next_task_list[0]
            print '%s: %s' % (proc_name, next_task)
            args = next_task_list[1]
            kwargs = next_task_list[2]
            answer = next_task(*args, **kwargs)
            self.task_q.task_done()
            self.result_q.put(answer)
        return
# End of ProcessWorker class

class Worker(object):
    """
    Launches a child process to run commands from derived classes in separate processes,
    which sit and listen for something to do
    This base class is called by each derived worker
    """
    def __init__(self, config, index=None):
        self.config = config
        self.index = index

        # Launce the ProcessWorker for anything that has an index value
        if self.index is not None:
            self.task_q = multiprocessing.JoinableQueue()
            self.result_q = multiprocessing.Queue()

            self.process_worker = ProcessWorker(self.task_q, self.result_q)
            self.process_worker.start()
            print "Got here"
            # Process should be running and listening for functions to execute
        return

    def enqueue_process(target):  # No self, since it is a decorator
        """
        Used to place an command target from this class object into the task_q
        NOTE: Any function decorated with this must use fetch_results() to get the
        target task's result value
        """
        def wrapper(self, *args, **kwargs):
            self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled!
        return wrapper

    def fetch_results(self):
        """
        After all processes have been spawned by multiple modules, this command
        is called on each one to retreive the results of the call.
        This blocks until the execution of the item in the queue is complete
        """
        self.task_q.join()                          # Wait for it to to finish
        return self.result_q.get()                  # Return the result

    @enqueue_process
    def run_long_command(self, command):
        print "I am running number % as process "%number, self.name

        # In here, I will launch a subprocess to run a  long-running system command
        # p = Popen(command), etc
        # p.wait(), etc
        return 

    def close(self):
        self.task_q.put(None)
        self.task_q.join()

if __name__ == '__main__':
    config = ["some value", "something else"]
    index = 7
    workers = []
    for i in range(5):
        worker = Worker(config, index)
        worker.run_long_command("ls /")
        workers.append(worker)
    for worker in workers:
        worker.fetch_results()

    # Do more work... (this would actually be done in a distributor in another class)

    for worker in workers:
        worker.close() 

Edycja: próbowałem przenieść ProcessWorker klasa i tworzenie kolejek do przetwarzania wieloprocesowego na zewnątrz Worker klasa, a następnie próbował ręcznie podlać instancję pracującą. Nawet to nie działa i pojawia się błąd

RuntimeError: Obiekty kolejki powinny być udostępniane tylko między procesami   przez dziedziczenie

. Ale tylko przekazuję odniesienia do tych kolejek do instancji pracownika? Brakuje mi czegoś fundamentalnego. Oto zmodyfikowany kod z głównej sekcji:

if __name__ == '__main__':
    config = ["some value", "something else"]
    index = 7
    workers = []
    for i in range(1):
        task_q = multiprocessing.JoinableQueue()
        result_q = multiprocessing.Queue()
        process_worker = ProcessWorker(task_q, result_q)
        worker = Worker(config, index, process_worker, task_q, result_q)
        something_to_look_at = pickle.dumps(worker) # FAIL:  Doesn't like queues??
        process_worker.start()
        worker.run_long_command("ls /")

21
2018-01-05 07:10


pochodzenie


Widziałeś dispy? To może uratować ból głowy lub dwa :) - Alex L
Nie mogłem znaleźć żadnych przykładów dispy, które używane klasy. Wszystko wydaje się odbiegać od Główny i nie jest tak, jak zamierzam go użyć. Moje przykłady z wykorzystaniem wieloprocesowości. Przetwarzanie w porządku Główny ale kończy się niepowodzeniem, gdy próbuję użyć klas i metod ze stanem - David Lynch
Wiem, że jest późno, ale jeśli użyjesz widelca multiprocessing nazywa pathos.multiprocessingmożna łatwo pobierać przykłady klas. Jeśli potrzebujesz dink z Queue obiekty i co tam, następnie można uzyskać dostęp do rozszerzonego rozwidlone Queues przez importowanie from processing import Queue. pathos.multiprocessing używa dill, który robi serializować i wysyłać definicje klas wraz z instancjami. - Mike McKerns
Możesz także użyć dill i pathos.multiprocessing wysłać metodę klasy (powiązaną lub niezwiązaną). - Mike McKerns


Odpowiedzi:


Zamiast próbować wysłać samą metodę (co jest niepraktyczne), spróbuj wysłać wiadomość Nazwa metody do wykonania.

Pod warunkiem, że każdy pracownik uruchamia ten sam kod, jest to kwestia prosta getattr(self, task_name).

Zdałbym krotki (task_name, task_args), gdzie task_args były dyktowane bezpośrednio do metody zadania:

next_task_name, next_task_args = self.task_q.get()
if next_task_name:
  task = getattr(self, next_task_name)
  answer = task(**next_task_args)
  ...
else:
  # poison pill, shut down
  break

8
2018-01-05 07:44



To nie działa ... Dostaję błąd "Obiekt AttributeError:" ProcessWorker "nie ma atrybutu" run_long_command "". Nie spodziewałbym się, że to zadziała, ponieważ ProcessWorker nie ma żadnej z metod istniejących w klasie Worker. Chcę wysłać metodę przez potok (z informacją o stanie), aby zdalny proces mógł wykorzystać wszystkie te informacje o stanie. Naprawdę nie widzę sensu modułu wieloprocesowego, jeśli wszystko, co zrobi, to uruchomić bezpaństwową funkcję po drugiej stronie. - David Lynch
Przykro mi, ale muszę powtórzyć. ty nie może wyślij metodę przez potok. Dlatego pickle narzeka na to. Wysyłanie kodu wykonywalnego nie jest niemożliwe, ale dostaje dużo bardziej zaangażowane niż deserializacja obiektu kodu. Powinieneś wcześniej zaimplementować metody, które chcesz uruchomić w klasie Worker. Jeśli potrzebujesz wysłać kod, którego nie znasz z wyprzedzeniem, najlepszym rozwiązaniem jest wysłanie źródła Pythona jako napisu, a następnie wywołanie compile i eval na tym. Jeśli chcesz wysłać metodę ze stanem, umieść cały stan w argumentach metody lub użyj udostępnionej bazy danych. - 9000
WRT działa bezstanowe metody: masz rury, które mogą trzymać państwo. Rozdziel swój początkowy stan na kilka procesów, a następnie odbierz wyniki. Jeśli chcesz mieć stan wysoce współdzielony (np. Geometria do śledzenia promieni), korzystasz z bazy danych (w pamięci), od memcached do zwykłego RDBMS. Korzystanie z globalnego zmienny Stan jest zazwyczaj wystarczająco zły. Jeśli musisz, użyj procesu arbitra, który odczytuje z potoków i rozwiązuje konflikty (np. Bazy danych). - 9000
Kiedy uruchamiam proces potomny, czy jest to kopia procesu nadrzędnego? Nie potrzebuję kompilować nieznanego kodu, po prostu muszę wymyślić, jak przywołać kopię istniejącą w nowym procesie. Nie sądziłem, że mogę mieć własne funkcje w klasie ProcessWorker, oparte na znalezionych przeze mnie przykładach online. - David Lynch
Dzięki 9000 za pomoc w tej sprawie. Twoje odpowiedzi zdecydowanie wysłały mi właściwą ścieżkę, aby pomóc mi rozwiązać ten problem! - David Lynch


Problem polegał na tym, że zakładałem, że Python robi jakąś magię, która różni się nieco od sposobu działania C ++ / fork (). Myślałem, że Python skopiował klasę, a nie cały program w oddzielny proces. Poważnie zmarnowałem kilka dni, próbując zmusić to do działania, ponieważ cała rozmowa na temat serializacji pikla sprawiła, że ​​pomyślałem, że to wszystko wysłało wszystko przez rurę. Wiedziałem, że pewnych rzeczy nie można wysłać przez rurę, ale myślałem, że moim problemem jest to, że nie pakowałem właściwie.

Tego wszystkiego można by było uniknąć, gdyby dokumentacja Pythona dała mi widok 10.000 stóp, co dzieje się, gdy ten moduł jest używany. Jasne, mówi mi, co robią metody modułu wieloprocesowego i podaje mi kilka podstawowych przykładów, ale chcę wiedzieć, czym jest "Teoria działania" za kulisami! Oto rodzaj informacji, które mogłem wykorzystać. Zadzwoń, jeśli moja odpowiedź jest wyłączona. Pomoże mi się uczyć.

Po uruchomieniu procesu uruchamiającego ten moduł cały program jest kopiowany do innego procesu. Ale skoro to nie jest "__main__"proces i mój kod sprawdzał to, nie uruchamia on jeszcze jednego procesu w nieskończoność, po prostu zatrzymuje się i siada tam czekając na coś do zrobienia, jak zombie. Wszystko, co zostało zainicjalizowane u rodzica w momencie wywoływania multiprocess.Process () jest skonfigurowany i gotowy do pracy Po umieszczeniu czegoś w multiprocess.Queue lub pamięci współdzielonej, potoku itp. (jednak komunikujesz się), oddzielny proces otrzymuje go i zaczyna działać. Może pobierać wszystkie importowane moduły i konfigurować tak, jakby był rodzicem, jednak gdy niektóre wewnętrzne zmienne stanu ulegną zmianie w procesie rodzica lub osobnym, te zmiany są izolowane. Gdy proces zostanie zainicjowany, teraz staje się Twoim zadaniem utrzymanie je zsynchronizować, jeśli to konieczne, przez kolejkę, potok, pamięć współdzieloną itp.

Wyrzuciłem kod i zacząłem od nowa, ale teraz kładę tylko jedną dodatkową funkcję w ProcessWorker, metoda "uruchom", która uruchamia wiersz polecenia. Dość proste. Nie muszę się martwić o uruchamianie, a następnie zamykanie wielu procesów w ten sposób, co spowodowało u mnie wszelkie problemy niestabilności i wydajności w przeszłości w C ++. Kiedy przestawiłem się na uruchamianie procesów na początku, a następnie przekazywanie wiadomości do tych oczekujących procesów, moja wydajność uległa poprawie i była bardzo stabilna.

Przy okazji, spojrzałem na to łącze, by uzyskać pomoc, co mnie wyrzuciło, ponieważ ten przykład sprawił, że pomyślałem, że metody były transportowane przez kolejki: http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html W drugim przykładzie pierwszej sekcji użyto "next_task ()", która pojawiła się (dla mnie), aby wykonać zadanie odebrane przez kolejkę.


20
2018-01-06 06:03



Jak wspomniano w moim komentarzu do twojego pytania, jeśli chcesz wyłowić instancję klasy bez obaw o zależności tak bardzo ... powinieneś użyć dill, które mogą zarówno dopełniać definicję klasy z instancją klasy, lub pickle kod źródłowy i zależności dla większości obiektów, w tym klas zdefiniowanych przez użytkownika. Widelec multiprocessing (wspomniane w komentarzu do pytania) używa dill do serializacji ... w ten sposób unikając większości problemów, które opisujesz. - Mike McKerns


REF: https://stackoverflow.com/a/14179779

Odpowiedź 6 stycznia o 6:03 napisana przez Davida Lyncha nie jest prawdą, gdy mówi, że został zwiedziony przez http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html.

Podany kod i przykłady są poprawne i działają zgodnie z ogłoszeniem. next_task()  jest wykonywanie zadania otrzymanego za pośrednictwem kolejki - spróbuj i zrozum, co Task.__call__() metoda robi.

W moim przypadku potknęło mnie błędy składniowe w mojej implementacji run(). Wygląda na to, że pod-proces nie zgłosi tego i po prostu nie powiedzie się po cichu - zostawiając rzeczy w dziwnych pętlach! Upewnij się, że masz jakiś mechanizm sprawdzania składni np. Flymake / Pyflakes w Emacs.

Debugowanie za pośrednictwem multiprocessing.log_to_stderr()F pomógł mi zawęzić problem.


0
2017-11-28 04:06