1-
21# ' Detect number of free cores (on UNIX)
32# '
43# ' @description Read \code{/proc/loadavg} and subtract from the number of cores
@@ -56,15 +55,16 @@ detectFreeCores <- function(machine = NULL) {
5655# ' They can then be used to get the results of a job wihtout having to do it manually.
5756# ' Requires the correct filename, so if the previous runbg was run with filename = NULL, you have to
5857# ' specify the tmp_filename manually.
59- # '
60- # ' @return List of functions \code{check}, \code{get()} and \code{purge ()}.
61- # ' \code{check()} checks, if the result is ready.
58+ # ' @param walltime Optional. Maximum runtime in the format "HH:MM:SS". If exceeded, the job will be terminated.
59+ # ' @return List of functions \code{check() }, \code{get()}, \code{purge()} and \code{terminate ()}.
60+ # ' \code{check()} checks if the result is ready.
6261# ' \code{get()} copies the result file
6362# ' to the working directory and loads it into the workspace as an object called \code{.runbgOutput}.
6463# ' This object is a list named according to the machines that contains the results returned by each
6564# ' machine.
6665# ' \code{purge()} deletes the temporary folder
6766# ' from the working directory and the remote machines.
67+ # ' \code{terminate()} kills all running processes associated with this job on the remote machines.
6868# ' @export
6969# ' @examples
7070# ' \dontrun{
@@ -95,7 +95,7 @@ detectFreeCores <- function(machine = NULL) {
9595# ' print(result)
9696# ' out_job1$purge()
9797# ' }
98- runbg <- function (... , machine = " localhost" , filename = NULL , input = ls(.GlobalEnv ), compile = FALSE , wait = FALSE , recover = F ) {
98+ runbg <- function (... , machine = " localhost" , filename = NULL , input = ls(.GlobalEnv ), compile = FALSE , wait = FALSE , recover = F , walltime = NULL ) {
9999
100100
101101 expr <- as.expression(substitute(... ))
@@ -109,7 +109,7 @@ runbg <- function(..., machine = "localhost", filename = NULL, input = ls(.Globa
109109 filename <- paste(filename , 1 : nmachines , sep = " _" )
110110
111111 # Initialize output
112- out <- structure(vector(" list" , 3 ), names = c(" check" , " get" , " purge" ))
112+ out <- structure(vector(" list" , 4 ), names = c(" check" , " get" , " purge" , " terminate " ))
113113
114114 # Check
115115 out [[1 ]] <- function () {
@@ -152,10 +152,60 @@ runbg <- function(..., machine = "localhost", filename = NULL, input = ls(.Globa
152152 out [[3 ]] <- function () {
153153
154154 for (m in 1 : nmachines ) {
155- system(paste0(" ssh " , machine [m ], " rm -r " , filename [m ], " _folder" ))
156- system(paste0(" ssh " , machine [m ], " rm " , filename [m ], " .Rout" ))
155+ # Check if folder exists before trying to remove it
156+ folder_exists <- suppressWarnings(
157+ system(paste0(" ssh " , machine [m ], " '[ -d " , filename [m ], " _folder ] && echo 1 || echo 0'" ),
158+ intern = TRUE )
159+ )
160+ if (folder_exists == " 1" ) {
161+ system(paste0(" ssh " , machine [m ], " rm -r " , filename [m ], " _folder" ))
162+ }
163+
164+ # Check if .Rout file exists before trying to remove it
165+ rout_exists <- suppressWarnings(
166+ system(paste0(" ssh " , machine [m ], " '[ -f " , filename [m ], " .Rout ] && echo 1 || echo 0'" ),
167+ intern = TRUE )
168+ )
169+ if (rout_exists == " 1" ) {
170+ system(paste0(" ssh " , machine [m ], " rm " , filename [m ], " .Rout" ))
171+ }
172+ }
173+
174+ # Check if local files exist before trying to remove them
175+ local_files <- list.files(pattern = paste0(filename0 , " .*" ))
176+ if (length(local_files ) > 0 ) {
177+ system(paste0(" rm " , filename0 , " *" ))
178+ }
179+ }
180+
181+ # Add terminate function
182+ out [[4 ]] <- function () {
183+ for (m in 1 : nmachines ) {
184+ # Get process IDs for the R processes running our job
185+ # Use ps to get more detailed process info including state
186+ pids <- suppressWarnings(
187+ system(paste0(" ssh " , machine [m ],
188+ " 'ps aux | grep \" " , filename [m ], " \" | grep -v grep'" ),
189+ intern = TRUE )
190+ )
191+
192+ if (length(pids ) > 0 ) {
193+ # Extract PIDs of only running (R state) processes
194+ running_pids <- sapply(strsplit(pids , " \\ s+" ), function (x ) {
195+ # Check if process state contains 'R' (running)
196+ if (grepl(" R" , x [8 ])) x [2 ] else NULL
197+ })
198+ running_pids <- running_pids [! sapply(running_pids , is.null )]
199+
200+ if (length(running_pids ) > 0 ) {
201+ # Kill only the running processes
202+ system(paste0(" ssh " , machine [m ], " 'kill " , paste(running_pids , collapse = " " ), " '" ))
203+ cat(" Terminated" , length(running_pids ), " running processes on" , machine [m ], " \n " )
204+ } else {
205+ cat(" No running processes found on" , machine [m ], " \n " )
206+ }
207+ }
157208 }
158- system(paste0(" rm " , filename0 , " *" ))
159209 }
160210
161211
@@ -205,16 +255,19 @@ runbg <- function(..., machine = "localhost", filename = NULL, input = ls(.Globa
205255 pack ,
206256 paste0(" setwd('~/" , filename [m ], " _folder')" ),
207257 " rm(list = ls())" ,
258+ if (! is.null(walltime )) {
259+ paste0(" Sys.setenv(R_TIMEOUT='" , walltime , " ')" )
260+ },
208261 compile.line ,
209262 paste0(" load('" , filename0 , " .RData')" ),
210- # "do.call(loadDLL, lapply(lsdMod(c('parfn', 'prdfn', 'obsfn', 'objfn')), get))",
211263 " files <- list.files(pattern = '.so')" ,
212264 " for (f in files) dyn.load(f)" ,
213- # ".oldobjects <- ls()",
214265 paste0(" .node <- " , m ),
215- paste0(" .runbgOutput <- try(" , as.character(expr ), " )" ),
216- # ".newobjects <- ls()",
217-
266+ if (! is.null(walltime )) {
267+ paste0(" .runbgOutput <- try(tools::pskill(Sys.getpid(), tools::SIGALRM, " , walltime , " ); " , as.character(expr ), " )" )
268+ } else {
269+ paste0(" .runbgOutput <- try(" , as.character(expr ), " )" )
270+ },
218271 paste0(" save(" , output ," , file = '" , filename [m ], " _result.RData')" ),
219272 sep = " \n "
220273 ))
0 commit comments