|
17 | 17 | package org.apache.spark.sql |
18 | 18 |
|
19 | 19 | import com.sparkutils.testing.ConnectSession |
20 | | -import com.sparkutils.testing.SparkTestUtils.{DEBUG_CONNECT_LOGS_SYS, FLAT_JVM_OPTION, MAIN_CLASSPATH, booleanEnvOrProp, classPathJars, connectServerJars, testClassPaths} |
| 20 | +import com.sparkutils.testing.SparkTestUtils.{DEBUG_CONNECT_LOGS_SYS, FLAT_JVM_OPTION, MAIN_CLASSPATH, booleanEnvOrProp, classPathJars, connectServerJars, stringEnvOrProp, testClassPaths} |
21 | 21 | import com.sparkutils.testing.TestUtilsEnvironment.{onDatabricksFS, onFabricOrSynapse} |
22 | 22 | import org.apache.spark.{SparkBuildInfo, sql} |
23 | 23 | import org.apache.spark.sql.connect.SparkSession |
@@ -288,56 +288,61 @@ object SparkConnectServerUtils { |
288 | 288 | // classic non-shared databricks setup, should also be the case for a normal submitted job on OSS / Fabric |
289 | 289 | val localOnly = booleanEnvOrProp("SPARKUTILS_TESTING_USE_LOCAL_CONNECT") |
290 | 290 |
|
| 291 | + // despite the function name, we may want to test against remote servers |
| 292 | + val connectURL = stringEnvOrProp("SPARK_REMOTE") |
| 293 | + |
291 | 294 | Some( |
| 295 | + if (connectURL ne null) |
| 296 | + ExistingSession(SparkSession.builder.config(clientConfig).getOrCreate()) |
| 297 | + else |
| 298 | + if (spawnConnect) { |
| 299 | + // if there is a forced local connect, e.g. running 4.0.0 full shades on a later Fabric 1.4 that doesn't force a |
| 300 | + // connect setup, we have a way out |
| 301 | + if (localOnly) |
| 302 | + ExistingSession(SparkSession.builder.config("spark.api.mode", "connect").getOrCreate()) |
| 303 | + else |
| 304 | + new ConnectSession { |
| 305 | + |
| 306 | + val filter = |
| 307 | + serverConfig.get(DEBUG_CONNECT_LOGS_SYS).exists { v => |
| 308 | + System.setProperty(DEBUG_CONNECT_LOGS_SYS, v) |
| 309 | + true |
| 310 | + } |
292 | 311 |
|
293 | | - if (spawnConnect) { |
294 | | - // if there is a forced local connect, e.g. running 4.0.0 full shades on a later Fabric 1.4 that doesn't force a |
295 | | - // connect setup, we have a way out |
296 | | - if (localOnly) |
297 | | - ExistingSession(SparkSession.builder.config("spark.api.mode", "connect").getOrCreate()) |
298 | | - else |
299 | | - new ConnectSession { |
300 | | - |
301 | | - val filter = |
302 | | - serverConfig.get(DEBUG_CONNECT_LOGS_SYS).exists { v => |
303 | | - System.setProperty(DEBUG_CONNECT_LOGS_SYS, v) |
304 | | - true |
| 312 | + val utils = SparkConnectServerUtils( |
| 313 | + if (filter) |
| 314 | + serverConfig - DEBUG_CONNECT_LOGS_SYS |
| 315 | + else |
| 316 | + serverConfig |
| 317 | + ) |
| 318 | + |
| 319 | + val th = System.getProperty("spark.test.home") |
| 320 | + if (th eq null) { |
| 321 | + new File("./testing_connect_tmp").mkdirs() |
| 322 | + System.setProperty("spark.test.home","./testing_connect_tmp") |
305 | 323 | } |
306 | 324 |
|
307 | | - val utils = SparkConnectServerUtils( |
308 | | - if (filter) |
309 | | - serverConfig - DEBUG_CONNECT_LOGS_SYS |
310 | | - else |
311 | | - serverConfig |
312 | | - ) |
313 | | - |
314 | | - val th = System.getProperty("spark.test.home") |
315 | | - if (th eq null) { |
316 | | - new File("./testing_connect_tmp").mkdirs() |
317 | | - System.setProperty("spark.test.home","./testing_connect_tmp") |
318 | | - } |
319 | | - |
320 | 325 |
|
321 | | - utils.start() |
| 326 | + utils.start() |
322 | 327 |
|
323 | | - private def createSparkSession: SparkSession = SparkConnectServerUtils.createSparkSession(utils.port, clientConfig) |
| 328 | + private def createSparkSession: SparkSession = SparkConnectServerUtils.createSparkSession(utils.port, clientConfig) |
324 | 329 |
|
325 | | - private var _sparkSession: SparkSession = createSparkSession |
| 330 | + private var _sparkSession: SparkSession = createSparkSession |
326 | 331 |
|
327 | | - override def sparkSession: sql.SparkSession = _sparkSession |
| 332 | + override def sparkSession: sql.SparkSession = _sparkSession |
328 | 333 |
|
329 | | - override def stopServer(): Unit = utils.stop() |
| 334 | + override def stopServer(): Unit = utils.stop() |
330 | 335 |
|
331 | | - override def resetSession(): Unit = |
332 | | - if (!(onFabricOrSynapse(_sparkSession) || onDatabricksFS)) { |
333 | | - if (sparkSession.isUsable) { |
334 | | - sparkSession.stop() |
335 | | - } |
336 | | - _sparkSession = createSparkSession |
337 | | - } // else leave as is, no reset to do |
338 | | - } |
339 | | - } else |
340 | | - ExistingSession(SparkSession.active) |
| 336 | + override def resetSession(): Unit = |
| 337 | + if (!(onFabricOrSynapse(_sparkSession) || onDatabricksFS)) { |
| 338 | + if (sparkSession.isUsable) { |
| 339 | + sparkSession.stop() |
| 340 | + } |
| 341 | + _sparkSession = createSparkSession |
| 342 | + } // else leave as is, no reset to do |
| 343 | + } |
| 344 | + } else |
| 345 | + ExistingSession(SparkSession.active) |
341 | 346 | ) |
342 | 347 | } |
343 | 348 |
|
|
0 commit comments