Inherentes Apache Spark Problem

Ich versuche gerade ein interessantes Problem zu lösen. Hier und hier habe ich beschrieben, wie ich den Performance von Spark und Pandas vergleichen will auf die Daten vom Kaggle Quora Wettbewerb.

Dabei stoße ich auf ein interessantes Phänomen. Solange der Zugriff zeilenweise stattfindet, ist Spark OK. Jetzt will ich aber das Maximum einer Spalte berechnen. Das geht erstaunlich schlecht. Ich vermute, dass das mit der verteilte Struktur von Spark zu tun hat, würde es aber gerne verstehen wollen.

Meine Frage nach dem ‚Warum‘ auf Stack Overflow wurde beantwortet mit ‚benutze Spark nur, wenn es absolut notwendig ist, nimm ansonsten Pandas‘. Obwohl das auch meine Schlussfolgerung ist, ist es keine Antwort.

Es gibt verschiedene Arten, das Maximum zu suchen. Alle brauchen mehr als 20 Minuten. Wenn Du diesen Code siehst, verstehst Du, wie merkwürdig das ist.

df.createOrReplaceTempView("kaggle")
biggest = spark.sql("SELECT MAX(levenshtein) FROM kaggle")
biggest.collect()

Das Maximum aus 400.000 Einträgen zu suchen, dauert bei einem Datenbankzugriff keine Sekunde. Und hier 25 Minuten. Es gibt noch mehr Lösungen auf StackOverflow und bei Databricks. Alles Banane. Irgendwann wird die Antwort kommen.

[Edit: 2. Mai] Das Problem saß mal wieder vor dem Bildschirm. Aber schon seit Tage 😦

Es eht um ein inherentes Apache Spark Feature: Lazy Evaluation. Ich habe das Ausmaß des Laziness unterschätzt. Ich hatte gedacht, dass ich die Berechnung der levenshtein-Spalte schon erzwungen hatte, aber dem war nicht so. Also werden jetzt alle Werte berechnet, bevor das MAX geholt werden kann. Also

  1. Dauert die Berechnung des Maximums nicht so lange und
  2. War meine Freude, dass die zeilenweise Berechnungen so flott durchliefen, nicht so angebracht. Die liefen schön flott noch nicht durch.

Inzwischen habe ich aber die Accumulator und AccumulatorParam Klassen kennengelernt, dass ist auch was schönes 🙂

[Edit2: 2. Mai] Trotzdem sollte man den Accumulator benutzen. Ansonsten wird die ganze Berechnung noch mal wiederholt, wenn man den Codeblock oben aufruft.

# create the accumulator to capture the maximum levnshtein
from pyspark.accumulators import AccumulatorParam

class MaxAccumulatorParam(AccumulatorParam):
    def zero(self, value):
        return 0
    def addInPlace(self, val1, val2):
        # this method defines what happens with 
        # the value sent to the accumulator
        return max(val1, val2)
    
# create an Accumulator object
max_levenshtein = sc.accumulator(0,MaxAccumulatorParam())

from nltk.metrics import edit_distance

def levenshtein(question1, question2):
    result = edit_distance(question1, question2)
    global max_levenshtein
    # the following call calls the addInPlace method
    # from the MaxAccumulatorParam object
    max_levenshtein += result
    return result

# and create the pyspark.sql.UserDefinedFunction
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
udf_levenshtein = udf(levenshtein, IntegerType())

 

Advertisements

Einen Kommentar schreiben

Required fields are marked *

*
*

%d Bloggern gefällt das: