mirai - Reference Manual

This is a reference vignette of the package’s core functionality. Other package vignettes cover additional features.

1. Introduction

mirai (Japanese for ‘future’) implements the concept of futures in R.

Futures represent results from code that will complete later. Code executes in a separate R process (daemon) and returns results to the main process (host).

mirai

mirai() creates a mirai object from an expression.

It returns immediately without blocking. While the expression evaluates on a daemon, the host process continues working.

Expressions must be self-contained:

This explicit design perfectly matches message-passing parallelism - attempting to infer global variables introduces unreliability, which we do not compromise on.

This example mimics an expensive calculation:

library(mirai)

m <- mirai(
  {
    Sys.sleep(time)
    rnorm(5L, mean)
  },
  time = 2L,
  mean = 4.5
)

m
#> < mirai [] >
m$data
#> 'unresolved' logi NA
unresolved(m)
#> [1] TRUE

# Do work whilst unresolved

m[]
#> [1] 6.218842 3.650785 3.958701 4.108253 4.619849
m$data
#> [1] 6.218842 3.650785 3.958701 4.108253 4.619849

A mirai is unresolved until its result is received, then resolved. Use unresolved() to check its state.

Access results via m$data once resolved. This will be the return value, or an ‘errorValue’ if the expression errored, crashed, or timed out (see Error Handling).

Use m[] to efficiently wait for and collect the value instead of repeatedly checking unresolved(m).

You may also wait efficiently for mirai (or lists of mirai) to resolve using:

mirai (advanced)

For programmatic use, ‘.expr’ accepts a pre-constructed language object and ‘.args’ accepts a named list of arguments. The following is equivalent:

expr <- quote({Sys.sleep(time); rnorm(5L, mean)})
args <- list(time = 2L, mean = 4)

m1 <- mirai(.expr = expr, .args = args)
m1[]
#> [1] 3.901930 3.121547 3.593815 2.947067 4.729572

This example performs an asynchronous write operation. Passing environment() to ‘.args’ conveniently provides all objects from the calling environment (like x and file):

write.csv.async <- function(x, file) {
  mirai(write.csv(x, file), .args = environment())
}

m <- write.csv.async(x = rnorm(1e6), file = tempfile())

while (unresolved(m)) {
  cat("Writing file...\n")
  Sys.sleep(0.5) # or do other work
}
#> Writing file...
#> Writing file...
cat("Write complete:", is.null(m$data))
#> Write complete: TRUE

daemons

When writing a mirai() call, don’t worry about where or how it executes. End-users declare available resources using daemons().

Without daemons configured, each mirai() call creates a new local background process (ephemeral daemon).

daemons() sets up persistent daemons to evaluate mirai expressions:

See local daemons for setup instructions.

2. Error Handling

Errors return as a character string with classes ‘miraiError’ and ‘errorValue’.

Use is_mirai_error() to test for errors:

m1 <- mirai(stop("occurred with a custom message", call. = FALSE))
m1[]
#> 'miraiError' chr Error: occurred with a custom message

m2 <- mirai(mirai::mirai())
m2[]
#> 'miraiError' chr Error in mirai::mirai(): missing expression, perhaps wrap in {}?

is_mirai_error(m2$data)
#> [1] TRUE
is_error_value(m2$data)
#> [1] TRUE

Error objects include $stack.trace for full stack traces and $condition.class for original condition classes:

f <- function(x) if (x > 0) stop("positive")

m3 <- mirai({f(-1); f(1)}, f = f)
m3[]
#> 'miraiError' chr Error in f(1): positive

m3$data$stack.trace
#> [[1]]
#> stop("positive")
#> 
#> [[2]]
#> f(1)
m3$data$condition.class
#> [1] "simpleError" "error"       "condition"

Original error condition elements and rlang::abort() metadata are preserved:

f <- function(x) if (x > 0) stop("positive")

m4 <- mirai(rlang::abort("aborted", meta_uid = "UID001"))
m4[]
#> 'miraiError' chr Error: aborted

m4$data$meta_uid
#> [1] "UID001"

User interrupts resolve to class ‘miraiInterrupt’ and ‘errorValue’. Use is_mirai_interrupt() to test for interrupts:

m4 <- mirai(rlang::interrupt()) # simulates a user interrupt
is_mirai_interrupt(m4[])
#> [1] TRUE

Timeouts (via ‘.timeout’) resolve to ‘errorValue’ of 5L, guarding against hanging processes:

m5 <- mirai(nanonext::msleep(1000), .timeout = 500)
m5[]
#> 'errorValue' int 5 | Timed out

is_mirai_error(m5$data)
#> [1] FALSE
is_mirai_interrupt(m5$data)
#> [1] FALSE
is_error_value(m5$data)
#> [1] TRUE

is_error_value() tests for all mirai execution errors, user interrupts and timeouts.

3. Local Daemons

Daemons are persistent background processes that receive mirai() requests.

Daemons inherit system configuration (’.Renviron’, ‘.Rprofile’) and load default packages. To load only the base package (cutting startup time in half), set R_SCRIPT_DEFAULT_PACKAGES=NULL before launching.

Specify the number of daemons to launch:

daemons(6)

Set n to one less than available cores for optimal performance. Consider cores reserved for other purposes.

With Dispatcher (default)

The default dispatcher = TRUE creates a background dispatcher process that manages daemon connections. Tasks dispatch efficiently in FIFO order, queueing at the dispatcher and sending to daemons as they become available. The event-driven approach consumes no resources while waiting and stays synchronized with events.

info() provides current statistics as an integer vector:

info()
#> connections  cumulative    awaiting   executing   completed 
#>           6           6           0           0           0

status() provides more detail:

  1. connections: active connections
  2. daemons: connection URL
  3. mirai: task summary
status()
#> $connections
#> [1] 6
#> 
#> $daemons
#> [1] "ipc:///tmp/43df2d10f7016fe4c9bdc344"
#> 
#> $mirai
#>  awaiting executing completed 
#>         0         0         0

Set daemons to zero to reset. This reverts to creating a new background process per request.

daemons(0)

Without Dispatcher

With dispatcher = FALSE, daemons connect directly to the host process:

daemons(6, dispatcher = FALSE)

Tasks send immediately in round-robin fashion, ensuring even distribution. However, scheduling isn’t optimal since task duration is unknown beforehand. Tasks may queue behind long-running tasks while other daemons sit idle.

This resource-light approach suits similar-length tasks or when concurrent tasks don’t exceed available daemons.

Status now shows 6 connections and the host URL:

status()
#> $connections
#> [1] 6
#> 
#> $daemons
#> [1] "ipc:///tmp/f7d6e9c065c1e179c922f75a"

everywhere()

everywhere() evaluates expressions on all daemons and persists state regardless of cleanup settings:

everywhere(library(DBI))

This keeps the DBI package loaded. You can also set up common resources like database connections:

everywhere(con <<- dbConnect(RSQLite::SQLite(), file), file = tempfile())

Super-assignment makes ‘con’ available globally in all daemons:

mirai(exists("con"))[]
#> [1] TRUE

Disconnect everywhere:

everywhere(dbDisconnect(con))

To evaluate in the global environment of each daemon (since mirai evaluations occur in an environment inheriting from global), use evalq(envir = globalenv()). Example with box::use():

everywhere(
  evalq(
    box::use(dplyr[select], mirai[...]),
    envir = globalenv()
  )
)

daemons(0)

4. mirai_map

mirai_map() performs asynchronous parallel mapping over lists or vectors.

Requires daemons() to be set (avoids launching too many ephemeral daemons).

Basic Usage

Returns immediately. Collect results with x[]:

with(daemons(3, seed = 1234L), mirai_map(1:3, rnorm, .args = list(mean = 20, sd = 2))[])
#> [[1]]
#> [1] 19.86409
#> 
#> [[2]]
#> [1] 19.55834 22.30159
#> 
#> [[3]]
#> [1] 20.62193 23.06144 19.61896

Use .args for constant arguments to .f, and ... for objects referenced in .f:

daemons(4, seed = 2345L)
fn <- function(x, range) runif(x, x, x + range)
ml <- mirai_map(c(a = 1, b = 2, c = 3), \(x) fn(x, x * 2), fn = fn)
ml
#> < mirai map [0/3] >
ml[]
#> $a
#> [1] 2.637793
#> 
#> $b
#> [1] 2.328183 5.649959
#> 
#> $c
#> [1] 5.302906 3.531788 6.389231

Collecting Options

mirai_map(list(a = 1, b = "a", c = 3), function(x) exp(x))[.stop]
#> Error in `mirai_map()`:
#> ℹ In index: 2.
#> ℹ With name: b.
#> Caused by error in `exp()`:
#> ! non-numeric argument to mathematical function

mirai_map(c(0.1, 0.2, 0.3), Sys.sleep)[.progress, .flat]
#> NULL

Multiple Map

Dataframes and matrices map over rows. .f must accept as many arguments as there are columns:

fruit <- c("melon", "grapes", "coconut")
df <- data.frame(i = seq_along(fruit), fruit = fruit)

mirai_map(df, sprintf, .args = list(fmt = "%d. %s"))[.flat]
#> [1] "1. melon"   "2. grapes"  "3. coconut"

Matrices also map over rows:

mat <- matrix(1:4, nrow = 2L, dimnames = list(c("a", "b"), c("y", "z")))
mirai_map(mat, function(x = 10, y = 0, z = 0) x + y + z)[.flat]
#>  a  b 
#> 14 16

daemons(0)

To map over columns instead, use as.list() for dataframes or t() for matrices.

Nested Maps

For nested mapping, don’t launch local daemons from within mirai_map(). Instead:

daemons(url = local_url())
launch_local(n)

5. Remote Infrastructure

This section covers setting up remote daemons, launching them on remote machines, and securing connections with TLS.

Remote Daemons Overview

Remote daemons run on network machines to process tasks remotely.

Call daemons() with a ‘url’ (e.g., ‘tcp://10.75.32.70:5555’) or use host_url() to construct one automatically. The host listens on a single port for daemons to connect.

IPv6 addresses are also supported and must be enclosed in square brackets [] to avoid confusion with the final colon separating the port. For example, port 5555 on the IPv6 address ::ffff:a6f:50d would be specified as tcp://[::ffff:a6f:50d]:5555.

Calling host_url() without a port uses ‘0’, which automatically assigns a free ephemeral port:

daemons(url = host_url())

Query the assigned port with status():

status()
#> $connections
#> [1] 0
#> 
#> $daemons
#> [1] "tcp://10.216.62.38:49515"
#> 
#> $mirai
#>  awaiting executing completed 
#>         0         0         0

Dynamically scale the number of daemons up or down as needed.

Reset all connections:

daemons(0)

Closing connections exits all daemons. With dispatcher, this exits the dispatcher first, then all connected daemons.

Launching Remote Daemons

Launchers deploy daemons on remote machines. Once deployed, daemons connect back to the host via TCP or TLS.

Local launchers run Rscript via a local shell. Remote launchers run Rscript on remote machines.

Supply a remote launch configuration to the ‘remote’ argument of daemons() or launch_remote().

Three configuration options:

  1. ssh_config() for SSH access
  2. cluster_config() for HPC resource managers (Slurm, SGE, Torque/PBS, LSF)
  3. remote_config() for generic/custom launchers

All return simple lists that can be pre-constructed, saved, and reused.

SSH Direct Connection

Use for internal networks where the host can accept incoming connections. Remote daemons connect back directly to the host port.

TLS is recommended for additional security.

Launch 4 daemons on 10.75.32.90 (SSH port 22 is default):

daemons(
  n = 4,
  url = host_url(tls = TRUE, port = 5555),
  remote = ssh_config("ssh://10.75.32.90")
)

Launch one daemon on each machine using custom SSH port 222:

daemons(
  n = 1,
  url = host_url(tls = TRUE, port = 5555),
  remote = ssh_config(c("ssh://10.75.32.90:222", "ssh://10.75.32.91:222"))
)

SSH Tunnelling

Use SSH tunnelling when firewall policies prevent direct connections. Requires SSH key-based authentication to be setup.

SSH tunnelling creates a tunnel after the initial SSH connection, using the same port on both host and daemon.

Supply a ‘127.0.0.1’ URL to daemons():

With local_url(tcp = TRUE, port = 5555), the host listens at 127.0.0.1:5555 and daemons dial into 127.0.0.1:5555 on their own machines.

Launch 2 daemons on 10.75.32.90 with tunnelling:

daemons(
  n = 2,
  url = local_url(tcp = TRUE),
  remote = ssh_config("ssh://10.75.32.90", tunnel = TRUE)
)

HPC Cluster Resource Managers

cluster_config() deploys daemons via cluster resource managers.

Specify command:

The options argument accepts scheduler options (lines typically preceded by #):

  Slurm: "#SBATCH --job-name=mirai
          #SBATCH --mem=10G
          #SBATCH --output=job.out"
  SGE: "#$ -N mirai
        #$ -l mem_free=10G
        #$ -o job.out"
  Torque/PBS: "#PBS -N mirai
               #PBS -l mem=10gb
               #PBS -o job.out"
  LSF: "#BSUB -J mirai
        #BSUB -M 10000
        #BSUB -o job.out"
module load R

or for a specific R version:

module load R/4.5.0

The rscript argument defaults to "Rscript" (assumes R is on PATH). Specify full path if needed: file.path(R.home("bin"), "Rscript").

Job Arrays

For many daemons, use job arrays instead of individual jobs.

Instead of:

daemons(n = 100, url = host_url(), remote = cluster_config())

rather use:

daemons(
  n = 1,
  url = host_url(),
  remote = cluster_config(options = "#SBATCH --array=1-100")
)

Generic Remote Configuration

remote_config() provides a generic framework for custom deployment commands.

The args argument must contain ".", which is replaced with the daemon launch command.

cluster_config() is easier for HPC, but remote_config() offers flexibility. Slurm example:

daemons(
  n = 2,
  url = host_url(),
  remote = remote_config(
    command = "sbatch",
    args = c("--mem 512", "-n 1", "--wrap", "."),
    rscript = file.path(R.home("bin"), "Rscript"),
    quote = TRUE
  )
)

Manual Deployment

Call launch_remote() without ‘remote’ to get shell commands for manual deployment:

daemons(url = host_url())
launch_remote()
#> [1]
#> Rscript -e 'mirai::daemon("tcp://10.216.62.38:49516")'
daemons(0)

TLS Secure Connections

TLS secures communications between host and remote daemons.

Automatic Zero-configuration Default

Use tls+tcp:// scheme or host_url(tls = TRUE):

daemons(url = host_url(tls = TRUE))

Keys and certificates generate automatically. Private keys remain on the host.

Self-signed certificates are included in launch_remote() commands:

launch_remote(1)
#> [1]
#> Rscript -e 'mirai::daemon("tls+tcp://10.216.62.38:49517",tlscert=c("-----BEGIN CERTIFICATE-----
#> MIIFPzCCAyegAwIBAgIBATANBgkqhkiG9w0BAQsFADA3MRUwEwYDVQQDDAwxMC4y
#> MTYuNjIuMzgxETAPBgNVBAoMCE5hbm9uZXh0MQswCQYDVQQGEwJKUDAeFw0wMTAx
#> MDEwMDAwMDBaFw0zMDEyMzEyMzU5NTlaMDcxFTATBgNVBAMMDDEwLjIxNi42Mi4z
#> ODERMA8GA1UECgwITmFub25leHQxCzAJBgNVBAYTAkpQMIICIjANBgkqhkiG9w0B
#> AQEFAAOCAg8AMIICCgKCAgEAydxZw07AviS9yZjnYP9PL+x/TA5RGEbm+G0Iobct
#> ML2a/t8pk+cJ/hwpB8HA0i7eoc52Km8TCz2hturtGe3BS0mvvnzipcs9k2pxga6o
#> 3sqbLXvI19sC/CMu5gUWOU9dcFh6BYavVLpUW4j0xjjcXAr4PBBIjF+/Lt5FNWuZ
#> srPKC7Q6/ay7b8bFANEcYwZWkoXlWqhY/8EmOqECA97cdCybkfXcIFRU2SFMTQtx
#> NbpHqXWvtGKRQxwvYcfnlkLyQwJZr3CsB10gdPCeoXVPKCJf+ZoPQDGA2BH1jdHx
#> sxUVMIg9skMdciGHWM/zu+7urXn7HFMoMmXfSnsQSgJ46EnJSEqFQ5W2ZIstfnQE
#> tIZFmJqyIr3D6NOvwdk54lK7CkfWr17tcvWlTUhxRsJzR3/rWnQKAIx/R+eMsPfK
#> fP8ajpxhFXsfnyZF0U3onvU2L6qjKSUyj3NS4lpb85T5LN56Kp7OauZiqXNBV7b5
#> PCdgSMJljRSXYolMjk7B7KA1LbMigMJwLN5KfnvwUswS6B24XI4F2NmUJXe2nec7
#> Cy7iCiUYI2O1v2MKn0xWUHuJW21EJtPXoNBOBFA6Bpz93/Wf0m6MbSjUH7rf8Xgo
#> accxoffGXlE3oKwO/BMbbNwtHMupsdu5MMyv8TGD8F2NqE7b1eT57oDVkfzjGjwR
#> b1sCAwEAAaNWMFQwEgYDVR0TAQH/BAgwBgEB/wIBADAdBgNVHQ4EFgQU7J2cGBmo
#> ZcqDzBVYVQLcQUCQ5ccwHwYDVR0jBBgwFoAU7J2cGBmoZcqDzBVYVQLcQUCQ5ccw
#> DQYJKoZIhvcNAQELBQADggIBAHhjNztuQAaPsKKZWweaIY06jak24YsSG64zJBGj
#> dbMta7s4r4Sj8DaidEQMcVl4FeVO/3ZLtC9xDpq+QWQkj44Zi1gbOloEkbIy2TK7
#> cmfsPCP3sgjGt9+TAI2Fib5eBXHGH5eITCf99PBUC/rEHhd9LsBqih0gD0iRVZw8
#> PD7KiVUK6R5f0dpGfVvl4l0HEpOFNnq23w6JGZeyMmHbknW6o62VTh+7FtBEci6E
#> wwBoUK6lTujzDK3jTLdAPVipTad6yIBK1wgQBfWuGB8/U4Sbr9ji/+sQ5Yp9rUP6
#> aQis8f+KOmj2mxqsEj/NYwm22BmwufAy/5RmXg3S9gdfg2Lf3g73cJZOppd5B/AH
#> dL4G2Iym1c8PcT1UkkKDIIzfTHxV4vvyUsqoNX5aJcbmp/Qla7aXkO69ilkDzIjT
#> AypracLmZzve8NYtws196/bc1Lk93ezaER+AWODxCoquUDWjl2ybFrfnWhqJ1Z96
#> nK8w4u0LvA6D4JG1lG15XykY/CIj91rRpIaijg3qi0pAoU340YgiT5cKn6tOmBNe
#> EdPlZKBazZ3TXTST5NwiMEslLKyKj9htOEIfO0gDKwvb82bazG7SEn1bjxyvGCMb
#> DLZFvTcRrv+OT6sFYe8ii1umnwuI+PuRAboyU4ltTSoUaIV0kEvkFTzQBZf5mvrY
#> b8kO
#> -----END CERTIFICATE-----
#> ",""))'
daemons(0)
CA Signed Certificates

Alternatively, generate certificates via a Certificate Signing Request (CSR) to a Certificate Authority (public or internal).

  1. Generate a private key and CSR:
  1. Provide the generated CSR to the CA for it to sign a new TLS certificate.
  1. When setting daemons, the TLS certificate and private key should be provided to the ‘tls’ argument of daemons().
  1. The certificate chain to the CA should be supplied to the ‘tlscert’ argument of daemons().

6. Compute Profiles

The .compute argument to daemons() creates separate, independent daemon sets (compute profiles) for heterogeneous compute requirements:

Pass a character string to .compute as the profile name (NULL defaults to ‘default’). Settings save under this name.

Specify .compute in mirai() to use a profile (NULL uses ‘default’).

Other functions (status(), launch_local(), launch_remote()) also accept .compute.

with_daemons() and local_daemons()

with_daemons() or local_daemons() with a profile name sets the default for all functions within that scope:

daemons(1, .compute = "cpu")
daemons(1, .compute = "gpu")

with_daemons("cpu", {
  s1 <- status()
  m1 <- mirai(Sys.getpid())
})

with_daemons("gpu", {
  s2 <- status()
  m2 <- mirai(Sys.getpid())
  m3 <- mirai(Sys.getpid(), .compute = "cpu")
  local_daemons("cpu")
  m4 <- mirai(Sys.getpid())
})

s1$daemons
#> [1] "ipc:///tmp/79b5fc7a162b5413cdd1b0e6"
m1[]
#> [1] 12700

s2$daemons
#> [1] "ipc:///tmp/5710972c9d0d836a64e8c45a"
m2[] # different to m1
#> [1] 12726

m3[] # same as m1
#> [1] 12700
m4[] # same as m1
#> [1] 12700

with_daemons("cpu", daemons(0))
with_daemons("gpu", daemons(0))

With Method

The with() method creates daemons for an expression’s duration, then automatically resets them. Functions within the scope use the daemons’ compute profile.

Designed for running Shiny apps with specific daemon counts:

with(daemons(4), shiny::runApp(app))
# Or:
with(daemons(4, .compute = "shiny"), shiny::runApp(app))

Note: The app must already be created. Don’t wrap shiny::shinyApp() since runApp() is called when printed, after with() returns.

Shiny apps execute all mirai calls before returning (blocking). For other expressions, collect all mirai values to ensure completion before daemon reset.

7. Advanced Topics

Random Number Generation

mirai uses L’Ecuyer-CMRG streams (like base R’s parallel package) for statistically-sound parallel RNG.

Streams divide the RNG sequence at far-apart intervals that don’t overlap, ensuring valid parallel results.

Default (seed = NULL): New stream per daemon (like base R):

Reproducible (seed = integer): New stream per mirai() call (not per daemon):

Synchronous Mode

daemons(sync = TRUE) enables synchronous mode. Mirai evaluate immediately without async operation, useful for testing and debugging with browser().

Restrict to a specific profile by specifying .compute. Only seed affects behavior with sync = TRUE.

Example usage:

# run everything in sync:
daemons(sync = TRUE)
mp <- mirai_map(1:2, \(x) Sys.getpid())
daemons(0)
mp[]
#> [[1]]
#> [1] 4978
#> 
#> [[2]]
#> [1] 4978


# Use sync with the 'sync' compute profile:
daemons(sync = TRUE, .compute = "sync")
with_daemons("sync", {
  mp <- mirai_map(1:2, \(x) Sys.getpid())
})
daemons(0, .compute = "sync")
mp[]
#> [[1]]
#> [1] 4978
#> 
#> [[2]]
#> [1] 4978