Skip to content

Part 2 : Spark using Scala

Oussama Ben Khiroun edited this page Nov 14, 2021 · 7 revisions

A. Installation et Configuration de Spark

1- Mise à jour de la base des téléchargements sous Ubuntu

sudo apt update

2- Vérification si Java est installé et bien configuré (si la commande donne une version de Java ignorer l'étape suivante)

java -version

3- Installation de Java

sudo apt install default-jdk

4- Installation de Scala

sudo apt install scala

5- Téléchargement de Spark (vérifier le lien depuis le site de Spark)

wget https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz

6- Décompression de l'archive Spark

sudo tar -xvzf spark-3.0.3-bin-hadoop2.7.tgz

7- Déplacement du dossier décompressé vers /opt/spark/

sudo mv spark-3.0.3-bin-hadoop2.7 /opt/spark

cd /opt/spark

8- Exportation du chemin de Spark dans la variable d'environnement PATH

export SPARK_HOME=/opt/spark

export PATH=$PATH:SPARK_HOME/bin:SPARK_HOME/sbin

echo $PATH

/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:SPARK_HOME/bin:SPARK_HOME/sbin

Deux nouveaux chemins sont ajoutés à PATH -> /opt/spark/bin/ ET /opt/spark/sbin/

B. Lancement du shell Spark

Remarque : On suppose être sous le dossier /opt/spark/bin/ (vérifier le répertoire courant de travail avec la commande pwd)

./spark-shell
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.0
      /_/
          
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.11)
Type in expressions to have them evaluated.
Type :help for more information.
scala> ▋

C. Manipulation de Spark avec Scala

Exemple 1

Choisir un fichier texte et l'affecter via Spark Context (sc) dans un RDD (changer le nom du fichier hello.txt par un fichier existant)

scala> val rdd1 = sc.textFile("/home/oussama/hello.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /home/oussama/hello.txt MapPartitionsRDD[1] at textFile at <console>:23

scala> rdd1.count
res0: Long = 3

Remarque : Contenu du fichier hello.txt

Bonjour tout le monde Big Data

Formation Big Data Jour Jour

Jour 2

Exemple 2

scala> val list1 = List("apple","banana","orange")
list1: List[String] = List(apple, banana, orange)

scala> val rdd2 = sc.parallelize(list1)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> rdd2.collect
res1: Array[String] = Array(apple, banana, orange)

Exemple 3

scala> val rdd4 = sc.textFile("/home/oussama/hello.txt")
rdd4: org.apache.spark.rdd.RDD[String] = /home/oussama/hello.txt MapPartitionsRDD[20] at textFile at <console>:23

scala> rdd4.collect
res9: Array[String] = Array(Bonjour tout le monde Big Data, Formation Big Data Jour Jour, Jour 2)

Exemple 4 (Opération Map)

scala> val count1 = rdd4.flatMap(line => line.split(" "))
count1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at flatMap at <console>:23

scala> count1.collect
res10: Array[String] = Array(Bonjour, tout, le, monde, Big, Data, Formation, Big, Data, Jour, Jour, Jour, 2)

scala> val count2 = count1.map(word => (word,1))
count2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at <console>:23

scala> count2.collect
res11: Array[(String, Int)] = Array((Bonjour,1), (tout,1), (le,1), (monde,1), (Big,1), (Data,1), (Formation,1), (Big,1), (Data,1), (Jour,1), (Jour,1), (Jour,1), (2,1))

Exemple 5 (Opération Reduce)

scala> val count3 = count2.reduceByKey(_+_)
count3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:23

scala> count3.collect
res12: Array[(String, Int)] = Array((monde,1), (Big,2), (le,1), (2,1), (tout,1), (Bonjour,1), (Jour,3), (Formation,1), (Data,2))
  • Sauvegarde du résultat de MapReduce dans un répertoire
scala> count3.saveAsTextFile("/home/oussama/resultat-spark")
    /home/oussama/resultat-spark/
    └── part-00000
  • Contenu du fichier part-00000
(monde,1)
(Big,2)
(le,1)
(2,1)
(tout,1)
(Bonjour,1)
(Jour,3)
(Formation,1)
(Data,2)

Exemple 6 (Différence entre .map et .flatMap)

scala> val rddmap = sc.parallelize(List("test1 test2", "test3 test4", "test5 test6"))
rddmap: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at parallelize at <console>:23

scala> rddmap.map(x => x.split(" ")).collect
res16: Array[Array[String]] = Array(Array(test1, test2), Array(test3, test4), Array(test5, test6))

scala> rddmap.flatMap(x => x.split(" ")).collect
res17: Array[String] = Array(test1, test2, test3, test4, test5, test6)

Exemple 7 (Data Frames)

scala> val rdd = sc.parallelize(List((1, "toto", "yoyo"),(2,"titi","jiji"),(3,"tata","nono")))
rdd: org.apache.spark.rdd.RDD[(Int, String, String)] = ParallelCollectionRDD[33] at parallelize at <console>:23
  • Transformation en Data Frame
scala> val dataframe = rdd.toDF("id","nom","prénom")
dataframe: org.apache.spark.sql.DataFrame = [id: int, nom: string ... 1 more field]
  • Manipulation et requêtage (SQL) du Data Frame
scala> dataframe.createOrReplaceTempView("personnes")

scala> val dataframeSQL = spark.sql("select * from personnes")
dataframeSQL: org.apache.spark.sql.DataFrame = [id: int, nom: string ... 1 more field]
  • Affichage du résultat de la requête SELECT
scala> dataframeSQL.show()
+---+----+------+
| id| nom|prénom|
+---+----+------+
|  1|toto|  yoyo|
|  2|titi|  jiji|
|  3|tata|  nono|
+---+----+------+
scala> spark.sql("SELECT count(*) FROM personnes").show
+--------+
|count(1)|
+--------+
|       3|
+--------+