-
Notifications
You must be signed in to change notification settings - Fork 0
/
pyspark_atlastoredshift_customeractivity.py
32 lines (28 loc) · 1.29 KB
/
pyspark_atlastoredshift_customeractivity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# This script is used to migrate data from MongoDB Atlas to AWS Redshift using the Spark connectors
###import the lib
#import psycopg2
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# create a local SparkSession
spark = SparkSession.\
builder.\
appName("streamingExampleRead").\
config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector:10.0.0').\
getOrCreate()
#connection to the standar mongodb in atlas
rdd=(spark.read.format("mongodb")
.option('spark.mongodb.connection.uri', 'mongodb+srv://<username>:<password>@<servername>/?retryWrites=true&w=majority') \
.option('spark.mongodb.database', 'customer_activity') \
.option('spark.mongodb.collection', 'source') \
.option('spark.mongodb.change.stream.publish.full.document.only','true') \
.option("forceDeleteTempCheckpointLocation", "true") \
.load())
## write the dataframe to Redshift
rdd.write.format('jdbc').options(
url='jdbc:redshift://<redshift endpoint>:5439/<databasename>',
driver='com.amazon.redshift.jdbc42.Driver',
dbtable='public.customer_activity',
user='<username>',
password='<password>').mode('append').save()