I am looking to move a large amount of data from one db to another and I have seen that Spark is a good tool for doing this. I am trying to understand the process and the ideology behind Spark's big data ETL's. Would also appreciate if someone could explain how Spark goes about parallelizing (or splitting) the data in to the various jobs that it spawns. My main aim is to move the data from BigQuery to Amazon Keyspaces - and the data is around 40Gigs.
I am putting here the understanding I have gathered already from online.
This is the code to read the data from Bigquery.
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.master('yarn') \
.appName('spark-bigquery-ks') \
.getOrCreate()
spark.conf.set("credentialsFile", "./cred.json")
# Load data from BigQuery.
df = spark.read.format('bigquery') \
.option('parentProject','project-id') \
.option('dataset', 'mydataset') \
.option('table', 'mytable') \
.option('query', 'SELECT * from mytable LIMIT 10') \
.load()
print(df.head())
Now I need to figure out the best way to transform the data (which is super easy and I can do that myself) - but my most important question is regarding the batching and handling of such a large data set. Are there any considerations that I need to have to move such data ( which wont fit in memory ) to KeySpaces.
from ssl import SSLContext, CERT_REQUIRED, PROTOCOL_TLSv1_2
import boto3
from boto3 import Session
from cassandra_sigv4.auth import AuthProvider, Authenticator, SigV4AuthProvider
from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
ssl_context.load_verify_locations('./AmazonRootCA1.pem')
ssl_context.verify_mode = CERT_REQUIRED
boto_session = boto3.Session(aws_access_key_id="accesstoken",
aws_secret_access_key="accesskey",
aws_session_token="token",
region_name="region")
auth_provider = SigV4AuthProvider(boto_session)
cluster = Cluster(['cassandra.region.amazonaws.com'], ssl_context=ssl_context, auth_provider=auth_provider,
port=9142)
session = cluster.connect()
and finally to push the data to keyspaces would look something like this code.
# Write data to Amazon Keyspaces
for index, row in pdf.iterrows():
keyfamilyid = row["keyfamilyid"]
recommendedfamilyid = row["recommendedfamilyid"]
rank = row["rank"]
chi = row["chi"]
recommendationtype = row["recommendationtype"]
title = row["title"]
location = row["location"]
typepriority = row["typepriority"]
customerid = row["customerid"]
insert_query = f"INSERT INTO {keyspace_name}.{table_name} (keyfamilyid, recommendedfamilyid, rank, chi, recommendationtype, title, location, typepriority, customerid) VALUES ('{keyfamilyid}', '{recommendedfamilyid}', {rank}, {chi}, '{recommendationtype}', '{title}', '{location}', '{typepriority}', '{customerid}')"
try:
client.execute(insert_query)
except ClientError as e:
print(f"Error writing data for row {index}: {e.response['Error']['Message']}")
You can use the spark cassandra connector to copy data to Amazon Keyspaces. In the following example I write to Keyspaces from S3 but big query would be similar. The biggest thing with writing to Keyspaces would be to do the following.
Prewarm the tables using provisioned capacity mode. Provision a high amount to ensure the table has enough resources at intial write rate.
Shuffle the data from bigtable since it would most likely be exported in sorted order. Inserting to NoSQL you should write in a random access pattern