Forum: PC-Programmierung Laufzeit Python3 und Multiprocessing


von Markus W. (dl8mby)


Lesenswert?

Hallo Forum,

heute mal eine Frage an die Python-Experten unter Euch.

>./single_and_parallel_processing.py
Single CPU processing took 0.120847 seconds.
[6, 2, 8, 1, 4, 2, 4, 4, 2, 3, 4, 1, 4, 2, 3, 2, 4, 6, 8, 3, 1, 4, 5, 1, 
4, 2, 2, 0, 2, 3]
Parallel CPU processing took 25.538535 seconds.
[6, 2, 8, 1, 4, 2, 4, 4, 2, 3, 4, 1, 4, 2, 3, 2, 4, 6, 8, 3, 1, 4, 5, 1, 
4, 2, 2, 0, 2, 3]

Habe den u.g. Code der den Unterschied zwischen Single-CPU
und Multi-CPU Abarbeitung testen soll.

Habe mich nach den Beispielen von
  https://www.machinelearningplus.com/python/parallel-processing-python/
orientiert.

Wie kommt es nun, dass die Multiprocessing Verarbeitung länger
dauert, wie die Singleprocessing?

Was habe ich übersehen?

Danke für Eure Hilfe!

Markus


1
#!/usr/bin/python3
2
# https://www.machinelearningplus.com/python/parallel-processing-python/
3
4
import numpy as np
5
from time import time
6
from timeit import default_timer as timer
7
8
# Parallelizing using Pool.apply()
9
import multiprocessing as mp
10
11
12
# Function thar run in single or parallel mode
13
def howmany_within_range(row, minimum, maximum):
14
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
15
    count = 0
16
    for n in row:
17
        if minimum <= n <= maximum:
18
            count = count + 1
19
    return count
20
21
22
# Solution without Paralleization
23
def single_cpu(data):
24
    results = []
25
    results=[howmany_within_range(row, minimum=10, maximum=20) for row in data]
26
    return results
27
28
29
# Solution with Paralleization
30
def multi_cpu(cpus,data):
31
    results = []
32
    # Step 1: Init multiprocessing.Pool()
33
    pool = mp.Pool(cpus)  # use only 30 CPUs
34
    # Step 2: `pool.apply` the `howmany_within_range()`
35
    results = [pool.apply(howmany_within_range, args=(row, 10, 20)) for row in data]
36
    # Step 3: Don't forget to close
37
    pool.close()
38
    return results
39
40
# Main function with reporting of both methods    
41
def main():
42
    cpus = mp.cpu_count() -2  # CPUs to use for parallel calculation
43
44
    # create data array for function howmany_within_range()
45
    np.random.RandomState(100)
46
    arr = np.random.randint(0, 100, size=[100000, cpus])
47
    data = arr.tolist()
48
49
    sresults = []  # singel cpu results
50
    presults = []  # parallel cpu results
51
    # after calculation should be the dame content => sresults = presults !!!
52
  
53
    start = timer()
54
    sresults=single_cpu(data)
55
    cpu_time = timer() - start
56
    print("Single CPU processing took %f seconds." % cpu_time)
57
    print(sresults[:cpus])
58
59
    start = timer()
60
    presults=multi_cpu(cpus,data)
61
    cpu_time = timer() - start
62
    print("Parallel CPU processing took %f seconds." % cpu_time)
63
    print(presults[:cpus])
64
    return
65
66
if __name__ == "__main__":
67
  main()

von Tom K. (ez81)


Lesenswert?

1
    arr = np.random.randint(0, 100, size=[100000, cpus])
2
    data = arr.tolist()
3
    print("len data =", len(data)) # !!!!!!!

Was passiert dann bei
1
    results = [pool.apply(howmany_within_range, args=(row, 10, 20)) for row in data]
?

von Tilo R. (joey5337) Benutzerseite


Lesenswert?

Ich bin jetzt kein Python-Experte, hatte mich vor längerer Zeit aber mal 
mit dem Multithreading-Modell von Python beschäftigt.

Die grundsätzliche Idee damals war, dass Python OS-seitig keine Threads 
braucht sondern das selber macht.
Das GIL (Global Interpreter Lock) verriegelt die Python-Threads. Es gibt 
immer nur einen Thread, der tatsächlich CPU-Zeit abbekommt.

Wenn du irgendwas IO-lastiges machst hilft Multithreading natürlich.
Wenn dein Problem aber CPU-constrained ist, kann man mit diesem Modell 
keinen Blumentopf gewinnen. Obendrauf kommt der zusätzliche 
Verwaltungs-Overhead, der vermutlich in deinem Fall den Unterschied 
macht.

von Tilo R. (joey5337) Benutzerseite


Lesenswert?

Das klingt jetzt alles nicht so toll, bringt aber auch Vorteile.

Weil Python nicht gleichzeitig mehrere Threads ausführt garantiert dir 
die Sprache, dass Operationen auf Arrays, Hashes und Sets atomar sind.

von Tom K. (ez81)


Lesenswert?

Multiprocessing startet mehrere Prozesse (auch wenn das bei diesem 
Problem nicht wie erhofft hilft).

"The multiprocessing package offers both local and remote concurrency, 
effectively side-stepping the Global Interpreter Lock by using 
subprocesses instead of threads. Due to this, the multiprocessing module 
allows the programmer to fully leverage multiple processors on a given 
machine."

Dass der Beispiel-Autor 100k Prozesse für jeweils 30 Zahlen startet 
statt umgekehrt, ist nicht die Schuld von Python.

von Markus W. (dl8mby)


Lesenswert?

@All,

danke für die Hinweise. Aber leider helfen die noch nicht
mein Problem zu lösen.

Wie Ihr dem angegebenen Link
https://www.machinelearningplus.com/python/parallel-processing-python/
entnehmen könnt ist die Funktionalität des
synchronen Multiprocessing mittels
1
    # Step 1: Init multiprocessing.Pool()
2
    pool = mp.Pool(cpus)  # use only 30 CPUs
3
    # Step 2: `pool.apply` the `howmany_within_range()`
4
    results = [pool.apply(howmany_within_range, args=(row, 10, 20)) for row in data]
5
    # Step 3: Don't forget to close
6
    pool.close()

gegeben.

Es werden tatsächlich, in meinem Fall, gleichzeitig 30
Processe gestartet (via htop geprüft!).


arr = np.random.randint(0, 100, size=[100000, cpus])

Die o.g. Zeile erzeugt ein Array mit 100000 x 30 Zufalswerten
im Bereich 0-100.

Ich habe aber rausgefunden, das es besser ist size=[cpus, 100000]
zu schreiben, denn sonst werden aus dem Pool (30) jeweils Processe
erzeugt, die 100000 mal in der Summe laufen.

in Der Variante size=[cpus, 100000] werden nur einmalig 30
Processe aus dem 30-Processe enthaltenem Pool aufgerufen.


Die Zeile

results = [pool.apply(howmany_within_range, args=(row, 10, 20)) for row 
in data]

bewirkt, dass die Funktion "howmany_within_range" in einer Loop
100000x oder 30x (bei size=[cpus, 100000]) entsprechend der Anzahl
der Reihen in dem Datenarray "data" aufgerufen wird, jedes mal in
einem eigenen Process und dort auf die Datenreihe ihre Verarbeitung
anwendet (d.h. Suche und zählen des Vorkommens der Zufallszahlen
in dem Bereich 10-20). Dieses vorkommen wird dann in dem result-
Array abgelegt und angezeigt.

In beiden Fällen Single- und Multiprocessing sollte das angezeigte
Array die selben Werte enthalten.

Dies ist auch der Fall, nur dass die Laufzeit von
single_cpu(data)
kürzer ist als die Laufzeit von
multi_cpu(cpus,data)

was ich nicht ganz verstehe und deshalb die Frage an Euch.

Am Overhead des Process-Initialisieren liegt es nicht, da dieser
nur Bruchteile von Sekunden erfordert.
(Gemessen aber hier nicht reingestellt)

So nun bin ich für weitere Hinweise und Erklärungen dankbar.

Markus

von leo (Gast)


Lesenswert?

Markus W. schrieb:
> So nun bin ich für weitere Hinweise und Erklärungen dankbar.

Wieso soll eine triviale Funktion durch Multithreading verschnellert 
werden? Sicher gibt's einen Overhead der kleiner sein muss als die 
Threadfunktion.

leo

von Sven B. (scummos)


Lesenswert?

Dir ist klar, dass die fünfmal schnellere Lösung

count = np.sum((row >= min) & (row <= max))

ist, oder? Bei numpy-Code muss man erstmal die Python-Loops 
wegoptimieren soweit es geht, bevor man mit irgendwelchem 
multiprocessing-Kram anfängt. Ist leichter und bringt viel mehr.

von Karl (Gast)


Lesenswert?

Dein Programm ist mehr Speicherintensiv als Rechenintensiv probier es 
mal so:
und ließ mal die doku zu .apply

It blocks until the result is ready. Given this blocks, apply_async() is 
better suited for performing work in parallel.

https://docs.python.org/3.4/library/multiprocessing.html?highlight=process

1
#!/usr/bin/python3
2
# https://www.machinelearningplus.com/python/parallel-processing-python/
3
4
import numpy as np
5
from time import time
6
from timeit import default_timer as timer
7
8
# Parallelizing using Pool.apply()
9
import multiprocessing as mp
10
11
12
# Function thar run in single or parallel mode
13
def howmany_within_range(row, minimum, maximum):
14
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
15
    count = 0
16
    for n in row:
17
        if minimum <= n <= maximum:
18
            count = count + 1
19
            a = fakult(count)
20
    return count
21
22
23
def fakult(n):
24
    
25
    if n < 0:
26
        raise ValueError
27
    
28
    if n == 0:
29
        return 1
30
    else:
31
        save = 1
32
        for i in range(2,n+1):
33
            save *= i
34
        return save
35
36
37
# Solution without Paralleization
38
def single_cpu(data):
39
    results = []
40
    results=[howmany_within_range(row, minimum=10, maximum=20) for row in data]
41
    return results
42
43
44
# Solution with Paralleization
45
def multi_cpu(cpus,data):
46
    results = []
47
    # Step 1: Init multiprocessing.Pool()
48
    pool = mp.Pool(cpus)  # use only 30 CPUs
49
    # Step 2: `pool.apply` the `howmany_within_range()`
50
    res = []
51
    i = 0
52
    for row in data:
53
        res.append(pool.apply_async(howmany_within_range, args=(row, 10, 20)))
54
    #results = [pool.apply_async(howmany_within_range, args=(row, 10, 20)) for row in data]
55
    # Step 3: Don't forget to close
56
    pool.close()
57
    for r in res:
58
        results.append(r.get())
59
    return results
60
61
# Main function with reporting of both methods    
62
def main():
63
    cpus = mp.cpu_count() -2  # CPUs to use for parallel calculation
64
    # create data array for function howmany_within_range()
65
    np.random.RandomState(100)
66
    arr = np.random.randint(0, 100, size=[cpus, 30000])  #10000000
67
    data = arr.tolist()
68
69
    
70
    # after calculation should be the dame content => sresults = presults !!!
71
  
72
    start = timer()
73
    sresults=single_cpu(data)
74
    cpu_time = timer() - start
75
    print("Single CPU processing took %f seconds." % cpu_time)
76
    print(sresults[:cpus])
77
78
    start = timer()
79
    presults=multi_cpu(cpus,data)
80
    cpu_time = timer() - start
81
    print("Parallel CPU processing took %f seconds." % cpu_time)
82
    print(presults[:cpus])
83
    return
84
85
if __name__ == "__main__":
86
  main()

von Sven B. (scummos)


Lesenswert?

Zur eigentlichen Frage: vermutlich musst du besser batchen, deine Tasks 
sind zu klein sodass der Overhead alles totschlägt.

von Markus W. (dl8mby)


Lesenswert?

@All,

entweder ich habe mich falsch ausgedrückt, oder Euch ist
die Funktion meines Tests nicht hinreichend klar.

Ich versuche es nochmals, hoffentlich nun besser erklärt.

Ich habe keine spezielle Anwendung und wollte nur die
Funktionalität des Multiprocessings mit Python3 spielerisch
erlernen in der Anlehnung an den genannten Beispiel-Link.

Als Problem wird das Durchsuchen von Zufalls-Zahlen Kolumnen
nach Werten in bestimmten Zahlenbereich vorgenommen.
Die Funktion dazu "howmany_within_range" bekommt im single
Process-Mode der Reihe nach alle Spalten der Datenmatrix
übergeben und sucht darin alle Zahlen die in dem gesuchten
(min/max)-Range liegen und zählt ihr Vorkommen und trägt es
jeweils in die Resultat-liste ein.

Wenn man dieses Problem im Multiprocessing Mode angeht, wird
jede Spalte der Datenmatrix an einen eigenen Prozess übergeben
und die Spalten der Datenmatrix werden gleichzeitig nach den
Zahlen die das (min/max)-Kriterium erfüllen durchsucht und
gezählt.

Dieser Durchlauf sollte um ein Vielfaches schneller, entsprechend
der Anzahl der im Pool definierten Processe, vonstatten gehen.

Bei dieser Annahme gehe ich davon aus, dass jeder Prozess auf
einen eigenen Prozessor (CPU) ausgeführt wird. Dies könnte aber
noch nicht der Fall sein.
Ich sehe zwar in htop die Anzahl der Prozesse habe aber noch nicht
darauf geachtet ob sie simultan auf unterschiedlichen CPU's laufen.
Das muss ich noch überprüfen.

Ich hoffe ich konnte es jetzt besser und verständlicher darstellen.

Markus

von Markus W. (dl8mby)


Lesenswert?

@Karl (Gast)

Ich habe 32 CPU's, von denen ich 30 im Beispiel verwende, und 64GB RAM
zur Verfügung und diese werden noch nicht so belastet, dass der PC
ins swappen kommen würde.

30x100000x32Bit-Int ist noch nicht so riesig, das der Rechner am
Anschlag ist.
Ich habe das Beispiel auch mit 1000000 und 10000000 langen Zahlen-
spalten durchlaufen und der längste Durchlauf liegt bei knapp 500
Sekunden.

Generell geht es nur darum mehrere CPU's via Python3 zu benützen
und den Umgang damit richtig zu verstehen.

Das selbe Problem auf auf der GPU (Nvidia P5000) läuft auf dem selben
Rechner um Faktor 125 schneller als der Single-Process Durchlauf.
Da sieht man die Performance sofort. Bleibe ich auf der CPU sehe ich
leider unter "import multiprocessing" diesen Geschwindigkeitszuwachs
noch nicht, weil ich wahrscheinlich noch ein Verständnisproblem zu der
richtigen Handhabung habe.


Markus

: Bearbeitet durch User
von Markus W. (dl8mby)


Lesenswert?

Bin etwas weiter gekommen

>./mp_asy.py
Single CPU processing took 23.715139 seconds.
[10991052, 10999028, 11006124, 10998257, 10998847, 11006722]
Parallel CPU processing took 17.434246 seconds.
[10991052, 10999028, 11006124, 10998257, 10998847, 11006722]

mpstatus zeigt jetzt mehrere CPU's unter Last
1
12:27:02 AM  CPU    %usr   %nice    %sys %iowait    %irq   %soft  %steal  %guest  %gnice   %idle
2
12:27:03 AM  all   62.03    0.00    0.75    0.00    0.00    0.00    0.00    0.00    0.00   37.22
3
12:27:03 AM    0  100.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00
4
12:27:03 AM    1   96.00    0.00    4.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00
5
12:27:03 AM    2  100.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00
6
12:27:03 AM    3    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00  100.00
7
12:27:03 AM    4    0.00    0.00    1.98    0.00    0.00    0.00    0.00    0.00    0.00   98.02
8
12:27:03 AM    5  100.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00
9
12:27:03 AM    6    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00  100.00
10
12:27:03 AM    7  100.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00

den zugehörigen Code, (jetzt auf einem Notebook mit 8CPU's) seht Ihr 
darunter.
1
#!/usr/bin/python3
2
# https://www.machinelearningplus.com/python/parallel-processing-python/
3
4
import numpy as np
5
from time import time
6
from timeit import default_timer as timer
7
8
# Parallelizing using Pool.apply()
9
import multiprocessing as mp
10
11
12
# Function thar run in single or parallel mode
13
def howmany_within_range(i,row, minimum, maximum):
14
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
15
    count = 0
16
    for n in row:
17
        if minimum <= n <= maximum:
18
            count = count + 1
19
    return (i,count)
20
21
22
# Solution without Paralleization
23
def single_cpu(data):
24
    results = []
25
    i=0
26
    r=0
27
    #results=[howmany_within_range(i,row, minimum=10, maximum=20) for row in data]
28
    for row in data:
29
        (i,r)=howmany_within_range(i,row, minimum=10, maximum=20)
30
        results.append(r)    
31
    return results
32
33
34
# Solution with Paralleization
35
def multi_cpu(cpus,data):
36
    results = []
37
    i=0
38
    # Step 1: Init multiprocessing.Pool()
39
    pool = mp.Pool(cpus)  # use only 30 CPUs
40
    # Step 2: `pool.apply` the `howmany_within_range()`
41
    #results = [pool.apply(howmany_within_range, args=(row, 10, 20)) for row in data]
42
    # call apply_async() without callback
43
    result_objects = [pool.apply_async(howmany_within_range, args=(i, row, 10, 20)) for i, row in enumerate(data)]
44
    # result_objects is a list of pool.ApplyResult objects
45
    results = [r.get()[1] for r in result_objects]
46
    # Step 3: Don't forget to close
47
    pool.close()
48
    pool.join()
49
    return results
50
51
# Main function with reporting of both methods    
52
def main():
53
    cpus = mp.cpu_count() -2  # CPUs to use for parallel calculation
54
55
    # create data array for function howmany_within_range()
56
    np.random.RandomState(100)
57
    arr = np.random.randint(0, 100, size=[cpus, 100000000])
58
    data = arr.tolist()
59
60
    sresults = []  # singel cpu results
61
    presults = []  # parallel cpu results
62
    # after calculation should be the dame content => sresults = presults !!!
63
  
64
    i=0
65
    start = timer()
66
    sresults=single_cpu(data)
67
    cpu_time = timer() - start
68
    print("Single CPU processing took %f seconds." % cpu_time)
69
    print(sresults[:cpus])
70
71
    start = timer()
72
    presults=multi_cpu(cpus,data)
73
    cpu_time = timer() - start
74
    print("Parallel CPU processing took %f seconds." % cpu_time)
75
    print(presults[:cpus])
76
    return
77
78
if __name__ == "__main__":
79
  main()

Markus

von Markus W. (dl8mby)


Lesenswert?

Noch ein paar Tests mit verschiedenen Spaltengrößen.

Die Entwicklung der Rechenzeit ist ziemlich linear.

>./mp_asy.py mit 200000000 Spaltenlänge
Single CPU processing took 47.986295 seconds.
[21992465, 21996292, 22006039, 22001910, 22004299, 22000972]
Parallel CPU processing took 36.096982 seconds.
[21992465, 21996292, 22006039, 22001910, 22004299, 22000972]


>./mp_asy.py mit 400000000 Spaltenlänge
Single CPU processing took 94.560948 seconds.
[43991017, 43994289, 44014595, 44006955, 43993680, 44004148]
Parallel CPU processing took 76.487460 seconds.
[43991017, 43994289, 44014595, 44006955, 43993680, 44004148]

Dabei ist der Umstand noch nicht Berücksichtigt, das
Single-Tasks

auf der CPU höcher getaktet werden als wenn
mehrere Processoren gleichzeitig rechnen.
Dies wird die gemessenen Ergebnisse etwas verfälschen.


So für heute reicht es.

Markus

von Karl (Gast)


Lesenswert?

Markus W. schrieb:
> @Karl (Gast)
>
> Ich habe 32 CPU's, von denen ich 30 im Beispiel verwende, und 64GB RAM
> zur Verfügung und diese werden noch nicht so belastet, dass der PC
> ins swappen kommen würde.

Wärst du einfach so gut und würdest mein Beispielcode auf deinem PC 
ausführen, dann wirst du sehen, dass bei meinem Beispiel mit etwas mehr 
sinnlosem rechnen, die Multicore-Variante deutlich schneller ist (Bei 
mir ca. Faktor 2, ich habe aber auch nur 2 Phys. Kerne)!
Und das mit Speicherintensiv bedeutet nicht, das dein Arbeitsspeicher 
voll ist, sondern dass es einfach länger dauert die Zufallszahlen vom 
Arbeitsspeicher in den Prozessor und zurück zu schaufel, als damit zu 
rechnen. Du musst den Test so machen, dass die CPU nur den Cache 
braucht.

von Markus W. (dl8mby)


Lesenswert?

@Karl,

wie gewünscht der Durchlauf Deines Beispiels mit 30000 und 60000 Werten
pro Spalte auf meinem NB mit acht log. Cores und Benützung von 6 Cores.

>./karl.py (30000 Zahlen pro Spalte)
Single CPU processing took 11.585195 seconds.
[3259, 3257, 3236, 3298, 3329, 3344]
Parallel CPU processing took 3.243654 seconds.
[3259, 3257, 3236, 3298, 3329, 3344]

Faktor S/P = 3.57

>./karl.py (60000 Zahlen pro Spalte)
Single CPU processing took 90.552562 seconds.
[6552, 6661, 6566, 6577, 6673, 6545]
Parallel CPU processing took 26.587306 seconds.
[6552, 6661, 6566, 6577, 6673, 6545]

Faktor S/P = 3.40


Der Faktor im Geschwindigkeitszuwachs ist deutlich besser

wie bei meinem letzten Beispiel.

Was mir noch nicht klar ist, warum Du fakult(count) in
die howmany_within_range() Funktion einbaust.
Du steigerst damit nur die Rechenkomplexität für den
singel- und die multi-Process(e) so dass die Daten-
bewegung in Relation zum Rechenaufwand geringer wird.
Was ist Deine Absicht dahinter.

Ob jetzt eine Funktion rechnet oder im Speicher Daten
durchsucht hat doch mit der Parallelisierung eines

Problems nur indirekt was zu tun.

Mir ist natürlich klar, dass gewisse Rechenprobleme

im internen CPU Cache schneller ablaufen können als
Probleme die auf Daten zugreifen, die nicht im Cache
vorgehalten werden können.
Da es aber ja nur um einen relativen und nicht absoluten
Vergleich zwischen Singe- und Multi-Processing geht ist
es doch von untergeordneter Bedeutung.

Ich will ja nicht in dieser Episode meiner Lernphase
die beste und schnellste Methode sofort anwenden, sondern
erst mal den Unterschied als solchen herausarbeiten und
vor allem tiefer verstehen wie unter Python3 das MP an-
gewendet wird.

Trotzdem danke für Deine Mühe mir zu diesem Thema

was zeigen zu wollen.

Markus

PS.: zum Vergleich ohne Aufruf von fakult()

>./karl_wo_fak.py (30000 Zahlen pro Spalte)
Single CPU processing took 0.007091 seconds.
[3385, 3097, 3378, 3238, 3332, 3322]
Parallel CPU processing took 0.015615 seconds.
[3385, 3097, 3378, 3238, 3332, 3322]

>./karl_wo_fak.py (60000 Zahlen pro Spalte)
Single CPU processing took 0.014547 seconds.
[6698, 6422, 6612, 6640, 6534, 6562]
Parallel CPU processing took 0.023354 seconds.
[6698, 6422, 6612, 6640, 6534, 6562]




PS2.: Die ollen überflüssigen Leerzeilen waren bei der
Vorschau nicht im Thread zu sehen.
Entweder sind die Zeilen zu lang und nicht umbrochen
oder ich habe Leerzeilen drin. Ich verstehe es nicht!

: Bearbeitet durch User
Bitte melde dich an um einen Beitrag zu schreiben. Anmeldung ist kostenlos und dauert nur eine Minute.
Bestehender Account
Schon ein Account bei Google/GoogleMail? Keine Anmeldung erforderlich!
Mit Google-Account einloggen
Noch kein Account? Hier anmelden.