Chapter 5 R y Spark

5.1 Introducción

En este capítulo emplearemos Spark junto con R para el procesamiento de datos. Spark es un framework que permite realizar procesos de división y paralelismo en un equipo o múltiples equipos.

5.1.1 Hadoop

Google (2004) publicó un nuevo documento que describe cómo realizar operaciones en todo el Sistema de archivos de Google, un enfoque que se conoció como MapReduce. Como era de esperar, hay dos operaciones en MapReduce: map y reduce. La operación de map proporciona una forma arbitraria de transformar cada archivo en un nuevo archivo, mientras que la operación de reduce combina dos archivos. Ambas operaciones requieren un código de computadora personalizado, pero el marco MapReduce se encarga de ejecutarlas automáticamente en muchas computadoras a la vez. Estas dos operaciones son suficientes para procesar todos los datos disponibles en la web, al tiempo que proporcionan suficiente flexibilidad para extraer información significativa de la misma.

5.1.2 Spark

En 2009, Apache Spark comenzó como un proyecto de investigación en AMPLab de UC Berkeley para mejorar MapReduce. Específicamente, Spark proporcionó un conjunto más rico de verbos más allá de MapReduce para facilitar la optimización del código que se ejecuta en múltiples máquinas. Spark también cargó datos en la memoria, lo que hace que las operaciones sean mucho más rápidas que el almacenamiento en disco de Hadoop

Spark

Componetes:

  1. Spark Core: Base donde se apoya el resto de los componentes
  2. Spark SQL: Procesamiento de dato estructurados y no estructurados
  3. Spark streaming: Procesamiento de datos en tiempo real.
  4. Spark MLLib: Machine learning
  5. Spark graph: Procesamiento de grafos

Algunos conceptos importantes

HDFS: Sistema de ficheros

5.1.3 Sparklyr

Oficialmente, sparklyr es una interfaz R para Apache Spark. Está disponible en CRAN y funciona como cualquier otro paquete de CRAN, lo que significa que es independiente de las versiones de Spark, es fácil de instalar, sirve a la comunidad R, abarca otros paquetes y prácticas de la comunidad R, y así sucesivamente. Está alojado en GitHub y tiene licencia de Apache 2.0, que le permite clonar, modificar y contribuir de nuevo a este proyecto.

5.2 Librería dplyr

#install.packages("dplyr")
library(dplyr)
load(url("https://github.com/AlvaroLimber/EST-384/raw/master/data/eh19.RData"))
## Obtener el promedio de edad por sexo de los jefes de hogar
aux<-eh19p[eh19p$s02a_05=="1.JEFE O JEFA DEL HOGAR",c("s02a_03","s02a_02")]
aux<-subset(eh19p,s02a_05=="1.JEFE O JEFA DEL HOGAR",c("s02a_03","s02a_02"))
tapply(aux$s02a_03, aux$s02a_02, mean)
#dplyr
# %>% operador pipe 
t1<-eh19p %>% filter(s02a_05=="1.JEFE O JEFA DEL HOGAR") %>% group_by(s02a_02) %>% 
summarise(mean(s02a_03))

t2<-eh19p %>% rename(sexo=s02a_02,edad=s02a_03,paren=s02a_05) %>%  filter(paren=="1.JEFE O JEFA DEL HOGAR") %>% group_by(sexo) %>% 
summarise(edad=mean(edad))

## Obtener el promedio de edad y años de educación por sexo de los jefes de hogar
t3<-eh19p %>% rename(sexo=s02a_02,edad=s02a_03,paren=s02a_05) %>%  filter(paren=="1.JEFE O JEFA DEL HOGAR") %>% group_by(sexo) %>% 
summarise(edad=mean(edad),aestudio=mean(aestudio,na.rm = T),med_aestudio=median(aestudio,na.rm = T),n=n())

eh19p %>% rename(sexo=s02a_02,edad=s02a_03,paren=s02a_05) %>%  filter(paren=="1.JEFE O JEFA DEL HOGAR") %>% group_by(depto,area,sexo) %>% 
summarise(edad=mean(edad),aestudio=mean(aestudio,na.rm = T),med_aestudio=median(aestudio,na.rm = T),n=n())

# ctr+mayus+m %>% 
eh19p<-eh19p %>% mutate(jefe=(s02a_05=="1.JEFE O JEFA DEL HOGAR"))


eh19p %>% mutate(jefe=(s02a_05=="1.JEFE O JEFA DEL HOGAR")) %>% group_by(jefe,s02a_02) %>% summarise(mean(ylab,na.rm = T))

eh19p<-eh19p %>% rename(sexo=s02a_02)

names(eh19p)[6]<-"edad"
# personas entre 15 y 30 años, por departamento y sexo, el promedio de horas a la semana de trabajo, años de educación y el mínimo y máximo de ingresos laborales y su rango
eh19p %>% filter(edad>=15 & edad<=30) %>% group_by(depto,sexo) %>% summarise(horas=mean(tothrs,na.rm = T),edu=mean(aestudio,na.rm = T),min_ylab=min(ylab,na.rm = T),max_ylab=max(ylab,na.rm = T)) %>% mutate(rango=max_ylab-min_ylab)

eh19p %>% slice_sample(n=20) %>% select(edad) %>% mutate(gedad=cut(edad,c(0,14,25,60,100),include.lowest =T))

eh19p %>% mutate(gedad=cut(edad,c(0,5,14,25,60,100),include.lowest =T)) %>% group_by(gedad) %>% summarise(mean(aestudio,na.rm=T))
## Por departamento, área y sexo de los jefes del hogar, mostrar el porcentaje de hogares con piso de tierra.
aux<-eh19p %>% filter(jefe==T) %>% select(folio,sexo,area,depto)

aa<-attributes(eh19v)
aa$variable.labels
unique(eh19v$s01a_09)

aux2<-eh19v %>% mutate(tierra=s01a_09=="1.TIERRA") %>% select(folio,tierra)
head(aux2)

bd<-merge(aux,aux2)## 1:1 (), n:1 , 1:n , n:n (X 30) (Y 20) XY 600

#x
#Nombre materia  nota ci
#Juan    m1      45   123
#Juan    m2      57   123
#Juan    m3      57   123
#
#y
#Nombre  ci   plan_estudio
#Juan    123  2012
#Maria   124  2007

#merge(x,y,by=c("ci","Nombre"),all=T)

#xy
#Nombre materia  nota ci  plan_estudio
#Juan    m1      45   123  2012
#Juan    m2      57   123  2012  
#Juan    m3      57   123  2012
#Maria                124  2007

bd %>% group_by(depto,area,sexo) %>% summarise(Piso_Tierra=mean(tierra)*100)
#se recomienda explorar el comando merge, y la variable folio es la llave para la unión de las bases de datos

# Ejercicio. Obtener una tabla por departamento y área con la siguiente información
#  * % de hogares pobres extremos
#  * % de hogares con jefes del hogar mujeres
#  * Promedio de personas en el hogar
load(url("https://github.com/AlvaroLimber/EST-384/raw/master/data/eh19.RData"))
bd<-eh19p %>% filter(s02a_02=="2.Mujer") %>% mutate(pobreza=pext0=="Pobre extremo",mujer=s02a_05=="JEFE O JEFA DEL HOGAR" ) %>% group_by(depto,area) %>%  summarise(pobre=mean(pobreza)*100,mujer1=mean(mujer))
bd

eh19p %>% mutate(pobreza=(pext0=="Pobre extremo")*1,jefa=(s02a_05=="1.JEFE O JEFA DEL HOGAR" & s02a_02=="2.Mujer")*1, nper=1) %>% group_by(depto,area,folio) %>% summarise(nper=sum(nper),jefa=max(jefa),pobreza=max(pobreza)) %>% group_by(depto,area) %>% summarise(pobreza=mean(pobreza,na.rm=T)*100,jefa=mean(jefa)*100,nper=mean(nper)) 

t1<-eh19p %>% mutate(pobreza=(pext0=="Pobre extremo")*1,jefa=(s02a_05=="1.JEFE O JEFA DEL HOGAR" & s02a_02=="2.Mujer")*1, nper=1) %>% group_by(depto,area,folio) %>% summarise(nper=sum(nper),jefa=max(jefa),pobreza=max(pobreza)) 

t2<-t1 %>% group_by(depto,area) %>% summarise(pobreza=mean(pobreza,na.rm=T)*100,jefa=mean(jefa)*100,nper=mean(nper)) 
t2

xtable::xtable(t2)
knitr::kable(t2)

5.3 Arrancando Spark

5.3.1 Prerequisitos

  • R
  • RStudio
  • Java 8
system("java -version")

5.3.2 Instalación

#instalar la librería sparlyr
install.packages("sparklyr")
#Habilitando la librería
library(sparklyr)
#versiones disponibles
spark_available_versions()
#versiones instaladas
spark_installed_versions()
#instalando spark desde R
spark_install("3.0")
#spark_install(version="2.3")

5.3.3 Soluciones a problemas de instalación

install.packages("rJava")
library(rJava)

5.3.4 Creando una sesión en Spark

#inicia la sesión
sc <- spark_connect(master = "local")

#sc <- spark_connect(master = "local", spark_home = "C:\\Users\\ALVARO\\AppData\\Local\\spark",version="3.0.1")

5.3.5 Interface web

spark_web(sc)

5.3.6 Configuración

conf <- spark_config()
  conf$`sparklyr.shell.driver-memory` <- "2G"
  #conf$spark.memory.fraction <- 0.8 
  #spark_home="C:\\Users\\ALVARO\\AppData\\Local\\spark\\spark-2.4.3-bin-hadoop2.7"
  #conf$sparklyr.gateway.port <- 9090
#conección  

#apagar conexión
spark_disconnect(sc)

spark_disconnect_all()

5.4 Análisis con Spark

Pasos en el análisis de datos

5.4.1 Importando los datos

Por lo general, importar significa que R leerá los archivos y los cargará en la memoria; cuando se usa Spark, los datos se importan a Spark, no a R.

Colección de documentos

#csv/importaciones
sp_importaciones<-spark_read_csv(sc,
  name="importaciones",
  path = "C:\\Users\\ALVARO\\Desktop\\db_bolivia\\importaciones\\importaciones_csv")
###########
#esquema
###########
top_rows <- read.csv("C:\\Users\\ALVARO\\Desktop\\db_bolivia\\importaciones\\importaciones_csv\\1993.csv",sep=",", nrows = 5)
spec_with_r <- sapply(top_rows, class)
spec_with_r[6]<-"factor"

sp_importaciones2<-spark_read_csv(sc,
  name="importaciones2",
  path = "C:\\Users\\ALVARO\\Desktop\\db_bolivia\\importaciones\\importaciones_csv",
  columns = spec_with_r)
# memoria
object.size(sp_importaciones)

Comando copy_to

m1<-data.frame(matrix(rnorm(10^7),ncol=20))
object.size(m1)/(10^6)
sp_m1<-copy_to(sc,m1)
rm(m1)

load("C:\\Users\\ALVARO\\Documents\\GitHub\\EST-383\\data\\oct20.RData")
object.size(computo)/10^6
sp_computo<-copy_to(sc,computo)
sp_eh19p<-copy_to(sc,eh19p)
rm(computo)

Documento específico

sp_covid<-spark_read_csv(sc,
  name="covid",
  path = "C:\\Users\\ALVARO\\Desktop\\db_bolivia\\bigdata\\200614COVID19MEXICO.csv")

sp_covid2<-spark_read_csv(sc,
  name="covid2",
  path = "C:\\Users\\ALVARO\\Desktop\\db_bolivia\\bigdata\\201115COVID19MEXICO.csv")

5.4.2 Librería DPLYR (Gramática de manipulación de datos)

Eliminar un objeto en spark

library(dplyr)
db_drop_table(sc,"importaciones2")
db_drop_table(sc,"m1")

DPLYR en Spark

sp_importaciones %>% group_by(GESTION) %>% count()
sp_covid2 %>% count()

eh19v %>% group_by(depto) %>% count() # R
sp_eh19v %>% group_by(depto) %>% count() # Spark
library(dplyr)
load(url("https://github.com/AlvaroLimber/EST-383/raw/master/data/oct20.RData"))
names(computo)[1]<-"pais"
##############################
### Columnas -> variables
##############################
#cambio de nombre de las variables
computo<-computo %>% rename(mas=`MAS - IPSP`,cc=CC)
computo<-computo %>% rename(idep=`Número departamento`,imun=5)
names(computo)
# Selección de variables
bd1<-computo %>% select(pais,cc,mas)
head(bd1)
# Crear nuevas variables
bd1<-bd1 %>% mutate(a=1,s_mas_cc=cc+mas,raiz_mas=sqrt(mas),lcc=log(cc))
bd1<-bd1 %>% mutate(mas100=mas>100,mean_mas=mas>mean(mas))
# ordenando las variables  
bd1<-bd1 %>% relocate(lcc:mas,before=a)#pendiente
bd1[,c("cc","pais","a")]
##############################
### Filas-> Observaciones
##############################
#filtrado
# elección presidencial solo Bolivia 
pv<-computo %>% filter(Elección=="Presidente y Vicepresidente")
pvbol<-computo %>% filter(Elección=="Presidente y Vicepresidente" & pais=="Bolivia")
##############################
### Resumen estadístico
##############################
# summarise
pvbol %>% summarise(media_mas=mean(mas),media_cc=mean(cc),max_mas=max(mas),max_cc=max(cc))
pvbol %>% summarise(p100mas=mean(mas>100)*100,p100cc=mean(cc>100)*100)
# group_by
pvbol %>% group_by(Departamento) %>% summarise(p100mas=mean(mas>100)*100,p100cc=mean(cc>100)*100)

pvbol %>% group_by(Departamento,Provincia) %>% summarise(p100mas=mean(mas>100)*100,p100cc=mean(cc>100)*100)
#count tally
pvbol %>% group_by(Departamento,Provincia) %>% count()
pvbol %>% group_by(Departamento,Provincia) %>% tally()
# encadenamiento masivo
pvbol %>% filter(Departamento=="Beni") %>% select(mas,cc) %>% plot()
pvbol %>% filter(Departamento=="Beni") %>% select(mas,cc) %>% cor()

5.4.3 DPLYR en Spark

# 0. Habilitar la librería
library(sparklyr)
library(dplyr)
library(ggplot2)
# 1. Conexión con Spark
sc <- spark_connect(master = "local", version = "2.4.3")
# 2. Definir la data (importación)
sp_covid<-spark_read_csv(sc,
  name="covid",
  path = "C:\\Users\\ALVARO\\Desktop\\db_bolivia\\bigdata\\200614COVID19MEXICO.csv")#específico
sp_importaciones<-spark_read_csv(sc,
  name="importaciones",
  path = "C:\\Users\\ALVARO\\Desktop\\db_bolivia\\importaciones\\importaciones_csv")#carpeta

load(url("https://github.com/AlvaroLimber/EST-383/raw/master/data/oct20.RData"))

pvbol<-computo %>% rename(eleccion=Elección) %>% filter(eleccion=="Presidente y Vicepresidente")
sp_pvbol<-copy_to(sc,pvbol)

# 3. Exploración de los datos / ("Modelado")
names(pvbol)
glimpse(sp_pvbol)
glimpse(sp_importaciones)

sdf_describe(sp_importaciones)
sdf_describe(sp_pvbol,c("CC"))
summary(pvbol$CC)

sp_importaciones %>% group_by(DESADU) %>% count()

a1<-sp_importaciones %>% group_by(DESADU) %>% count() %>% collect() # guardar la salida en R

sp_a2<-sp_importaciones %>% group_by(DESADU) %>% count() %>% compute("a2") # guardar la salida en Spark

sp_importaciones %>% group_by(GESTION,DESADU) %>% count()

sp_importaciones %>% group_by(DESADU, GESTION) %>% count()

 
sp_importaciones %>% group_by(DESADU, GESTION) %>% summarise()

sp_importaciones %>% count()

# Generar un cuadro por gestión y producto, respecto el total de peso bruto y guardarlo en R.

t1<-sp_importaciones %>% group_by(GESTION,NANDINA,DESNAN) %>% summarise(peso_ton=sum(KILBRU,na.rm=T)/1000) %>% arrange(GESTION,desc(peso_ton)) %>% collect()

library(DBI)
aux<-dbGetQuery(sc,"Select * from importaciones limit 5")

#lm(FOB~PAG,data=sp_importaciones)

mod1<-sp_importaciones %>% select(FOB,PAG) %>% ml_linear_regression(FOB~.)

# 4. Análisis de los resultados


# 5. Salida. Visualización  

ggplot(sp_importaciones,aes(DESDEPTO))+geom_bar()+facet_wrap(~GESTION)

ggplot(sp_importaciones,aes(DESDEPTO))+geom_bar()

#ggplot(sp_importaciones,aes(KILBRU))+geom_histogram()

library(dbplot)

sp_importaciones %>% dbplot_bar(DESDEPTO)



# Final. Cerrar la conexión a Spark
spark_disconnect_all()
###########################################################
# cargar el pvbol
sp_pvbol<-copy_to(sc,pvbol,name="elecciones")
rm(pvbol)
# interactuar con la base en spark
sp_pvbol %>% count()
t0<-sp_pvbol %>% group_by(Departamento) %>% count()
#collect es para guardar la salida en la memoria de R
t1<-sp_pvbol %>% group_by(Departamento) %>% count() %>% collect()
# compute es para guardar la salida en la memoria de spark
sp_t2<-sp_pvbol %>% group_by(Departamento) %>% count() %>% compute("t2")
# consultas a spark

sp_pvbol %>%  group_by(Departamento,Provincia) %>% summarise(media_mas=mean(mas),media_cc=mean(cc),sd(mas))

# SQL show_query
sp_pvbol %>% group_by(Departamento) %>% count() %>% show_query()

sp_pvbol %>% group_by(Departamento) %>% select(mas,cc) %>% summarise_all(mean) %>% show_query()

# spark SQL
library(DBI)
e10<-dbGetQuery(sc,"Select * from elecciones limit 10")

# figuras 
sp_pvbol %>% select(cc) %>% hist()

library(ggplot2)
ggplot(sp_pvbol,aes(cc))+geom_histogram()
ggplot(sp_pvbol,aes(cc,mas))+geom_point()

# recomendación ml
load("C:\\Users\\ALVARO\\Documents\\GitHub\\EST-383\\data\\oct20.RData")
head(computo)
#filtrado
aux<-computo[computo$País=="Bolivia",] #en R
aux2<-computo %>% filter(País=="Bolivia") 
#spark SQL
library(DBI)
top10 <- dbGetQuery(sc, "Select * from importaciones limit 10")
top10
library(dplyr)
sp_importaciones %>% tally
sp_covid2 %>% tally
oct20 %>% tally
covid_cache <- sp_covid2 %>% 
    compute("covid_ch")
covid_cache %>% tally

Para correr con las bases de la parte de importaciones

#importaciones
dimnames(sp_importaciones)

sp_t1<-sp_importaciones %>% count(GESTION) %>% compute("t1")

aa<-sp_importaciones %>% count(GESTION) %>% collect()
aa<-sp_importaciones %>% count(GESTION,DESDEPTO) %>% collect()
sp_covid2 %>% count(SEXO,NEUMONIA)
sp_covid2 %>% group_by(SEXO) %>% summarise(mean(EDAD),na.rm=T)

ww<-sp_importaciones %>% group_by(GESTION,DESDEPTO,NANDINA) %>% summarise(total=sum(FOB,na.rm=T)) %>% collect()
sp_importaciones %>%  tbl(GESTION)

oct20 %>% select(MAS__IPSP,CC)%>%  summarise_all(mean)
oct20 %>% select(MAS__IPSP,CC)%>%  summarise_all(mean) %>% show_query()
oct20 %>% summarise(qq=quantile(CC,probs=0.5))
oct20 %>% select(MAS__IPSP,CC) %>% ml_corr()

#figuras
oct20 %>% select(CC) %>% hist()

library(dbplot)
oct20 %>% dbplot_histogram(CC,binwidth = 3)
sp_importaciones %>%  dbplot_bar(GESTION)
oct20 %>%  dbplot_raster(CC,MAS__IPSP)

oct20 %>% filter(PaAs=="Bolivia")%>%  group_by(Departamento) %>% summarise(voto=mean(CC)) %>% ggplot(aes(Departamento,voto))+geom_bar(stat="identity")

ggplot(oct20,aes(CC,MAS__IPSP))+geom_point()
Allaire, JJ, Yihui Xie, Jonathan McPherson, Javier Luraschi, Kevin Ushey, Aron Atkins, Hadley Wickham, Joe Cheng, Winston Chang, and Richard Iannone. 2020. Rmarkdown: Dynamic Documents for r. https://CRAN.R-project.org/package=rmarkdown.
R Core Team. 2019. R: A Language and Environment for Statistical Computing. Vienna, Austria: R Foundation for Statistical Computing. https://www.R-project.org/.
Xie, Yihui. 2019. Knitr: A General-Purpose Package for Dynamic Report Generation in r. https://CRAN.R-project.org/package=knitr.
———. 2020. Bookdown: Authoring Books and Technical Documents with r Markdown. https://CRAN.R-project.org/package=bookdown.
Xie, Yihui, J. J. Allaire, and Garrett Grolemund. 2018. How to Read This Book.” Transforming Climate Finance and Green Investment with Blockchains, 1. https://doi.org/10.1016/b978-0-12-814447-3.00041-0.