| Title: | Deterministic, Zero-Copy Parallel Execution for R |
|---|---|
| Description: | Provides a parallel execution runtime for R that emphasizes deterministic memory behavior and efficient handling of large shared inputs. 'shard' enables zero-copy parallel reads via shared/memory-mapped segments, encourages explicit output buffers to avoid large result aggregation, and supervises worker processes to mitigate memory drift via controlled recycling. Diagnostics report peak memory usage, end-of-run memory return, and hidden copy/materialization events to support reproducible performance benchmarking. |
| Authors: | Bradley Buchsbaum [aut, cre, cph] |
| Maintainer: | Bradley Buchsbaum <[email protected]> |
| License: | MIT + file LICENSE |
| Version: | 0.1.0 |
| Built: | 2026-06-04 06:34:55 UTC |
| Source: | https://github.com/bbuchsbaum/shard |
Extract Buffer Elements
## S3 method for class 'shard_buffer' x[i, j, ..., drop = TRUE]## S3 method for class 'shard_buffer' x[i, j, ..., drop = TRUE]
x |
A shard_buffer object. |
i |
Index or indices. |
j |
Optional second index (for matrices). |
... |
Additional indices (for arrays). |
drop |
Whether to drop dimensions. |
A vector or array of values read from the buffer.
buf <- buffer("double", dim = 10) buf[1:5] <- 1:5 buf[1:3] buffer_close(buf)buf <- buffer("double", dim = 10) buf[1:5] <- 1:5 buf[1:3] buffer_close(buf)
Subset Shard Descriptor
## S3 method for class 'shard_descriptor' x[i]## S3 method for class 'shard_descriptor' x[i]
x |
A shard_descriptor object. |
i |
Index or indices. |
A subset of the object.
sh <- shards(100, block_size = 25) sh[1:2]sh <- shards(100, block_size = 25) sh[1:2]
Subset a shard_descriptor_lazy Object
## S3 method for class 'shard_descriptor_lazy' x[i]## S3 method for class 'shard_descriptor_lazy' x[i]
x |
A |
i |
Index or indices. |
A subset of the object.
sh <- shards(100, block_size = 25) sh[1:2]sh <- shards(100, block_size = 25) sh[1:2]
Get Single Shard
## S3 method for class 'shard_descriptor' x[[i]]## S3 method for class 'shard_descriptor' x[[i]]
x |
A shard_descriptor object. |
i |
Index. |
A subset of the object.
sh <- shards(100, block_size = 25) sh[[1]]sh <- shards(100, block_size = 25) sh[[1]]
Extract a Single Shard from a shard_descriptor_lazy Object
## S3 method for class 'shard_descriptor_lazy' x[[i]]## S3 method for class 'shard_descriptor_lazy' x[[i]]
x |
A |
i |
Index. |
A subset of the object.
sh <- shards(100, block_size = 25) sh[[1]]sh <- shards(100, block_size = 25) sh[[1]]
Assign to Buffer Elements
## S3 replacement method for class 'shard_buffer' x[i, j, ...] <- value## S3 replacement method for class 'shard_buffer' x[i, j, ...] <- value
x |
A shard_buffer object. |
i |
Index or indices. |
j |
Optional second index (for matrices). |
... |
Additional indices (for arrays). |
value |
Values to assign. |
The modified shard_buffer object, invisibly.
buf <- buffer("double", dim = 10) buf[1:5] <- rnorm(5) buffer_close(buf)buf <- buffer("double", dim = 10) buf[1:5] <- rnorm(5) buffer_close(buf)
Register custom traversal logic for specific classes during deep sharing operations. Adapters allow fine-grained control over how objects are decomposed and reconstructed.
The adapter registry provides a way to customize how specific classes are handled during deep sharing. Instead of generic slot traversal for S4 objects or element-wise traversal for lists, you can provide custom functions to:
Extract the shareable children from an object (children)
Reconstruct the object from shared children (replace)
This is useful for:
Complex S4 objects where only certain slots should be shared
S3 objects with internal structure that differs from list structure
Objects with accessors that should be used instead of direct slot access
share for the main sharing function that uses adapters.
These controls are opt-in and best-effort. On unsupported platforms, they safely no-op (returning FALSE).
Currently supported on Linux only.
affinity_supported()affinity_supported()
A logical scalar indicating platform support.
affinity_supported()affinity_supported()
ALTREP-backed zero-copy vectors for shared memory.
These functions create ALTREP (Alternative Representation) vectors that are backed by shared memory segments. The key benefits are:
Zero-copy subsetting: Contiguous subsets return views into the same shared memory, not copies.
Diagnostics: Track when data pointers are accessed or when vectors are materialized (copied to standard R vectors).
Read-only protection: Optionally prevent write access to protect shared data.
Supported types: integer, double/numeric, logical, raw.
Semantic scope for scratch memory that signals temporary data should not accumulate. Enables memory-conscious parallel execution.
Evaluates an expression in a semantic scope that signals scratch memory usage. This enables memory-conscious execution where temporaries are expected to be reclaimed after the scope exits.
arena( expr, strict = FALSE, escape_threshold = .arena_escape_threshold, gc_after = strict, diagnostics = FALSE )arena( expr, strict = FALSE, escape_threshold = .arena_escape_threshold, gc_after = strict, diagnostics = FALSE )
expr |
An expression to evaluate within the arena scope. |
strict |
Logical. If TRUE, enables strict mode which:
Default is FALSE for compatibility and performance. |
escape_threshold |
Numeric. Size in bytes above which returned objects
trigger a warning in strict mode. Default is 1MB (1048576 bytes).
Only used when |
gc_after |
Logical. If TRUE, triggers garbage collection after the arena scope exits. Default is TRUE in strict mode, FALSE otherwise. |
diagnostics |
Logical. If TRUE, returns diagnostics about memory usage along with the result. Default is FALSE. |
The arena() function provides a semantic scope that signals "this code
produces scratch data that should not outlive the scope." It serves two
purposes:
For compiled kernels: When Rust-based kernels are available, arena() provides real scratch arenas backed by temporary shared memory segments that are automatically reclaimed.
For arbitrary R code: Triggers post-task memory checks to detect growth and potential memory leaks.
The strict parameter controls escape detection:
strict = FALSE (default): Returns results normally, logs
diagnostics about memory growth.
strict = TRUE: Warns or errors if large objects escape the
scope, and triggers aggressive memory reclamation.
The result of evaluating expr. If diagnostics = TRUE,
returns an arena_result object with elements result and
diagnostics.
shard_map for parallel execution,
share for shared memory inputs.
result <- arena({ tmp <- matrix(rnorm(1e6), nrow = 1000) colMeans(tmp) }) info <- arena({ x <- rnorm(1e5) sum(x) }, diagnostics = TRUE) info$diagnosticsresult <- arena({ tmp <- matrix(rnorm(1e6), nrow = 1000) colMeans(tmp) }) info <- arena({ x <- rnorm(1e5) sum(x) }, diagnostics = TRUE) info$diagnostics
Returns the nesting depth of arena scopes. Useful for debugging.
arena_depth()arena_depth()
Integer count of nested arena scopes (0 if not in an arena).
arena_depth()arena_depth()
Materialize a shard table handle as a data.frame/tibble
as_tibble(x, max_bytes = 256 * 1024^2, ...)as_tibble(x, max_bytes = 256 * 1024^2, ...)
x |
A shard table object. |
max_bytes |
Warn if estimated payload exceeds this threshold. |
... |
Reserved for future extensions. |
A data.frame (or tibble if the tibble package is installed).
s <- schema(x = float64(), y = int32()) tb <- table_buffer(s, nrow = 5L) table_write(tb, idx_range(1, 5), data.frame(x = rnorm(5), y = 1:5)) df <- as_tibble(tb)s <- schema(x = float64(), y = int32()) tb <- table_buffer(s, nrow = 5L) table_write(tb, idx_range(1, 5), data.frame(x = rnorm(5), y = 1:5)) df <- as_tibble(tb)
Materialize a dataset handle into a data.frame/tibble
## S3 method for class 'shard_dataset' as_tibble(x, max_bytes = 256 * 1024^2, ...)## S3 method for class 'shard_dataset' as_tibble(x, max_bytes = 256 * 1024^2, ...)
x |
A |
max_bytes |
Accepted for API consistency. |
... |
Reserved for future extensions. |
A data.frame (or tibble if the tibble package is installed).
Materialize a row-groups handle into a data.frame/tibble
## S3 method for class 'shard_row_groups' as_tibble(x, max_bytes = 256 * 1024^2, ...)## S3 method for class 'shard_row_groups' as_tibble(x, max_bytes = 256 * 1024^2, ...)
x |
A |
max_bytes |
Accepted for API consistency; currently unused for row-groups. |
... |
Reserved for future extensions. |
A data.frame (or tibble if the tibble package is installed).
Converts a shard_table_handle to an in-memory data.frame (or tibble if the
tibble package is installed).
## S3 method for class 'shard_table_buffer' as_tibble(x, max_bytes = 256 * 1024^2, ...)## S3 method for class 'shard_table_buffer' as_tibble(x, max_bytes = 256 * 1024^2, ...)
x |
A |
max_bytes |
Warn if estimated payload exceeds this threshold. |
... |
Reserved for future extensions. |
A data.frame (or tibble).
Materialize a table handle into a data.frame/tibble
## S3 method for class 'shard_table_handle' as_tibble(x, max_bytes = 256 * 1024^2, ...)## S3 method for class 'shard_table_handle' as_tibble(x, max_bytes = 256 * 1024^2, ...)
x |
A |
max_bytes |
Warn if estimated payload exceeds this threshold. |
... |
Reserved for future extensions. |
A data.frame (or tibble if the tibble package is installed).
Coerce a Shared Memory Buffer to Array
## S3 method for class 'shard_buffer' as.array(x, ...)## S3 method for class 'shard_buffer' as.array(x, ...)
x |
A |
... |
Ignored. |
An array with the buffer contents and the buffer's dimensions, or a plain vector for 1-D buffers.
buf <- buffer("double", dim = c(2, 3, 4)) as.array(buf) buffer_close(buf)buf <- buffer("double", dim = c(2, 3, 4)) as.array(buf) buffer_close(buf)
Coerce a Shared Memory Buffer to Double
## S3 method for class 'shard_buffer' as.double(x, ...)## S3 method for class 'shard_buffer' as.double(x, ...)
x |
A |
... |
Ignored. |
A double vector with the buffer contents.
buf <- buffer("double", dim = 5) as.double(buf) buffer_close(buf)buf <- buffer("double", dim = 5) as.double(buf) buffer_close(buf)
Coerce a Shared Memory Buffer to Integer
## S3 method for class 'shard_buffer' as.integer(x, ...)## S3 method for class 'shard_buffer' as.integer(x, ...)
x |
A |
... |
Ignored. |
An integer vector with the buffer contents.
buf <- buffer("integer", dim = 5) as.integer(buf) buffer_close(buf)buf <- buffer("integer", dim = 5) as.integer(buf) buffer_close(buf)
Coerce a Shared Memory Buffer to Logical
## S3 method for class 'shard_buffer' as.logical(x, ...)## S3 method for class 'shard_buffer' as.logical(x, ...)
x |
A |
... |
Ignored. |
A logical vector with the buffer contents.
buf <- buffer("logical", dim = 5) as.logical(buf) buffer_close(buf)buf <- buffer("logical", dim = 5) as.logical(buf) buffer_close(buf)
Coerce a Shared Memory Buffer to Matrix
## S3 method for class 'shard_buffer' as.matrix(x, ...)## S3 method for class 'shard_buffer' as.matrix(x, ...)
x |
A |
... |
Ignored. |
A matrix with the buffer contents and the buffer's dimensions.
buf <- buffer("double", dim = c(3, 4)) as.matrix(buf) buffer_close(buf)buf <- buffer("double", dim = c(3, 4)) as.matrix(buf) buffer_close(buf)
Coerce a Shared Memory Buffer to Raw
## S3 method for class 'shard_buffer' as.raw(x, ...)## S3 method for class 'shard_buffer' as.raw(x, ...)
x |
A |
... |
Ignored. |
A raw vector with the buffer contents.
buf <- buffer("raw", dim = 5) as.raw(buf) buffer_close(buf)buf <- buffer("raw", dim = 5) as.raw(buf) buffer_close(buf)
Coerce a Shared Memory Buffer to a Vector
## S3 method for class 'shard_buffer' as.vector(x, mode = "any")## S3 method for class 'shard_buffer' as.vector(x, mode = "any")
x |
A |
mode |
Storage mode passed to |
A vector of the buffer's type (or coerced to mode).
buf <- buffer("double", dim = 5) buf[1:5] <- 1:5 as.vector(buf) buffer_close(buf)buf <- buffer("double", dim = 5) buf[1:5] <- 1:5 as.vector(buf) buffer_close(buf)
Get available shared memory backing types
available_backings()available_backings()
A character vector of available backing types on the current platform.
available_backings()available_backings()
Create typed writable output buffers backed by shared memory for cross-process writes during parallel execution.
Creates a typed output buffer backed by shared memory that can be written to by parallel workers using slice assignment.
buffer( type = c("double", "integer", "logical", "raw"), dim, init = NULL, backing = c("auto", "mmap", "shm") )buffer( type = c("double", "integer", "logical", "raw"), dim, init = NULL, backing = c("auto", "mmap", "shm") )
type |
Character. Data type: "double" (default), "integer", "logical", or "raw". |
dim |
Integer vector. Dimensions of the buffer. For a vector, specify
the length. For a matrix, specify |
init |
Initial value to fill the buffer. Default is type-appropriate
zero ( |
backing |
Backing type for shared memory: "auto" (default), "mmap", or "shm". |
Buffers provide an explicit output mechanism for shard_map.
Instead of returning results from workers (which requires serialization
and memory copying), workers write directly to shared buffers.
Supported types:
"double": 8-byte floating point (default)
"integer": 4-byte signed integer
"logical": 4-byte logical (stored as integer)
"raw": 1-byte raw data
Buffers support slice assignment using standard R indexing:
buf[1:100] <- values
An S3 object of class "shard_buffer" that supports:
Slice assignment: buf[idx] <- values
Slice reading: buf[idx]
Full extraction: buf[]
Conversion to R vector: as.vector(buf), as.double(buf), etc.
segment_create for low-level segment operations,
share for read-only shared inputs
out <- buffer("double", dim = 100) out[1:10] <- rnorm(10) result <- out[]out <- buffer("double", dim = 100) out[1:10] <- rnorm(10) result <- out[]
Advise access pattern for a buffer
buffer_advise( x, advice = c("normal", "sequential", "random", "willneed", "dontneed") )buffer_advise( x, advice = c("normal", "sequential", "random", "willneed", "dontneed") )
x |
A shard_buffer. |
advice |
See |
A logical scalar; TRUE if the OS accepted the hint.
buf <- buffer("double", dim = 10L) buffer_advise(buf, "sequential")buf <- buffer("double", dim = 10L) buffer_advise(buf, "sequential")
Closes the buffer and releases the underlying shared memory.
buffer_close(x, unlink = NULL)buffer_close(x, unlink = NULL)
x |
A shard_buffer object. |
unlink |
Whether to unlink the underlying segment. |
NULL, invisibly.
buf <- buffer("double", dim = 10) buffer_close(buf)buf <- buffer("double", dim = 10) buffer_close(buf)
Returns per-process counters for shard buffer writes. shard_map uses these internally to report write volume/operations in copy_report().
buffer_diagnostics()buffer_diagnostics()
A list with elements writes (integer count) and bytes
(total bytes written) accumulated in the current process.
buffer_diagnostics()buffer_diagnostics()
Returns information about a buffer.
buffer_info(x)buffer_info(x)
x |
A shard_buffer object. |
A named list with buffer properties: type, dim,
n, bytes, backing, path, and readonly.
buf <- buffer("integer", dim = c(5, 5)) buffer_info(buf) buffer_close(buf)buf <- buffer("integer", dim = c(5, 5)) buffer_info(buf) buffer_close(buf)
Opens a shared memory buffer that was created in another process. Used by workers to attach to the parent's output buffer.
buffer_open(path, type, dim, backing = c("mmap", "shm"), readonly = FALSE)buffer_open(path, type, dim, backing = c("mmap", "shm"), readonly = FALSE)
path |
Path or shm name of the buffer's segment. |
type |
Character. Data type of the buffer. |
dim |
Integer vector. Dimensions of the buffer. |
backing |
Backing type: "mmap" or "shm". |
readonly |
Logical. Open as read-only? Default FALSE for workers. |
A shard_buffer object attached to the existing segment.
buf <- buffer("double", dim = 10) path <- buffer_path(buf) buf2 <- buffer_open(path, type = "double", dim = 10, backing = "mmap") buffer_close(buf2, unlink = FALSE) buffer_close(buf)buf <- buffer("double", dim = 10) path <- buffer_path(buf) buf2 <- buffer_open(path, type = "double", dim = 10, backing = "mmap") buffer_close(buf2, unlink = FALSE) buffer_close(buf)
Returns the path or name of the buffer's underlying segment. Use this to pass buffer location to workers.
buffer_path(x)buffer_path(x)
x |
A shard_buffer object. |
A character string with the path or name of the segment, or
NULL if the segment is anonymous.
buf <- buffer("double", dim = 10) buffer_path(buf) buffer_close(buf)buf <- buffer("double", dim = 10) buffer_path(buf) buffer_close(buf)
collect() is a convenience alias for as_tibble() for shard table outputs.
collect(x, ...)collect(x, ...)
x |
A shard table handle ( |
... |
Passed to |
A data.frame (or tibble if the tibble package is installed).
s <- schema(x = float64(), y = int32()) tb <- table_buffer(s, nrow = 5L) table_write(tb, idx_range(1, 5), data.frame(x = rnorm(5), y = 1:5)) handle <- table_finalize(tb) df <- collect(handle)s <- schema(x = float64(), y = int32()) tb <- table_buffer(s, nrow = 5L) table_write(tb, idx_range(1, 5), data.frame(x = rnorm(5), y = 1:5)) handle <- table_finalize(tb) df <- collect(handle)
Collect a dataset handle into memory
## S3 method for class 'shard_dataset' collect(x, ...)## S3 method for class 'shard_dataset' collect(x, ...)
x |
A |
... |
Passed to |
A data.frame (or tibble if the tibble package is installed).
Collect a row-groups handle into memory
## S3 method for class 'shard_row_groups' collect(x, ...)## S3 method for class 'shard_row_groups' collect(x, ...)
x |
A |
... |
Passed to |
A data.frame (or tibble if the tibble package is installed).
Collect a table handle into memory
## S3 method for class 'shard_table_handle' collect(x, ...)## S3 method for class 'shard_table_handle' collect(x, ...)
x |
A |
... |
Passed to |
A data.frame (or tibble if the tibble package is installed).
Type constructors for schema-driven table outputs.
int32() float64() bool() raw_col() string_col()int32() float64() bool() raw_col() string_col()
A shard_coltype object.
Generates a report of data transfer and copy statistics during parallel execution.
copy_report(result = NULL)copy_report(result = NULL)
result |
Optional. A |
An S3 object of class shard_report with type "copy"
containing:
type: "copy"
timestamp: When the report was generated
borrow_exports: Number of borrowed input exports
borrow_bytes: Total bytes in borrowed inputs
result_imports: Number of result imports
result_bytes: Estimated bytes in results
buffer_writes: Number of buffer write operations
buffer_bytes: Total bytes written to buffers
res <- shard_map(shards(100, workers = 2), function(s) sum(s$idx), workers = 2) pool_stop() copy_report(res)res <- shard_map(shards(100, workers = 2), function(s) sum(s$idx), workers = 2) pool_stop() copy_report(res)
Generates a report of copy-on-write behavior for borrowed inputs.
cow_report(result = NULL)cow_report(result = NULL)
result |
Optional. A |
An S3 object of class shard_report with type "cow"
containing:
type: "cow"
timestamp: When the report was generated
policy: The COW policy used ("deny", "audit", "allow")
violations: Count of COW violations detected (audit mode)
copies_triggered: Estimated copies triggered by mutations
res <- shard_map(shards(100, workers = 2), function(s) sum(s$idx), workers = 2) pool_stop() cow_report(res)res <- shard_map(shards(100, workers = 2), function(s) sum(s$idx), workers = 2) pool_stop() cow_report(res)
Comprehensive diagnostics for shard parallel execution, providing insights into memory usage, worker status, task execution, and shared memory segments.
The diagnostics API provides multiple views into shard's runtime behavior:
report(): Primary entry point with configurable detail levels
mem_report(): Memory usage across workers
cow_report(): Copy-on-write policy tracking
copy_report(): Data transfer statistics
task_report(): Task/chunk execution statistics
segment_report(): Shared memory segment information
All functions return S3 shard_report objects with appropriate print
methods for human-readable output.
Dimensions of a Shared Memory Buffer
## S3 method for class 'shard_buffer' dim(x)## S3 method for class 'shard_buffer' dim(x)
x |
A |
An integer vector of dimensions, or NULL for 1-D buffers.
buf <- buffer("double", dim = c(4, 5)) dim(buf) buffer_close(buf)buf <- buffer("double", dim = c(4, 5)) dim(buf) buffer_close(buf)
Orchestrates chunk dispatch with worker supervision and failure handling.
Executes a function over chunks using the worker pool with supervision. Handles worker death and recycling transparently by requeuing failed chunks.
dispatch_chunks( chunks, fun, ..., pool = NULL, health_check_interval = 10L, max_retries = 3L, timeout = 3600, scheduler_policy = NULL, on_result = NULL, store_results = TRUE, retain_chunks = TRUE )dispatch_chunks( chunks, fun, ..., pool = NULL, health_check_interval = 10L, max_retries = 3L, timeout = 3600, scheduler_policy = NULL, on_result = NULL, store_results = TRUE, retain_chunks = TRUE )
chunks |
List of chunk descriptors. Each chunk will be passed to |
fun |
Function to execute. Receives (chunk, ...) as arguments. |
... |
Additional arguments passed to |
pool |
A |
health_check_interval |
Integer. Check pool health every N chunks (default 10). |
max_retries |
Integer. Maximum retries per chunk before permanent failure (default 3). |
timeout |
Numeric. Seconds to wait for each chunk (default 3600). |
scheduler_policy |
Optional list of scheduling hints (advanced). Currently:
|
on_result |
Optional callback (advanced). If provided, called on the
master process as |
store_results |
Logical (advanced). If FALSE, successful chunk values are
not retained in the returned |
retain_chunks |
Logical (advanced). If FALSE, completed chunk descriptors are stored minimally (avoids retaining large shard lists in memory). |
A shard_dispatch_result object with results and diagnostics.
pool_create(2) chunks <- list(list(id = 1L, x = 1), list(id = 2L, x = 2)) result <- dispatch_chunks(chunks, function(chunk) chunk$x * 2, pool = pool_get()) pool_stop()pool_create(2) chunks <- list(list(id = 1L, x = 1), list(id = 2L, x = 2)) result <- dispatch_chunks(chunks, function(chunk) chunk$x * 2, pool = pool_get()) pool_stop()
Convenience wrappers that provide apply/lapply-style ergonomics while preserving shard's core contract: shared immutable inputs, supervised execution, and diagnostics.
These functions are intentionally thin wrappers around shard_map() and
related primitives.
Stores factors as int32 codes plus shared levels metadata.
factor_col(levels)factor_col(levels)
levels |
Character vector of allowed levels. |
A shard_coltype object.
Retrieves the R object from shared memory by deserializing it. This is the primary way to access shared data in workers.
fetch(x, ...) ## S3 method for class 'shard_shared' fetch(x, ...) ## S3 method for class 'shard_deep_shared' fetch(x, ...) ## Default S3 method: fetch(x, ...)fetch(x, ...) ## S3 method for class 'shard_shared' fetch(x, ...) ## S3 method for class 'shard_deep_shared' fetch(x, ...) ## Default S3 method: fetch(x, ...)
x |
A |
... |
Ignored. |
When called in the main process, this reads from the existing segment. When called in a worker process, this opens the segment by path and deserializes the data.
The fetch() function is the primary way to access shared data.
It can also be called as materialize() for compatibility.
The original R object that was shared.
x <- 1:100 shared <- share(x) recovered <- fetch(shared) identical(x, recovered) close(shared)x <- 1:100 shared <- share(x) recovered <- fetch(shared) identical(x, recovered) close(shared)
Creates a compact, serializable range descriptor for contiguous indices. This avoids allocating an explicit index vector for large slices.
idx_range(start, end)idx_range(start, end)
start |
Integer. Start index (1-based, inclusive). |
end |
Integer. End index (1-based, inclusive). |
An object of class shard_idx_range.
r <- idx_range(1, 100) rr <- idx_range(1, 100) r
Returns TRUE if the current execution context is within an arena() scope.
in_arena()in_arena()
Logical indicating whether we are in an arena scope.
in_arena() arena({ in_arena() })in_arena() arena({ in_arena() })
View Predicates
is_view(x) is_block_view(x)is_view(x) is_block_view(x)
x |
An object. |
Logical. TRUE if x is a shard view (or block view).
m <- share(matrix(1:20, nrow = 4)) v <- view_block(m, cols = idx_range(1, 2)) is_view(v) is_block_view(v)m <- share(matrix(1:20, nrow = 4)) v <- view_block(m, cols = idx_range(1, 2)) is_view(v) is_block_view(v)
Check if running on Windows
is_windows()is_windows()
A logical scalar: TRUE if running on Windows, FALSE otherwise.
is_windows()is_windows()
Iterate row groups
iterate_row_groups(x, decode = TRUE)iterate_row_groups(x, decode = TRUE)
x |
A |
decode |
Logical. If TRUE (default), native-encoded partitions are decoded to data.frames. If FALSE, native partitions are returned as their internal representation (advanced). |
A zero-argument iterator function that returns the next data.frame on
each call, or NULL when exhausted.
s <- schema(x = float64()) sink <- table_sink(s, mode = "row_groups") table_write(sink, 1L, data.frame(x = rnorm(5))) rg <- table_finalize(sink) it <- iterate_row_groups(rg) chunk <- it()s <- schema(x = float64()) sink <- table_sink(s, mode = "row_groups") table_write(sink, 1L, data.frame(x = rnorm(5))) rg <- table_finalize(sink) it <- iterate_row_groups(rg) chunk <- it()
Length of a Shared Memory Buffer
## S3 method for class 'shard_buffer' length(x)## S3 method for class 'shard_buffer' length(x)
x |
A |
An integer scalar giving the total number of elements.
buf <- buffer("double", dim = 20) length(buf) buffer_close(buf)buf <- buffer("double", dim = 20) length(buf) buffer_close(buf)
Length of a shard_descriptor Object
## S3 method for class 'shard_descriptor' length(x)## S3 method for class 'shard_descriptor' length(x)
x |
A |
An integer scalar giving the number of shards.
sh <- shards(100, block_size = 25) length(sh)sh <- shards(100, block_size = 25) length(sh)
Length of a shard_descriptor_lazy Object
## S3 method for class 'shard_descriptor_lazy' length(x)## S3 method for class 'shard_descriptor_lazy' length(x)
x |
A |
An integer scalar giving the number of shards.
sh <- shards(100, block_size = 25) length(sh)sh <- shards(100, block_size = 25) length(sh)
List registered kernels
list_kernels()list_kernels()
A character vector of registered kernel names.
list_kernels()list_kernels()
Alias for fetch(). Retrieves the R object from shared memory.
materialize(x) ## S3 method for class 'shard_shared' materialize(x) ## Default S3 method: materialize(x)materialize(x) ## S3 method for class 'shard_shared' materialize(x) ## Default S3 method: materialize(x)
x |
A |
The original R object.
shared <- share(1:100) data <- materialize(shared) close(shared)shared <- share(1:100) data <- materialize(shared) close(shared)
Materialize a block view into an R matrix
## S3 method for class 'shard_view_block' materialize(x)## S3 method for class 'shard_view_block' materialize(x)
x |
A |
A standard R matrix containing the selected rows and columns.
Materialize a gather view into an R matrix
## S3 method for class 'shard_view_gather' materialize(x)## S3 method for class 'shard_view_gather' materialize(x)
x |
A |
A standard R matrix containing the gathered columns.
Generates a report of memory usage across all workers in the pool.
mem_report(pool = NULL)mem_report(pool = NULL)
pool |
Optional. A |
An S3 object of class shard_report with type "memory"
containing:
type: "memory"
timestamp: When the report was generated
pool_active: Whether a pool exists
n_workers: Number of workers
rss_limit: RSS limit per worker (bytes)
total_rss: Sum of RSS across all workers
peak_rss: Highest RSS among workers
mean_rss: Mean RSS across workers
workers: Per-worker RSS details
p <- pool_create(2) mem_report(p) pool_stop(p)p <- pool_create(2) mem_report(p) pool_stop(p)
Best-effort worker pinning to improve cache locality and reduce cross-core migration. Currently supported on Linux only.
pin_workers(pool = NULL, strategy = c("spread", "compact"), cores = NULL)pin_workers(pool = NULL, strategy = c("spread", "compact"), cores = NULL)
pool |
Optional shard_pool. Defaults to current pool. |
strategy |
"spread" assigns worker i -> core i mod ncores. "compact" assigns workers to the first cores. |
cores |
Optional integer vector of available cores (0-based). If NULL, uses 0:(detectCores()-1). |
Invisibly, a logical vector per worker indicating success.
affinity_supported()affinity_supported()
Spawn and supervise persistent R worker processes with RSS monitoring.
Spawns N R worker processes that persist across multiple shard_map() calls.
Workers are supervised and recycled when RSS drift exceeds thresholds.
pool_create( n = parallel::detectCores() - 1L, rss_limit = "2GB", rss_drift_threshold = 0.5, heartbeat_interval = 5, min_recycle_interval = 1, init_expr = NULL, packages = NULL )pool_create( n = parallel::detectCores() - 1L, rss_limit = "2GB", rss_drift_threshold = 0.5, heartbeat_interval = 5, min_recycle_interval = 1, init_expr = NULL, packages = NULL )
n |
Integer. Number of worker processes to spawn. |
rss_limit |
Numeric or character. Maximum RSS per worker before recycling. Can be bytes (numeric) or human-readable (e.g., "2GB"). Default is "2GB". |
rss_drift_threshold |
Numeric. Fraction of RSS increase from baseline that triggers recycling (default 0.5 = 50% growth). |
heartbeat_interval |
Numeric. Seconds between health checks (default 5). |
min_recycle_interval |
Numeric. Minimum time in seconds between recycling the same worker (default 1.0). This prevents thrashing PSOCK worker creation under extremely tight RSS limits. |
init_expr |
Expression to evaluate in each worker on startup. |
packages |
Character vector. Packages to load in workers. |
A shard_pool object (invisibly). The pool is also stored in the
package environment for reuse.
p <- pool_create(2) pool_stop(p)p <- pool_create(2) pool_stop(p)
Sends a task to a specific worker and waits for the result.
pool_dispatch( worker_id, expr, envir = parent.frame(), pool = NULL, timeout = 3600 )pool_dispatch( worker_id, expr, envir = parent.frame(), pool = NULL, timeout = 3600 )
worker_id |
Integer. Worker to dispatch to. |
expr |
Expression to evaluate. |
envir |
Environment containing variables needed by expr. |
pool |
A |
timeout |
Numeric. Seconds to wait for result (default 3600). |
The result of evaluating expr in the worker.
p <- pool_create(2) pool_dispatch(1, quote(1 + 1), pool = p) pool_stop(p)p <- pool_create(2) pool_dispatch(1, quote(1 + 1), pool = p) pool_stop(p)
Returns the active worker pool, or NULL if none exists.
pool_get()pool_get()
A shard_pool object or NULL.
p <- pool_get() is.null(p)p <- pool_get() is.null(p)
Monitors all workers, recycling those with excessive RSS drift or that have died.
pool_health_check(pool = NULL, busy_workers = NULL)pool_health_check(pool = NULL, busy_workers = NULL)
pool |
A |
busy_workers |
Optional integer vector of worker ids that are currently running tasks (used internally by the dispatcher to avoid recycling a worker while a result is in flight). |
A list with health status per worker and actions taken.
p <- pool_create(2) pool_health_check(p) pool_stop(p)p <- pool_create(2) pool_health_check(p) pool_stop(p)
An alternative dispatch that uses parallel::parLapply-style execution but with supervision. This is a simpler interface for basic parallel apply.
pool_lapply(X, FUN, ..., pool = NULL, chunk_size = 1L)pool_lapply(X, FUN, ..., pool = NULL, chunk_size = 1L)
X |
List or vector to iterate over. |
FUN |
Function to apply to each element. |
... |
Additional arguments to FUN. |
pool |
A |
chunk_size |
Integer. Elements per chunk (default 1). |
A list of results.
pool_create(2) result <- pool_lapply(1:4, function(x) x^2, pool = pool_get()) pool_stop()pool_create(2) result <- pool_lapply(1:4, function(x) x^2, pool = pool_get()) pool_stop()
Parallel sapply with Supervision
pool_sapply(X, FUN, ..., simplify = TRUE, pool = NULL)pool_sapply(X, FUN, ..., simplify = TRUE, pool = NULL)
X |
List or vector to iterate over. |
FUN |
Function to apply. |
... |
Additional arguments to FUN. |
simplify |
Logical. Simplify result to vector/matrix? |
pool |
A |
Simplified result if possible, otherwise a list.
pool_create(2) result <- pool_sapply(1:4, function(x) x^2, pool = pool_get()) pool_stop()pool_create(2) result <- pool_sapply(1:4, function(x) x^2, pool = pool_get()) pool_stop()
Returns current status of all workers in the pool.
pool_status(pool = NULL)pool_status(pool = NULL)
pool |
A |
A data frame with worker status information.
p <- pool_create(2) pool_status(p) pool_stop(p)p <- pool_create(2) pool_status(p) pool_stop(p)
Terminates all worker processes and releases resources. Waits for workers to actually terminate before returning.
pool_stop(pool = NULL, timeout = 5)pool_stop(pool = NULL, timeout = 5)
pool |
A |
timeout |
Numeric. Seconds to wait for workers to terminate (default 5). Returns after timeout even if workers are still alive. |
NULL (invisibly).
p <- pool_create(2) pool_stop(p)p <- pool_create(2) pool_stop(p)
Print an arena_result object
## S3 method for class 'arena_result' print(x, ...)## S3 method for class 'arena_result' print(x, ...)
x |
An |
... |
Additional arguments passed to |
Returns x invisibly.
info <- arena({ sum(1:10) }, diagnostics = TRUE) print(info)info <- arena({ sum(1:10) }, diagnostics = TRUE) print(info)
Print a shard_apply_policy Object
## S3 method for class 'shard_apply_policy' print(x, ...)## S3 method for class 'shard_apply_policy' print(x, ...)
x |
A |
... |
Ignored. |
The input x, invisibly.
Print a Shared Memory Buffer
## S3 method for class 'shard_buffer' print(x, ...)## S3 method for class 'shard_buffer' print(x, ...)
x |
A |
... |
Ignored. |
The input x, invisibly.
buf <- buffer("double", dim = 10) print(buf) buffer_close(buf)buf <- buffer("double", dim = 10) print(buf) buffer_close(buf)
Print a shard_descriptor Object
## S3 method for class 'shard_descriptor' print(x, ...)## S3 method for class 'shard_descriptor' print(x, ...)
x |
A |
... |
Further arguments (ignored). |
The input x, invisibly.
sh <- shards(100, block_size = 25) print(sh)sh <- shards(100, block_size = 25) print(sh)
Print a shard_descriptor_lazy Object
## S3 method for class 'shard_descriptor_lazy' print(x, ...)## S3 method for class 'shard_descriptor_lazy' print(x, ...)
x |
A |
... |
Further arguments (ignored). |
The input x, invisibly.
sh <- shards(100, block_size = 25) print(sh)sh <- shards(100, block_size = 25) print(sh)
Print a shard_dispatch_result Object
## S3 method for class 'shard_dispatch_result' print(x, ...)## S3 method for class 'shard_dispatch_result' print(x, ...)
x |
A |
... |
Further arguments (ignored). |
The input x, invisibly.
pool_create(2) chunks <- list(list(id = 1L, x = 1), list(id = 2L, x = 2)) result <- dispatch_chunks(chunks, function(chunk) chunk$x, pool = pool_get()) print(result) pool_stop()pool_create(2) chunks <- list(list(id = 1L, x = 1), list(id = 2L, x = 2)) result <- dispatch_chunks(chunks, function(chunk) chunk$x, pool = pool_get()) print(result) pool_stop()
Print a shard_health_report Object
## S3 method for class 'shard_health_report' print(x, ...)## S3 method for class 'shard_health_report' print(x, ...)
x |
A |
... |
Further arguments (ignored). |
The input x, invisibly.
p <- pool_create(2) r <- pool_health_check(p) print(r) pool_stop(p)p <- pool_create(2) r <- pool_health_check(p) print(r) pool_stop(p)
Print a shard_idx_range object
## S3 method for class 'shard_idx_range' print(x, ...)## S3 method for class 'shard_idx_range' print(x, ...)
x |
A |
... |
Additional arguments (ignored). |
Returns x invisibly.
r <- idx_range(1, 10) print(r)r <- idx_range(1, 10) print(r)
Print a shard_pool Object
## S3 method for class 'shard_pool' print(x, ...)## S3 method for class 'shard_pool' print(x, ...)
x |
A |
... |
Further arguments (ignored). |
The input x, invisibly.
p <- pool_create(2) print(p) pool_stop(p)p <- pool_create(2) print(p) pool_stop(p)
Print a shard_reduce_result Object
## S3 method for class 'shard_reduce_result' print(x, ...)## S3 method for class 'shard_reduce_result' print(x, ...)
x |
A |
... |
Further arguments (ignored). |
The input x, invisibly.
res <- shard_reduce(4L, map = function(s) sum(s$idx), combine = `+`, init = 0, workers = 2) pool_stop() print(res)res <- shard_reduce(4L, map = function(s) sum(s$idx), combine = `+`, init = 0, workers = 2) pool_stop() print(res)
Print a shard_report Object
## S3 method for class 'shard_report' print(x, ...)## S3 method for class 'shard_report' print(x, ...)
x |
A |
... |
Ignored. |
The input x, invisibly.
res <- shard_map(shards(100, workers = 2), function(s) sum(s$idx), workers = 2) pool_stop() rpt <- report(result = res) print(rpt)res <- shard_map(shards(100, workers = 2), function(s) sum(s$idx), workers = 2) pool_stop() rpt <- report(result = res) print(rpt)
Print a shard_result Object
## S3 method for class 'shard_result' print(x, ...)## S3 method for class 'shard_result' print(x, ...)
x |
A |
... |
Further arguments (ignored). |
The input x, invisibly.
result <- shard_map(4L, function(shard) shard$idx, workers = 2) pool_stop() print(result)result <- shard_map(4L, function(shard) shard$idx, workers = 2) pool_stop() print(result)
Print a Shared Memory Segment
## S3 method for class 'shard_segment' print(x, ...)## S3 method for class 'shard_segment' print(x, ...)
x |
A |
... |
Ignored. |
The input x, invisibly.
seg <- segment_create(1024) print(seg) segment_close(seg)seg <- segment_create(1024) print(seg) segment_close(seg)
Print a shard_tiles object
## S3 method for class 'shard_tiles' print(x, ...)## S3 method for class 'shard_tiles' print(x, ...)
x |
A |
... |
Additional arguments (ignored). |
Returns x invisibly.
Print a shard_view_block object
## S3 method for class 'shard_view_block' print(x, ...)## S3 method for class 'shard_view_block' print(x, ...)
x |
A |
... |
Additional arguments (ignored). |
Returns x invisibly.
Print a shard_view_gather object
## S3 method for class 'shard_view_gather' print(x, ...)## S3 method for class 'shard_view_gather' print(x, ...)
x |
A |
... |
Additional arguments (ignored). |
Returns x invisibly.
Print a shard_worker Object
## S3 method for class 'shard_worker' print(x, ...)## S3 method for class 'shard_worker' print(x, ...)
x |
A |
... |
Further arguments (ignored). |
The input x, invisibly.
p <- pool_create(1) print(p$workers[[1]]) pool_stop(p)p <- pool_create(1) print(p$workers[[1]]) pool_stop(p)
Queue management for dispatching chunks to workers with requeue support.
Uses run telemetry (copy/materialization stats, packing volume, buffer/table writes, scratch pool stats) to produce actionable recommendations.
recommendations(result)recommendations(result)
result |
A |
A character vector of recommendations (possibly empty).
res <- shard_map(shards(100, workers = 2), function(s) sum(s$idx), workers = 2) pool_stop() recommendations(res)res <- shard_map(shards(100, workers = 2), function(s) sum(s$idx), workers = 2) pool_stop() recommendations(res)
Registers a named kernel implementation that can be selected by
shard_map(..., kernel = "name").
register_kernel( name, impl, signature = NULL, footprint = NULL, supports_views = TRUE, description = NULL )register_kernel( name, impl, signature = NULL, footprint = NULL, supports_views = TRUE, description = NULL )
name |
Kernel name (string). |
impl |
Function implementing the kernel. It must accept the shard descriptor as its first argument. |
signature |
Optional short signature string for documentation. |
footprint |
Optional footprint hint. Either a constant (bytes) or a
function |
supports_views |
Logical. Whether the kernel is intended to operate on shard views without slice materialization. |
description |
Optional human-readable description. |
A "kernel" is just a function that shard_map can call for each shard. The registry lets shard_map attach additional metadata (footprint hints, supports_views) for scheduling/autotuning.
Invisibly, the registered kernel metadata.
list_kernels()list_kernels()
Primary entry point for shard diagnostics. Generates a comprehensive report of the current runtime state including pool status, memory usage, and execution statistics.
report(level = c("summary", "workers", "tasks", "segments"), result = NULL)report(level = c("summary", "workers", "tasks", "segments"), result = NULL)
level |
Character. Detail level for the report:
|
result |
Optional. A |
An S3 object of class shard_report containing:
level: The requested detail level
timestamp: When the report was generated
pool: Pool status information (if pool exists)
memory: Memory usage summary
workers: Per-worker details (if level includes workers)
tasks: Task execution details (if level includes tasks)
segments: Segment details (if level includes segments)
result_diagnostics: Diagnostics from shard_result (if provided)
res <- shard_map(shards(100, workers = 2), function(s) sum(s$idx), workers = 2) pool_stop() report(result = res)res <- shard_map(shards(100, workers = 2), function(s) sum(s$idx), workers = 2) pool_stop() report(result = res)
Extract Results from shard_map
results(x, flatten = TRUE)results(x, flatten = TRUE)
x |
A shard_result object. |
flatten |
Logical. Flatten nested results? |
List or vector of results.
result <- shard_map(4L, function(shard) shard$idx[[1L]], workers = 2) pool_stop() results(result)result <- shard_map(4L, function(shard) shard$idx[[1L]], workers = 2) pool_stop() results(result)
Computes disjoint row ranges for each shard via prefix-sum, enabling lock-free writes where each shard writes to a unique region.
row_layout(shards, rows_per_shard)row_layout(shards, rows_per_shard)
shards |
A |
rows_per_shard |
Either a scalar integer or a function(shard)->integer. |
A named list mapping shard id (character) to an idx_range(start, end).
sh <- shards(100, block_size = 25) layout <- row_layout(sh, rows_per_shard = 25L)sh <- shards(100, block_size = 25) layout <- row_layout(sh, rows_per_shard = 25L)
A schema is a named set of columns with explicit types. It is used to allocate table buffers and validate writes.
schema(...)schema(...)
... |
Named columns with type specs (e.g., |
A shard_schema object.
s <- schema(x = float64(), y = int32(), label = string_col()) ss <- schema(x = float64(), y = int32(), label = string_col()) s
Scratch pool diagnostics
scratch_diagnostics()scratch_diagnostics()
A list with counters and current pool bytes.
scratch_diagnostics()scratch_diagnostics()
Allocates (or reuses) a double matrix in the worker scratch pool.
scratch_matrix(nrow, ncol, key = NULL)scratch_matrix(nrow, ncol, key = NULL)
nrow, ncol
|
Dimensions. |
key |
Optional key to control reuse. Defaults to a shape-derived key. |
A double matrix of dimensions nrow by ncol.
m <- scratch_matrix(10, 5) dim(m)m <- scratch_matrix(10, 5) dim(m)
Configure scratch pool limits
scratch_pool_config(max_bytes = Inf)scratch_pool_config(max_bytes = Inf)
max_bytes |
Maximum scratch pool bytes allowed in a worker. If exceeded, the worker is flagged for recycle at the next safe point. |
NULL, invisibly.
cfg <- scratch_pool_config(max_bytes = 100 * 1024^2)cfg <- scratch_pool_config(max_bytes = 100 * 1024^2)
Low-level shared memory segment operations for cross-process
data sharing. These functions provide the foundation for the higher-level
share() and buffer() APIs.
Segments can be backed by:
"shm": POSIX shared memory (Linux/macOS) or named file
mapping (Windows). Faster but may have size limitations.
"mmap": File-backed memory mapping. Works on all platforms
and supports larger sizes.
"auto": Let the system choose the best option.
All segments are created with secure permissions (0600 on Unix) and are automatically cleaned up when the R object is garbage collected.
This calls madvise() on the segment mapping when available.
segment_advise( seg, advice = c("normal", "sequential", "random", "willneed", "dontneed") )segment_advise( seg, advice = c("normal", "sequential", "random", "willneed", "dontneed") )
seg |
A shard_segment. |
advice |
One of "normal", "sequential", "random", "willneed", "dontneed". |
A logical scalar; TRUE if the OS accepted the hint.
seg <- segment_create(1024) segment_advise(seg, "sequential")seg <- segment_create(1024) segment_advise(seg, "sequential")
Close a shared memory segment
segment_close(x, unlink = NULL)segment_close(x, unlink = NULL)
x |
A shard_segment object |
unlink |
Whether to unlink the underlying file/shm (default: FALSE for opened segments, TRUE for owned segments) |
NULL, invisibly.
seg <- segment_create(1024) segment_close(seg)seg <- segment_create(1024) segment_close(seg)
Create a new shared memory segment
segment_create( size, backing = c("auto", "mmap", "shm"), path = NULL, readonly = FALSE )segment_create( size, backing = c("auto", "mmap", "shm"), path = NULL, readonly = FALSE )
size |
Size of the segment in bytes |
backing |
Backing type: "auto", "mmap", or "shm" |
path |
Optional file path for mmap backing (NULL for temp file) |
readonly |
Create as read-only (after initial write) |
A shard_segment object backed by shared memory.
seg <- segment_create(1024 * 1024) segment_info(seg) segment_close(seg)seg <- segment_create(1024 * 1024) segment_info(seg) segment_close(seg)
Get segment information
segment_info(x)segment_info(x)
x |
A shard_segment object |
A named list with segment metadata including size, backing,
path, readonly, and owns.
seg <- segment_create(1024) segment_info(seg) segment_close(seg)seg <- segment_create(1024) segment_info(seg) segment_close(seg)
Open an existing shared memory segment
segment_open(path, backing = c("mmap", "shm"), readonly = TRUE)segment_open(path, backing = c("mmap", "shm"), readonly = TRUE)
path |
Path or shm name of the segment |
backing |
Backing type: "mmap" or "shm" |
readonly |
Open as read-only |
A shard_segment object attached to the existing segment.
seg <- segment_create(1024, backing = "mmap") path <- segment_path(seg) seg2 <- segment_open(path, backing = "mmap", readonly = TRUE) segment_close(seg2, unlink = FALSE) segment_close(seg)seg <- segment_create(1024, backing = "mmap") path <- segment_path(seg) seg2 <- segment_open(path, backing = "mmap", readonly = TRUE) segment_close(seg2, unlink = FALSE) segment_close(seg)
Get the path or name of a segment
segment_path(x)segment_path(x)
x |
A shard_segment object |
The path string, or NULL for anonymous segments.
seg <- segment_create(1024, backing = "mmap") segment_path(seg) segment_close(seg)seg <- segment_create(1024, backing = "mmap") segment_path(seg) segment_close(seg)
Make a segment read-only
segment_protect(x)segment_protect(x)
x |
A shard_segment object |
The shard_segment object, invisibly.
seg <- segment_create(1024) segment_protect(seg) segment_close(seg)seg <- segment_create(1024) segment_protect(seg) segment_close(seg)
Read raw data from a segment
segment_read(x, offset = 0, size = NULL)segment_read(x, offset = 0, size = NULL)
x |
A shard_segment object |
offset |
Byte offset to start reading (0-based) |
size |
Number of bytes to read |
A raw vector containing the bytes read from the segment.
seg <- segment_create(1024) segment_write(seg, as.integer(1:4), offset = 0) segment_read(seg, offset = 0, size = 16) segment_close(seg)seg <- segment_create(1024) segment_write(seg, as.integer(1:4), offset = 0) segment_read(seg, offset = 0, size = 16) segment_close(seg)
Generates a report of active shared memory segments in the current session.
segment_report()segment_report()
This function reports on segments that are currently accessible. Note that segments are automatically cleaned up when their R objects are garbage collected, so this only shows segments with live references.
An S3 object of class shard_report with type "segment"
containing:
type: "segment"
timestamp: When the report was generated
n_segments: Number of tracked segments
total_bytes: Total bytes across all segments
segments: List of segment details
segment_report()segment_report()
Get the size of a segment
segment_size(x)segment_size(x)
x |
A shard_segment object |
Size in bytes as a numeric scalar.
seg <- segment_create(1024) segment_size(seg) segment_close(seg)seg <- segment_create(1024) segment_size(seg) segment_close(seg)
Write data to a segment
segment_write(x, data, offset = 0)segment_write(x, data, offset = 0)
x |
A shard_segment object |
data |
Data to write (raw, numeric, integer, or logical vector) |
offset |
Byte offset to start writing (0-based) |
Number of bytes written, invisibly.
seg <- segment_create(1024) segment_write(seg, as.integer(1:10), offset = 0) segment_close(seg)seg <- segment_create(1024) segment_write(seg, as.integer(1:10), offset = 0) segment_close(seg)
Intended to be called inside a worker process (e.g., via clusterCall()).
On unsupported platforms, returns FALSE.
set_affinity(cores)set_affinity(cores)
cores |
Integer vector of 0-based CPU core ids. |
A logical scalar; TRUE on success, FALSE if not supported.
affinity_supported()affinity_supported()
A convenience wrapper for the common "per-column apply" pattern. The matrix is shared once and each worker receives a zero-copy column view when possible.
shard_apply_matrix( X, MARGIN = 2, FUN, VARS = NULL, workers = NULL, ..., policy = shard_apply_policy() )shard_apply_matrix( X, MARGIN = 2, FUN, VARS = NULL, workers = NULL, ..., policy = shard_apply_policy() )
X |
A numeric/integer/logical matrix (or a shared matrix created by |
MARGIN |
Must be 2 (columns). |
FUN |
Function of the form |
VARS |
Optional named list of extra variables. Large atomic VARS are
auto-shared based on |
workers |
Number of workers (passed to |
... |
Additional arguments forwarded to |
policy |
A |
Current limitation: MARGIN must be 2 (columns). Row-wise apply would require
strided/gather slicing and is intentionally explicit in shard via views/kernels.
An atomic vector of length ncol(X) with the results.
X <- matrix(rnorm(400), 20, 20) shard_apply_matrix(X, MARGIN = 2, FUN = mean) pool_stop()X <- matrix(rnorm(400), 20, 20) shard_apply_matrix(X, MARGIN = 2, FUN = mean) pool_stop()
Centralizes safe defaults and guardrails for apply/lapply convenience wrappers.
shard_apply_policy( auto_share_min_bytes = "1MB", max_gather_bytes = "256MB", cow = c("deny", "audit", "allow"), profile = c("default", "memory", "speed"), block_size = "auto", backing = c("auto", "mmap", "shm") )shard_apply_policy( auto_share_min_bytes = "1MB", max_gather_bytes = "256MB", cow = c("deny", "audit", "allow"), profile = c("default", "memory", "speed"), block_size = "auto", backing = c("auto", "mmap", "shm") )
auto_share_min_bytes |
Minimum object size for auto-sharing (default "1MB"). |
max_gather_bytes |
Maximum estimated gathered result bytes before refusing to run (default "256MB"). |
cow |
Copy-on-write policy for borrowed inputs. One of |
profile |
Execution profile passed through to |
block_size |
Shard block size for apply-style workloads. Default |
backing |
Backing type used when auto-sharing ( |
An object of class shard_apply_policy.
cfg <- shard_apply_policy() cfgcfg <- shard_apply_policy() cfg
Computes crossprod(X, Y) (i.e. t(X) %*% Y) using:
shared/mmap-backed inputs (one copy),
block views (no slice materialization),
BLAS-3 dgemm in each tile,
an explicit shared output buffer (no gather/bind spikes).
shard_crossprod( X, Y, workers = NULL, block_x = "auto", block_y = "auto", backing = c("mmap", "shm"), materialize = c("auto", "never", "always"), materialize_max_bytes = 512 * 1024^2, diagnostics = TRUE )shard_crossprod( X, Y, workers = NULL, block_x = "auto", block_y = "auto", backing = c("mmap", "shm"), materialize = c("auto", "never", "always"), materialize_max_bytes = 512 * 1024^2, diagnostics = TRUE )
X, Y
|
Double matrices with the same number of rows. |
workers |
Number of worker processes. |
block_x, block_y
|
Tile sizes over |
backing |
Backing for shared inputs and output buffer ( |
materialize |
Whether to return the result as a standard R matrix:
|
materialize_max_bytes |
Threshold for |
diagnostics |
Whether to collect shard_map diagnostics. |
This is intended as an ergonomic entry point for the "wow" path: users
shouldn't have to manually call share(), view_block(), buffer(),
tiles2d(), and shard_map() for common patterns.
A list with:
buffer: shard_buffer for the result (p x v)
value: materialized matrix if requested, otherwise NULL
run: the underlying shard_result from shard_map
tile: chosen tile sizes
X <- matrix(rnorm(2000), 100, 20) Y <- matrix(rnorm(2000), 100, 20) res <- shard_crossprod(X, Y, block_x = 50, block_y = 10, workers = 2) pool_stop() res$valueX <- matrix(rnorm(2000), 100, 20) Y <- matrix(rnorm(2000), 100, 20) res <- shard_crossprod(X, Y, block_x = 50, block_y = 10, workers = 2) pool_stop() res$value
Retrieves the registered adapter for an object's class. Checks all classes in the object's class hierarchy, returning the first matching adapter.
shard_get_adapter(x)shard_get_adapter(x)
x |
An R object. |
The adapter list if one is registered for any of the object's classes, or NULL if no adapter is registered.
shard_get_adapter(1:10)shard_get_adapter(1:10)
Returns a character vector of all classes with registered adapters.
shard_list_adapters()shard_list_adapters()
Character vector of class names with registered adapters.
shard_list_adapters()shard_list_adapters()
Core parallel execution engine with supervision, shared inputs, and output buffers.
Executes a function over shards in parallel with worker supervision, shared inputs, and explicit output buffers. This is the primary entry point for shard's parallel execution model.
shard_map( shards, fun = NULL, borrow = list(), out = list(), kernel = NULL, scheduler_policy = NULL, autotune = NULL, dispatch_mode = c("rpc_chunked", "shm_queue"), dispatch_opts = NULL, workers = NULL, chunk_size = 1L, profile = c("default", "memory", "speed"), mem_cap = "2GB", recycle = TRUE, cow = c("deny", "audit", "allow"), seed = NULL, diagnostics = TRUE, packages = NULL, init_expr = NULL, timeout = 3600, max_retries = 3L, health_check_interval = 10L )shard_map( shards, fun = NULL, borrow = list(), out = list(), kernel = NULL, scheduler_policy = NULL, autotune = NULL, dispatch_mode = c("rpc_chunked", "shm_queue"), dispatch_opts = NULL, workers = NULL, chunk_size = 1L, profile = c("default", "memory", "speed"), mem_cap = "2GB", recycle = TRUE, cow = c("deny", "audit", "allow"), seed = NULL, diagnostics = TRUE, packages = NULL, init_expr = NULL, timeout = 3600, max_retries = 3L, health_check_interval = 10L )
shards |
A |
fun |
Function to execute per shard. Receives the shard descriptor
as first argument, followed by borrowed inputs and outputs. You can also
select a registered kernel via |
borrow |
Named list of shared inputs. These are exported to workers once and reused across shards. Treated as read-only by default. |
out |
Named list of output buffers (from |
kernel |
Optional. Name of a registered kernel (see |
scheduler_policy |
Optional list of scheduling hints (advanced). Currently:
|
autotune |
Optional. Online autotuning for scalar-N sharding (advanced).
When Accepted values:
|
dispatch_mode |
Dispatch mode (advanced). |
dispatch_opts |
Optional list of dispatch-mode specific knobs (advanced). Currently:
|
workers |
Integer. Number of worker processes. If NULL, uses existing
pool or creates one with |
chunk_size |
Integer. Shards to batch per worker dispatch (default 1). Higher values reduce RPC overhead but may hurt load balancing. |
profile |
Execution profile: |
mem_cap |
Memory cap per worker (e.g., "2GB"). Workers exceeding this are recycled. |
recycle |
Logical or numeric. If TRUE, recycle workers on RSS drift. If numeric, specifies drift threshold (default 0.5 = 50% growth). |
cow |
Copy-on-write policy for borrowed inputs: |
seed |
Integer. RNG seed for reproducibility. If NULL, no seed is set. |
diagnostics |
Logical. Collect detailed diagnostics (default TRUE). |
packages |
Character vector. Additional packages to load in workers. |
init_expr |
Expression to evaluate in each worker on startup. |
timeout |
Numeric. Seconds to wait for each shard (default 3600). |
max_retries |
Integer. Maximum retries per shard on failure (default 3). |
health_check_interval |
Integer. Check worker health every N shards (default 10). |
A shard_result object containing:
results: List of results from each shard (if fun returns values)
failures: Any permanently failed shards
diagnostics: Timing, memory, and worker statistics
pool_stats: Pool-level statistics
blocks <- shards(1000, workers = 2) result <- shard_map(blocks, function(shard) { sum(shard$idx^2) }, workers = 2) pool_stop()blocks <- shards(1000, workers = 2) result <- shard_map(blocks, function(shard) { sum(shard$idx^2) }, workers = 2) pool_stop()
Reduce shard results without gathering all per-shard returns on the master.
shard_reduce() executes map() over shards in parallel and combines results
using an associative combine() function. Unlike shard_map(), it does not
accumulate all per-shard results on the master; it streams partials as chunks
complete.
shard_reduce( shards, map, combine, init, borrow = list(), out = list(), workers = NULL, chunk_size = 1L, profile = c("default", "memory", "speed"), mem_cap = "2GB", recycle = TRUE, cow = c("deny", "audit", "allow"), seed = NULL, diagnostics = TRUE, packages = NULL, init_expr = NULL, timeout = 3600, max_retries = 3L, health_check_interval = 10L )shard_reduce( shards, map, combine, init, borrow = list(), out = list(), workers = NULL, chunk_size = 1L, profile = c("default", "memory", "speed"), mem_cap = "2GB", recycle = TRUE, cow = c("deny", "audit", "allow"), seed = NULL, diagnostics = TRUE, packages = NULL, init_expr = NULL, timeout = 3600, max_retries = 3L, health_check_interval = 10L )
shards |
A |
map |
Function executed per shard. Receives shard descriptor as first argument, followed by borrowed inputs and outputs. |
combine |
Function |
init |
Initial accumulator value. |
borrow |
Named list of shared inputs (same semantics as |
out |
Named list of output buffers/sinks (same semantics as |
workers |
Number of worker processes. |
chunk_size |
Shards to batch per worker dispatch (default 1). |
profile |
Execution profile (same semantics as |
mem_cap |
Memory cap per worker (same semantics as |
recycle |
Worker recycling policy (same semantics as |
cow |
Copy-on-write policy for borrowed inputs (same semantics as |
seed |
RNG seed for reproducibility. |
diagnostics |
Logical; collect diagnostics (default TRUE). |
packages |
Additional packages to load in workers. |
init_expr |
Expression to evaluate in each worker on startup. |
timeout |
Seconds to wait for each chunk. |
max_retries |
Maximum retries per chunk. |
health_check_interval |
Check worker health every N completions. |
For performance and memory efficiency, reduction is performed in two stages:
per-chunk partial reduction inside each worker, and
streaming combine of partials on the master.
A shard_reduce_result with fields:
value: final accumulator
failures: any permanently failed chunks
diagnostics: run telemetry including reduction stats
queue_status, pool_stats
res <- shard_reduce( 100L, map = function(s) sum(s$idx), combine = function(acc, x) acc + x, init = 0, workers = 2 ) pool_stop() res$valueres <- shard_reduce( 100L, map = function(s) sum(s$idx), combine = function(acc, x) acc + x, init = 0, workers = 2 ) pool_stop() res$value
Registers a custom adapter for a specific class. When deep sharing encounters
an object of this class, it will use the adapter's children() function
to extract shareable components instead of generic traversal.
shard_register_adapter(class, adapter)shard_register_adapter(class, adapter)
class |
A character string naming the class to register the adapter for. |
adapter |
A list containing:
|
Invisibly returns the previous adapter for this class (if any), or NULL if no adapter was registered.
shard_list_adapters()shard_list_adapters()
Removes a previously registered adapter for a class. After unregistration, objects of this class will use default traversal behavior during deep sharing.
shard_unregister_adapter(class)shard_unregister_adapter(class)
class |
A character string naming the class to unregister. |
Invisibly returns the removed adapter, or NULL if no adapter was registered for this class.
shard_list_adapters()shard_list_adapters()
Create shard descriptors for parallel execution with autotuning.
Produces shard descriptors (index ranges) for use with shard_map().
Supports autotuning based on worker count and memory constraints.
shards( n, block_size = "auto", workers = NULL, strategy = c("contiguous", "strided"), min_shards_per_worker = 4L, max_shards_per_worker = 64L, scratch_bytes_per_item = 0, scratch_budget = 0 )shards( n, block_size = "auto", workers = NULL, strategy = c("contiguous", "strided"), min_shards_per_worker = 4L, max_shards_per_worker = 64L, scratch_bytes_per_item = 0, scratch_budget = 0 )
n |
Integer. Total number of items to shard. |
block_size |
Block size specification. Can be:
|
workers |
Integer. Number of workers for autotuning (default: pool size or detectCores - 1). |
strategy |
Sharding strategy: |
min_shards_per_worker |
Integer. Minimum shards per worker for load balancing (default 4). |
max_shards_per_worker |
Integer. Maximum shards per worker to limit overhead (default 64). |
scratch_bytes_per_item |
Numeric. Expected scratch memory per item for memory budgeting. |
scratch_budget |
Character or numeric. Total scratch memory budget (e.g., "1GB"). |
A shard_descriptor object containing:
n: Total items
block_size: Computed block size
strategy: Strategy used
shards: List of shard descriptors with id, start, end, idx fields
blocks <- shards(1e6, workers = 8) length(blocks$shards) blocks <- shards(1000, block_size = 100) blocks$shards[[1]]$idxblocks <- shards(1e6, workers = 8) length(blocks$shards) blocks <- shards(1000, block_size = 100) blocks$shards[[1]]$idx
Constructs a shard_descriptor from a user-supplied list of index vectors.
This is useful for non-contiguous workloads like searchlights/feature sets
where each shard operates on an arbitrary subset.
shards_list(idxs)shards_list(idxs)
idxs |
List of integer vectors (1-based indices). Each element becomes
one shard with fields |
A shard_descriptor list describing the chunk layout.
sh <- shards_list(list(1:10, 11:20, 21:30)) length(sh)sh <- shards_list(list(1:10, 11:20, 21:30)) length(sh)
Stream row count
stream_count(x)stream_count(x)
x |
A |
A single integer giving the total number of rows across all partitions.
s <- schema(x = float64()) sink <- table_sink(s, mode = "row_groups") table_write(sink, 1L, data.frame(x = rnorm(5))) rg <- table_finalize(sink) stream_count(rg)s <- schema(x = float64()) sink <- table_sink(s, mode = "row_groups") table_write(sink, 1L, data.frame(x = rnorm(5))) rg <- table_finalize(sink) stream_count(rg)
Reads each partition, filters rows, and writes a new partitioned dataset. Output is written as one partition per input partition (empty partitions are allowed). This avoids materializing all results.
stream_filter(x, predicate, path = NULL, ...)stream_filter(x, predicate, path = NULL, ...)
x |
A |
predicate |
Function |
path |
Output directory. If NULL, a temp dir is created. |
... |
Passed to |
A shard_dataset handle pointing to the filtered partitions.
s <- schema(x = float64()) sink <- table_sink(s, mode = "row_groups") table_write(sink, 1L, data.frame(x = c(1.0, 2.0, 3.0))) rg <- table_finalize(sink) filtered <- stream_filter(rg, predicate = function(chunk) chunk$x > 1.5)s <- schema(x = float64()) sink <- table_sink(s, mode = "row_groups") table_write(sink, 1L, data.frame(x = c(1.0, 2.0, 3.0))) rg <- table_finalize(sink) filtered <- stream_filter(rg, predicate = function(chunk) chunk$x > 1.5)
Counts rows per group across partitions without collecting. Optimized for
factor groups (factor_col()).
stream_group_count(x, group)stream_group_count(x, group)
x |
A |
group |
Group column name (recommended: |
A data.frame with columns group (factor) and n (integer).
s <- schema(g = factor_col(c("a", "b")), x = float64()) sink <- table_sink(s, mode = "row_groups") table_write(sink, 1L, data.frame(g = factor(c("a", "b", "a"), levels = c("a", "b")), x = c(1, 2, 3))) rg <- table_finalize(sink) stream_group_count(rg, "g")s <- schema(g = factor_col(c("a", "b")), x = float64()) sink <- table_sink(s, mode = "row_groups") table_write(sink, 1L, data.frame(g = factor(c("a", "b", "a"), levels = c("a", "b")), x = c(1, 2, 3))) rg <- table_finalize(sink) stream_group_count(rg, "g")
Computes sum(value) by group across partitions without collecting. This is
optimized for factor groups (factor_col()).
stream_group_sum(x, group, value, na_rm = TRUE)stream_group_sum(x, group, value, na_rm = TRUE)
x |
A |
group |
Group column name (recommended: |
value |
Numeric column name to sum. |
na_rm |
Logical; drop rows where value is NA (default TRUE). |
A data.frame with columns group (factor) and sum (numeric).
s <- schema(g = factor_col(c("a", "b")), x = float64()) sink <- table_sink(s, mode = "row_groups") table_write(sink, 1L, data.frame(g = factor(c("a", "b", "a"), levels = c("a", "b")), x = c(1, 2, 3))) rg <- table_finalize(sink) stream_group_sum(rg, "g", "x")s <- schema(g = factor_col(c("a", "b")), x = float64()) sink <- table_sink(s, mode = "row_groups") table_write(sink, 1L, data.frame(g = factor(c("a", "b", "a"), levels = c("a", "b")), x = c(1, 2, 3))) rg <- table_finalize(sink) stream_group_sum(rg, "g", "x")
Applies f() to each partition and returns the list of per-partition results.
This is still much cheaper than collecting the full dataset when f() returns
a small summary per partition.
stream_map(x, f, ...) ## S3 method for class 'shard_row_groups' stream_map(x, f, ...) ## S3 method for class 'shard_dataset' stream_map(x, f, ...)stream_map(x, f, ...) ## S3 method for class 'shard_row_groups' stream_map(x, f, ...) ## S3 method for class 'shard_dataset' stream_map(x, f, ...)
x |
A |
f |
Function |
... |
Passed to |
A list of per-partition values, one element per row-group file.
s <- schema(x = float64()) sink <- table_sink(s, mode = "row_groups") table_write(sink, 1L, data.frame(x = rnorm(5))) rg <- table_finalize(sink) nrows <- stream_map(rg, nrow)s <- schema(x = float64()) sink <- table_sink(s, mode = "row_groups") table_write(sink, 1L, data.frame(x = rnorm(5))) rg <- table_finalize(sink) nrows <- stream_map(rg, nrow)
Applies f() to each partition (row-group) and combines results with
combine() into a single accumulator. This keeps peak memory bounded by the
largest single partition (plus your accumulator).
stream_reduce(x, f, init, combine, ...) ## S3 method for class 'shard_row_groups' stream_reduce(x, f, init, combine, ...) ## S3 method for class 'shard_dataset' stream_reduce(x, f, init, combine, ...)stream_reduce(x, f, init, combine, ...) ## S3 method for class 'shard_row_groups' stream_reduce(x, f, init, combine, ...) ## S3 method for class 'shard_dataset' stream_reduce(x, f, init, combine, ...)
x |
A |
f |
Function |
init |
Initial accumulator value. |
combine |
Function |
... |
Passed to |
The final accumulator value after processing all partitions.
s <- schema(x = float64()) sink <- table_sink(s, mode = "row_groups") table_write(sink, 1L, data.frame(x = rnorm(5))) rg <- table_finalize(sink) total <- stream_reduce(rg, f = nrow, init = 0L, combine = `+`)s <- schema(x = float64()) sink <- table_sink(s, mode = "row_groups") table_write(sink, 1L, data.frame(x = rnorm(5))) rg <- table_finalize(sink) total <- stream_reduce(rg, f = nrow, init = 0L, combine = `+`)
Computes the sum of col across all partitions without collecting the full
dataset. When partitions are native-encoded, this avoids decoding string
columns entirely.
stream_sum(x, col, na_rm = TRUE)stream_sum(x, col, na_rm = TRUE)
x |
A |
col |
Column name to sum. |
na_rm |
Logical; drop NAs (default TRUE). |
A single numeric value giving the sum of the column across all partitions.
s <- schema(x = float64()) sink <- table_sink(s, mode = "row_groups") table_write(sink, 1L, data.frame(x = c(1.0, 2.0, 3.0))) rg <- table_finalize(sink) stream_sum(rg, "x")s <- schema(x = float64()) sink <- table_sink(s, mode = "row_groups") table_write(sink, 1L, data.frame(x = c(1.0, 2.0, 3.0))) rg <- table_finalize(sink) stream_sum(rg, "x")
Finds the top k rows by col without collecting the full dataset.
stream_top_k(x, col, k = 10L, decreasing = TRUE, na_drop = TRUE)stream_top_k(x, col, k = 10L, decreasing = TRUE, na_drop = TRUE)
x |
A |
col |
Column name to rank by. |
k |
Number of rows to keep. |
decreasing |
Logical; TRUE for largest values (default TRUE). |
na_drop |
Logical; drop rows where |
For native-encoded partitions, this selects candidate rows using the numeric column without decoding strings, then decodes only the chosen rows for the returned result.
A data.frame (or tibble if the tibble package is installed)
with at most k rows ordered by col.
s <- schema(x = float64()) sink <- table_sink(s, mode = "row_groups") table_write(sink, 1L, data.frame(x = c(3.0, 1.0, 2.0))) rg <- table_finalize(sink) stream_top_k(rg, "x", k = 2L)s <- schema(x = float64()) sink <- table_sink(s, mode = "row_groups") table_write(sink, 1L, data.frame(x = c(3.0, 1.0, 2.0))) rg <- table_finalize(sink) stream_top_k(rg, "x", k = 2L)
Check if shard_map Succeeded
succeeded(x)succeeded(x)
x |
A shard_result object. |
Logical. TRUE if no failures.
result <- shard_map(4L, function(shard) shard$idx[[1L]], workers = 2) pool_stop() succeeded(result)result <- shard_map(4L, function(shard) shard$idx[[1L]], workers = 2) pool_stop() succeeded(result)
Allocates a columnar table output: one typed buffer per column, each of
length nrow. Intended for lock-free disjoint row-range writes in shard_map.
table_buffer(schema, nrow, backing = c("auto", "mmap", "shm"))table_buffer(schema, nrow, backing = c("auto", "mmap", "shm"))
schema |
A |
nrow |
Total number of rows in the final table. |
backing |
Backing type for buffers ( |
A shard_table_buffer object with one shared buffer per schema column.
s <- schema(x = float64(), y = int32()) tb <- table_buffer(s, nrow = 100L)s <- schema(x = float64(), y = int32()) tb <- table_buffer(s, nrow = 100L)
Per-process counters for table writes (number of table_write calls, rows, and bytes written). shard_map uses deltas of these counters to produce run-level diagnostics in copy_report().
table_diagnostics()table_diagnostics()
A list with writes, rows, and bytes.
For a shard_table_buffer, this returns a lightweight in-memory handle (or a
materialized data.frame/tibble, depending on materialize).
table_finalize( target, materialize = c("never", "auto", "always"), max_bytes = 256 * 1024^2, ... )table_finalize( target, materialize = c("never", "auto", "always"), max_bytes = 256 * 1024^2, ... )
target |
A |
materialize |
|
max_bytes |
For |
... |
Reserved for future extensions. |
For a shard_table_sink, this returns a row-group handle referencing the
written partitions (or materializes them if requested).
A shard_table_handle, shard_row_groups, or materialized
data.frame/tibble depending on target type and materialize.
s <- schema(x = float64(), y = int32()) tb <- table_buffer(s, nrow = 5L) table_write(tb, idx_range(1, 5), data.frame(x = rnorm(5), y = 1:5)) handle <- table_finalize(tb)s <- schema(x = float64(), y = int32()) tb <- table_buffer(s, nrow = 5L) table_write(tb, idx_range(1, 5), data.frame(x = rnorm(5), y = 1:5)) handle <- table_finalize(tb)
Finalize a table buffer
## S3 method for class 'shard_table_buffer' table_finalize( target, materialize = c("never", "auto", "always"), max_bytes = 256 * 1024^2, ... )## S3 method for class 'shard_table_buffer' table_finalize( target, materialize = c("never", "auto", "always"), max_bytes = 256 * 1024^2, ... )
target |
A |
materialize |
|
max_bytes |
For |
... |
Reserved for future extensions. |
A shard_table_handle or a materialized data.frame/tibble.
Finalize a sink
## S3 method for class 'shard_table_sink' table_finalize( target, materialize = c("never", "auto", "always"), max_bytes = 256 * 1024^2, ... )## S3 method for class 'shard_table_sink' table_finalize( target, materialize = c("never", "auto", "always"), max_bytes = 256 * 1024^2, ... )
target |
A |
materialize |
|
max_bytes |
For |
... |
Reserved for future extensions. |
A shard_row_groups handle (or a materialized data.frame/tibble).
A table sink supports variable-sized outputs without returning large data.frames to the master. Each shard writes a separate row-group file.
table_sink( schema, mode = c("row_groups", "partitioned"), path = NULL, format = c("auto", "rds", "native") )table_sink( schema, mode = c("row_groups", "partitioned"), path = NULL, format = c("auto", "rds", "native") )
schema |
A |
mode |
|
path |
Directory to write row-group files. If NULL, a temp dir is created. |
format |
Storage format for partitions: |
v1.1 implementation notes:
Storage format is per-shard RDS (portable, CRAN-friendly).
This guarantees bounded master memory during execution; final collection may still be large if you materialize.
A shard_table_sink object.
s <- schema(x = float64(), label = string_col()) sink <- table_sink(s, mode = "row_groups")s <- schema(x = float64(), label = string_col()) sink <- table_sink(s, mode = "row_groups")
table_write() is the common write path for shard table outputs:
For fixed-size outputs, write into a shard_table_buffer using a row selector.
For variable-size outputs, write into a shard_table_sink using a shard id.
table_write(target, rows_or_shard_id, data, ...)table_write(target, rows_or_shard_id, data, ...)
target |
A |
rows_or_shard_id |
For buffers: row selector (idx_range or integer vector). For sinks: shard id (integer). |
data |
A data.frame or named list matching the schema columns. |
... |
Reserved for future extensions. |
NULL (invisibly).
s <- schema(x = float64(), y = int32()) tb <- table_buffer(s, nrow = 10L) table_write(tb, idx_range(1, 5), data.frame(x = rnorm(5), y = 1:5))s <- schema(x = float64(), y = int32()) tb <- table_buffer(s, nrow = 10L) table_write(tb, idx_range(1, 5), data.frame(x = rnorm(5), y = 1:5))
Write into a table buffer
## S3 method for class 'shard_table_buffer' table_write(target, rows_or_shard_id, data, ...)## S3 method for class 'shard_table_buffer' table_write(target, rows_or_shard_id, data, ...)
target |
A |
rows_or_shard_id |
Row selector (idx_range or integer vector). |
data |
A data.frame or named list matching the schema columns. |
... |
Reserved for future extensions. |
NULL (invisibly).
Write a shard's row-group output
## S3 method for class 'shard_table_sink' table_write(target, rows_or_shard_id, data, ...)## S3 method for class 'shard_table_sink' table_write(target, rows_or_shard_id, data, ...)
target |
A |
rows_or_shard_id |
Integer shard id used to name the row-group file. |
data |
A data.frame matching the sink schema. |
... |
Reserved for future extensions. |
NULL (invisibly).
Generates a report of task/chunk execution statistics from a shard_map result.
task_report(result = NULL)task_report(result = NULL)
result |
A |
An S3 object of class shard_report with type "task"
containing:
type: "task"
timestamp: When the report was generated
shards_total: Total number of shards
shards_processed: Number of shards successfully processed
shards_failed: Number of permanently failed shards
chunks_dispatched: Number of chunk batches dispatched
total_retries: Total number of retry attempts
duration: Total execution duration (seconds)
throughput: Shards processed per second
queue_status: Final queue status
res <- shard_map(shards(100, workers = 2), function(s) sum(s$idx), workers = 2) pool_stop() task_report(res)res <- shard_map(shards(100, workers = 2), function(s) sum(s$idx), workers = 2) pool_stop() task_report(res)
Create a view over a shared matrix
view(x, rows = NULL, cols = NULL, type = c("auto", "block", "gather"))view(x, rows = NULL, cols = NULL, type = c("auto", "block", "gather"))
x |
A shared (share()d) atomic matrix (double/integer/logical/raw). |
rows |
Row selector. NULL (all rows) or idx_range(). |
cols |
Column selector. NULL (all cols) or idx_range(). |
type |
View type. |
A shard_view_block or shard_view_gather object depending
on the selectors provided.
m <- share(matrix(1:20, nrow = 4)) v <- view(m, cols = idx_range(1, 2))m <- share(matrix(1:20, nrow = 4)) v <- view(m, cols = idx_range(1, 2))
Create a contiguous block view
view_block(x, rows = NULL, cols = NULL)view_block(x, rows = NULL, cols = NULL)
x |
A shared (share()d) atomic matrix. |
rows |
NULL or idx_range(). |
cols |
NULL or idx_range(). |
A shard_view_block object representing the contiguous block slice.
m <- share(matrix(1:20, nrow = 4)) v <- view_block(m, cols = idx_range(1, 2))m <- share(matrix(1:20, nrow = 4)) v <- view_block(m, cols = idx_range(1, 2))
Returns global counters for view creation/materialization. This is a simple first step; in future this should be integrated into shard_map run-level diagnostics.
view_diagnostics()view_diagnostics()
A list with counters.
Gather views describe non-contiguous column (or row) subsets without allocating a slice-sized matrix. shard-aware kernels can then choose to pack the requested indices into scratch explicitly (bounded and reportable) or run gather-aware compute paths.
view_gather(x, rows = NULL, cols)view_gather(x, rows = NULL, cols)
x |
A shared (share()d) atomic matrix (double/integer/logical/raw). |
rows |
Row selector. NULL (all rows) or idx_range(). |
cols |
Integer vector of column indices (1-based). |
v1 note: only column-gather views are implemented (rows may be NULL or idx_range()).
A shard_view_gather object describing the indexed column view.
m <- share(matrix(1:20, nrow = 4)) v <- view_gather(m, cols = c(1L, 3L))m <- share(matrix(1:20, nrow = 4)) v <- view_gather(m, cols = c(1L, 3L))
Returns metadata about a view without forcing materialization.
view_info(v)view_info(v)
v |
A shard view. |
A named list with fields: dtype, dim, slice_dim,
rows, cols, layout, fast_path,
nbytes_est, and base_is_shared.
m <- share(matrix(1:20, nrow = 4)) v <- view_block(m, cols = idx_range(1, 2)) view_info(v)m <- share(matrix(1:20, nrow = 4)) v <- view_block(m, cols = idx_range(1, 2)) view_info(v)
Views are explicit slice descriptors over shared arrays/matrices. They avoid
creating slice-sized allocations (e.g. Y[, a:b]) by carrying only metadata
plus a reference to the shared backing.
This is a low-level optimization handle: arbitrary base R operations may
materialize a view; use materialize() explicitly when you want a standard
matrix/array.