Agenda
Announcements
- Ch 17 programming notebook not assigned, but highly recommended
- several steps take some non-trivial configuration that I don’t want to require per se
MDSR Ch 17: Towards big data
- 3 V’s of big data
- “big data is when your workflow breaks” –Randy Pruim
- how big is “big” is relative
- pencil/paper workflow: 30 rows & 3 columns is “big”
- TI 83 workflow: larger than 99 rows (or columns) is “big”
- MS Excel workflow: larger than any of the following constraints is “big”
- 1,048,576 rows (\(2^20\))
- 16,384 columns (\(2^14\))
- 255 characters in a column (\(2^8 - 1\))
- Laptop running R: large with respect to available memory
Biggest of the big… (for perspective)
- the Large Hadron Collider in Geneva generates 25 petabytes of data per year
- one petabyte is a million gigabytes
- all the workflows are broken
- they actually only save 0.001% of the data generated
- Others just for fun (based on 2018 Forbes article)
- Google processes 40,000 searches per second
- 300 million photos uploaded to Facebook per day
- The Weather Channel receives 18,055,556 forecast requests per minute
What happens when data start getting big (in R)
- The data may not load into memory
- Analyzing the data may take a (really) long time
- Visualizations get messy
Memory limits in R
- If you’re running 32-bit R on any OS, it’ll be 2 or 3 GB
- note: 2GB of memory used by R is not the same as 2GB on disk
- Overhead for R to keep track of your data
- Memory used for analysis, etc.
- Probably not more than about 500MB on disk?
- If you’re running 64-bit R on a 64-bit OS, the upper limit should be infinite (but it’s not)
- Package
bigmemory
- “Manage massive matrices with shared memory and memory-mapped files”
- use in parallel environments can provide substantial speed and memory efficiencies
- uses a pointer to a C++ data structure
- (con) matrix data structure requires homogeneity
- Package
ff
- “Memory-Efficient Storage of Large Data on Disk and Fast Access Functions”
- provides data structures that are stored on disk but behave (almost) as if they were in RAM
- uses a pointer to a flat binary file stored on disk, and it can be shared across different sessions
- permits heterogeneous data structures
Strategies when data get big
- Make the data smaller
- Get a bigger computer
- Access the data differently
- Split up the dataset for analysis
If things are just slow…
Recall the teller simulation:
any_active <- function(df) {
# return TRUE if someone has not finished
return(max(df$endtime) == Inf)
}
next_customer <- function(df) {
# returns the next customer in line
res <- filter(df, endtime == Inf) %>%
arrange(arrival)
return(head(res, 1))
}
update_customer <- function(df, cust_num, end_time) {
# sets the end time of a specific customer
return(mutate(df, endtime = ifelse(custnum == cust_num, end_time, endtime)))
}
teller_sim <- function(n = 1/2, m = 3/2, hours = 6) {
# simulation of bank where there is just one teller
# n: expected number of customers per minute
# m: expected length of transaction is m minutes
# hours: bank open for this many hours
customers <- rpois(hours * 60, lambda = n)
arrival <- numeric(sum(customers))
position <- 1
for (i in 1:length(customers)) {
numcust <- customers[i]
if (numcust != 0) {
arrival[position:(position + numcust - 1)] <- rep(i, numcust)
position <- position + numcust
}
}
duration <- rexp(length(arrival), rate = 1/m) # E[X]=m
df <- data.frame(arrival, duration, custnum = 1:length(duration),
endtime = Inf, stringsAsFactors = FALSE)
endtime <- 0 # set up beginning of simulation
while (any_active(df)) { # anyone left to serve
next_one <- next_customer(df)
now <- ifelse(next_one$arrival >= endtime, next_one$arrival, endtime)
endtime <- now + next_one$duration
df <- update_customer(df, next_one$custnum, endtime)
}
df <- mutate(df, totaltime = endtime - arrival)
return(favstats(~ totaltime, data = df))
}
Profiling the teller simulation
- let’s see where the time goes…
- not bad in this case, but you could discover interesting things… like
- maybe some modeling function defaults to bootstrap confidence intervals that you don’t care about with 1000 iterations per model
- you did something for every line of your huge data frame and then combine results using
c()
or rbind()
rather than assigning to a preallocated vector or matrix
Rprof("TellerSimProfile")
teller_sim()
Rprof(NULL)
head(summaryRprof("TellerSimProfile")$by.self, 20)
STILL slow? Try biglm
biglm
package has an efficient alternative to the lm
function
- it can even fit generalized linear models (regression & logistic regression) with data frames that are larger than memory
require(biglm)
n <- 20000
p <- 500
d <- as.data.frame(matrix(rnorm(n * (p + 1)), ncol = (p + 1)))
expl_vars <- paste(paste0("V", 2:(p+1)), collapse = " + ")
my_formula <- as.formula(paste("V1 ~ ", expl_vars))
# profile `lm` vs `biglm`
system.time(lm(my_formula, data = d))
user system elapsed
5.779 0.251 6.131
system.time(biglm(my_formula, data = d))
user system elapsed
3.777 0.182 4.002
Next step: parallel processing
- Parallel processing is basically farming out subtasks to independent processors, then merging results
- effectively just allocates more RAM for the problem
my_cores <- detectCores()
my_cores
[1] 4
- embarassingly parallel computing
- I need to repeat the same task many times
- order of implementation doesn’t matter
- I have 4 total cores, but you should always save one for your operating system
- e.g., comparison for several iterations of teller simulation
k <- 5
# without parallel processing
system.time(lapply(1:k, teller_sim))
user system elapsed
12.102 0.258 12.565
# parallelize with 3 cores
system.time(mclapply(1:k, teller_sim, mc.cores = my_cores - 1))
user system elapsed
11.942 0.933 12.079
MapReduce (parallelization that’s not embarassing?)
- programming paradigm for parallel computing
- two phase algorithm
- map–farm out parallizeable task to many machines
- reduce–combine results
- tricky part: you have to define the
map
function and the reduce
function
- needs software implementation
Hadoop & Spark
- Hadoop was first to really tackle MapReduce
- Hadoop MapReduce has been superseded by Spark,
- tools that emerged as the “ecosystem” around it are still popular (HDFS)
- “legacy” projects might still use Hadoop MapReduce
- Apache Spark is considered superior for a few reasons
- had the benefit of implementing lessons learned from Hadoop
- keep the good–HDFS (Hadoop Distributed File System) for disk storage
- improve the weaknesses–prioritize RAM rather than disk storage whenever possible
Interface with Spark
- Spark provides provides an interface for programming entire clusters
- a computer cluster is a set of connected computers that work together as a single system
- the
sparklyr
package in R makes it easy to
- install a local Spark cluster (from within R)
- connect to a local or remote cluster
require(sparklyr)
# spark_install() # only once per machine
Interface with Spark
# modify master to connect to a remote Spark cluster
sc <- spark_connect(master = "local")
* Using Spark: 2.4.0
class(sc)
[1] "spark_connection" "spark_shell_connection" "DBIConnection"
babynames_tbl <-
sc %>%
copy_to(babynames::babynames, "babynames", overwrite = TRUE)
class(babynames_tbl)
[1] "tbl_spark" "tbl_sql" "tbl_lazy" "tbl"
Counting Matthews
babynames_tbl %>%
filter(name == "Matthew") %>%
group_by(year) %>%
summarise(N = n(),
total_births = sum(n)) %>%
arrange(desc(total_births)) %>%
head()
Missing values are always removed in SQL.
Use `SUM(x, na.rm = TRUE)` to silence this warning
This warning is displayed only once per session.
From dplyr
to SQL
- whenever
dplyr
meets an object with class tbl_sql
(like babynames_tbl
), dplyr
automatically translates the R pipeline into SQL
- SQL (structured query language) is a widely used language for querying relational databases, among other purposes
- Queries in SQL start with the
SELECT
keyword and consist several clauses which must to be written in order; basically
SELECT
(required)–like select()
in dplyr
(and possibly combined with mutate()
)
FROM
(required)–like table before the first %>%
in dplyr
JOIN
–like join()
verbs in dplyr
WHERE
–like filter()
verb in dplyr
GROUP BY
–like group_by()
verb in dplyr
HAVING
–like using a second filter()
in dplyr
after the rows have already been
ORDER BY
–like arrange()
verb in dplyr
LIMIT
–sort of like head()
but more versatile
- For example, let’s revisit our previous
dplyr
query
q <-
babynames_tbl %>%
filter(name == "Matthew") %>%
group_by(year) %>%
summarise(N = n(),
total_births = sum(n)) %>%
arrange(desc(total_births)) %>%
head()
q
show_query(q)
<SQL>
SELECT `year`, count(*) AS `N`, SUM(`n`) AS `total_births`
FROM `babynames`
WHERE (`name` = "Matthew")
GROUP BY `year`
ORDER BY `total_births` DESC
LIMIT 6
Querying the Spark cluster
- Spark is a parallelized technology designed to supersede SQL, but it’s still useful to know SQL in order to use Spark
- here, we’ll query the Spark cluster using the connection we’ve defined
sc
with the SQL statement equivalent to our dplyr
wrangling
require(DBI)
dbGetQuery(conn = sc, statement = "
SELECT year, sum(1) as N, sum(n) as total_births
FROM babynames
WHERE name == 'Matthew'
GROUP BY year
ORDER BY total_births desc
LIMIT 6
")
Modeling with Spark
whately_2015
has some weather data from Massachusetts (in the macleish
package)
- Spark has a machine learning library which includes many of the supervised/unsupervised learning tools we’ve discussed this semester
- let’s use Spark to fit a multiple regression model as an example
require(macleish)
weather_tbl <- copy_to(sc, whately_2015, overwrite = TRUE)
weather_tbl %>%
sparklyr::ml_linear_regression(rainfall ~ temperature + pressure + rel_humidity) %>%
summary()
Deviance Residuals:
Min 1Q Median 3Q Max
-0.041290 -0.021761 -0.011632 -0.000576 15.968356
Coefficients:
(Intercept) temperature pressure rel_humidity
0.7177542 0.0004089 -0.0007545 0.0004377
R-Squared: 0.004824
Root Mean Squared Error: 0.1982
Alternatives to SQL (Google BigQuery)
require(bigrquery)
project_id <- "stat-380-class-demo" # Beckman's google cloud project ID
sql <- "SELECT word, count(distinct corpus) as numPlays, sum(word_count) as N
FROM [publicdata:samples.shakespeare]
GROUP BY word
ORDER BY N desc
LIMIT 10
"
query_exec(query = sql, project = project_id)
Waiting for authentication in browser...
Press Esc/Ctrl + C to abort
Authentication complete.
Running job -: 1s:
Running job \: 2s:
Running job |: 2s:
Running job /: 3s:
Running job -: 3s:
Running job \: 3s:
Running job |: 4s:
Running job /: 4s:
Running job -: 5s:
Running job \: 5s:
Running job |: 5s:
Running job /: 5s:
10.0 megabytes processed
Rcpp
RCPP::cppFunction()
allows you to write C++ functions in R
Rcpp::sourceCpp()
loads a C++ file from disk in the same way you use source() to load a file of R code.
- Rcpp will compile the C++ code and construct an R function that connects to the compiled C++ function
- more here: http://adv-r.had.co.nz/Rcpp.html
require(Rcpp)
# write a simple function in C++
cppFunction('int addemup(int x, int y, int z) {
int sum = x + y + z;
return sum;
}')
# R recognizes `addemup` like any other function
addemup
function (x, y, z)
.Call(<pointer: 0x1176de8f0>, x, y, z)
addemup(2, 4, 6)
[1] 12
Stan
- Bayesian statistical inference with MCMC sampling
- Stan model within the code chunk is compiled into a “stanmodel” object
- result assigned to a variable with the name given by the
output.var
option
parameters {
real y[2];
}
model {
y[1] ~ normal(0, 1);
y[2] ~ double_exponential(0, 2);
}
Loading required package: sp
library(rstan)
Loading required package: StanHeaders
rstan (Version 2.19.2, GitRev: 2e1f913d3ca3)
For execution on a local, multicore CPU with excess RAM we recommend calling
options(mc.cores = parallel::detectCores()).
To avoid recompilation of unchanged Stan programs, we recommend calling
rstan_options(auto_write = TRUE)
Attaching package: ‘rstan’
The following object is masked from ‘package:tidyr’:
extract
fit <- sampling(ex1, cores = 3)
starting worker pid=25541 on localhost:11849 at 12:27:28.837
starting worker pid=25555 on localhost:11849 at 12:27:29.119
starting worker pid=25569 on localhost:11849 at 12:27:29.379
SAMPLING FOR MODEL 'stan-5a7c20ea0b45' NOW (CHAIN 1).
Chain 1:
Chain 1: Gradient evaluation took 1.1e-05 seconds
Chain 1: 1000 transitions using 10 leapfrog steps per transition would take 0.11 seconds.
Chain 1: Adjust your expectations accordingly!
Chain 1:
Chain 1:
Chain 1: Iteration: 1 / 2000 [ 0%] (Warmup)
Chain 1: Iteration: 200 / 2000 [ 10%] (Warmup)
Chain 1: Iteration: 400 / 2000 [ 20%] (Warmup)
Chain 1: Iteration: 600 / 2000 [ 30%] (Warmup)
Chain 1: Iteration: 800 / 2000 [ 40%] (Warmup)
Chain 1: Iteration: 1000 / 2000 [ 50%] (Warmup)
Chain 1: Iteration: 1001 / 2000 [ 50%] (Sampling)
Chain 1: Iteration: 1200 / 2000 [ 60%] (Sampling)
Chain 1: Iteration: 1400 / 2000 [ 70%] (Sampling)
Chain 1: Iteration: 1600 / 2000 [ 80%] (Sampling)
Chain 1: Iteration: 1800 / 2000 [ 90%] (Sampling)
SAMPLING FOR MODEL 'stan-5a7c20ea0b45' NOW (CHAIN 2).
Chain 1: Iteration: 2000 / 2000 [100%] (Sampling)
Chain 1:
Chain 1: Elapsed Time: 0.032539 seconds (Warm-up)
Chain 1: 0.031726 seconds (Sampling)
Chain 1: 0.064265 seconds (Total)
Chain 1:
Chain 2:
Chain 2: Gradient evaluation took 1.1e-05 seconds
Chain 2: 1000 transitions using 10 leapfrog steps per transition would take 0.11 seconds.
Chain 2: Adjust your expectations accordingly!
Chain 2:
Chain 2:
Chain 2: Iteration: 1 / 2000 [ 0%] (Warmup)
Chain 2: Iteration: 200 / 2000 [ 10%] (Warmup)
Chain 2: Iteration: 400 / 2000 [ 20%] (Warmup)
Chain 2: Iteration: 600 / 2000 [ 30%] (Warmup)
Chain 2: Iteration: 800 / 2000 [ 40%] (Warmup)
Chain 2: Iteration: 1000 / 2000 [ 50%] (Warmup)
Chain 2: Iteration: 1001 / 2000 [ 50%] (Sampling)
Chain 2: Iteration: 1200 / 2000 [ 60%] (Sampling)
Chain
SAMPLING FOR MODEL 'stan-5a7c20ea0b45' NOW (CHAIN 3).
2: Iteration: 1400 / 2000 [ 70%] (Sampling)
Chain 3:
Chain 3: Gradient evaluation took 9e-06 seconds
Chain 3: 1000 transitions using 10 leapfrog steps per transition would take 0.09 seconds.
Chain 3: Adjust your expectations accordingly!
Chain 3:
Chain 3:
Chain 3: Iteration: 1 / 2000 [ 0%] (Warmup)
Chain 2: Iteration: 1600 / 2000 [ 80%] (Sampling)
Chain 2: Iteration: 1800 / 2000 [ 90%] (Sampling)
Chain 2: Iteration: 2000 / 2000 [100%] (Sampling)
Chain 2:
Chain 2: Elapsed Time: 0.033218 seconds (Warm-up)
Chain 2: 0.029568 seconds (Sampling)
Chain 2: 0.062786 seconds (Total)
Chain 2:
Chain 3: Iteration: 200 / 2000 [ 10%] (Warmup)
Chain 3: Iteration: 400 / 2000 [ 20%] (Warmup)
Chain 3: Iteration: 600 / 2000 [ 30%] (Warmup)
Chain 3: Iteration: 800 / 2000 [ 40%] (Warmup)
Chain 3: Iteration: 1000 / 2000 [ 50%] (Warmup)
Chain 3: Iteration: 1001 / 2000 [ 50%] (Sampling)
Chain 3: Iteration: 1200 / 2000 [ 60%] (Sampling)
Chain 3: Iteration: 1400 / 2000 [ 70%] (Sampling)
Chain 3: Iteration: 1600 / 2000 [ 80%] (Sampling)
Chain 3: Iteration: 1800 / 2000 [ 90%] (Sampling)
Chain 3: Iteration: 2000 / 2000 [100%] (Sampling)
Chain 3:
Chain 3: Elapsed Time: 0.033801 seconds (Warm-up)
Chain 3: 0.032489 seconds (Sampling)
Chain 3: 0.06629 seconds (Total)
Chain 3:
SAMPLING FOR MODEL 'stan-5a7c20ea0b45' NOW (CHAIN 4).
Chain 4:
Chain 4: Gradient evaluation took 6e-06 seconds
Chain 4: 1000 transitions using 10 leapfrog steps per transition would take 0.06 seconds.
Chain 4: Adjust your expectations accordingly!
Chain 4:
Chain 4:
Chain 4: Iteration: 1 / 2000 [ 0%] (Warmup)
Chain 4: Iteration: 200 / 2000 [ 10%] (Warmup)
Chain 4: Iteration: 400 / 2000 [ 20%] (Warmup)
Chain 4: Iteration: 600 / 2000 [ 30%] (Warmup)
Chain 4: Iteration: 800 / 2000 [ 40%] (Warmup)
Chain 4: Iteration: 1000 / 2000 [ 50%] (Warmup)
Chain 4: Iteration: 1001 / 2000 [ 50%] (Sampling)
Chain 4: Iteration: 1200 / 2000 [ 60%] (Sampling)
Chain 4: Iteration: 1400 / 2000 [ 70%] (Sampling)
Chain 4: Iteration: 1600 / 2000 [ 80%] (Sampling)
Chain 4: Iteration: 1800 / 2000 [ 90%] (Sampling)
Chain 4: Iteration: 2000 / 2000 [100%] (Sampling)
Chain 4:
Chain 4: Elapsed Time: 0.019408 seconds (Warm-up)
Chain 4: 0.016486 seconds (Sampling)
Chain 4: 0.035894 seconds (Total)
Chain 4:
print(fit)
Inference for Stan model: stan-5a7c20ea0b45.
4 chains, each with iter=2000; warmup=1000; thin=1;
post-warmup draws per chain=1000, total post-warmup draws=4000.
mean se_mean sd 2.5% 25% 50% 75% 97.5% n_eff Rhat
y[1] -0.02 0.02 1.02 -2.01 -0.72 -0.01 0.67 2.01 1897 1
y[2] 0.10 0.06 2.79 -5.72 -1.28 0.06 1.47 6.03 2006 1
lp__ -1.50 0.03 1.22 -4.61 -2.06 -1.22 -0.62 -0.10 1329 1
Samples were drawn using NUTS(diag_e) at Wed Oct 2 12:27:32 2019.
For each parameter, n_eff is a crude measure of effective sample size,
and Rhat is the potential scale reduction factor on split chains (at
convergence, Rhat=1).
# posterior
stan_plot(fit, point_est = "mean", show_density = TRUE, fill_color = "dodgerblue")
ci_level: 0.8 (80% intervals)
outer_level: 0.95 (95% intervals)

# trace
stan_trace(fit) +
scale_color_manual(values = c("red", "blue", "green", "black"))
Scale for 'colour' is already present. Adding another scale for 'colour',
which will replace the existing scale.

Python in R Markdown
https://rstudio.github.io/reticulate/#python-in-r-markdown
reticulate
package includes a Python engine for R Markdown:
- Run Python chunks in a single Python session embedded within your R session (shared variables/state between Python chunks)
- Printing of Python output, including graphical output from
matplotlib
.
- Access to objects created within Python chunks from R using the
py
object (e.g. py$x
would access an x
variable created within Python from R).
- Access to objects created within R chunks from Python using the
r
object (e.g. r.x
would access to x
variable created within R from Python)
- Built in conversion for many Python object types, including
NumPy
arrays and Pandas
data frames.
reticulate
& R Notebooks
reticulate
package does make it pretty easy to go back and forth between R & Python objects
- it “just works” when you compile RMarkdown
- the R Notebook isn’t yet ironed out in the current production release (v1.1)
- preview will throw errors, even when the code is correct
- R Notebook is no better than preview
- This will be fixed in RStudio v1.2 (install preview here…)
- you should install it (after the semester)
- I generally don’t recommend major updates during the semester if you can avoid it
Create object in R code chunk
require(reticulate)
dat <- c(180, 215, 210, 210, 188, 176, 209, 200)
Manipulate r.dat
in Python code chunk
198.5
Access py$avg
object in R code chunk
py$avg
[1] 198.5
---
title: "Extending R to accommodate 'big data' and interface with other software technologies"
subtitle: "MDSR Ch 17--Towards Big Data"
output: 
  slidy_presentation: default
  html_notebook: default  
---


```{r Front Matter, echo=TRUE, message=FALSE, warning=FALSE, include=FALSE}
# clean up R environment
rm(list = ls())

# global options
knitr::opts_chunk$set(eval=TRUE, include=TRUE)
options(digits=4)

# load all packages here
library(mdsr)
library(tidyverse)

library(biglm)
library(bigrquery)
library(DBI)
library(macleish)
library(parallel)
library(Rcpp)
library(reticulate)
library(rstan)
library(sparklyr)


# inputs summary

```


# Agenda


#### Announcements

- Ch 17 programming notebook not assigned, but *highly* recommended
- several steps take some non-trivial configuration that I don't want to require per se


# MDSR Ch 17: Towards big data

- 3 V's of big data
    - Volume
    - Variety
    - Velocity
- "big data is when your workflow breaks" --Randy Pruim
- how big is "big" is relative
    - pencil/paper workflow: 30 rows & 3 columns is "big"
    - TI 83 workflow: larger than 99 rows (or columns) is "big"
    - MS Excel workflow: larger than any of the following constraints is "big"
        - 1,048,576 rows ($2^20$)
        - 16,384 columns ($2^14$)
        - 255 characters in a column ($2^8 - 1$)
    - Laptop running R: large with respect to available memory


# Biggest of the big... (for perspective)

- the Large Hadron Collider in Geneva generates 25 petabytes of data per year
    - one petabyte is a million gigabytes
    - all the workflows are broken
    - they actually only save 0.001% of the data generated
- Others just for fun ([based on 2018 Forbes article](https://www.forbes.com/sites/bernardmarr/2018/05/21/how-much-data-do-we-create-every-day-the-mind-blowing-stats-everyone-should-read/#7ef508c760ba))
    - Google processes 40,000 searches *per second*
    - 300 million photos uploaded to Facebook per day
    - The Weather Channel receives 18,055,556 forecast requests *per minute*

![https://www.iflscience.com/technology/how-much-data-does-the-world-generate-every-minute/](2018bigData.png){width=95%}



# What happens when data start getting big (in R)

- The data may not load into memory
- Analyzing the data may take a (really) long time
- Visualizations get messy

# Sidebar about R vs SAS for large data sets

- SAS allocates memory dynamically to keep data on disk (by default)
- R loads all data into memory (by default)
- bottom line: 
    - by default, SAS handles very large datasets better... it "just works"
    - but don't use the R default!

# Memory limits in R

- If you're running 32-bit R on any OS, it'll be 2 or 3 GB
    - note: 2GB of memory used by R is not the same as 2GB on disk
    - Overhead for R to keep track of your data
    - Memory used for analysis, etc.
    - Probably not more than about 500MB on disk?
- If you're running 64-bit R on a 64-bit OS, the upper limit should be infinite (but it's not)
    - Package `bigmemory`
        - "Manage massive matrices with shared memory and memory-mapped files"
        - use in parallel environments can provide substantial speed and memory efficiencies
        - uses a pointer to a C++ data structure
        - (con) matrix data structure requires homogeneity 
    - Package `ff`
        - "Memory-Efficient Storage of Large Data on Disk and Fast Access Functions"
        - provides data structures that are stored on disk but behave (almost) as if they were in RAM
        - uses a pointer to a flat binary file stored on disk, and it can be shared across different sessions
        - permits heterogeneous data structures 


#  Strategies when data get big

- Make the data smaller 
- Get a bigger computer 
- Access the data differently
- Split up the dataset for analysis


# If things are just slow...

- time your code
    - one-liner: 
    ```
        system.time(<call>)
    ```
    - alternative: 
    ```
        start_time <- proc.time()
        <call>
        proc.time() – start_time
    ```
- profile your code to find out which specific operations are slowing you down
- small changes (e.g., vectorize to avoid a loop) can have huge benefit


#### Recall the teller simulation: 

```{r}

any_active <- function(df) {
  # return TRUE if someone has not finished
  return(max(df$endtime) == Inf)
}

next_customer <- function(df) {
  # returns the next customer in line
  res <- filter(df, endtime == Inf) %>%
    arrange(arrival)
  return(head(res, 1))
}

update_customer <- function(df, cust_num, end_time) {
  # sets the end time of a specific customer
  return(mutate(df, endtime = ifelse(custnum == cust_num, end_time, endtime)))
}


teller_sim <- function(n = 1/2, m = 3/2, hours = 6) {
  # simulation of bank where there is just one teller
  # n: expected number of customers per minute
  # m: expected length of transaction is m minutes
  # hours: bank open for this many hours
  
  customers <- rpois(hours * 60, lambda = n)
  arrival <- numeric(sum(customers))
  position <- 1
  for (i in 1:length(customers)) {
    numcust <- customers[i]
    if (numcust != 0) {
      arrival[position:(position + numcust - 1)] <- rep(i, numcust)
      position <- position + numcust
    }
  }
  duration <- rexp(length(arrival), rate = 1/m)  # E[X]=m
  df <- data.frame(arrival, duration, custnum = 1:length(duration), 
                   endtime = Inf, stringsAsFactors = FALSE)
  
  endtime <- 0 # set up beginning of simulation
  while (any_active(df)) { # anyone left to serve
    next_one <- next_customer(df)
    now <- ifelse(next_one$arrival >= endtime, next_one$arrival, endtime)
    endtime <- now + next_one$duration
    df <- update_customer(df, next_one$custnum, endtime)
  }
  df <- mutate(df, totaltime = endtime - arrival)
  return(favstats(~ totaltime, data = df))
}

```


# Profiling the teller simulation 

- let's see where the time goes...
- not bad in this case, but you could discover interesting things... like
    - maybe some modeling function defaults to bootstrap confidence intervals that you don’t care about with 1000 iterations per model
    - you did something for every line of your huge data frame and then combine results using `c()` or `rbind()` rather than assigning to a preallocated vector or matrix


```{r}
Rprof("TellerSimProfile")
teller_sim()
Rprof(NULL)
head(summaryRprof("TellerSimProfile")$by.self, 20)
```


# STILL slow? Try `biglm`

- `biglm` package has an efficient alternative to the `lm` function
- it can even fit generalized linear models (regression & logistic regression) with data frames that are larger than memory

```{r}
require(biglm)

n <- 20000
p <- 500
d <- as.data.frame(matrix(rnorm(n * (p + 1)), ncol = (p + 1)))
expl_vars <- paste(paste0("V", 2:(p+1)), collapse = " + ")
my_formula <- as.formula(paste("V1 ~ ", expl_vars))

# profile `lm` vs `biglm`
system.time(lm(my_formula, data = d))

system.time(biglm(my_formula, data = d))
```


# Next step: parallel processing

- **Parallel processing** is basically farming out subtasks to independent processors, then merging results
- effectively just allocates more RAM for the problem

```{r}
my_cores <- detectCores()
my_cores
```


- **embarassingly parallel computing**
    - I need to repeat the same task many times
    - order of implementation doesn't matter
- I have 4 total cores, but you should always save one for your operating system
- e.g., comparison for several iterations of teller simulation

```{r}
k <- 5

# without parallel processing
system.time(lapply(1:k, teller_sim))

# parallelize with 3 cores
system.time(mclapply(1:k, teller_sim, mc.cores = my_cores - 1))
```


# MapReduce (parallelization that's not embarassing?)

- programming paradigm for parallel computing
    - two phase algorithm
    - **map**--farm out parallizeable task to many machines
    - **reduce**--combine results
- tricky part: **you** have to define the `map` function and the `reduce` function
- needs software implementation
    - Hadoop
    - Spark

# Hadoop & Spark

- Hadoop was first to really tackle MapReduce 
- Hadoop MapReduce has been superseded by Spark, 
    - tools that emerged as the "ecosystem" around it are still popular (HDFS)
    - "legacy" projects might still use Hadoop MapReduce
- Apache Spark is considered superior for a few reasons
    - had the benefit of implementing lessons learned from Hadoop
    - keep the good--HDFS (Hadoop Distributed File System) for disk storage
    - improve the weaknesses--prioritize RAM rather than disk storage whenever possible


# Interface with Spark

- Spark provides provides an interface for programming entire clusters
- a **computer cluster** is a set of connected computers that work together as a single system
- the `sparklyr` package in R makes it easy to 
    - install a local Spark cluster (from within R)
    - connect to a local or remote cluster

```{r eval=FALSE}
require(sparklyr)
# spark_install()   # only once per machine
```


# Interface with Spark


```{r}
# modify master to connect to a remote Spark cluster
sc <- spark_connect(master = "local")
class(sc)
```


```{r}
babynames_tbl <- 
  sc %>%
  copy_to(babynames::babynames, "babynames", overwrite = TRUE)

class(babynames_tbl)
```


# Counting Matthews

```{r}
babynames_tbl %>%
  filter(name == "Matthew") %>%
  group_by(year) %>%
  summarise(N = n(), 
            total_births = sum(n)) %>%
  arrange(desc(total_births)) %>%
  head()
```


# From `dplyr` to SQL

- whenever `dplyr` meets an object with class `tbl_sql` (like `babynames_tbl`), `dplyr` **automatically** translates the R pipeline into **SQL**
- **SQL** (structured query language) is a widely used language for querying relational databases, among other purposes
- Queries in SQL start with the `SELECT` keyword and consist several clauses which **must to be written in order**; basically
    - `SELECT` (**required**)--like `select()` in `dplyr` (and possibly combined with `mutate()`)
    - `FROM` (**required**)--like table before the first `%>%` in `dplyr`
    - `JOIN`--like `join()` verbs in `dplyr`
    - `WHERE`--like `filter()` verb in `dplyr`
    - `GROUP BY`--like `group_by()` verb in `dplyr`
    - `HAVING`--like using a second `filter()` in `dplyr` after the rows have already been 
    - `ORDER BY`--like `arrange()` verb in `dplyr` 
    - `LIMIT`--sort of like `head()` but more versatile
- For example, let's revisit our previous `dplyr` query

```{r}
q <- 
  babynames_tbl %>%
  filter(name == "Matthew") %>%
  group_by(year) %>%
  summarise(N = n(), 
            total_births = sum(n)) %>%
  arrange(desc(total_births)) %>%
  head()

q
```

```{r}
show_query(q)
```

# Querying the Spark cluster

- Spark is a parallelized technology designed to supersede SQL, but it's still useful to know SQL in order to use Spark
- here, we'll query the Spark cluster using the connection we've defined `sc` with the SQL statement equivalent to our `dplyr` wrangling

```{r}
require(DBI)

dbGetQuery(conn = sc, statement = "
                         SELECT year, sum(1) as N, sum(n) as total_births
                         FROM babynames
                         WHERE name == 'Matthew'
                         GROUP BY year
                         ORDER BY total_births desc
                         LIMIT 6
                         ")
```


# Modeling with Spark

- `whately_2015` has some weather data from Massachusetts (in the `macleish` package)
- Spark has a machine learning library which includes many of the supervised/unsupervised learning tools we've discussed this semester
- let's use Spark to fit a multiple regression model as an example

```{r}
require(macleish)

weather_tbl <- copy_to(sc, whately_2015, overwrite = TRUE)

weather_tbl %>%
  sparklyr::ml_linear_regression(rainfall ~ temperature + pressure + rel_humidity) %>%
  summary()

```




# Alternatives to SQL (Google BigQuery)


```{r}
require(bigrquery)

project_id <- "stat-380-class-demo"   # Beckman's google cloud project ID

sql <- "SELECT word, count(distinct corpus) as numPlays, sum(word_count) as N
        FROM [publicdata:samples.shakespeare]
        GROUP BY word
        ORDER BY N desc
        LIMIT 10
        "
query_exec(query = sql, project = project_id)

```


# `Rcpp`

- `RCPP::cppFunction()` allows you to write C++ functions in R
- `Rcpp::sourceCpp()` loads a C++ file from disk in the same way you use source() to load a file of R code. 
- Rcpp will compile the C++ code and construct an R function that connects to the compiled C++ function
- more here: <http://adv-r.had.co.nz/Rcpp.html>


```{r}
require(Rcpp)

# write a simple function in C++

cppFunction('int addemup(int x, int y, int z) {
  int sum = x + y + z;
  return sum;
}')


# R recognizes `addemup` like any other function
addemup
```

```{r}
addemup(2, 4, 6)
```



# Stan

- Bayesian statistical inference with MCMC sampling
- Stan model within the code chunk is compiled into a "stanmodel" object 
- result assigned to a variable with the name given by the `output.var` option


```{stan, output.var="ex1"}
parameters {
  real y[2];
}
model {
  y[1] ~ normal(0, 1);
  y[2] ~ double_exponential(0, 2);
}
```


```{r}
library(rstan)
fit <- sampling(ex1, cores = 3)
print(fit)
```

```{r}
# posterior
stan_plot(fit, point_est = "mean", show_density = TRUE, fill_color = "dodgerblue")

# trace
stan_trace(fit) +
  scale_color_manual(values = c("red", "blue", "green", "black"))

```


# Python in R Markdown

<https://rstudio.github.io/reticulate/#python-in-r-markdown>

- `reticulate` package includes a Python engine for R Markdown:
    - Run Python chunks in a single Python session embedded within your R session (shared variables/state between Python chunks)
    - Printing of Python output, including graphical output from `matplotlib`.
    - Access to objects created within Python chunks from R using the `py` object (e.g. `py$x` would access an `x` variable created within Python from R).
    - Access to objects created within R chunks from Python using the `r` object (e.g. `r.x` would access to `x` variable created within R from Python)
- Built in conversion for many Python object types, including `NumPy` arrays and `Pandas` data frames.


# `reticulate` & R Notebooks

- `reticulate` package does make it pretty easy to go back and forth between R & Python objects
- it "just works" when you compile RMarkdown 
- the R Notebook isn't yet ironed out in the current production release (v1.1)
    - preview will throw errors, even when the code is correct
    - R Notebook is no better than preview
    - This will be fixed in RStudio v1.2 [(install preview here...)](https://www.rstudio.com/products/rstudio/download/preview/)  
        - you should install it (after the semester)
        - I generally don't recommend major updates during the semester if you can avoid it


#### Create object in R code chunk

```{r}
require(reticulate)
dat <- c(180, 215, 210, 210, 188, 176, 209, 200)
```


#### Manipulate `r.dat` in Python code chunk

```{python}
# Import the numpy package
import numpy

# Create a Numpy array from data: np_data
np_data = numpy.array(r.dat)

# Print out mean of np_data
avg = numpy.mean(np_data)

print(avg)
```

#### Access `py$avg` object in R code chunk

```{r}
py$avg
```







