Getting Started with delarr

library(delarr)

What problem does delarr solve?

delarr lets you write matrix pipelines as if everything were already in memory while deferring the actual work until collect(). That matters when the source matrix lives on disk, when you want to avoid intermediate allocations, or when you need to stream the result directly into another backend.

This vignette covers one small lazy pipeline, one streaming write to HDF5, and one custom backend. For chunk planning, profiling, and optional shared-memory workers, see vignette("advanced", package = "delarr").

What does a lazy pipeline look like?

set.seed(1)
mat <- matrix(
  rnorm(24),
  nrow = 6,
  ncol = 4,
  dimnames = list(paste0("sample_", 1:6), paste0("feature_", 1:4))
)

lazy_mean <- delarr(mat) |>
  d_center(dim = "rows") |>
  d_map(~ .x * 0.5) |>
  d_reduce(mean, dim = "rows")

lazy_mean
#> <delarr> 6 x 1 - ops: center(rows) -> map -> reduce(rows)

Nothing has been materialized yet. The object is still a delarr, and the work is only a recorded plan.

row_summary <- collect(lazy_mean, chunk_size = 2L)
row_summary
#>      sample_1      sample_2      sample_3      sample_4      sample_5 
#> -6.938894e-18  6.245005e-17  1.387779e-17  2.775558e-17 -1.387779e-17 
#>      sample_6 
#>  6.938894e-18
stopifnot(all(abs(row_summary) < 1e-10))

After row-centering, every row has mean zero. The stopifnot() turns that claim into a check that would fail if the pipeline changed unexpectedly.

How do row and column vectors broadcast?

Scalars, row-sized vectors, and column-sized vectors stay lazy too. delarr infers whether a vector should broadcast across rows or columns from its length.

row_bias <- c(-1, 0, 1, 2, 3, 4)
col_scale <- c(1, 0.5, 2, 1.5)

broadcasted <- collect((delarr(mat) + row_bias) * col_scale, chunk_size = 2L)
expected <- sweep(sweep(mat, 1L, row_bias, "+"), 2L, col_scale, "*")

stopifnot(isTRUE(all.equal(broadcasted, expected)))
broadcasted[1:3, , drop = FALSE]
#>           feature_1  feature_2 feature_3  feature_4
#> sample_1 -1.6264538 -0.2562855 -3.242481 -0.2681682
#> sample_2  0.1836433  0.3691624 -4.429400  0.8908520
#> sample_3  0.1643714  0.7878907  4.249862  2.8784661

How do you stream a result to HDF5?

delarr_hdf5() reads a dataset lazily, and hdf5_writer() lets you stream the transformed result back to disk without materializing the full output matrix in R.

tf_in <- tempfile(fileext = ".h5")
tf_out <- tempfile(fileext = ".h5")

input <- matrix(runif(30), 5, 6)
write_hdf5(input, tf_in, "X")

X <- delarr_hdf5(tf_in, "X")
writer <- hdf5_writer(tf_out, "X_z", ncol = ncol(X), chunk = c(5L, 3L))

collect(X |> d_zscore(dim = "cols"), into = writer, chunk_size = 3L)

z <- read_hdf5(tf_out, "X_z")
round(colMeans(z), 6)
#> [1] 0 0 0 0 0 0
stopifnot(all(abs(colMeans(z)) < 1e-8))

unlink(c(tf_in, tf_out))

How do you wrap your own storage layer?

A custom backend only needs matrix dimensions and a pull() function that can return arbitrary row and column slices. Here the backing store is just another matrix, but the same pattern works for databases, APIs, or memory-mapped files.

source_mat <- matrix(
  seq_len(60),
  nrow = 10,
  ncol = 6,
  dimnames = list(paste0("row_", 1:10), paste0("col_", 1:6))
)

seed <- delarr_seed(
  nrow = nrow(source_mat),
  ncol = ncol(source_mat),
  pull = function(rows = NULL, cols = NULL) {
    if (is.null(rows)) rows <- seq_len(nrow(source_mat))
    if (is.null(cols)) cols <- seq_len(ncol(source_mat))
    source_mat[rows, cols, drop = FALSE]
  },
  dimnames = dimnames(source_mat)
)

custom_result <- delarr(seed)[1:4, 2:5] |>
  d_map(~ .x^2) |>
  collect(chunk_size = 2L)

stopifnot(isTRUE(all.equal(custom_result, source_mat[1:4, 2:5]^2)))
custom_result
#>       col_2 col_3 col_4 col_5
#> row_1   121   441   961  1681
#> row_2   144   484  1024  1764
#> row_3   169   529  1089  1849
#> row_4   196   576  1156  1936

Where should you go next?

Use ?collect when you want to control chunk size or stream into a writer, ?delarr_seed when you need a custom backend, and vignette("advanced", package = "delarr") for execution plans, streamed multi-reducer summaries, block-wise workflows, delayed matrix products, and optional shared-memory workers.