Chapter 5 R y Spark
5.1 Introducción
En este capítulo emplearemos Spark junto con R para el procesamiento de datos. Spark es un framework que permite realizar procesos de división y paralelismo en un equipo o múltiples equipos.
5.1.1 Hadoop
Google (2004) publicó un nuevo documento que describe cómo realizar operaciones en todo el Sistema de archivos de Google, un enfoque que se conoció como MapReduce. Como era de esperar, hay dos operaciones en MapReduce: map y reduce. La operación de map proporciona una forma arbitraria de transformar cada archivo en un nuevo archivo, mientras que la operación de reduce combina dos archivos. Ambas operaciones requieren un código de computadora personalizado, pero el marco MapReduce se encarga de ejecutarlas automáticamente en muchas computadoras a la vez. Estas dos operaciones son suficientes para procesar todos los datos disponibles en la web, al tiempo que proporcionan suficiente flexibilidad para extraer información significativa de la misma.
5.1.2 Spark
En 2009, Apache Spark comenzó como un proyecto de investigación en AMPLab de UC Berkeley para mejorar MapReduce. Específicamente, Spark proporcionó un conjunto más rico de verbos más allá de MapReduce para facilitar la optimización del código que se ejecuta en múltiples máquinas. Spark también cargó datos en la memoria, lo que hace que las operaciones sean mucho más rápidas que el almacenamiento en disco de Hadoop

Spark
Componetes:
- Spark Core: Base donde se apoya el resto de los componentes
- Spark SQL: Procesamiento de dato estructurados y no estructurados
- Spark streaming: Procesamiento de datos en tiempo real.
- Spark MLLib: Machine learning
- Spark graph: Procesamiento de grafos
Algunos conceptos importantes
HDFS: Sistema de ficheros
5.1.3 Sparklyr
Oficialmente, sparklyr es una interfaz R para Apache Spark. Está disponible en CRAN y funciona como cualquier otro paquete de CRAN, lo que significa que es independiente de las versiones de Spark, es fácil de instalar, sirve a la comunidad R, abarca otros paquetes y prácticas de la comunidad R, y así sucesivamente. Está alojado en GitHub y tiene licencia de Apache 2.0, que le permite clonar, modificar y contribuir de nuevo a este proyecto.
5.2 Librería dplyr
#install.packages("dplyr")
library(dplyr)
load(url("https://github.com/AlvaroLimber/EST-384/raw/master/data/eh19.RData"))
## Obtener el promedio de edad por sexo de los jefes de hogar
<-eh19p[eh19p$s02a_05=="1.JEFE O JEFA DEL HOGAR",c("s02a_03","s02a_02")]
aux<-subset(eh19p,s02a_05=="1.JEFE O JEFA DEL HOGAR",c("s02a_03","s02a_02"))
auxtapply(aux$s02a_03, aux$s02a_02, mean)
#dplyr
# %>% operador pipe
<-eh19p %>% filter(s02a_05=="1.JEFE O JEFA DEL HOGAR") %>% group_by(s02a_02) %>%
t1summarise(mean(s02a_03))
<-eh19p %>% rename(sexo=s02a_02,edad=s02a_03,paren=s02a_05) %>% filter(paren=="1.JEFE O JEFA DEL HOGAR") %>% group_by(sexo) %>%
t2summarise(edad=mean(edad))
## Obtener el promedio de edad y años de educación por sexo de los jefes de hogar
<-eh19p %>% rename(sexo=s02a_02,edad=s02a_03,paren=s02a_05) %>% filter(paren=="1.JEFE O JEFA DEL HOGAR") %>% group_by(sexo) %>%
t3summarise(edad=mean(edad),aestudio=mean(aestudio,na.rm = T),med_aestudio=median(aestudio,na.rm = T),n=n())
%>% rename(sexo=s02a_02,edad=s02a_03,paren=s02a_05) %>% filter(paren=="1.JEFE O JEFA DEL HOGAR") %>% group_by(depto,area,sexo) %>%
eh19p summarise(edad=mean(edad),aestudio=mean(aestudio,na.rm = T),med_aestudio=median(aestudio,na.rm = T),n=n())
# ctr+mayus+m %>%
<-eh19p %>% mutate(jefe=(s02a_05=="1.JEFE O JEFA DEL HOGAR"))
eh19p
%>% mutate(jefe=(s02a_05=="1.JEFE O JEFA DEL HOGAR")) %>% group_by(jefe,s02a_02) %>% summarise(mean(ylab,na.rm = T))
eh19p
<-eh19p %>% rename(sexo=s02a_02)
eh19p
names(eh19p)[6]<-"edad"
# personas entre 15 y 30 años, por departamento y sexo, el promedio de horas a la semana de trabajo, años de educación y el mínimo y máximo de ingresos laborales y su rango
%>% filter(edad>=15 & edad<=30) %>% group_by(depto,sexo) %>% summarise(horas=mean(tothrs,na.rm = T),edu=mean(aestudio,na.rm = T),min_ylab=min(ylab,na.rm = T),max_ylab=max(ylab,na.rm = T)) %>% mutate(rango=max_ylab-min_ylab)
eh19p
%>% slice_sample(n=20) %>% select(edad) %>% mutate(gedad=cut(edad,c(0,14,25,60,100),include.lowest =T))
eh19p
%>% mutate(gedad=cut(edad,c(0,5,14,25,60,100),include.lowest =T)) %>% group_by(gedad) %>% summarise(mean(aestudio,na.rm=T))
eh19p ## Por departamento, área y sexo de los jefes del hogar, mostrar el porcentaje de hogares con piso de tierra.
<-eh19p %>% filter(jefe==T) %>% select(folio,sexo,area,depto)
aux
<-attributes(eh19v)
aa$variable.labels
aaunique(eh19v$s01a_09)
<-eh19v %>% mutate(tierra=s01a_09=="1.TIERRA") %>% select(folio,tierra)
aux2head(aux2)
<-merge(aux,aux2)## 1:1 (), n:1 , 1:n , n:n (X 30) (Y 20) XY 600
bd
#x
#Nombre materia nota ci
#Juan m1 45 123
#Juan m2 57 123
#Juan m3 57 123
#
#y
#Nombre ci plan_estudio
#Juan 123 2012
#Maria 124 2007
#merge(x,y,by=c("ci","Nombre"),all=T)
#xy
#Nombre materia nota ci plan_estudio
#Juan m1 45 123 2012
#Juan m2 57 123 2012
#Juan m3 57 123 2012
#Maria 124 2007
%>% group_by(depto,area,sexo) %>% summarise(Piso_Tierra=mean(tierra)*100)
bd #se recomienda explorar el comando merge, y la variable folio es la llave para la unión de las bases de datos
# Ejercicio. Obtener una tabla por departamento y área con la siguiente información
# * % de hogares pobres extremos
# * % de hogares con jefes del hogar mujeres
# * Promedio de personas en el hogar
load(url("https://github.com/AlvaroLimber/EST-384/raw/master/data/eh19.RData"))
<-eh19p %>% filter(s02a_02=="2.Mujer") %>% mutate(pobreza=pext0=="Pobre extremo",mujer=s02a_05=="JEFE O JEFA DEL HOGAR" ) %>% group_by(depto,area) %>% summarise(pobre=mean(pobreza)*100,mujer1=mean(mujer))
bd
bd
%>% mutate(pobreza=(pext0=="Pobre extremo")*1,jefa=(s02a_05=="1.JEFE O JEFA DEL HOGAR" & s02a_02=="2.Mujer")*1, nper=1) %>% group_by(depto,area,folio) %>% summarise(nper=sum(nper),jefa=max(jefa),pobreza=max(pobreza)) %>% group_by(depto,area) %>% summarise(pobreza=mean(pobreza,na.rm=T)*100,jefa=mean(jefa)*100,nper=mean(nper))
eh19p
<-eh19p %>% mutate(pobreza=(pext0=="Pobre extremo")*1,jefa=(s02a_05=="1.JEFE O JEFA DEL HOGAR" & s02a_02=="2.Mujer")*1, nper=1) %>% group_by(depto,area,folio) %>% summarise(nper=sum(nper),jefa=max(jefa),pobreza=max(pobreza))
t1
<-t1 %>% group_by(depto,area) %>% summarise(pobreza=mean(pobreza,na.rm=T)*100,jefa=mean(jefa)*100,nper=mean(nper))
t2
t2
::xtable(t2)
xtable::kable(t2) knitr
5.3 Arrancando Spark
5.3.1 Prerequisitos
- R
- RStudio
- Java 8
system("java -version")
5.3.2 Instalación
#instalar la librería sparlyr
install.packages("sparklyr")
#Habilitando la librería
library(sparklyr)
#versiones disponibles
spark_available_versions()
#versiones instaladas
spark_installed_versions()
#instalando spark desde R
spark_install("3.0")
#spark_install(version="2.3")
5.3.3 Soluciones a problemas de instalación
install.packages("rJava")
library(rJava)
5.3.4 Creando una sesión en Spark
#inicia la sesión
<- spark_connect(master = "local")
sc
#sc <- spark_connect(master = "local", spark_home = "C:\\Users\\ALVARO\\AppData\\Local\\spark",version="3.0.1")
5.3.5 Interface web
spark_web(sc)
Jobs
Stages
Storage
Environment
Executors
SQL
5.3.6 Configuración
<- spark_config()
conf $`sparklyr.shell.driver-memory` <- "2G"
conf#conf$spark.memory.fraction <- 0.8
#spark_home="C:\\Users\\ALVARO\\AppData\\Local\\spark\\spark-2.4.3-bin-hadoop2.7"
#conf$sparklyr.gateway.port <- 9090
#conección
#apagar conexión
spark_disconnect(sc)
spark_disconnect_all()
5.4 Análisis con Spark

Pasos en el análisis de datos
5.4.1 Importando los datos
Por lo general, importar significa que R leerá los archivos y los cargará en la memoria; cuando se usa Spark, los datos se importan a Spark, no a R.
Colección de documentos
#csv/importaciones
<-spark_read_csv(sc,
sp_importacionesname="importaciones",
path = "C:\\Users\\ALVARO\\Desktop\\db_bolivia\\importaciones\\importaciones_csv")
###########
#esquema
###########
<- read.csv("C:\\Users\\ALVARO\\Desktop\\db_bolivia\\importaciones\\importaciones_csv\\1993.csv",sep=",", nrows = 5)
top_rows <- sapply(top_rows, class)
spec_with_r 6]<-"factor"
spec_with_r[
<-spark_read_csv(sc,
sp_importaciones2name="importaciones2",
path = "C:\\Users\\ALVARO\\Desktop\\db_bolivia\\importaciones\\importaciones_csv",
columns = spec_with_r)
# memoria
object.size(sp_importaciones)
Comando copy_to
<-data.frame(matrix(rnorm(10^7),ncol=20))
m1object.size(m1)/(10^6)
<-copy_to(sc,m1)
sp_m1rm(m1)
load("C:\\Users\\ALVARO\\Documents\\GitHub\\EST-383\\data\\oct20.RData")
object.size(computo)/10^6
<-copy_to(sc,computo)
sp_computo<-copy_to(sc,eh19p)
sp_eh19prm(computo)
Documento específico
<-spark_read_csv(sc,
sp_covidname="covid",
path = "C:\\Users\\ALVARO\\Desktop\\db_bolivia\\bigdata\\200614COVID19MEXICO.csv")
<-spark_read_csv(sc,
sp_covid2name="covid2",
path = "C:\\Users\\ALVARO\\Desktop\\db_bolivia\\bigdata\\201115COVID19MEXICO.csv")
5.4.2 Librería DPLYR (Gramática de manipulación de datos)
Eliminar un objeto en spark
library(dplyr)
db_drop_table(sc,"importaciones2")
db_drop_table(sc,"m1")
DPLYR en Spark
%>% group_by(GESTION) %>% count()
sp_importaciones %>% count()
sp_covid2
%>% group_by(depto) %>% count() # R
eh19v %>% group_by(depto) %>% count() # Spark sp_eh19v
library(dplyr)
load(url("https://github.com/AlvaroLimber/EST-383/raw/master/data/oct20.RData"))
names(computo)[1]<-"pais"
##############################
### Columnas -> variables
##############################
#cambio de nombre de las variables
<-computo %>% rename(mas=`MAS - IPSP`,cc=CC)
computo<-computo %>% rename(idep=`Número departamento`,imun=5)
computonames(computo)
# Selección de variables
<-computo %>% select(pais,cc,mas)
bd1head(bd1)
# Crear nuevas variables
<-bd1 %>% mutate(a=1,s_mas_cc=cc+mas,raiz_mas=sqrt(mas),lcc=log(cc))
bd1<-bd1 %>% mutate(mas100=mas>100,mean_mas=mas>mean(mas))
bd1# ordenando las variables
<-bd1 %>% relocate(lcc:mas,before=a)#pendiente
bd1c("cc","pais","a")]
bd1[,##############################
### Filas-> Observaciones
##############################
#filtrado
# elección presidencial solo Bolivia
<-computo %>% filter(Elección=="Presidente y Vicepresidente")
pv<-computo %>% filter(Elección=="Presidente y Vicepresidente" & pais=="Bolivia")
pvbol##############################
### Resumen estadístico
##############################
# summarise
%>% summarise(media_mas=mean(mas),media_cc=mean(cc),max_mas=max(mas),max_cc=max(cc))
pvbol %>% summarise(p100mas=mean(mas>100)*100,p100cc=mean(cc>100)*100)
pvbol # group_by
%>% group_by(Departamento) %>% summarise(p100mas=mean(mas>100)*100,p100cc=mean(cc>100)*100)
pvbol
%>% group_by(Departamento,Provincia) %>% summarise(p100mas=mean(mas>100)*100,p100cc=mean(cc>100)*100)
pvbol #count tally
%>% group_by(Departamento,Provincia) %>% count()
pvbol %>% group_by(Departamento,Provincia) %>% tally()
pvbol # encadenamiento masivo
%>% filter(Departamento=="Beni") %>% select(mas,cc) %>% plot()
pvbol %>% filter(Departamento=="Beni") %>% select(mas,cc) %>% cor() pvbol
5.4.3 DPLYR en Spark
# 0. Habilitar la librería
library(sparklyr)
library(dplyr)
library(ggplot2)
# 1. Conexión con Spark
<- spark_connect(master = "local", version = "2.4.3")
sc # 2. Definir la data (importación)
<-spark_read_csv(sc,
sp_covidname="covid",
path = "C:\\Users\\ALVARO\\Desktop\\db_bolivia\\bigdata\\200614COVID19MEXICO.csv")#específico
<-spark_read_csv(sc,
sp_importacionesname="importaciones",
path = "C:\\Users\\ALVARO\\Desktop\\db_bolivia\\importaciones\\importaciones_csv")#carpeta
load(url("https://github.com/AlvaroLimber/EST-383/raw/master/data/oct20.RData"))
<-computo %>% rename(eleccion=Elección) %>% filter(eleccion=="Presidente y Vicepresidente")
pvbol<-copy_to(sc,pvbol)
sp_pvbol
# 3. Exploración de los datos / ("Modelado")
names(pvbol)
glimpse(sp_pvbol)
glimpse(sp_importaciones)
sdf_describe(sp_importaciones)
sdf_describe(sp_pvbol,c("CC"))
summary(pvbol$CC)
%>% group_by(DESADU) %>% count()
sp_importaciones
<-sp_importaciones %>% group_by(DESADU) %>% count() %>% collect() # guardar la salida en R
a1
<-sp_importaciones %>% group_by(DESADU) %>% count() %>% compute("a2") # guardar la salida en Spark
sp_a2
%>% group_by(GESTION,DESADU) %>% count()
sp_importaciones
%>% group_by(DESADU, GESTION) %>% count()
sp_importaciones
%>% group_by(DESADU, GESTION) %>% summarise()
sp_importaciones
%>% count()
sp_importaciones
# Generar un cuadro por gestión y producto, respecto el total de peso bruto y guardarlo en R.
<-sp_importaciones %>% group_by(GESTION,NANDINA,DESNAN) %>% summarise(peso_ton=sum(KILBRU,na.rm=T)/1000) %>% arrange(GESTION,desc(peso_ton)) %>% collect()
t1
library(DBI)
<-dbGetQuery(sc,"Select * from importaciones limit 5")
aux
#lm(FOB~PAG,data=sp_importaciones)
<-sp_importaciones %>% select(FOB,PAG) %>% ml_linear_regression(FOB~.)
mod1
# 4. Análisis de los resultados
# 5. Salida. Visualización
ggplot(sp_importaciones,aes(DESDEPTO))+geom_bar()+facet_wrap(~GESTION)
ggplot(sp_importaciones,aes(DESDEPTO))+geom_bar()
#ggplot(sp_importaciones,aes(KILBRU))+geom_histogram()
library(dbplot)
%>% dbplot_bar(DESDEPTO)
sp_importaciones
# Final. Cerrar la conexión a Spark
spark_disconnect_all()
###########################################################
# cargar el pvbol
<-copy_to(sc,pvbol,name="elecciones")
sp_pvbolrm(pvbol)
# interactuar con la base en spark
%>% count()
sp_pvbol <-sp_pvbol %>% group_by(Departamento) %>% count()
t0#collect es para guardar la salida en la memoria de R
<-sp_pvbol %>% group_by(Departamento) %>% count() %>% collect()
t1# compute es para guardar la salida en la memoria de spark
<-sp_pvbol %>% group_by(Departamento) %>% count() %>% compute("t2")
sp_t2# consultas a spark
%>% group_by(Departamento,Provincia) %>% summarise(media_mas=mean(mas),media_cc=mean(cc),sd(mas))
sp_pvbol
# SQL show_query
%>% group_by(Departamento) %>% count() %>% show_query()
sp_pvbol
%>% group_by(Departamento) %>% select(mas,cc) %>% summarise_all(mean) %>% show_query()
sp_pvbol
# spark SQL
library(DBI)
<-dbGetQuery(sc,"Select * from elecciones limit 10")
e10
# figuras
%>% select(cc) %>% hist()
sp_pvbol
library(ggplot2)
ggplot(sp_pvbol,aes(cc))+geom_histogram()
ggplot(sp_pvbol,aes(cc,mas))+geom_point()
# recomendación ml
load("C:\\Users\\ALVARO\\Documents\\GitHub\\EST-383\\data\\oct20.RData")
head(computo)
#filtrado
<-computo[computo$País=="Bolivia",] #en R
aux<-computo %>% filter(País=="Bolivia") aux2
#spark SQL
library(DBI)
<- dbGetQuery(sc, "Select * from importaciones limit 10")
top10
top10library(dplyr)
%>% tally
sp_importaciones %>% tally
sp_covid2 %>% tally
oct20 <- sp_covid2 %>%
covid_cache compute("covid_ch")
%>% tally covid_cache
Para correr con las bases de la parte de importaciones
#importaciones
dimnames(sp_importaciones)
<-sp_importaciones %>% count(GESTION) %>% compute("t1")
sp_t1
<-sp_importaciones %>% count(GESTION) %>% collect()
aa<-sp_importaciones %>% count(GESTION,DESDEPTO) %>% collect()
aa%>% count(SEXO,NEUMONIA)
sp_covid2 %>% group_by(SEXO) %>% summarise(mean(EDAD),na.rm=T)
sp_covid2
<-sp_importaciones %>% group_by(GESTION,DESDEPTO,NANDINA) %>% summarise(total=sum(FOB,na.rm=T)) %>% collect()
ww%>% tbl(GESTION)
sp_importaciones
%>% select(MAS__IPSP,CC)%>% summarise_all(mean)
oct20 %>% select(MAS__IPSP,CC)%>% summarise_all(mean) %>% show_query()
oct20 %>% summarise(qq=quantile(CC,probs=0.5))
oct20 %>% select(MAS__IPSP,CC) %>% ml_corr()
oct20
#figuras
%>% select(CC) %>% hist()
oct20
library(dbplot)
%>% dbplot_histogram(CC,binwidth = 3)
oct20 %>% dbplot_bar(GESTION)
sp_importaciones %>% dbplot_raster(CC,MAS__IPSP)
oct20
%>% filter(PaAs=="Bolivia")%>% group_by(Departamento) %>% summarise(voto=mean(CC)) %>% ggplot(aes(Departamento,voto))+geom_bar(stat="identity")
oct20
ggplot(oct20,aes(CC,MAS__IPSP))+geom_point()