Skip to content

Commit 88706f9

Browse files
committed
terminate method for runbg
1 parent 848a637 commit 88706f9

2 files changed

Lines changed: 74 additions & 17 deletions

File tree

R/remoteComputing.R

Lines changed: 67 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
#' Detect number of free cores (on UNIX)
32
#'
43
#' @description Read \code{/proc/loadavg} and subtract from the number of cores
@@ -56,15 +55,16 @@ detectFreeCores <- function(machine = NULL) {
5655
#' They can then be used to get the results of a job wihtout having to do it manually.
5756
#' Requires the correct filename, so if the previous runbg was run with filename = NULL, you have to
5857
#' specify the tmp_filename manually.
59-
#'
60-
#' @return List of functions \code{check}, \code{get()} and \code{purge()}.
61-
#' \code{check()} checks, if the result is ready.
58+
#' @param walltime Optional. Maximum runtime in the format "HH:MM:SS". If exceeded, the job will be terminated.
59+
#' @return List of functions \code{check()}, \code{get()}, \code{purge()} and \code{terminate()}.
60+
#' \code{check()} checks if the result is ready.
6261
#' \code{get()} copies the result file
6362
#' to the working directory and loads it into the workspace as an object called \code{.runbgOutput}.
6463
#' This object is a list named according to the machines that contains the results returned by each
6564
#' machine.
6665
#' \code{purge()} deletes the temporary folder
6766
#' from the working directory and the remote machines.
67+
#' \code{terminate()} kills all running processes associated with this job on the remote machines.
6868
#' @export
6969
#' @examples
7070
#' \dontrun{
@@ -95,7 +95,7 @@ detectFreeCores <- function(machine = NULL) {
9595
#' print(result)
9696
#' out_job1$purge()
9797
#' }
98-
runbg <- function(..., machine = "localhost", filename = NULL, input = ls(.GlobalEnv), compile = FALSE, wait = FALSE, recover = F) {
98+
runbg <- function(..., machine = "localhost", filename = NULL, input = ls(.GlobalEnv), compile = FALSE, wait = FALSE, recover = F, walltime = NULL) {
9999

100100

101101
expr <- as.expression(substitute(...))
@@ -109,7 +109,7 @@ runbg <- function(..., machine = "localhost", filename = NULL, input = ls(.Globa
109109
filename <- paste(filename, 1:nmachines, sep = "_")
110110

111111
# Initialize output
112-
out <- structure(vector("list", 3), names = c("check", "get", "purge"))
112+
out <- structure(vector("list", 4), names = c("check", "get", "purge", "terminate"))
113113

114114
# Check
115115
out[[1]] <- function() {
@@ -152,10 +152,60 @@ runbg <- function(..., machine = "localhost", filename = NULL, input = ls(.Globa
152152
out[[3]] <- function() {
153153

154154
for (m in 1:nmachines) {
155-
system(paste0("ssh ", machine[m], " rm -r ", filename[m], "_folder"))
156-
system(paste0("ssh ", machine[m], " rm ", filename[m], ".Rout"))
155+
# Check if folder exists before trying to remove it
156+
folder_exists <- suppressWarnings(
157+
system(paste0("ssh ", machine[m], " '[ -d ", filename[m], "_folder ] && echo 1 || echo 0'"),
158+
intern = TRUE)
159+
)
160+
if (folder_exists == "1") {
161+
system(paste0("ssh ", machine[m], " rm -r ", filename[m], "_folder"))
162+
}
163+
164+
# Check if .Rout file exists before trying to remove it
165+
rout_exists <- suppressWarnings(
166+
system(paste0("ssh ", machine[m], " '[ -f ", filename[m], ".Rout ] && echo 1 || echo 0'"),
167+
intern = TRUE)
168+
)
169+
if (rout_exists == "1") {
170+
system(paste0("ssh ", machine[m], " rm ", filename[m], ".Rout"))
171+
}
172+
}
173+
174+
# Check if local files exist before trying to remove them
175+
local_files <- list.files(pattern = paste0(filename0, ".*"))
176+
if (length(local_files) > 0) {
177+
system(paste0("rm ", filename0, "*"))
178+
}
179+
}
180+
181+
# Add terminate function
182+
out[[4]] <- function() {
183+
for (m in 1:nmachines) {
184+
# Get process IDs for the R processes running our job
185+
# Use ps to get more detailed process info including state
186+
pids <- suppressWarnings(
187+
system(paste0("ssh ", machine[m],
188+
" 'ps aux | grep \"", filename[m], "\" | grep -v grep'"),
189+
intern = TRUE)
190+
)
191+
192+
if (length(pids) > 0) {
193+
# Extract PIDs of only running (R state) processes
194+
running_pids <- sapply(strsplit(pids, "\\s+"), function(x) {
195+
# Check if process state contains 'R' (running)
196+
if (grepl("R", x[8])) x[2] else NULL
197+
})
198+
running_pids <- running_pids[!sapply(running_pids, is.null)]
199+
200+
if (length(running_pids) > 0) {
201+
# Kill only the running processes
202+
system(paste0("ssh ", machine[m], " 'kill ", paste(running_pids, collapse = " "), "'"))
203+
cat("Terminated", length(running_pids), "running processes on", machine[m], "\n")
204+
} else {
205+
cat("No running processes found on", machine[m], "\n")
206+
}
207+
}
157208
}
158-
system(paste0("rm ", filename0, "*"))
159209
}
160210

161211

@@ -205,16 +255,19 @@ runbg <- function(..., machine = "localhost", filename = NULL, input = ls(.Globa
205255
pack,
206256
paste0("setwd('~/", filename[m], "_folder')"),
207257
"rm(list = ls())",
258+
if (!is.null(walltime)) {
259+
paste0("Sys.setenv(R_TIMEOUT='", walltime, "')")
260+
},
208261
compile.line,
209262
paste0("load('", filename0, ".RData')"),
210-
#"do.call(loadDLL, lapply(lsdMod(c('parfn', 'prdfn', 'obsfn', 'objfn')), get))",
211263
"files <- list.files(pattern = '.so')",
212264
"for (f in files) dyn.load(f)",
213-
#".oldobjects <- ls()",
214265
paste0(".node <- ", m),
215-
paste0(".runbgOutput <- try(", as.character(expr), ")"),
216-
#".newobjects <- ls()",
217-
266+
if (!is.null(walltime)) {
267+
paste0(".runbgOutput <- try(tools::pskill(Sys.getpid(), tools::SIGALRM, ", walltime, "); ", as.character(expr), ")")
268+
} else {
269+
paste0(".runbgOutput <- try(", as.character(expr), ")")
270+
},
218271
paste0("save(", output ,", file = '", filename[m], "_result.RData')"),
219272
sep = "\n"
220273
))

man/runbg.Rd

Lines changed: 7 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)