Skip to content

Commit 134b129

Browse files
committed
Support legacy daemons (< v0.14.0) in ServerPoll mode
- Add supports_full_server_poll() method to check daemon version - Skip polling for legacy daemons (they don't have /api/status, /api/poll, etc.) - Make API key optional in post_to_daemon and discovery request methods - Allow subscriber to send discovery commands without auth for legacy daemons - Add info-level logging for HubSpot org sync on startup Legacy daemons stay alive via their own heartbeat calls and can still receive discovery initiate/cancel commands without authentication.
1 parent b24d27d commit 134b129

File tree

6 files changed

+122
-56
lines changed

6 files changed

+122
-56
lines changed

backend/src/bin/server.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,11 +286,14 @@ async fn main() -> anyhow::Result<()> {
286286

287287
// Sync existing organizations to HubSpot if configured
288288
if let Some(hubspot_service) = state.services.hubspot_service.clone() {
289+
tracing::info!(target: LOG_TARGET, " Spawning HubSpot organization sync task");
289290
tokio::spawn(async move {
290291
if let Err(e) = hubspot_service.sync_existing_organizations().await {
291292
tracing::error!(target: LOG_TARGET, error = %e, "Failed to sync existing organizations to HubSpot");
292293
}
293294
});
295+
} else {
296+
tracing::info!(target: LOG_TARGET, " HubSpot service not configured, skipping org sync");
294297
}
295298

296299
// Configuration summary

backend/src/server/daemons/impl/base.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,23 @@ impl Daemon {
7777
&& self.base.network_id == other.base.network_id
7878
&& self.base.host_id == other.base.host_id
7979
}
80+
81+
/// Check if daemon supports full ServerPoll mode (v0.14.0+).
82+
///
83+
/// Legacy daemons (< v0.14.0) only support `/api/discovery/initiate` and
84+
/// `/api/discovery/cancel` endpoints without authentication.
85+
/// They don't support the newer endpoints: `/api/status`, `/api/poll`,
86+
/// `/api/first-contact`, `/api/discovery/entities-created`.
87+
///
88+
/// Returns `false` for daemons without a version (assume legacy).
89+
pub fn supports_full_server_poll(&self) -> bool {
90+
const SERVER_POLL_VERSION: Version = Version::new(0, 14, 0);
91+
self.base
92+
.version
93+
.as_ref()
94+
.map(|v| v >= &SERVER_POLL_VERSION)
95+
.unwrap_or(false)
96+
}
8097
}
8198

8299
impl Display for Daemon {

backend/src/server/daemons/service.rs

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -224,29 +224,34 @@ impl DaemonService {
224224
.await
225225
}
226226

227-
/// Send POST request to daemon with auth and retry.
227+
/// Send POST request to daemon with optional auth and retry.
228228
/// Uses exponential backoff: 5 retries, 5-30s delays.
229229
/// Returns `Option<T>` - `Some(data)` if response contains data, `None` otherwise.
230230
/// For endpoints that don't return data, use `::<serde_json::Value>` and ignore result.
231+
///
232+
/// If `api_key` is `None`, the request is sent without an Authorization header.
233+
/// This is used for legacy daemons (< v0.14.0) that don't require authentication.
231234
async fn post_to_daemon<T: serde::de::DeserializeOwned>(
232235
&self,
233236
daemon: &Daemon,
234-
api_key: &str,
237+
api_key: Option<&str>,
235238
path: &str,
236239
body: &impl serde::Serialize,
237240
) -> Result<Option<T>> {
238241
let url = format!("{}{}", daemon.base.url, path);
239242
let daemon_id = daemon.id;
240243
let body_json = serde_json::to_value(body)?;
244+
let api_key_owned = api_key.map(|s| s.to_owned());
241245

242246
(|| async {
243-
let response = self
244-
.client
245-
.post(&url)
246-
.header("Authorization", format!("Bearer {}", api_key))
247-
.json(&body_json)
248-
.send()
249-
.await?;
247+
let mut request = self.client.post(&url).json(&body_json);
248+
249+
// Only add auth header if API key provided (v0.14.0+ daemons)
250+
if let Some(ref key) = api_key_owned {
251+
request = request.header("Authorization", format!("Bearer {}", key));
252+
}
253+
254+
let response = request.send().await?;
250255

251256
if !response.status().is_success() {
252257
anyhow::bail!("POST {} failed: HTTP {}", path, response.status());
@@ -317,7 +322,7 @@ impl DaemonService {
317322
let _: Option<serde_json::Value> = self
318323
.post_to_daemon(
319324
daemon,
320-
api_key,
325+
Some(api_key),
321326
"/api/discovery/entities-created",
322327
&created_entities,
323328
)
@@ -333,11 +338,14 @@ impl DaemonService {
333338
Ok(())
334339
}
335340

336-
/// Send discovery request to daemon (HTTP only, no event publishing)
341+
/// Send discovery request to daemon (HTTP only, no event publishing).
342+
///
343+
/// If `api_key` is `None`, the request is sent without authentication.
344+
/// This is used for legacy daemons (< v0.14.0) that don't require auth.
337345
pub async fn send_discovery_request_to_daemon(
338346
&self,
339347
daemon: &Daemon,
340-
api_key: &str,
348+
api_key: Option<&str>,
341349
request: DaemonDiscoveryRequest,
342350
) -> Result<(), Error> {
343351
tracing::info!(
@@ -359,11 +367,14 @@ impl DaemonService {
359367
Ok(())
360368
}
361369

362-
/// Send discovery cancellation to daemon (HTTP only, no event publishing)
370+
/// Send discovery cancellation to daemon (HTTP only, no event publishing).
371+
///
372+
/// If `api_key` is `None`, the request is sent without authentication.
373+
/// This is used for legacy daemons (< v0.14.0) that don't require auth.
363374
pub async fn send_discovery_cancellation_to_daemon(
364375
&self,
365376
daemon: &Daemon,
366-
api_key: &str,
377+
api_key: Option<&str>,
367378
session_id: Uuid,
368379
) -> Result<(), Error> {
369380
let _: Option<serde_json::Value> = self
@@ -394,7 +405,7 @@ impl DaemonService {
394405
server_capabilities,
395406
};
396407

397-
self.post_to_daemon(daemon, api_key, "/api/first-contact", &request)
408+
self.post_to_daemon(daemon, Some(api_key), "/api/first-contact", &request)
398409
.await?
399410
.ok_or_else(|| anyhow::anyhow!("First contact response missing daemon status"))
400411
}
@@ -1067,7 +1078,23 @@ impl DaemonService {
10671078
/// Poll a single daemon for status and discovery data.
10681079
/// Uses backon for retry with exponential backoff.
10691080
/// Marks daemon unreachable after UNREACHABLE_THRESHOLD failures.
1081+
///
1082+
/// Legacy daemons (< v0.14.0) are skipped entirely - they don't support
1083+
/// the polling endpoints (/api/status, /api/poll, /api/first-contact,
1084+
/// /api/discovery/entities-created). Legacy daemons stay alive via their
1085+
/// own heartbeat calls to the server's backward-compat endpoint.
10701086
async fn poll_daemon(&self, daemon: &Daemon) -> Result<()> {
1087+
// Skip polling for legacy daemons - they don't have the new endpoints
1088+
if !daemon.supports_full_server_poll() {
1089+
tracing::debug!(
1090+
daemon_id = %daemon.id,
1091+
daemon_name = %daemon.base.name,
1092+
version = ?daemon.base.version,
1093+
"Skipping poll for legacy daemon (< v0.14.0) - polling endpoints not supported"
1094+
);
1095+
return Ok(());
1096+
}
1097+
10711098
tracing::debug!(
10721099
daemon_id = %daemon.id,
10731100
daemon_name = %daemon.base.name,
@@ -1296,7 +1323,7 @@ impl DaemonService {
12961323
discovery_type: work.discovery_type,
12971324
};
12981325
if let Err(e) = self
1299-
.send_discovery_request_to_daemon(daemon, &api_key, request)
1326+
.send_discovery_request_to_daemon(daemon, Some(&api_key), request)
13001327
.await
13011328
{
13021329
tracing::warn!(

backend/src/server/daemons/subscriber.rs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,28 @@ impl EventSubscriber for DaemonService {
4545
continue;
4646
}
4747

48-
// Get the API key for this daemon
49-
let api_key = match self.get_daemon_api_key(&daemon).await {
50-
Ok(key) => key,
51-
Err(e) => {
52-
tracing::error!(
53-
error = ?e,
54-
daemon_id = %discovery_event.daemon_id,
55-
"Failed to get API key for daemon, skipping event"
56-
);
57-
continue;
48+
// Get the API key - optional for legacy daemons (< v0.14.0)
49+
// Legacy daemons only have /api/discovery/initiate and /api/discovery/cancel
50+
// and they don't require authentication
51+
let api_key = if daemon.supports_full_server_poll() {
52+
match self.get_daemon_api_key(&daemon).await {
53+
Ok(key) => Some(key),
54+
Err(e) => {
55+
tracing::error!(
56+
error = ?e,
57+
daemon_id = %discovery_event.daemon_id,
58+
"Failed to get API key for daemon, skipping event"
59+
);
60+
continue;
61+
}
5862
}
63+
} else {
64+
tracing::debug!(
65+
daemon_id = %discovery_event.daemon_id,
66+
version = ?daemon.base.version,
67+
"Legacy daemon (< v0.14.0) - sending discovery command without auth"
68+
);
69+
None
5970
};
6071

6172
match discovery_event.phase {
@@ -72,7 +83,7 @@ impl EventSubscriber for DaemonService {
7283
};
7384

7485
if let Err(e) = self
75-
.send_discovery_request_to_daemon(&daemon, &api_key, request)
86+
.send_discovery_request_to_daemon(&daemon, api_key.as_deref(), request)
7687
.await
7788
{
7889
tracing::error!(
@@ -93,7 +104,7 @@ impl EventSubscriber for DaemonService {
93104
if let Err(e) = self
94105
.send_discovery_cancellation_to_daemon(
95106
&daemon,
96-
&api_key,
107+
api_key.as_deref(),
97108
discovery_event.session_id,
98109
)
99110
.await

backend/src/server/hubspot/service.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,8 @@ impl HubSpotService {
586586
/// Sync all organizations that don't have a HubSpot company ID.
587587
/// Called on server startup.
588588
pub async fn sync_existing_organizations(&self) -> Result<()> {
589+
tracing::info!("Starting HubSpot organization sync check");
590+
589591
let filter = StorableFilter::<Organization>::new_without_hubspot_company_id();
590592
let orgs = self.organization_service.get_all(filter).await?;
591593

@@ -600,15 +602,22 @@ impl HubSpotService {
600602
);
601603

602604
for org in orgs {
605+
tracing::info!(
606+
organization_id = %org.id,
607+
org_name = %org.base.name,
608+
"Syncing organization to HubSpot"
609+
);
603610
if let Err(e) = self.sync_organization(org).await {
604611
tracing::error!(
605-
organization_id = %e,
612+
error = %e,
606613
"Failed to sync organization to HubSpot"
607614
);
608615
// Continue with other orgs
609616
}
610617
}
611618

619+
tracing::info!("HubSpot organization sync check complete");
620+
612621
Ok(())
613622
}
614623

backend/tests/integration/main.rs

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -106,38 +106,18 @@ async fn integration_tests() {
106106
println!("✅ ServerPoll daemon provisioned: {}", serverpoll_daemon_id);
107107

108108
// =========================================================================
109-
// Phase 4: API Compatibility Tests
109+
// Phase 4: ServerPoll Discovery
110110
// =========================================================================
111+
// Run discovery BEFORE compat tests to ensure the daemon has clean state.
112+
// Compat tests send fixture requests that can leave stale sessions in the
113+
// server's daemon_sessions map, causing discovery to skip publishing events.
111114
println!("\n============================================================");
112-
println!("Phase 4: API Compatibility Tests");
113-
println!("============================================================");
114-
115-
compat::run_compat_tests(
116-
daemon.id, // Use DaemonPoll daemon ID (like original)
117-
network.id,
118-
organization.id,
119-
user.id,
120-
&serverpoll_api_key,
121-
)
122-
.await
123-
.expect("Compatibility tests failed");
124-
125-
// =========================================================================
126-
// Phase 5: Full Integration Verification
127-
// =========================================================================
128-
println!("\n============================================================");
129-
println!("Phase 5: Full Integration Verification");
115+
println!("Phase 4: ServerPoll Discovery");
130116
println!("============================================================\n");
131117

132-
// Clear DB data and in-memory session state
118+
// Clear any leftover discovery data from previous test runs
133119
clear_discovery_data().expect("Failed to clear discovery data");
134120

135-
// Cancel any stale sessions from compat tests that may still be in memory.
136-
// This prevents the daemon from processing old fixture sessions instead of the new one.
137-
compat::cancel_server_discovery_sessions(&client)
138-
.await
139-
.expect("Failed to clear stale discovery sessions");
140-
141121
// Trigger discovery for the ServerPoll daemon and get the session_id
142122
let session_id = discovery::trigger_discovery(&client, serverpoll_daemon_id, network.id)
143123
.await
@@ -168,7 +148,26 @@ async fn integration_tests() {
168148
.await
169149
.expect("Failed to create group");
170150

171-
println!("\n✅ ServerPoll integration flow completed!");
151+
println!("\n✅ ServerPoll discovery completed!");
152+
153+
// =========================================================================
154+
// Phase 5: API Compatibility Tests
155+
// =========================================================================
156+
// Runs after discovery so any stale sessions left by fixture playback
157+
// don't affect the real discovery flow.
158+
println!("\n============================================================");
159+
println!("Phase 5: API Compatibility Tests");
160+
println!("============================================================");
161+
162+
compat::run_compat_tests(
163+
daemon.id, // Use DaemonPoll daemon ID (like original)
164+
network.id,
165+
organization.id,
166+
user.id,
167+
&serverpoll_api_key,
168+
)
169+
.await
170+
.expect("Compatibility tests failed");
172171

173172
// =========================================================================
174173
// Phase 6: CRUD Endpoint Tests

0 commit comments

Comments
 (0)