Estimated time needed: 60 minutes
This Final Project focuses on data transformation and integration using PySpark. You will work with two datasets, perform various transformations such as adding columns, renaming columns, dropping unnecessary columns, joining dataframes, and finally, writing the results into both a Hive warehouse and an HDFS file system.
For this lab assignment, you will use wget, Python and Spark (PySpark). Therefore, it's essential to make sure that the below-specified libraries are installed in your lab environment or within Skills Network (SN) Labs.
# Installing required packages
!pip install wget pyspark findspark
import findspark
findspark.init()
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the SparkContext.
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import os
# Creating a SparkContext object
sc = SparkContext.getOrCreate()
# Creating a Spark Session
spark = SparkSession \
.builder \
.appName("Python Spark DataFrames basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
Download the datasets from the folloing links using wget and load it in a Spark Dataframe.
Hint: Import wget
#download dataset using wget
# 2. Definir las URLs de los datasets
url1 = "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv"
url2 = "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv"
# 3. Descargar los archivos usando comandos del sistema (os.system)
# Esto asegura que los archivos estén físicamente en el contenedor antes de que Spark los lea
print("Descargando datasets...")
os.system(f"wget -q {url1} -O dataset1.csv")
os.system(f"wget -q {url2} -O dataset2.csv")
# download the dataset using wget
import wget
link_to_data1 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv'
wget.download(link_to_data1)
link_to_data2 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv'
wget.download(link_to_data2)
#load the data into a pyspark dataframe
# 4. Cargar los archivos CSV en DataFrames de Spark
# Usamos header=True para los nombres de columnas e inferSchema=True para detectar tipos de datos
df1 = spark.read.csv("dataset1.csv", header=True, inferSchema=True)
df2 = spark.read.csv("dataset2.csv", header=True, inferSchema=True)
# 5. Mostrar los resultados para verificar la carga
print("\n--- Vista Previa Dataset 1 ---")
df1.show(5)
print("\n--- Vista Previa Dataset 2 ---")
df2.show(5)
# 6. Verificar los tipos de datos (Esquema)
df1.printSchema()
#load the data into a pyspark dataframe
df1 = spark.read.csv("dataset1.csv", header=True, inferSchema=True)
df2 = spark.read.csv("dataset2.csv", header=True, inferSchema=True)
Display the schema of df1 and df2 to understand the structure of the datasets.
#print the schema of df1 and df2
df1.printSchema()
df2.printSchema()
#print the schema of df1 and df2
df1.printSchema()
df2.printSchema()
Add a new column named year to df1 and quarter to df2 representing the year and quarter of the data.
Hint: use withColumn. Convert the date columns which are present as string to date before extracting the year and quarter information
from pyspark.sql.functions import year, quarter, to_date
#Add new column year to df1
from pyspark.sql.functions import lit
# 1. Agregar la columna 'year' a df1
# Supondremos que los datos corresponden a 2022
df1 = df1.withColumn("year", lit(2022))
print("Dataset 1 con columna 'year':")
df1.select("year", *df1.columns[:-1]).show(5) # Mostramos 'year' primero para verificar
#Add new column quarter to df2
# 2. Agregar la columna 'quarter' a df2
# Supondremos que corresponde al primer trimestre (Q1)
df2 = df2.withColumn("quarter", lit(1))
print("Dataset 2 con columna 'quarter':")
df2.select("quarter", *df2.columns[:-1]).show(5)
from pyspark.sql.functions import year, quarter, to_date
#Add new column year to df1
df1 = df1.withColumn('year', year(to_date('date_column','dd/MM/yyyy')))
#Add new column quarter to df2
df2 = df2.withColumn('quarter', quarter(to_date('transaction_date','dd/MM/yyyy')))
Rename the column amount to transaction_amount in df1 and value to transaction_value in df2.
Hint: Use withColumnRenamed
#Rename df1 column amount to transaction_amount
# 1. Renombrar 'amount' a 'transaction_amount' en df1
df1 = df1.withColumnRenamed("amount", "transaction_amount")
print("Nuevas columnas de df1:")
print(df1.columns)
df1.select("transaction_amount").show(5)
#Rename df2 column value to transaction_value
# 2. Renombrar 'value' a 'transaction_value' en df2
df2 = df2.withColumnRenamed("value", "transaction_value")
print("\nNuevas columnas de df2:")
print(df2.columns)
df2.select("transaction_value").show(5)
#Rename df1 column amount to transaction_amount
df1 = df1.withColumnRenamed('amount', 'transaction_amount')
#Rename df2 column value to transaction_value
df2 = df2.withColumnRenamed('value', 'transaction_value')
Drop the columns description and location from df1 and notes from df2.
#Drop columns description and location from df1
# 1. Eliminar 'description' y 'location' de df1
# Puedes pasar múltiples nombres de columnas como argumentos separados
df1 = df1.drop("description", "location")
print("Columnas actuales en df1:", df1.columns)
df1.printSchema()
# Opcional: ver el esquema para confirmar que ya no existen
#Drop column notes from df2
# 2. Eliminar 'notes' de df2
df2 = df2.drop("notes")
print("Columnas actuales en df2:", df2.columns)
df2.printSchema()
#Drop columns description and location from df1
df1 = df1.drop('description', 'location')
#Drop column notes from df2
df2 = df2.drop('notes')
Join df1 and df2 based on the common column customer_id and create a new dataframe named joined_df.
#join df1 and df2 based on common column customer_id
# 1. Unir df1 y df2 usando la columna 'customer_id'
# Usamos un 'inner join' por defecto (solo mantiene registros que coinciden en ambos)
joined_df = df1.join(df2, on="customer_id", how="inner")
# 2. Verificar el resultado
print(f"Número total de filas en el DataFrame unido: {joined_df.count()}")
# 3. Mostrar las primeras filas para ver la combinación de columnas
joined_df.show(5)
# 4. Revisar el nuevo esquema para ver las columnas de ambos DataFrames juntas
joined_df.printSchema()
#join df1 and df2 based on common column customer_id
joined_df = df1.join(df2, 'customer_id', 'inner')
Filter joined_df to include only transactions where "transaction_amount" is greater than 1000 and create a new dataframe named filtered_df.
# filter the dataframe for transaction amount > 1000
# 1. Filtrar el DataFrame para montos superiores a 1000
# Puedes usar la sintaxis de cadena o la referencia a la columna
filtered_df = joined_df.filter(joined_df.transaction_amount > 1000)
# 2. Verificar cuántos registros cumplen la condición
print(f"Registros originales: {joined_df.count()}")
print(f"Registros después del filtro (>1000): {filtered_df.count()}")
# 3. Mostrar una vista previa de los resultados filtrados
filtered_df.select("customer_id", "transaction_amount", "year", "quarter").show(5)
# filter the dataframe for transaction amount > 1000
filtered_df = joined_df.filter("transaction_amount > 1000")
Calculate the total transaction amount for each customer in filtered_df and display the result.
Hint: Use sum from pyspark.sql.functions
# group by customer_id and aggregate the sum of transaction amount
from pyspark.sql.functions import sum
# 1. Agrupar por 'customer_id' y calcular la suma de 'transaction_amount'
# Usamos .alias() para que el nombre de la columna resultante sea claro
agg_df = filtered_df.groupBy("customer_id") \
.agg(sum("transaction_amount").alias("total_transaction_amount"))
# 2. Mostrar el resultado final
print("Total de transacciones por cliente (Monto > 1000):")
agg_df.show()
# Opcional: Ordenar por monto de mayor a menor para mejor análisis
# agg_df.orderBy("total_transaction_amount", ascending=False).show()
#display the result
from pyspark.sql.functions import sum
# group by customer_id and aggregate the sum of transaction amount
total_amount_per_customer = filtered_df.groupBy('customer_id').agg(sum('transaction_amount').alias('total_amount'))
#display the result
total_amount_per_customer.show()
Write total_amount_per_customer to a Hive table named customer_totals.
# Write total_amount_per_customer to a Hive table named customer_totals
# 1. Asegurarse de que estamos usando el DataFrame de la agregación anterior
# Nota: Si llamaste a tu DF 'agg_df', cámbialo aquí o usa 'total_amount_per_customer'
total_amount_per_customer = agg_df
# 2. Escribir el resultado en una tabla de Hive
# 'overwrite' asegura que si la tabla ya existe, se reemplace con los datos nuevos
total_amount_per_customer.write \
.mode("overwrite") \
.saveAsTable("customer_totals")
print("¡Éxito! Los datos se han guardado en la tabla de Hive: customer_totals")
# Write total_amount_per_customer to a Hive table named customer_totals
total_amount_per_customer.write.mode("overwrite").saveAsTable("customer_totals")
Write filtered_df to HDFS in parquet format to a file named filtered_data.
#Write filtered_df to HDFS in parquet format file filtered_data.parquet
# 1. Escribir filtered_df en formato Parquet en HDFS
# El nombre del archivo/directorio será 'filtered_data'
filtered_df.write \
.mode("overwrite") \
.parquet("filtered_data")
print("¡Proyecto completado! El archivo 'filtered_data' ha sido guardado en HDFS en formato Parquet.")
#Write filtered_df to HDFS in parquet format file filtered_data
filtered_df.write.mode("overwrite").parquet("filtered_data.parquet")
Add a new column named high_value to df1 indicating whether the transaction_amount is greater than 5000. When the value is greater than 5000, the value of the column should be Yes. When the value is less than or equal to 5000, the value of the column should be No.
*Hint: Use when and lit from pyspark.sql.functions
# Add new column with value indicating whether transaction amount is > 5000 or not
from pyspark.sql.functions import when, col
# 1. Añadir la columna 'high_value' con la lógica condicional
# Si transaction_amount > 5000 -> 'Yes', de lo contrario -> 'No'
df1 = df1.withColumn("high_value",
when(col("transaction_amount") > 5000, "Yes")
.otherwise("No"))
# 2. Verificar los resultados
print("Resultados de la clasificación de transacciones (High Value):")
df1.select("transaction_amount", "high_value").show(10)
# 3. (Opcional) Contar cuántas transacciones son de alto valor
df1.groupBy("high_value").count().show()
from pyspark.sql.functions import when, lit
# Add new column with value indicating whether transaction amount is > 5000 or not
df1 = df1.withColumn("high_value", when(df1.transaction_amount > 5000, lit("Yes")).otherwise(lit("No")))
Calculate and display the average transaction value for each quarter in df2 and create a new dataframe named average_value_per_quarter with column avg_trans_val.
Hint: Use avg from pyspark.sql.functions
#calculate the average transaction value for each quarter in df2
from pyspark.sql.functions import avg
# 1. Agrupar por la columna 'quarter' que creamos en la Task 3
# 2. Calcular el promedio de 'transaction_value' (renombrado en la Task 4)
average_value_per_quarter = df2.groupBy("quarter") \
.agg(avg("transaction_value").alias("avg_trans_val"))
# 3. Mostrar el resultado
print("Promedio de valor de transacción por trimestre:")
average_value_per_quarter.show()
#show the average transaction value for each quarter in df2
from pyspark.sql.functions import avg
#calculate the average transaction value for each quarter in df2
average_value_per_quarter = df2.groupBy('quarter').agg(avg("transaction_value").alias("avg_trans_val"))
#show the average transaction value for each quarter in df2
average_value_per_quarter.show()
Write average_value_per_quarter to a Hive table named quarterly_averages.
#Write average_value_per_quarter to a Hive table named quarterly_averages
# 1. Asegurarse de usar el DataFrame creado en la tarea anterior
# 2. Escribir el resultado en la tabla de Hive 'quarterly_averages'
average_value_per_quarter.write \
.mode("overwrite") \
.saveAsTable("quarterly_averages")
print("¡Éxito! Los promedios trimestrales se han guardado en la tabla de Hive: quarterly_averages")
#Write average_value_per_quarter to a Hive table named quarterly_averages
average_value_per_quarter.write.mode("overwrite").saveAsTable("quarterly_averages")
Calculate and display the total transaction value for each year in df1 and create a new dataframe named total_value_per_year with column total_transaction_val.
# calculate the total transaction value for each year in df1.
from pyspark.sql.functions import sum
# 1. Agrupar df1 por la columna 'year'
# 2. Sumar 'transaction_amount' (renombrado en la Task 4)
total_value_per_year = df1.groupBy("year") \
.agg(sum("transaction_amount").alias("total_transaction_val"))
# 3. Mostrar el resultado
print("Valor total de transacciones por año:")
total_value_per_year.show()
# show the total transaction value for each year in df1.
# calculate the total transaction value for each year in df1.
total_value_per_year = df1.groupBy('year').agg(sum("transaction_amount").alias("total_transaction_val"))
# show the total transaction value for each year in df1.
total_value_per_year.show()
Write total_value_per_year to HDFS in the CSV format to file named total_value_per_year.
#Write total_value_per_year to HDFS in the CSV format
# 1. Escribir el DataFrame total_value_per_year en HDFS
# Usamos header=True para que el archivo CSV incluya los nombres de las columnas
total_value_per_year.write \
.mode("overwrite") \
.option("header", "true") \
.csv("total_value_per_year")
print("¡Tarea final completada! El archivo CSV 'total_value_per_year' ha sido guardado en HDFS.")
#Write total_value_per_year to HDFS in the CSV format
total_value_per_year.write.mode("overwrite").csv("total_value_per_year.csv")
This Final Project provides hands-on experience with data transformation and integration using PySpark. You've performed various tasks, including adding columns, renaming columns, dropping unnecessary columns, joining dataframes, and writing the results into both a Hive warehouse and an HDFS file system.