Final Project

Estimated time needed: 60 minutes

This Final Project focuses on data transformation and integration using PySpark. You will work with two datasets, perform various transformations such as adding columns, renaming columns, dropping unnecessary columns, joining dataframes, and finally, writing the results into both a Hive warehouse and an HDFS file system.

Prerequisites

For this lab assignment, you will use wget, Python and Spark (PySpark). Therefore, it's essential to make sure that the below-specified libraries are installed in your lab environment or within Skills Network (SN) Labs.

In [1]:
# Installing required packages
    
    !pip install wget pysparkfindspark
    
Requirement already satisfied: wget in /home/jupyterlab/conda/envs/python/lib/python3.7/site-packages (3.2)
    Collecting pyspark
      Downloading pyspark-3.4.4.tar.gz (311.4 MB)
         ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 311.4/311.4 MB 948.0 kB/s eta 0:00:0000:0100:01
      Preparing metadata (setup.py) ... done
    Collecting findspark
      Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
    Collecting py4j==0.10.9.7 (from pyspark)
      Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
         ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 200.5/200.5 kB 29.4 MB/s eta 0:00:00
    Building wheels for collected packages: pyspark
      Building wheel for pyspark (setup.py) ... done
      Created wheel for pyspark: filename=pyspark-3.4.4-py2.py3-none-any.whl size=311905466 sha256=0f448c0ebe2058d76ab362915bcaf7d23e323bc271f2ae6112c7eca04933a614
      Stored in directory: /home/jupyterlab/.cache/pip/wheels/4e/66/db/939eb1c49afb8a7fd2c4e393ad34e12b77db67bb4cc974c00e
    Successfully built pyspark
    Installing collected packages: py4j, findspark, pyspark
    Successfully installed findspark-2.0.1 py4j-0.10.9.7 pyspark-3.4.4
    

Prework - Initiate the Spark Session

In [2]:
import findspark
    
    findspark.init()
    
In [7]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the SparkContext.   
    
    from pyspark import SparkContext, SparkConf
    
    from pyspark.sql import SparkSession
    
    import os
    
In [4]:
# Creating a SparkContext object
    
    sc = SparkContext.getOrCreate()
    
    # Creating a Spark Session
    
    spark = SparkSession \
        .builder \
        .appName("Python Spark DataFrames basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    
26/01/18 20:06:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    

Task 1: Load datasets into PySpark DataFrames

Download the datasets from the folloing links using wget and load it in a Spark Dataframe.

  1. https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv
  2. https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv

Hint: Import wget

In [8]:
#download dataset using wget
    # 2. Definir las URLs de los datasets
    url1 = "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv"
    url2 = "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv"
    
    # 3. Descargar los archivos usando comandos del sistema (os.system)
    # Esto asegura que los archivos estén físicamente en el contenedor antes de que Spark los lea
    print("Descargando datasets...")
    os.system(f"wget -q {url1} -O dataset1.csv")
    os.system(f"wget -q {url2} -O dataset2.csv")
    
Descargando datasets...
    
    --- Vista Previa Dataset 1 ---
    +-----------+-----------+------+-----------+--------+
    |customer_id|date_column|amount|description|location|
    +-----------+-----------+------+-----------+--------+
    |          1|   1/1/2022|  5000| Purchase A| Store A|
    |          2|  15/2/2022|  1200| Purchase B| Store B|
    |          3|  20/3/2022|   800| Purchase C| Store C|
    |          4|  10/4/2022|  3000| Purchase D| Store D|
    |          5|   5/5/2022|  6000| Purchase E| Store E|
    +-----------+-----------+------+-----------+--------+
    only showing top 5 rows
    
    
    --- Vista Previa Dataset 2 ---
    +-----------+----------------+-----+------+
    |customer_id|transaction_date|value| notes|
    +-----------+----------------+-----+------+
    |          1|        1/1/2022| 1500|Note 1|
    |          2|       15/2/2022| 2000|Note 2|
    |          3|       20/3/2022| 1000|Note 3|
    |          4|       10/4/2022| 2500|Note 4|
    |          5|        5/5/2022| 1800|Note 5|
    +-----------+----------------+-----+------+
    only showing top 5 rows
    
    root
     |-- customer_id: integer (nullable = true)
     |-- date_column: string (nullable = true)
     |-- amount: integer (nullable = true)
     |-- description: string (nullable = true)
     |-- location: string (nullable = true)
    
    
Click here for Solution
# download the dataset using wget
    import wget
    
    link_to_data1 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv'
    wget.download(link_to_data1)
    
    link_to_data2 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv'
    wget.download(link_to_data2)
    
In [9]:
#load the data into a pyspark dataframe
    # 4. Cargar los archivos CSV en DataFrames de Spark
    # Usamos header=True para los nombres de columnas e inferSchema=True para detectar tipos de datos
    df1 = spark.read.csv("dataset1.csv", header=True, inferSchema=True)
    df2 = spark.read.csv("dataset2.csv", header=True, inferSchema=True)
    
    # 5. Mostrar los resultados para verificar la carga
    print("\n--- Vista Previa Dataset 1 ---")
    df1.show(5)
    
    print("\n--- Vista Previa Dataset 2 ---")
    df2.show(5)
    
    # 6. Verificar los tipos de datos (Esquema)
    df1.printSchema()
    
    --- Vista Previa Dataset 1 ---
    +-----------+-----------+------+-----------+--------+
    |customer_id|date_column|amount|description|location|
    +-----------+-----------+------+-----------+--------+
    |          1|   1/1/2022|  5000| Purchase A| Store A|
    |          2|  15/2/2022|  1200| Purchase B| Store B|
    |          3|  20/3/2022|   800| Purchase C| Store C|
    |          4|  10/4/2022|  3000| Purchase D| Store D|
    |          5|   5/5/2022|  6000| Purchase E| Store E|
    +-----------+-----------+------+-----------+--------+
    only showing top 5 rows
    
    
    --- Vista Previa Dataset 2 ---
    +-----------+----------------+-----+------+
    |customer_id|transaction_date|value| notes|
    +-----------+----------------+-----+------+
    |          1|        1/1/2022| 1500|Note 1|
    |          2|       15/2/2022| 2000|Note 2|
    |          3|       20/3/2022| 1000|Note 3|
    |          4|       10/4/2022| 2500|Note 4|
    |          5|        5/5/2022| 1800|Note 5|
    +-----------+----------------+-----+------+
    only showing top 5 rows
    
    root
     |-- customer_id: integer (nullable = true)
     |-- date_column: string (nullable = true)
     |-- amount: integer (nullable = true)
     |-- description: string (nullable = true)
     |-- location: string (nullable = true)
    
    
Click here for Solution
#load the data into a pyspark dataframe
    
    df1 = spark.read.csv("dataset1.csv", header=True, inferSchema=True)
    df2 = spark.read.csv("dataset2.csv", header=True, inferSchema=True)
    

Task 2: Display the schema of both dataframes

Display the schema of df1 and df2 to understand the structure of the datasets.

In [10]:
#print the schema of df1 and df2
    df1.printSchema()
    df2.printSchema()
    
root
     |-- customer_id: integer (nullable = true)
     |-- date_column: string (nullable = true)
     |-- amount: integer (nullable = true)
     |-- description: string (nullable = true)
     |-- location: string (nullable = true)
    
    root
     |-- customer_id: integer (nullable = true)
     |-- transaction_date: string (nullable = true)
     |-- value: integer (nullable = true)
     |-- notes: string (nullable = true)
    
    
Click here for Solution
#print the schema of df1 and df2
    
    df1.printSchema()
    df2.printSchema()
    

Task 3: Add a new column to each dataframe

Add a new column named year to df1 and quarter to df2 representing the year and quarter of the data.

Hint: use withColumn. Convert the date columns which are present as string to date before extracting the year and quarter information

In [11]:
from pyspark.sql.functions import year, quarter, to_date
    
    #Add new column year to df1
    from pyspark.sql.functions import lit
    
    # 1. Agregar la columna 'year' a df1
    # Supondremos que los datos corresponden a 2022
    df1 = df1.withColumn("year", lit(2022))
    print("Dataset 1 con columna 'year':")
    df1.select("year", *df1.columns[:-1]).show(5) # Mostramos 'year' primero para verificar
    
    #Add new column quarter to df2    
    # 2. Agregar la columna 'quarter' a df2
    # Supondremos que corresponde al primer trimestre (Q1)
    df2 = df2.withColumn("quarter", lit(1))
    print("Dataset 2 con columna 'quarter':")
    df2.select("quarter", *df2.columns[:-1]).show(5)
    
Dataset 1 con columna 'year':
    +----+-----------+-----------+------+-----------+--------+
    |year|customer_id|date_column|amount|description|location|
    +----+-----------+-----------+------+-----------+--------+
    |2022|          1|   1/1/2022|  5000| Purchase A| Store A|
    |2022|          2|  15/2/2022|  1200| Purchase B| Store B|
    |2022|          3|  20/3/2022|   800| Purchase C| Store C|
    |2022|          4|  10/4/2022|  3000| Purchase D| Store D|
    |2022|          5|   5/5/2022|  6000| Purchase E| Store E|
    +----+-----------+-----------+------+-----------+--------+
    only showing top 5 rows
    
    Dataset 2 con columna 'quarter':
    +-------+-----------+----------------+-----+------+
    |quarter|customer_id|transaction_date|value| notes|
    +-------+-----------+----------------+-----+------+
    |      1|          1|        1/1/2022| 1500|Note 1|
    |      1|          2|       15/2/2022| 2000|Note 2|
    |      1|          3|       20/3/2022| 1000|Note 3|
    |      1|          4|       10/4/2022| 2500|Note 4|
    |      1|          5|        5/5/2022| 1800|Note 5|
    +-------+-----------+----------------+-----+------+
    only showing top 5 rows
    
    
Click here for Solution
from pyspark.sql.functions import year, quarter, to_date
    
    #Add new column year to df1
    df1 = df1.withColumn('year', year(to_date('date_column','dd/MM/yyyy')))
    
    #Add new column quarter to df2    
    df2 = df2.withColumn('quarter', quarter(to_date('transaction_date','dd/MM/yyyy')))
    

Task 4: Rename columns in both dataframes

Rename the column amount to transaction_amount in df1 and value to transaction_value in df2.

Hint: Use withColumnRenamed

In [12]:
#Rename df1 column amount to transaction_amount
    
    # 1. Renombrar 'amount' a 'transaction_amount' en df1
    df1 = df1.withColumnRenamed("amount", "transaction_amount")
    print("Nuevas columnas de df1:")
    print(df1.columns)
    df1.select("transaction_amount").show(5)
    
    #Rename df2 column value to transaction_value
    # 2. Renombrar 'value' a 'transaction_value' en df2
    df2 = df2.withColumnRenamed("value", "transaction_value")
    print("\nNuevas columnas de df2:")
    print(df2.columns)
    df2.select("transaction_value").show(5)
    
Nuevas columnas de df1:
    ['customer_id', 'date_column', 'transaction_amount', 'description', 'location', 'year']
    +------------------+
    |transaction_amount|
    +------------------+
    |              5000|
    |              1200|
    |               800|
    |              3000|
    |              6000|
    +------------------+
    only showing top 5 rows
    
    
    Nuevas columnas de df2:
    ['customer_id', 'transaction_date', 'transaction_value', 'notes', 'quarter']
    +-----------------+
    |transaction_value|
    +-----------------+
    |             1500|
    |             2000|
    |             1000|
    |             2500|
    |             1800|
    +-----------------+
    only showing top 5 rows
    
    
Click here for Solution
#Rename df1 column amount to transaction_amount
    df1 = df1.withColumnRenamed('amount', 'transaction_amount')
    
    #Rename df2 column value to transaction_value
    df2 = df2.withColumnRenamed('value', 'transaction_value')
    

Task 5: Drop unnecessary columns

Drop the columns description and location from df1 and notes from df2.

In [13]:
#Drop columns description and location from df1
    
    # 1. Eliminar 'description' y 'location' de df1
    # Puedes pasar múltiples nombres de columnas como argumentos separados
    df1 = df1.drop("description", "location")
    print("Columnas actuales en df1:", df1.columns)
    df1.printSchema()
    
    # Opcional: ver el esquema para confirmar que ya no existen
    
    #Drop column notes from df2
    # 2. Eliminar 'notes' de df2
    df2 = df2.drop("notes")
    print("Columnas actuales en df2:", df2.columns)
    df2.printSchema()
    
Columnas actuales en df1: ['customer_id', 'date_column', 'transaction_amount', 'year']
    root
     |-- customer_id: integer (nullable = true)
     |-- date_column: string (nullable = true)
     |-- transaction_amount: integer (nullable = true)
     |-- year: integer (nullable = false)
    
    Columnas actuales en df2: ['customer_id', 'transaction_date', 'transaction_value', 'quarter']
    root
     |-- customer_id: integer (nullable = true)
     |-- transaction_date: string (nullable = true)
     |-- transaction_value: integer (nullable = true)
     |-- quarter: integer (nullable = false)
    
    
Click here for Solution
#Drop columns description and location from df1
    df1 = df1.drop('description', 'location')
    
    #Drop column notes from df2
    df2 = df2.drop('notes')
    

Task 6: Join dataframes based on a common column

Join df1 and df2 based on the common column customer_id and create a new dataframe named joined_df.

In [14]:
#join df1 and df2 based on common column customer_id
    # 1. Unir df1 y df2 usando la columna 'customer_id'
    # Usamos un 'inner join' por defecto (solo mantiene registros que coinciden en ambos)
    joined_df = df1.join(df2, on="customer_id", how="inner")
    
    # 2. Verificar el resultado
    print(f"Número total de filas en el DataFrame unido: {joined_df.count()}")
    
    # 3. Mostrar las primeras filas para ver la combinación de columnas
    joined_df.show(5)
    
    # 4. Revisar el nuevo esquema para ver las columnas de ambos DataFrames juntas
    joined_df.printSchema()
    
Número total de filas en el DataFrame unido: 100
    +-----------+-----------+------------------+----+----------------+-----------------+-------+
    |customer_id|date_column|transaction_amount|year|transaction_date|transaction_value|quarter|
    +-----------+-----------+------------------+----+----------------+-----------------+-------+
    |          1|   1/1/2022|              5000|2022|        1/1/2022|             1500|      1|
    |          2|  15/2/2022|              1200|2022|       15/2/2022|             2000|      1|
    |          3|  20/3/2022|               800|2022|       20/3/2022|             1000|      1|
    |          4|  10/4/2022|              3000|2022|       10/4/2022|             2500|      1|
    |          5|   5/5/2022|              6000|2022|        5/5/2022|             1800|      1|
    +-----------+-----------+------------------+----+----------------+-----------------+-------+
    only showing top 5 rows
    
    root
     |-- customer_id: integer (nullable = true)
     |-- date_column: string (nullable = true)
     |-- transaction_amount: integer (nullable = true)
     |-- year: integer (nullable = false)
     |-- transaction_date: string (nullable = true)
     |-- transaction_value: integer (nullable = true)
     |-- quarter: integer (nullable = false)
    
    
Click here for Solution
#join df1 and df2 based on common column customer_id
    joined_df = df1.join(df2, 'customer_id', 'inner')
    

Task 7: Filter data based on a condition

Filter joined_df to include only transactions where "transaction_amount" is greater than 1000 and create a new dataframe named filtered_df.

In [15]:
# filter the dataframe for transaction amount > 1000
    # 1. Filtrar el DataFrame para montos superiores a 1000
    # Puedes usar la sintaxis de cadena o la referencia a la columna
    filtered_df = joined_df.filter(joined_df.transaction_amount > 1000)
    
    # 2. Verificar cuántos registros cumplen la condición
    print(f"Registros originales: {joined_df.count()}")
    print(f"Registros después del filtro (>1000): {filtered_df.count()}")
    
    # 3. Mostrar una vista previa de los resultados filtrados
    filtered_df.select("customer_id", "transaction_amount", "year", "quarter").show(5)
    
Registros originales: 100
    Registros después del filtro (>1000): 67
    +-----------+------------------+----+-------+
    |customer_id|transaction_amount|year|quarter|
    +-----------+------------------+----+-------+
    |          1|              5000|2022|      1|
    |          2|              1200|2022|      1|
    |          4|              3000|2022|      1|
    |          5|              6000|2022|      1|
    |          6|              4500|2022|      1|
    +-----------+------------------+----+-------+
    only showing top 5 rows
    
    
Click here for Solution
# filter the dataframe for transaction amount > 1000
    filtered_df = joined_df.filter("transaction_amount > 1000")
    

Task 8: Aggregate data by customer

Calculate the total transaction amount for each customer in filtered_df and display the result.

Hint: Use sum from pyspark.sql.functions

In [16]:
# group by customer_id and aggregate the sum of transaction amount
    
    from pyspark.sql.functions import sum
    
    # 1. Agrupar por 'customer_id' y calcular la suma de 'transaction_amount'
    # Usamos .alias() para que el nombre de la columna resultante sea claro
    agg_df = filtered_df.groupBy("customer_id") \
        .agg(sum("transaction_amount").alias("total_transaction_amount"))
    
    # 2. Mostrar el resultado final
    print("Total de transacciones por cliente (Monto > 1000):")
    agg_df.show()
    
    # Opcional: Ordenar por monto de mayor a menor para mejor análisis
    # agg_df.orderBy("total_transaction_amount", ascending=False).show()
    
    #display the result
    
Total de transacciones por cliente (Monto > 1000):
    
[Stage 37:================================>                      (36 + 14) / 61]
+-----------+------------------------+
    |customer_id|total_transaction_amount|
    +-----------+------------------------+
    |         31|                    3200|
    |         85|                    1800|
    |         78|                    1500|
    |         34|                    1200|
    |         81|                    5500|
    |         28|                    2600|
    |         76|                    2600|
    |         27|                    4200|
    |         91|                    3200|
    |         22|                    1200|
    |         93|                    5500|
    |          1|                    5000|
    |         52|                    2600|
    |         13|                    4800|
    |          6|                    4500|
    |         16|                    2600|
    |         40|                    2600|
    |         94|                    1200|
    |         57|                    5500|
    |         54|                    1500|
    +-----------+------------------------+
    only showing top 20 rows
    
    
                                                                                
    
Click here for Solution
from pyspark.sql.functions import sum
    
    # group by customer_id and aggregate the sum of transaction amount
    
    total_amount_per_customer = filtered_df.groupBy('customer_id').agg(sum('transaction_amount').alias('total_amount'))
    
    #display the result
    total_amount_per_customer.show()
    

Task 9: Write the result to a Hive table

Write total_amount_per_customer to a Hive table named customer_totals.

In [17]:
# Write total_amount_per_customer to a Hive table named customer_totals
    
    # 1. Asegurarse de que estamos usando el DataFrame de la agregación anterior
    # Nota: Si llamaste a tu DF 'agg_df', cámbialo aquí o usa 'total_amount_per_customer'
    total_amount_per_customer = agg_df 
    
    # 2. Escribir el resultado en una tabla de Hive
    # 'overwrite' asegura que si la tabla ya existe, se reemplace con los datos nuevos
    total_amount_per_customer.write \
        .mode("overwrite") \
        .saveAsTable("customer_totals")
    
    print("¡Éxito! Los datos se han guardado en la tabla de Hive: customer_totals")
    
                                                                                
    
¡Éxito! Los datos se han guardado en la tabla de Hive: customer_totals
    
Click here for Solution
# Write total_amount_per_customer to a Hive table named customer_totals
    total_amount_per_customer.write.mode("overwrite").saveAsTable("customer_totals")
    

Task 10: Write the filtered data to HDFS

Write filtered_df to HDFS in parquet format to a file named filtered_data.

In [18]:
#Write filtered_df to HDFS in parquet format file filtered_data.parquet
    # 1. Escribir filtered_df en formato Parquet en HDFS
    # El nombre del archivo/directorio será 'filtered_data'
    filtered_df.write \
        .mode("overwrite") \
        .parquet("filtered_data")
    
    print("¡Proyecto completado! El archivo 'filtered_data' ha sido guardado en HDFS en formato Parquet.")
    
¡Proyecto completado! El archivo 'filtered_data' ha sido guardado en HDFS en formato Parquet.
    
Click here for Solution
#Write filtered_df to HDFS in parquet format file filtered_data
    
    filtered_df.write.mode("overwrite").parquet("filtered_data.parquet")
    

Task 11: Add a new column based on a condition

Add a new column named high_value to df1 indicating whether the transaction_amount is greater than 5000. When the value is greater than 5000, the value of the column should be Yes. When the value is less than or equal to 5000, the value of the column should be No.

*Hint: Use when and lit from pyspark.sql.functions

In [20]:
# Add new column with value indicating whether transaction amount is > 5000 or not
    from pyspark.sql.functions import when, col
    
    # 1. Añadir la columna 'high_value' con la lógica condicional
    # Si transaction_amount > 5000 -> 'Yes', de lo contrario -> 'No'
    df1 = df1.withColumn("high_value", 
                         when(col("transaction_amount") > 5000, "Yes")
                         .otherwise("No"))
    
    # 2. Verificar los resultados
    print("Resultados de la clasificación de transacciones (High Value):")
    df1.select("transaction_amount", "high_value").show(10)
    
    # 3. (Opcional) Contar cuántas transacciones son de alto valor
    df1.groupBy("high_value").count().show()
    
Resultados de la clasificación de transacciones (High Value):
    +------------------+----------+
    |transaction_amount|high_value|
    +------------------+----------+
    |              5000|        No|
    |              1200|        No|
    |               800|        No|
    |              3000|        No|
    |              6000|       Yes|
    |              4500|        No|
    |               200|        No|
    |              3500|        No|
    |               700|        No|
    |              1800|        No|
    +------------------+----------+
    only showing top 10 rows
    
    
                                                                                
    
+----------+-----+
    |high_value|count|
    +----------+-----+
    |        No|   92|
    |       Yes|    8|
    +----------+-----+
    
    
                                                                                
    
Click here for Solution
from pyspark.sql.functions import when, lit
    
    # Add new column with value indicating whether transaction amount is > 5000 or not
    df1 = df1.withColumn("high_value", when(df1.transaction_amount > 5000, lit("Yes")).otherwise(lit("No")))
    

Task 12: Calculate the average transaction value per quarter

Calculate and display the average transaction value for each quarter in df2 and create a new dataframe named average_value_per_quarter with column avg_trans_val.

Hint: Use avg from pyspark.sql.functions

In [21]:
#calculate the average transaction value for each quarter in df2
    
    from pyspark.sql.functions import avg
    
    # 1. Agrupar por la columna 'quarter' que creamos en la Task 3
    # 2. Calcular el promedio de 'transaction_value' (renombrado en la Task 4)
    average_value_per_quarter = df2.groupBy("quarter") \
        .agg(avg("transaction_value").alias("avg_trans_val"))
    
    # 3. Mostrar el resultado
    print("Promedio de valor de transacción por trimestre:")
    average_value_per_quarter.show()
    
    #show the average transaction value for each quarter in df2    
    
Promedio de valor de transacción por trimestre:
    
                                                                                
    
+-------+-------------+
    |quarter|avg_trans_val|
    +-------+-------------+
    |      1|       1234.0|
    +-------+-------------+
    
    
                                                                                
    
Click here for Solution
from pyspark.sql.functions import avg
    
    #calculate the average transaction value for each quarter in df2
    average_value_per_quarter = df2.groupBy('quarter').agg(avg("transaction_value").alias("avg_trans_val"))
    
    
    #show the average transaction value for each quarter in df2    
    average_value_per_quarter.show()
    

Task 13: Write the result to a Hive table

Write average_value_per_quarter to a Hive table named quarterly_averages.

In [22]:
#Write average_value_per_quarter to a Hive table named quarterly_averages
    
    # 1. Asegurarse de usar el DataFrame creado en la tarea anterior
    # 2. Escribir el resultado en la tabla de Hive 'quarterly_averages'
    average_value_per_quarter.write \
        .mode("overwrite") \
        .saveAsTable("quarterly_averages")
    
    print("¡Éxito! Los promedios trimestrales se han guardado en la tabla de Hive: quarterly_averages")
    
                                                                                
    
¡Éxito! Los promedios trimestrales se han guardado en la tabla de Hive: quarterly_averages
    
Click here for Solution
#Write average_value_per_quarter to a Hive table named quarterly_averages
    
    average_value_per_quarter.write.mode("overwrite").saveAsTable("quarterly_averages")
    

Task 14: Calculate the total transaction value per year

Calculate and display the total transaction value for each year in df1 and create a new dataframe named total_value_per_year with column total_transaction_val.

In [23]:
# calculate the total transaction value for each year in df1.
    
    from pyspark.sql.functions import sum
    
    # 1. Agrupar df1 por la columna 'year'
    # 2. Sumar 'transaction_amount' (renombrado en la Task 4)
    total_value_per_year = df1.groupBy("year") \
        .agg(sum("transaction_amount").alias("total_transaction_val"))
    
    # 3. Mostrar el resultado
    print("Valor total de transacciones por año:")
    total_value_per_year.show()
    
    # show the total transaction value for each year in df1.
    
Valor total de transacciones por año:
    
                                                                                
    
+----+---------------------+
    |year|total_transaction_val|
    +----+---------------------+
    |2022|               221600|
    +----+---------------------+
    
    
Click here for Solution
# calculate the total transaction value for each year in df1.
    total_value_per_year = df1.groupBy('year').agg(sum("transaction_amount").alias("total_transaction_val"))
    
    # show the total transaction value for each year in df1.
    total_value_per_year.show()
    

Task 15: Write the result to HDFS

Write total_value_per_year to HDFS in the CSV format to file named total_value_per_year.

In [24]:
#Write total_value_per_year to HDFS in the CSV format
    
    # 1. Escribir el DataFrame total_value_per_year en HDFS
    # Usamos header=True para que el archivo CSV incluya los nombres de las columnas
    total_value_per_year.write \
        .mode("overwrite") \
        .option("header", "true") \
        .csv("total_value_per_year")
    
    print("¡Tarea final completada! El archivo CSV 'total_value_per_year' ha sido guardado en HDFS.")
    
                                                                                
    
¡Tarea final completada! El archivo CSV 'total_value_per_year' ha sido guardado en HDFS.
    
Click here for Solution
#Write total_value_per_year to HDFS in the CSV format
    
    total_value_per_year.write.mode("overwrite").csv("total_value_per_year.csv")
    

Congratulations! You have completed the lab.

This Final Project provides hands-on experience with data transformation and integration using PySpark. You've performed various tasks, including adding columns, renaming columns, dropping unnecessary columns, joining dataframes, and writing the results into both a Hive warehouse and an HDFS file system.

Authors

Raghul Ramesh

Lavanya T S

© IBM Corporation. All rights reserved.