--- title: "Advanced Chunking and Backends in delarr" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Advanced Chunking and Backends in delarr} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ```{r setup, include = FALSE} knitr::opts_chunk$set( collapse = TRUE, comment = "#>", message = FALSE, warning = FALSE ) has_shard <- requireNamespace("shard", quietly = TRUE) ``` ```{r load-delarr} library(delarr) ``` ## When should you reach for the advanced tools? The basic workflow in `vignette("delarr-getting-started", package = "delarr")` is enough when you only need a lazy pipeline and a final `collect()`. This vignette is for the next step: understanding chunk plans, running several summaries in one pass, streaming to backends, and checking whether an optional parallel path behaves the way you expect. All examples use one small dense matrix and validate the key claims in code. ```{r make-example-matrix} set.seed(11) mat <- matrix( rnorm(96), nrow = 12, ncol = 8, dimnames = list(paste0("sample_", 1:12), paste0("feature_", 1:8)) ) ``` ## What will the execution plan do? `explain()` shows the effective output shape, the chunk axis, the chosen chunk size, and the recorded operations after optimization. ```{r inspect-plan} pipe <- delarr(mat)[, -1] |> d_map(~ .x^2 + 1) |> d_where(function(x) x > 1.25, fill = 0) plan <- explain(pipe, chunk_size = 3L) plan stopifnot( identical(plan$output_dim, dim(pipe)), identical(plan$chunk_margin, "cols"), identical(plan$chunk_count, ceiling(ncol(pipe) / 3)) ) ``` ## How do you let `delarr` choose a chunk size? If you do not want to hard-code `chunk_size`, you can pass a memory budget with `target_bytes`. ```{r adaptive-collect} adaptive_plan <- explain(pipe, target_bytes = 256) adaptive_plan adaptive_result <- collect(pipe, target_bytes = 256) fixed_result <- collect(pipe, chunk_size = 3L) stopifnot(isTRUE(all.equal(adaptive_result, fixed_result))) dim(adaptive_result) ``` ## How do you compute several summaries in one pass? `d_reduce_many()` runs several built-in reducers together and returns a matrix when the outputs have a common length. ```{r multi-reduce} row_summary <- d_reduce_many( delarr(mat), fns = list(sum = sum, mean = mean, max = max), dim = "rows", chunk_size = 3L ) stopifnot( is.matrix(row_summary), isTRUE(all.equal(row_summary[, "sum"], rowSums(mat))), isTRUE(all.equal(row_summary[, "mean"], rowMeans(mat))), isTRUE(all.equal(row_summary[, "max"], apply(mat, 1L, max))) ) row_summary[1:4, , drop = FALSE] ``` ## How do you work block-by-block? `block_apply()` is useful when you want chunk-local summaries or diagnostics without materializing the whole array. ```{r block-apply} col_blocks <- block_apply( delarr(mat), margin = "cols", size = 3L, fn = function(block) colMeans(block) ) block_means <- unlist(col_blocks, use.names = FALSE) stopifnot(isTRUE(all.equal(block_means, unname(colMeans(mat))))) block_means ``` ## How do delayed matrix products fit into a pipeline? `d_matmul()` returns another `delarr`, so you can materialize only the block you need from a larger product. ```{r delayed-matmul} rhs <- matrix(rnorm(30), nrow = 6, ncol = 5) product_block <- d_matmul(delarr(mat[, 1:6, drop = FALSE]), delarr(rhs))[1:4, 1:3] |> collect(chunk_size = 2L) expected_block <- (mat[, 1:6, drop = FALSE] %*% rhs)[1:4, 1:3, drop = FALSE] stopifnot(isTRUE(all.equal(product_block, expected_block))) product_block ``` ## How do you stream a transformed matrix to disk? The writer interface is useful when the result is still large enough that you do not want to hold it in memory. ```{r stream-scaled-hdf5} tf_in <- tempfile(fileext = ".h5") tf_out <- tempfile(fileext = ".h5") write_hdf5(mat, tf_in, "X") X <- delarr_hdf5(tf_in, "X") scaled <- X |> d_scale(dim = "cols", center = TRUE, scale = TRUE) writer <- hdf5_writer(tf_out, "X_scaled", ncol = ncol(X), chunk = c(6L, 4L)) collect(scaled, into = writer, chunk_size = 4L) disk_result <- read_hdf5(tf_out, "X_scaled") reference <- collect(delarr(mat) |> d_scale(dim = "cols", center = TRUE, scale = TRUE), chunk_size = 4L) stopifnot(isTRUE(all.equal(unname(disk_result), unname(reference), tolerance = 1e-8))) round(colMeans(disk_result), 6) unlink(c(tf_in, tf_out)) ``` ## How do you use shared-memory workers? If you install the optional `shard` package, `collect_shard()` can evaluate a supported pipeline in worker processes while keeping the underlying matrix in shared memory. ```{r shard-collect, eval = has_shard} shard_result <- delarr_shard(mat) |> d_map(~ .x * 2) |> d_reduce(sum, dim = "rows") |> collect_shard(workers = 2L, chunk_size = 3L) stopifnot(isTRUE(all.equal(shard_result, rowSums(mat * 2)))) head(shard_result) ``` ## How do you profile a candidate pipeline? `profile_collect()` repeats `collect()` and records elapsed time plus the size of the realized output. ```{r profile-pipeline} profile <- profile_collect(pipe, reps = 2L, chunk_size = 3L) profile stopifnot( identical(profile$reps, 2L), all(is.finite(profile$elapsed)), profile$min_sec >= 0 ) ``` ## Where should you go after this? Return to `vignette("delarr-getting-started", package = "delarr")` for the core lazy workflow, then use `?explain`, `?block_apply`, `?d_reduce_many`, and `?collect_shard` as you tune real pipelines for storage layout, chunking, and execution strategy.