Python >> Tutoriel Python >  >> Python Tag >> exec()

PySpark :Configuration des exécuteurs/cœurs et de la mémoire de la machine locale

Bien que la réponse à votre question ne réside que dans l'un des problèmes suivants, permettez-moi de réécrire votre exemple pour expliquer ce qui se passe.

Définir votre configuration

Tout d'abord, vous n'avez pas besoin de démarrer et d'arrêter un contexte pour définir votre configuration. Depuis Spark 2.0, vous pouvez créer la session Spark, puis définir les options de configuration.

from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("yourAwesomeApp").getOrCreate())
spark.conf.set("spark.executor.memory", "40g")
spark.conf.set("spark.executor.cores", "2")

Lire vos données

Spark évaluera paresseusement le DAG. Le temps que vous mesurez dans votre capture n'est pas le chargement des données dans le bloc de données, mais simplement l'inférence de schéma pour le fichier JSON . L'inférence de schéma coûte cher, vous devriez essayer de l'éviter en définissant le schéma de vos données. Vous verrez une grande différence de performances entre :

df = spark.read.json("../data/a_very_large_json.json.gz")

et

from pyspark.sql.types import (
    StructType, 
    StringType, 
    StructField,
)
json_schema = schema = StructType([
    StructField('data', StructType([
        StructField("field1", StringType(), nullable=False),
        StructField("field2", StringType(), nullable=False),
        StructField("field3", StringType(), nullable=True),
        StructField("field4", StringType(), nullable=True),
        StructField("field5", LongType(), nullable=False),
    ])),
])
df = spark.read.json("../data/a_very_large_json.json.gz", schema=json_schema)

Si vous fournissez le schéma, cette instruction devrait être presque instantanée. Comme un autre utilisateur l'a déjà mentionné, pour exécuter la tâche, vous devez avoir une activité, telle que show, head, collect, persist, etc.

df.show()

Vous pouvez définir le nombre d'instances d'exécuteur et de cœurs sur la configuration, mais l'utilisation réelle de ces instances dépend également de vos données d'entrée et des transformations/actions que vous effectuez. D'après votre description, je suppose que vous travaillez en mode autonome, donc avoir une instance d'exécuteur sera la valeur par défaut (utilisant tous les cœurs), et vous devez configurer la mémoire de l'exécuteur pour utiliser celle dont vous disposez. Autant que je m'en souvienne, lorsque vous travaillez en mode autonome, le spark.executor.instances est ignoré et le nombre réel d'exécuteurs est basé sur le nombre de cœurs disponibles et le spark.executor.cores

Comparaison avec les pandas

Si vous travaillez avec un seul nœud, en chargeant les données dans une trame de données, la comparaison entre l'étincelle et les pandas est injuste. Spark aura toujours un surcoût plus élevé. Les étincelles brilleront lorsque vous avez des ensembles de données qui ne tiennent pas sur la mémoire d'une machine et que vous avez plusieurs nœuds pour effectuer le travail de calcul. Si vous êtes à l'aise avec les pandas, je pense que vous pouvez être intéressé par les koalas de Databricks.

Recommandation

Je préfère configurer les détails d'exécution en dehors de l'application (par exemple, en utilisant les paramètres spark-submit). En de rares occasions, pour améliorer les performances, vous devrez en définir certaines dans le code, mais avec chaque nouvelle version de Spark, cela est moins fréquent. Si vous y parvenez, votre application sera plus évolutive et facile à faire évoluer.


spark.sparkContext.stop()
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df = spark.read.json("../Data/inasnelylargefile.json.gz")

Ajoutez ceci :

df.show() 
##OR
df.persist()

La comparaison que vous faites n'est pas des pommes avec des pommes, Spark effectue une évaluation paresseuse, ce qui signifie que si vous n'appelez pas d'action sur votre opération, il ne fera que compiler et garder le DAG prêt pour vous.

Dans Spark, il existe deux concepts,

  1. Transformation :évaluée paresseusement
  2. Actions :(comme collect(), take(), show(),persist()) évaluées instantanément.

Dans votre cas, read() n'est qu'une transformation, l'ajout d'une action devrait déclencher le calcul.

En savoir plus sur les actions contre la transformation :https://training.databricks.com/visualapi.pdf


La raison pour laquelle votre lecture Spark est plus lente que pandas est que le fichier gz n'est pas divisible, donc Spark doit lire l'intégralité du fichier avec une seule tâche. Cependant, lors de la lecture d'un fichier non compressé, ou d'un fichier compressé avec un format de compression divisible comme bzip2, le Spark déploiera x nombre de tâches en parallèle (jusqu'au nombre de cœurs disponibles dans votre cluster) pour lire le fichier. Essayez de décompresser le fichier avant de le transmettre à Spark.