Aprendizaje automático con Apache Spark ML

Este laboratorio introduce el aprendizaje automático usando Spark ML Lib (sparkml).

Objetivos

La biblioteca Spark ML también se conoce comúnmente como MLlib y se usa para realizar operaciones de aprendizaje automático con APIs basadas en DataFrames.

Después de completar este laboratorio podrás:

  • Importar las bibliotecas de Spark ML y estadística
  • Realizar operaciones básicas de estadística con Spark
  • Construir un modelo simple de regresión lineal usando Spark ML
  • Entrenar el modelo y realizar la evaluación

Configuración

Para este laboratorio vamos a usar Python y Spark (pyspark). Estas bibliotecas deben estar instaladas en tu entorno de laboratorio o en SN Labs.

In [1]:
# When you are executing on SN labs please uncomment the below lines and then run all cells.Next again Restart the kernel and run all cells.
    !pip3 install pyspark==3.1.2
    !pip install findspark
    
Collecting pyspark==3.1.2
      Downloading pyspark-3.1.2.tar.gz (212.4 MB)
         ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 212.4/212.4 MB 2.5 MB/s eta 0:00:0000:0100:01
      Preparing metadata (setup.py) ... done
    Collecting py4j==0.10.9 (from pyspark==3.1.2)
      Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
         ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 198.6/198.6 kB 24.6 MB/s eta 0:00:00
    Building wheels for collected packages: pyspark
      Building wheel for pyspark (setup.py) ... done
      Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880756 sha256=612b6f2a464eab64b5e5e550020214337675a2966f0eeef0bf6c80ecd54354d4
      Stored in directory: /home/jupyterlab/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
    Successfully built pyspark
    Installing collected packages: py4j, pyspark
    Successfully installed py4j-0.10.9 pyspark-3.1.2
    Collecting findspark
      Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
    Installing collected packages: findspark
    Successfully installed findspark-2.0.1
    
In [6]:
import findspark
    findspark.init()
    
In [7]:
# Pandas is a popular data science package for Python. In this lab, we use Pandas to load a CSV file from disc to a pandas dataframe in memory.
    import pandas as pd
    import matplotlib.pyplot as plt
    # pyspark is the Spark API for Python. In this lab, we use pyspark to initialize the spark context. 
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
    

Ejercicio 1 - Sesión de Spark

En este ejercicio crearás e inicializarás la sesión de Spark necesaria para cargar los dataframes y operar con ellos.

Tarea 1: Creación de la sesión y el contexto de Spark

In [ ]:
# Creating a spark context class
    sc = SparkContext()
    
    # Creating a spark session
    spark = SparkSession \
        .builder \
        .appName("Python Spark DataFrames basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    

Tarea 2: Inicializar la sesión de Spark

Para trabajar con dataframes solo necesitamos verificar que se haya creado la instancia de sesión de Spark. Puedes hacer clic en el botón "Spark UI" para explorar los elementos de la interfaz de Spark.

In [9]:
spark
    
Out[9]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v2.4.3
Master
local[*]
AppName
pyspark-shell

Tarea 2: Importar bibliotecas de Spark ML

En este ejercicio importaremos 4 funciones de SparkML

  1. (Biblioteca de características) VectorAssembler(): Esta función se usa para crear vectores de características a partir de dataframes/datos en bruto. Estos vectores son necesarios para entrenar un modelo de ML o realizar operaciones estadísticas.
  2. (Biblioteca estadística) Correlation(): Esta función pertenece a la biblioteca de estadística de SparkML. Se usa para calcular la correlación entre vectores de características.
  3. (Biblioteca de características) Normalized(): Esta función se usa para normalizar características. Normalizar características mejora la convergencia y los resultados del entrenamiento del modelo ML.
  4. (Biblioteca de regresión) LinearRegression(): Esta función se usa para crear y entrenar un modelo de regresión lineal.
In [11]:
from pyspark.ml.feature import VectorAssembler, Normalizer, StandardScaler
    from pyspark.ml.stat import Correlation
    from pyspark.ml.regression import LinearRegression
    

Ejercicio 2 - Carga de datos y creación de vectores de características

En esta sección, primero leerás el archivo CSV en un dataframe de pandas y luego en un dataframe de Spark.

Pandas es una biblioteca usada para manipulación y análisis de datos. Ofrece estructuras y operaciones para crear y manipular objetos Series y DataFrame. Los datos pueden importarse desde varias fuentes, por ejemplo arreglos de NumPy, diccionarios de Python y archivos CSV. Pandas permite manipular, organizar y visualizar los datos.

En este ejemplo usamos un conjunto de datos que contiene información sobre automóviles.

Tarea 1: Cargar datos en un DataFrame de pandas

In [12]:
# Read the file using `read_csv` function in pandas
    cars = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/cars.csv')
    
In [13]:
# Preview a few records
    cars.head()
    
Out[13]:
mpg cylinders displacement horsepower weight acceleration model origin car_name
0 18.0 8 307.0 130.0 3504.0 12.0 70 1 chevrolet chevelle malibu
1 15.0 8 350.0 165.0 3693.0 11.5 70 1 buick skylark 320
2 18.0 8 318.0 150.0 3436.0 11.0 70 1 plymouth satellite
3 16.0 8 304.0 150.0 3433.0 12.0 70 1 amc rebel sst
4 17.0 8 302.0 140.0 3449.0 10.5 70 1 ford torino

Para este ejemplo, preprocesamos los datos y usamos solo 3 columnas. Este conjunto de datos preprocesado se encuentra en el archivo cars2.csv.

In [14]:
cars2 = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/cars2.csv', header=None, names=["mpg", "hp", "weight"])
    cars2.head()
    
Out[14]:
mpg hp weight
0 18.0 130.0 3504.0
1 15.0 165.0 3693.0
2 18.0 150.0 3436.0
3 16.0 150.0 3433.0
4 17.0 140.0 3449.0

Tarea 2: Cargar datos en un DataFrame de Spark

In [15]:
# We use the `createDataFrame` function to load the data into a spark dataframe
    sdf = spark.createDataFrame(cars2)
    
In [16]:
# Let us look at the schema of the loaded spark dataframe
    sdf.printSchema()
    
root
     |-- mpg: double (nullable = true)
     |-- hp: double (nullable = true)
     |-- weight: double (nullable = true)
    
    

Tarea 3: Convertir columnas del dataframe en vectores de características

En esta tarea usamos la función VectorAssembler() para convertir las columnas del dataframe en vectores de características. Para nuestro ejemplo, usamos la potencia ("hp") y el peso del automóvil como características de entrada, y las millas por galón ("mpg") como etiqueta objetivo.

In [17]:
assembler = VectorAssembler(
        inputCols=["hp", "weight"],
        outputCol="features")
    
    output = assembler.transform(sdf).select('features','mpg')
    

Ahora creamos una división entrenamiento-prueba de 75%-25%.

In [18]:
train, test = output.randomSplit([0.75, 0.25])
    

Ejercicio 3 - Estadísticas básicas e ingeniería de características

En este ejercicio determinamos la correlación entre los vectores de características y normalizamos las características.

Tarea 1: Correlación

Spark ML tiene una función de correlación incorporada como parte de la biblioteca Stat. Usamos esta función para determinar los distintos tipos de correlación entre las 2 características: "hp" y "weight".

In [19]:
r1 = Correlation.corr(train, "features").head()
    print("Pearson correlation matrix:\n" + str(r1[0]))
    
[Stage 3:>                                                          (0 + 8) / 8]26/02/02 17:54:01 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
    26/02/02 17:54:01 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                    
    
Pearson correlation matrix:
    DenseMatrix([[1.        , 0.88319985],
                 [0.88319985, 1.        ]])
    
In [20]:
r2 = Correlation.corr(train, "features", "spearman").head()
    print("Spearman correlation matrix:\n" + str(r2[0]))
    
Spearman correlation matrix:
    DenseMatrix([[1.        , 0.88485636],
                 [0.88485636, 1.        ]])
    

Podemos ver que existe una correlación de 0.86 (o 86%) entre las características. Esto es lógico, ya que un automóvil con mayor potencia probablemente tiene un motor más grande y por eso pesa más. También podemos visualizar los vectores de características para comprobar que están correlacionados.

In [21]:
plt.figure()
    plt.scatter(cars2["hp"], cars2["weight"])
    plt.xlabel("horsepower")
    plt.ylabel("weight")
    plt.title("Correlation between Horsepower and Weight")
    plt.show()
    
No description has been provided for this image

Tarea 2: Normalización

Para lograr un mejor entrenamiento y convergencia del modelo, es una buena práctica normalizar los vectores de características.

In [22]:
normalizer = Normalizer(inputCol="features", outputCol="features_normalized", p=1.0)
    train_norm = normalizer.transform(train)
    print("Normalized using L^1 norm")
    train_norm.show(5, truncate=False)
    
Normalized using L^1 norm
    +-------------+----+-----------------------------------------+
    |features     |mpg |features_normalized                      |
    +-------------+----+-----------------------------------------+
    |[85.0,2587.0]|21.0|[0.03181137724550898,0.968188622754491]  |
    |[86.0,2220.0]|23.0|[0.0372940156114484,0.9627059843885516]  |
    |[87.0,2672.0]|25.0|[0.031533164189923885,0.9684668358100761]|
    |[88.0,2130.0]|27.0|[0.03967538322813345,0.9603246167718665] |
    |[88.0,2130.0]|27.0|[0.03967538322813345,0.9603246167718665] |
    +-------------+----+-----------------------------------------+
    only showing top 5 rows
    
    
26/02/02 17:57:12 WARN http.HttpParser: Illegal character 0x16 in state=START for buffer HeapByteBuffer@42f12c45[p=1,l=1542,c=8192,r=1541]={\x16<<<\x03\x01\x06\x01\x01\x00\x05\xFd\x03\x03\xAe%\xEf\xBd\xAd\x1d\xAe...\x8dXu<\x1e\xDa\x1f^K\x87\x98=\xEclL>>>\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00...\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00}
    26/02/02 17:57:12 WARN http.HttpParser: bad HTTP parsed: 400 Illegal character 0x16 for HttpChannelOverHttp@63925e55{r=0,c=false,a=IDLE,uri=null}
    

Tarea 2: Escalado estándar

Esta es una práctica estándar para escalar las características, de modo que todas las columnas tengan media cero y varianza unitaria.

In [23]:
standard_scaler = StandardScaler(inputCol="features", outputCol="features_scaled")
    train_model = standard_scaler.fit(train)
    train_scaled = train_model.transform(train)
    train_scaled.show(5, truncate=False)
    
                                                                                
    
+-------------+----+--------------------------------------+
    |features     |mpg |features_scaled                       |
    +-------------+----+--------------------------------------+
    |[85.0,2587.0]|21.0|[2.3060027515401957,2.960276789944799]|
    |[86.0,2220.0]|23.0|[2.333132195675963,2.540322564235583] |
    |[87.0,2672.0]|25.0|[2.36026163981173,3.0575413926294943] |
    |[88.0,2130.0]|27.0|[2.387391083947497,2.4373365143341403]|
    |[88.0,2130.0]|27.0|[2.387391083947497,2.4373365143341403]|
    +-------------+----+--------------------------------------+
    only showing top 5 rows
    
    
In [24]:
test_scaled = train_model.transform(test)
    test_scaled.show(5, truncate=False)
    
+--------------+----+---------------------------------------+
    |features      |mpg |features_scaled                        |
    +--------------+----+---------------------------------------+
    |[46.0,1835.0] |26.0|[1.2479544302452825,2.0997711285460787]|
    |[72.0,2408.0] |22.0|[1.9533199777752248,2.755448979585263] |
    |[95.0,2372.0] |24.0|[2.5772971928978663,2.714254559624686] |
    |[100.0,3282.0]|19.0|[2.712944413576701,3.7555579530726053] |
    |[105.0,3439.0]|16.0|[2.848591634255536,3.935211395678455]  |
    +--------------+----+---------------------------------------+
    only showing top 5 rows
    
    

Ejercicio 4 - Construcción y entrenamiento de un modelo de regresión lineal

En este ejercicio entrenamos un modelo de regresión lineal lrModel con nuestro conjunto de entrenamiento. Entrenamos el modelo con la versión de características escaladas de forma estándar. También imprimimos las métricas finales de RMSE y R-cuadrado.

Tarea 1: Crear y entrenar el modelo

Podemos crear el modelo usando la clase LinearRegression() y entrenarlo con la función fit().

In [25]:
# Create a LR model
    lr = LinearRegression(featuresCol='features_scaled', labelCol='mpg', maxIter=100)
    
    # Fit the model
    lrModel = lr.fit(train_scaled)
    
    # Print the coefficients and intercept for linear regression
    print("Coefficients: %s" % str(lrModel.coefficients))
    print("Intercept: %s" % str(lrModel.intercept))
    
    # Summarize the model over the training set and print out some metrics
    trainingSummary = lrModel.summary
    #trainingSummary.residuals.show()
    print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
    print("R-squared: %f" % trainingSummary.r2)
    
26/02/02 18:56:14 WARN util.Instrumentation: [d4ceeea2] regParam is zero, which might cause numerical instability and overfitting.
    26/02/02 18:56:15 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
    26/02/02 18:56:15 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                    
    
Coefficients: [-2.2872198197811335,-4.518913388143613]
    Intercept: 45.30044814874303
    RMSE: 4.269107
    R-squared: 0.705989
    

Observamos un RMSE (raíz del error cuadrático medio) de 4.26. Esto significa que nuestro modelo predice el mpg con un error promedio de 4.2 unidades.

Tarea 2: Predecir sobre datos nuevos

Una vez entrenado un modelo, podemos usar transform() sobre datos nuevos no vistos (por ejemplo, los datos de prueba) para generar predicciones. En la celda de abajo, observa la columna "prediction", que contiene el "mpg" predicho.

In [26]:
lrModel.transform(test_scaled).show(5)
    
+--------------+----+--------------------+------------------+
    |      features| mpg|     features_scaled|        prediction|
    +--------------+----+--------------------+------------------+
    | [46.0,1835.0]|26.0|[1.24795443024528...| 32.95741817687805|
    | [72.0,2408.0]|22.0|[1.95331997777522...| 28.38114069700659|
    | [95.0,2372.0]|24.0|[2.57729719289786...| 27.14012165936291|
    |[100.0,3282.0]|19.0|[2.71294441357670...|22.124306801956877|
    |[105.0,3439.0]|16.0|[2.84859163425553...|21.002213443304356|
    +--------------+----+--------------------+------------------+
    only showing top 5 rows
    
    

Pregunta 1 - Correlación

Imprime la matriz de correlación para la partición del conjunto de prueba creada arriba.

In [29]:
# Code block for learners to answer
    r1 = Correlation.corr(test, "features").head()
    print("Pearson correlation matrix:\n" + str(r1[0]))
    
Pearson correlation matrix:
    DenseMatrix([[1.        , 0.83683935],
                 [0.83683935, 1.        ]])
    

Haz doble clic aquí para ver la solución.

Pregunta 2 - Normalización de características

Normaliza las características de entrenamiento usando la norma L2 del vector de características.

In [31]:
# Code block for learners to answer
    normalizer_l2 = Normalizer(inputCol="features", outputCol="features_normalized", p=2.0)
    train_norm_l2 = normalizer_l2.transform(train)
    print("Normalized using L^1 norm\n"+str(train_norm_l2))
    train_norm_l2.show(5, truncate=False)
    
Normalized using L^1 norm
    DataFrame[features: vector, mpg: double, features_normalized: vector]
    +-------------+----+-----------------------------------------+
    |features     |mpg |features_normalized                      |
    +-------------+----+-----------------------------------------+
    |[85.0,2587.0]|21.0|[0.032838869734902014,0.9994606588728414]|
    |[86.0,2220.0]|23.0|[0.03870970399718558,0.9992504985319999] |
    |[87.0,2672.0]|25.0|[0.03254263484012658,0.9994703481933128] |
    |[88.0,2130.0]|27.0|[0.04127933931642943,0.9991476448181215] |
    |[88.0,2130.0]|27.0|[0.04127933931642943,0.9991476448181215] |
    +-------------+----+-----------------------------------------+
    only showing top 5 rows
    
    

Haz doble clic aquí para ver la solución.

Pregunta 3 - Entrenar modelo

Repite el entrenamiento del modelo mostrado arriba durante otras 100 iteraciones y reporta los coeficientes.

In [ ]:
# Code block for Question 3
    

Autores

Otros colaboradores

Copyright © 2021 IBM Corporation. Todos los derechos reservados.