Agenda
Announcements
- Ch 17 programming notebook not assigned, but highly recommended
- several steps take some non-trivial configuration that I don’t want to require per se
MDSR Ch 17: Towards big data
- 3 V’s of big data
- “big data is when your workflow breaks” –Randy Pruim
- how big is “big” is relative
- pencil/paper workflow: 30 rows & 3 columns is “big”
- TI 83 workflow: larger than 99 rows (or columns) is “big”
- MS Excel workflow: larger than any of the following constraints is “big”
- 1,048,576 rows (\(2^20\))
- 16,384 columns (\(2^14\))
- 255 characters in a column (\(2^8 - 1\))
- Laptop running R: large with respect to available memory
Biggest of the big… (for perspective)
- the Large Hadron Collider in Geneva generates 25 petabytes of data per year
- one petabyte is a million gigabytes
- all the workflows are broken
- they actually only save 0.001% of the data generated
- Others just for fun (based on 2018 Forbes article)
- Google processes 40,000 searches per second
- 300 million photos uploaded to Facebook per day
- The Weather Channel receives 18,055,556 forecast requests per minute
What happens when data start getting big (in R)
- The data may not load into memory
- Analyzing the data may take a (really) long time
- Visualizations get messy
Memory limits in R
- If you’re running 32-bit R on any OS, it’ll be 2 or 3 GB
- note: 2GB of memory used by R is not the same as 2GB on disk
- Overhead for R to keep track of your data
- Memory used for analysis, etc.
- Probably not more than about 500MB on disk?
- If you’re running 64-bit R on a 64-bit OS, the upper limit should be infinite (but it’s not)
- Package
bigmemory
- “Manage massive matrices with shared memory and memory-mapped files”
- use in parallel environments can provide substantial speed and memory efficiencies
- uses a pointer to a C++ data structure
- (con) matrix data structure requires homogeneity
- Package
ff
- “Memory-Efficient Storage of Large Data on Disk and Fast Access Functions”
- provides data structures that are stored on disk but behave (almost) as if they were in RAM
- uses a pointer to a flat binary file stored on disk, and it can be shared across different sessions
- permits heterogeneous data structures
Strategies when data get big
- Make the data smaller
- Get a bigger computer
- Access the data differently
- Split up the dataset for analysis
If things are just slow…
Recall the teller simulation:
any_active <- function(df) {
# return TRUE if someone has not finished
return(max(df$endtime) == Inf)
}
next_customer <- function(df) {
# returns the next customer in line
res <- filter(df, endtime == Inf) %>%
arrange(arrival)
return(head(res, 1))
}
update_customer <- function(df, cust_num, end_time) {
# sets the end time of a specific customer
return(mutate(df, endtime = ifelse(custnum == cust_num, end_time, endtime)))
}
teller_sim <- function(n = 1/2, m = 3/2, hours = 6) {
# simulation of bank where there is just one teller
# n: expected number of customers per minute
# m: expected length of transaction is m minutes
# hours: bank open for this many hours
customers <- rpois(hours * 60, lambda = n)
arrival <- numeric(sum(customers))
position <- 1
for (i in 1:length(customers)) {
numcust <- customers[i]
if (numcust != 0) {
arrival[position:(position + numcust - 1)] <- rep(i, numcust)
position <- position + numcust
}
}
duration <- rexp(length(arrival), rate = 1/m) # E[X]=m
df <- data.frame(arrival, duration, custnum = 1:length(duration),
endtime = Inf, stringsAsFactors = FALSE)
endtime <- 0 # set up beginning of simulation
while (any_active(df)) { # anyone left to serve
next_one <- next_customer(df)
now <- ifelse(next_one$arrival >= endtime, next_one$arrival, endtime)
endtime <- now + next_one$duration
df <- update_customer(df, next_one$custnum, endtime)
}
df <- mutate(df, totaltime = endtime - arrival)
return(favstats(~ totaltime, data = df))
}
Profiling the teller simulation
- let’s see where the time goes…
- not bad in this case, but you could discover interesting things… like
- maybe some modeling function defaults to bootstrap confidence intervals that you don’t care about with 1000 iterations per model
- you did something for every line of your huge data frame and then combine results using
c()
or rbind()
rather than assigning to a preallocated vector or matrix
Rprof("TellerSimProfile")
teller_sim()
Rprof(NULL)
head(summaryRprof("TellerSimProfile")$by.self, 20)
STILL slow? Try biglm
biglm
package has an efficient alternative to the lm
function
- it can even fit generalized linear models (regression & logistic regression) with data frames that are larger than memory
require(biglm)
n <- 20000
p <- 500
d <- as.data.frame(matrix(rnorm(n * (p + 1)), ncol = (p + 1)))
expl_vars <- paste(paste0("V", 2:(p+1)), collapse = " + ")
my_formula <- as.formula(paste("V1 ~ ", expl_vars))
# profile `lm` vs `biglm`
system.time(lm(my_formula, data = d))
user system elapsed
5.779 0.251 6.131
system.time(biglm(my_formula, data = d))
user system elapsed
3.777 0.182 4.002
Next step: parallel processing
- Parallel processing is basically farming out subtasks to independent processors, then merging results
- effectively just allocates more RAM for the problem
my_cores <- detectCores()
my_cores
[1] 4
- embarassingly parallel computing
- I need to repeat the same task many times
- order of implementation doesn’t matter
- I have 4 total cores, but you should always save one for your operating system
- e.g., comparison for several iterations of teller simulation
k <- 5
# without parallel processing
system.time(lapply(1:k, teller_sim))
user system elapsed
12.102 0.258 12.565
# parallelize with 3 cores
system.time(mclapply(1:k, teller_sim, mc.cores = my_cores - 1))
user system elapsed
11.942 0.933 12.079
MapReduce (parallelization that’s not embarassing?)
- programming paradigm for parallel computing
- two phase algorithm
- map–farm out parallizeable task to many machines
- reduce–combine results
- tricky part: you have to define the
map
function and the reduce
function
- needs software implementation
Hadoop & Spark
- Hadoop was first to really tackle MapReduce
- Hadoop MapReduce has been superseded by Spark,
- tools that emerged as the “ecosystem” around it are still popular (HDFS)
- “legacy” projects might still use Hadoop MapReduce
- Apache Spark is considered superior for a few reasons
- had the benefit of implementing lessons learned from Hadoop
- keep the good–HDFS (Hadoop Distributed File System) for disk storage
- improve the weaknesses–prioritize RAM rather than disk storage whenever possible
Interface with Spark
- Spark provides provides an interface for programming entire clusters
- a computer cluster is a set of connected computers that work together as a single system
- the
sparklyr
package in R makes it easy to
- install a local Spark cluster (from within R)
- connect to a local or remote cluster
require(sparklyr)
# spark_install() # only once per machine
Interface with Spark
# modify master to connect to a remote Spark cluster
sc <- spark_connect(master = "local")
* Using Spark: 2.4.0
class(sc)
[1] "spark_connection" "spark_shell_connection" "DBIConnection"
babynames_tbl <-
sc %>%
copy_to(babynames::babynames, "babynames", overwrite = TRUE)
class(babynames_tbl)
[1] "tbl_spark" "tbl_sql" "tbl_lazy" "tbl"
Counting Matthews
babynames_tbl %>%
filter(name == "Matthew") %>%
group_by(year) %>%
summarise(N = n(),
total_births = sum(n)) %>%
arrange(desc(total_births)) %>%
head()
Missing values are always removed in SQL.
Use `SUM(x, na.rm = TRUE)` to silence this warning
This warning is displayed only once per session.
From dplyr
to SQL
- whenever
dplyr
meets an object with class tbl_sql
(like babynames_tbl
), dplyr
automatically translates the R pipeline into SQL
- SQL (structured query language) is a widely used language for querying relational databases, among other purposes
- Queries in SQL start with the
SELECT
keyword and consist several clauses which must to be written in order; basically
SELECT
(required)–like select()
in dplyr
(and possibly combined with mutate()
)
FROM
(required)–like table before the first %>%
in dplyr
JOIN
–like join()
verbs in dplyr
WHERE
–like filter()
verb in dplyr
GROUP BY
–like group_by()
verb in dplyr
HAVING
–like using a second filter()
in dplyr
after the rows have already been
ORDER BY
–like arrange()
verb in dplyr
LIMIT
–sort of like head()
but more versatile
- For example, let’s revisit our previous
dplyr
query
q <-
babynames_tbl %>%
filter(name == "Matthew") %>%
group_by(year) %>%
summarise(N = n(),
total_births = sum(n)) %>%
arrange(desc(total_births)) %>%
head()
q
show_query(q)
<SQL>
SELECT `year`, count(*) AS `N`, SUM(`n`) AS `total_births`
FROM `babynames`
WHERE (`name` = "Matthew")
GROUP BY `year`
ORDER BY `total_births` DESC
LIMIT 6
Querying the Spark cluster
- Spark is a parallelized technology designed to supersede SQL, but it’s still useful to know SQL in order to use Spark
- here, we’ll query the Spark cluster using the connection we’ve defined
sc
with the SQL statement equivalent to our dplyr
wrangling
require(DBI)
dbGetQuery(conn = sc, statement = "
SELECT year, sum(1) as N, sum(n) as total_births
FROM babynames
WHERE name == 'Matthew'
GROUP BY year
ORDER BY total_births desc
LIMIT 6
")
Modeling with Spark
whately_2015
has some weather data from Massachusetts (in the macleish
package)
- Spark has a machine learning library which includes many of the supervised/unsupervised learning tools we’ve discussed this semester
- let’s use Spark to fit a multiple regression model as an example
require(macleish)
weather_tbl <- copy_to(sc, whately_2015, overwrite = TRUE)
weather_tbl %>%
sparklyr::ml_linear_regression(rainfall ~ temperature + pressure + rel_humidity) %>%
summary()
Deviance Residuals:
Min 1Q Median 3Q Max
-0.041290 -0.021761 -0.011632 -0.000576 15.968356
Coefficients:
(Intercept) temperature pressure rel_humidity
0.7177542 0.0004089 -0.0007545 0.0004377
R-Squared: 0.004824
Root Mean Squared Error: 0.1982
Alternatives to SQL (Google BigQuery)
require(bigrquery)
project_id <- "stat-380-class-demo" # Beckman's google cloud project ID
sql <- "SELECT word, count(distinct corpus) as numPlays, sum(word_count) as N
FROM [publicdata:samples.shakespeare]
GROUP BY word
ORDER BY N desc
LIMIT 10
"
query_exec(query = sql, project = project_id)
Waiting for authentication in browser...
Press Esc/Ctrl + C to abort
Authentication complete.
Running job -: 1s:
Running job \: 2s:
Running job |: 2s:
Running job /: 3s:
Running job -: 3s:
Running job \: 3s:
Running job |: 4s:
Running job /: 4s:
Running job -: 5s:
Running job \: 5s:
Running job |: 5s:
Running job /: 5s:
10.0 megabytes processed
Rcpp
RCPP::cppFunction()
allows you to write C++ functions in R
Rcpp::sourceCpp()
loads a C++ file from disk in the same way you use source() to load a file of R code.
- Rcpp will compile the C++ code and construct an R function that connects to the compiled C++ function
- more here: http://adv-r.had.co.nz/Rcpp.html
require(Rcpp)
# write a simple function in C++
cppFunction('int addemup(int x, int y, int z) {
int sum = x + y + z;
return sum;
}')
# R recognizes `addemup` like any other function
addemup
function (x, y, z)
.Call(<pointer: 0x1176de8f0>, x, y, z)
addemup(2, 4, 6)
[1] 12
Stan
- Bayesian statistical inference with MCMC sampling
- Stan model within the code chunk is compiled into a “stanmodel” object
- result assigned to a variable with the name given by the
output.var
option
parameters {
real y[2];
}
model {
y[1] ~ normal(0, 1);
y[2] ~ double_exponential(0, 2);
}
Loading required package: sp
library(rstan)
Loading required package: StanHeaders
rstan (Version 2.19.2, GitRev: 2e1f913d3ca3)
For execution on a local, multicore CPU with excess RAM we recommend calling
options(mc.cores = parallel::detectCores()).
To avoid recompilation of unchanged Stan programs, we recommend calling
rstan_options(auto_write = TRUE)
Attaching package: ‘rstan’
The following object is masked from ‘package:tidyr’:
extract
fit <- sampling(ex1, cores = 3)
starting worker pid=25541 on localhost:11849 at 12:27:28.837
starting worker pid=25555 on localhost:11849 at 12:27:29.119
starting worker pid=25569 on localhost:11849 at 12:27:29.379
SAMPLING FOR MODEL 'stan-5a7c20ea0b45' NOW (CHAIN 1).
Chain 1:
Chain 1: Gradient evaluation took 1.1e-05 seconds
Chain 1: 1000 transitions using 10 leapfrog steps per transition would take 0.11 seconds.
Chain 1: Adjust your expectations accordingly!
Chain 1:
Chain 1:
Chain 1: Iteration: 1 / 2000 [ 0%] (Warmup)
Chain 1: Iteration: 200 / 2000 [ 10%] (Warmup)
Chain 1: Iteration: 400 / 2000 [ 20%] (Warmup)
Chain 1: Iteration: 600 / 2000 [ 30%] (Warmup)
Chain 1: Iteration: 800 / 2000 [ 40%] (Warmup)
Chain 1: Iteration: 1000 / 2000 [ 50%] (Warmup)
Chain 1: Iteration: 1001 / 2000 [ 50%] (Sampling)
Chain 1: Iteration: 1200 / 2000 [ 60%] (Sampling)
Chain 1: Iteration: 1400 / 2000 [ 70%] (Sampling)
Chain 1: Iteration: 1600 / 2000 [ 80%] (Sampling)
Chain 1: Iteration: 1800 / 2000 [ 90%] (Sampling)
SAMPLING FOR MODEL 'stan-5a7c20ea0b45' NOW (CHAIN 2).
Chain 1: Iteration: 2000 / 2000 [100%] (Sampling)
Chain 1:
Chain 1: Elapsed Time: 0.032539 seconds (Warm-up)
Chain 1: 0.031726 seconds (Sampling)
Chain 1: 0.064265 seconds (Total)
Chain 1:
Chain 2:
Chain 2: Gradient evaluation took 1.1e-05 seconds
Chain 2: 1000 transitions using 10 leapfrog steps per transition would take 0.11 seconds.
Chain 2: Adjust your expectations accordingly!
Chain 2:
Chain 2:
Chain 2: Iteration: 1 / 2000 [ 0%] (Warmup)
Chain 2: Iteration: 200 / 2000 [ 10%] (Warmup)
Chain 2: Iteration: 400 / 2000 [ 20%] (Warmup)
Chain 2: Iteration: 600 / 2000 [ 30%] (Warmup)
Chain 2: Iteration: 800 / 2000 [ 40%] (Warmup)
Chain 2: Iteration: 1000 / 2000 [ 50%] (Warmup)
Chain 2: Iteration: 1001 / 2000 [ 50%] (Sampling)
Chain 2: Iteration: 1200 / 2000 [ 60%] (Sampling)
Chain
SAMPLING FOR MODEL 'stan-5a7c20ea0b45' NOW (CHAIN 3).
2: Iteration: 1400 / 2000 [ 70%] (Sampling)
Chain 3:
Chain 3: Gradient evaluation took 9e-06 seconds
Chain 3: 1000 transitions using 10 leapfrog steps per transition would take 0.09 seconds.
Chain 3: Adjust your expectations accordingly!
Chain 3:
Chain 3:
Chain 3: Iteration: 1 / 2000 [ 0%] (Warmup)
Chain 2: Iteration: 1600 / 2000 [ 80%] (Sampling)
Chain 2: Iteration: 1800 / 2000 [ 90%] (Sampling)
Chain 2: Iteration: 2000 / 2000 [100%] (Sampling)
Chain 2:
Chain 2: Elapsed Time: 0.033218 seconds (Warm-up)
Chain 2: 0.029568 seconds (Sampling)
Chain 2: 0.062786 seconds (Total)
Chain 2:
Chain 3: Iteration: 200 / 2000 [ 10%] (Warmup)
Chain 3: Iteration: 400 / 2000 [ 20%] (Warmup)
Chain 3: Iteration: 600 / 2000 [ 30%] (Warmup)
Chain 3: Iteration: 800 / 2000 [ 40%] (Warmup)
Chain 3: Iteration: 1000 / 2000 [ 50%] (Warmup)
Chain 3: Iteration: 1001 / 2000 [ 50%] (Sampling)
Chain 3: Iteration: 1200 / 2000 [ 60%] (Sampling)
Chain 3: Iteration: 1400 / 2000 [ 70%] (Sampling)
Chain 3: Iteration: 1600 / 2000 [ 80%] (Sampling)
Chain 3: Iteration: 1800 / 2000 [ 90%] (Sampling)
Chain 3: Iteration: 2000 / 2000 [100%] (Sampling)
Chain 3:
Chain 3: Elapsed Time: 0.033801 seconds (Warm-up)
Chain 3: 0.032489 seconds (Sampling)
Chain 3: 0.06629 seconds (Total)
Chain 3:
SAMPLING FOR MODEL 'stan-5a7c20ea0b45' NOW (CHAIN 4).
Chain 4:
Chain 4: Gradient evaluation took 6e-06 seconds
Chain 4: 1000 transitions using 10 leapfrog steps per transition would take 0.06 seconds.
Chain 4: Adjust your expectations accordingly!
Chain 4:
Chain 4:
Chain 4: Iteration: 1 / 2000 [ 0%] (Warmup)
Chain 4: Iteration: 200 / 2000 [ 10%] (Warmup)
Chain 4: Iteration: 400 / 2000 [ 20%] (Warmup)
Chain 4: Iteration: 600 / 2000 [ 30%] (Warmup)
Chain 4: Iteration: 800 / 2000 [ 40%] (Warmup)
Chain 4: Iteration: 1000 / 2000 [ 50%] (Warmup)
Chain 4: Iteration: 1001 / 2000 [ 50%] (Sampling)
Chain 4: Iteration: 1200 / 2000 [ 60%] (Sampling)
Chain 4: Iteration: 1400 / 2000 [ 70%] (Sampling)
Chain 4: Iteration: 1600 / 2000 [ 80%] (Sampling)
Chain 4: Iteration: 1800 / 2000 [ 90%] (Sampling)
Chain 4: Iteration: 2000 / 2000 [100%] (Sampling)
Chain 4:
Chain 4: Elapsed Time: 0.019408 seconds (Warm-up)
Chain 4: 0.016486 seconds (Sampling)
Chain 4: 0.035894 seconds (Total)
Chain 4:
print(fit)
Inference for Stan model: stan-5a7c20ea0b45.
4 chains, each with iter=2000; warmup=1000; thin=1;
post-warmup draws per chain=1000, total post-warmup draws=4000.
mean se_mean sd 2.5% 25% 50% 75% 97.5% n_eff Rhat
y[1] -0.02 0.02 1.02 -2.01 -0.72 -0.01 0.67 2.01 1897 1
y[2] 0.10 0.06 2.79 -5.72 -1.28 0.06 1.47 6.03 2006 1
lp__ -1.50 0.03 1.22 -4.61 -2.06 -1.22 -0.62 -0.10 1329 1
Samples were drawn using NUTS(diag_e) at Wed Oct 2 12:27:32 2019.
For each parameter, n_eff is a crude measure of effective sample size,
and Rhat is the potential scale reduction factor on split chains (at
convergence, Rhat=1).
# posterior
stan_plot(fit, point_est = "mean", show_density = TRUE, fill_color = "dodgerblue")
ci_level: 0.8 (80% intervals)
outer_level: 0.95 (95% intervals)

# trace
stan_trace(fit) +
scale_color_manual(values = c("red", "blue", "green", "black"))
Scale for 'colour' is already present. Adding another scale for 'colour',
which will replace the existing scale.

Python in R Markdown
https://rstudio.github.io/reticulate/#python-in-r-markdown
reticulate
package includes a Python engine for R Markdown:
- Run Python chunks in a single Python session embedded within your R session (shared variables/state between Python chunks)
- Printing of Python output, including graphical output from
matplotlib
.
- Access to objects created within Python chunks from R using the
py
object (e.g. py$x
would access an x
variable created within Python from R).
- Access to objects created within R chunks from Python using the
r
object (e.g. r.x
would access to x
variable created within R from Python)
- Built in conversion for many Python object types, including
NumPy
arrays and Pandas
data frames.
reticulate
& R Notebooks
reticulate
package does make it pretty easy to go back and forth between R & Python objects
- it “just works” when you compile RMarkdown
- the R Notebook isn’t yet ironed out in the current production release (v1.1)
- preview will throw errors, even when the code is correct
- R Notebook is no better than preview
- This will be fixed in RStudio v1.2 (install preview here…)
- you should install it (after the semester)
- I generally don’t recommend major updates during the semester if you can avoid it
Create object in R code chunk
require(reticulate)
dat <- c(180, 215, 210, 210, 188, 176, 209, 200)
Manipulate r.dat
in Python code chunk
198.5
Access py$avg
object in R code chunk
py$avg
[1] 198.5
