-
Notifications
You must be signed in to change notification settings - Fork 0
Part 2 : Spark using Scala
1- Mise à jour de la base des téléchargements sous Ubuntu
sudo apt update
2- Vérification si Java est installé et bien configuré (si la commande donne une version de Java ignorer l'étape suivante)
java -version
3- Installation de Java
sudo apt install default-jdk
4- Installation de Scala
sudo apt install scala
5- Téléchargement de Spark (vérifier le lien depuis le site de Spark)
wget https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
6- Décompression de l'archive Spark
sudo tar -xvzf spark-3.0.3-bin-hadoop2.7.tgz
7- Déplacement du dossier décompressé vers /opt/spark/
sudo mv spark-3.0.3-bin-hadoop2.7 /opt/spark
cd /opt/spark
8- Exportation du chemin de Spark dans la variable d'environnement PATH
export SPARK_HOME=/opt/spark
export PATH=$PATH:SPARK_HOME/bin:SPARK_HOME/sbin
echo $PATH
/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:SPARK_HOME/bin:SPARK_HOME/sbin
Deux nouveaux chemins sont ajoutés à PATH -> /opt/spark/bin/ ET /opt/spark/sbin/
Remarque : On suppose être sous le dossier /opt/spark/bin/
(vérifier le répertoire courant de travail avec la commande pwd
)
./spark-shell
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.2.0
/_/
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.11)
Type in expressions to have them evaluated.
Type :help for more information.
scala> ▋
Choisir un fichier texte et l'affecter via Spark Context (sc) dans un RDD (changer le nom du fichier hello.txt
par un fichier existant)
scala> val rdd1 = sc.textFile("/home/oussama/hello.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /home/oussama/hello.txt MapPartitionsRDD[1] at textFile at <console>:23
scala> rdd1.count
res0: Long = 3
Remarque : Contenu du fichier hello.txt
Bonjour tout le monde Big Data
Formation Big Data Jour Jour
Jour 2
scala> val list1 = List("apple","banana","orange")
list1: List[String] = List(apple, banana, orange)
scala> val rdd2 = sc.parallelize(list1)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> rdd2.collect
res1: Array[String] = Array(apple, banana, orange)
scala> val rdd4 = sc.textFile("/home/oussama/hello.txt")
rdd4: org.apache.spark.rdd.RDD[String] = /home/oussama/hello.txt MapPartitionsRDD[20] at textFile at <console>:23
scala> rdd4.collect
res9: Array[String] = Array(Bonjour tout le monde Big Data, Formation Big Data Jour Jour, Jour 2)
scala> val count1 = rdd4.flatMap(line => line.split(" "))
count1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at flatMap at <console>:23
scala> count1.collect
res10: Array[String] = Array(Bonjour, tout, le, monde, Big, Data, Formation, Big, Data, Jour, Jour, Jour, 2)
scala> val count2 = count1.map(word => (word,1))
count2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at <console>:23
scala> count2.collect
res11: Array[(String, Int)] = Array((Bonjour,1), (tout,1), (le,1), (monde,1), (Big,1), (Data,1), (Formation,1), (Big,1), (Data,1), (Jour,1), (Jour,1), (Jour,1), (2,1))
scala> val count3 = count2.reduceByKey(_+_)
count3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:23
scala> count3.collect
res12: Array[(String, Int)] = Array((monde,1), (Big,2), (le,1), (2,1), (tout,1), (Bonjour,1), (Jour,3), (Formation,1), (Data,2))
- Sauvegarde du résultat de MapReduce dans un répertoire
scala> count3.saveAsTextFile("/home/oussama/resultat-spark")
/home/oussama/resultat-spark/
└── part-00000
- Contenu du fichier
part-00000
(monde,1)
(Big,2)
(le,1)
(2,1)
(tout,1)
(Bonjour,1)
(Jour,3)
(Formation,1)
(Data,2)
scala> val rddmap = sc.parallelize(List("test1 test2", "test3 test4", "test5 test6"))
rddmap: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at parallelize at <console>:23
scala> rddmap.map(x => x.split(" ")).collect
res16: Array[Array[String]] = Array(Array(test1, test2), Array(test3, test4), Array(test5, test6))
scala> rddmap.flatMap(x => x.split(" ")).collect
res17: Array[String] = Array(test1, test2, test3, test4, test5, test6)
scala> val rdd = sc.parallelize(List((1, "toto", "yoyo"),(2,"titi","jiji"),(3,"tata","nono")))
rdd: org.apache.spark.rdd.RDD[(Int, String, String)] = ParallelCollectionRDD[33] at parallelize at <console>:23
- Transformation en Data Frame
scala> val dataframe = rdd.toDF("id","nom","prénom")
dataframe: org.apache.spark.sql.DataFrame = [id: int, nom: string ... 1 more field]
- Manipulation et requêtage (SQL) du Data Frame
scala> dataframe.createOrReplaceTempView("personnes")
scala> val dataframeSQL = spark.sql("select * from personnes")
dataframeSQL: org.apache.spark.sql.DataFrame = [id: int, nom: string ... 1 more field]
- Affichage du résultat de la requête
SELECT
scala> dataframeSQL.show()
+---+----+------+
| id| nom|prénom|
+---+----+------+
| 1|toto| yoyo|
| 2|titi| jiji|
| 3|tata| nono|
+---+----+------+
scala> spark.sql("SELECT count(*) FROM personnes").show
+--------+
|count(1)|
+--------+
| 3|
+--------+