Skip to content

Commit f9efa6e

Browse files
committed
fix: Fix up all issues from code reviews, especially async lock safety
1 parent 712e58d commit f9efa6e

15 files changed

Lines changed: 347 additions & 311 deletions

CHANGELOG.md

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010
### Changed
1111

1212
- **Major Rust Code Refactoring (Modularisation)**
13-
- Split monolithic `lib.rs` (2,302 lines) into 14 focused, single-responsibility modules
13+
- Split monolithic `lib.rs` (2,302 lines) into 13 focused, single-responsibility modules
1414
- **Module structure by feature area**:
1515
- `connection.rs` - Connection lifecycle, establishment, and state management
1616
- `query.rs` - Basic query execution and result handling
@@ -38,6 +38,36 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3838

3939
### Fixed
4040

41+
- **Critical Rust NIF Thread Safety and Scheduler Issues**
42+
- **Registry Lock Management**: Fixed all functions to drop registry locks before entering `TOKIO_RUNTIME.block_on()` async blocks
43+
- `execute_batch()` and `execute_transactional_batch()` in `batch.rs`: Simplified function signatures, dropped `conn_map` lock before async operations
44+
- `declare_cursor()` in `cursor.rs`: Dropped `conn_map` lock before async block
45+
- `do_sync()` in `query.rs`: Dropped `conn_map` lock before async block
46+
- `savepoint()`, `release_savepoint()`, and `rollback_to_savepoint()` in `savepoint.rs`: Now use `TransactionEntryGuard` pattern to avoid holding `TXN_REGISTRY` lock during async operations
47+
- `prepare_statement()` in `statement.rs`: Now clones inner connection Arc and drops client lock before async block, preventing locks from being held across await points
48+
- `begin_transaction()` and `begin_transaction_with_behavior()` in `transaction.rs`: Now clone inner connection Arc and drop all locks before async transaction creation, preventing locks from being held across await points
49+
- **DirtyIo Scheduler Annotations**: Added `#[rustler::nif(schedule = "DirtyIo")]` to blocking NIFs
50+
- `last_insert_rowid()`, `changes()`, and `is_autocommit()` in `metadata.rs`
51+
- Prevents blocking the BEAM scheduler during I/O operations
52+
- **Atom Naming Consistency**: Renamed `remote_primary` atom to `remote` in `constants.rs` and `decode.rs`
53+
- Fixes mismatch between Rust atom (`remote_primary()`) and Elixir convention (`:remote`)
54+
- `decode_mode()` now correctly decodes `:remote` atoms from Elixir
55+
- **Binary Allocation Error Handling**: Return `:error` atom instead of `nil` when binary allocation fails
56+
- Updated `cursor.rs` and `utils.rs` to use `:error` atom for `OwnedBinary::new()` allocation failures
57+
- Provides clearer indication of allocation errors in query results
58+
- **SQL Identifier Quoting**: Added proper quoting for SQLite identifiers in PRAGMA queries (`utils.rs`)
59+
- Table and index names are now properly quoted with double quotes
60+
- Internal double quotes are escaped by doubling them
61+
- Defensive programming against potential edge cases with special characters in identifiers
62+
- **Performance Optimizations**:
63+
- **Replication**: `max_write_replication_index()` in `replication.rs` now calls synchronous method directly instead of wrapping in `TOKIO_RUNTIME.block_on()`
64+
- Eliminates unnecessary async overhead for synchronous operations
65+
- **Connection**: `connect()` in `connection.rs` now uses shared global `TOKIO_RUNTIME` instead of creating a new runtime per connection
66+
- Prevents resource exhaustion under high connection rates
67+
- Eliminates expensive runtime creation overhead (each runtime spawns multiple threads)
68+
- Aligns with pattern used by all other operations in the codebase
69+
- **Impact**: Eliminates potential deadlocks, prevents BEAM scheduler blocking, ensures proper Elixir-Rust atom communication, improves error visibility, reduces overhead for replication index queries
70+
4171
- **Constraint Error Message Handling**
4272
- Enhanced constraint name extraction to support index names in error messages
4373
- Now correctly extracts custom index names from enhanced error format: `(index: index_name)`

CLAUDE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1141,7 +1141,7 @@ When working on this codebase:
11411141

11421142
## Summary
11431143

1144-
EctoLibSql is a production-ready Ecto adapter for LibSQL/Turso with full Ecto support, three connection modes, advanced features (vector search, encryption, streaming), zero panic risk, lots of tests, and comprehensive documentation.
1144+
EctoLibSql is a production-ready Ecto adapter for LibSQL/Turso with full Ecto support, three connection modes, advanced features (vector search, encryption, streaming), zero panic risk, extensive test coverage, and comprehensive documentation.
11451145

11461146
**Key Principle**: Safety first. All Rust code uses proper error handling to protect the BEAM VM. Errors are returned as tuples that can be supervised gracefully.
11471147

native/ecto_libsql/src/batch.rs

Lines changed: 97 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -32,59 +32,58 @@ pub fn execute_batch<'a>(
3232
_mode: Atom,
3333
_syncx: Atom,
3434
statements: Vec<Term<'a>>,
35-
) -> Result<NifResult<Term<'a>>, rustler::Error> {
35+
) -> NifResult<Term<'a>> {
3636
let conn_map = safe_lock(&CONNECTION_REGISTRY, "execute_batch conn_map")?;
3737

38-
if let Some(client) = conn_map.get(conn_id) {
39-
let client = client.clone();
38+
let client = conn_map
39+
.get(conn_id)
40+
.cloned()
41+
.ok_or_else(|| rustler::Error::Term(Box::new("Invalid connection ID")))?;
4042

41-
// Decode each statement with its arguments
42-
let mut batch_stmts: Vec<(String, Vec<Value>)> = Vec::new();
43-
for stmt_term in statements {
44-
let (query, args): (String, Vec<Term>) = stmt_term.decode().map_err(|e| {
45-
rustler::Error::Term(Box::new(format!("Failed to decode statement: {:?}", e)))
46-
})?;
43+
drop(conn_map); // Release lock before async operation
4744

48-
let decoded_args: Vec<Value> = args
49-
.into_iter()
50-
.map(|t| decode_term_to_value(t))
51-
.collect::<Result<_, _>>()
52-
.map_err(|e| rustler::Error::Term(Box::new(e)))?;
45+
// Decode each statement with its arguments
46+
let mut batch_stmts: Vec<(String, Vec<Value>)> = Vec::new();
47+
for stmt_term in statements {
48+
let (query, args): (String, Vec<Term>) = stmt_term.decode().map_err(|e| {
49+
rustler::Error::Term(Box::new(format!("Failed to decode statement: {:?}", e)))
50+
})?;
5351

54-
batch_stmts.push((query, decoded_args));
55-
}
52+
let decoded_args: Vec<Value> = args
53+
.into_iter()
54+
.map(|t| decode_term_to_value(t))
55+
.collect::<Result<_, _>>()
56+
.map_err(|e| rustler::Error::Term(Box::new(e)))?;
5657

57-
let result = TOKIO_RUNTIME.block_on(async {
58-
let mut all_results: Vec<Term<'a>> = Vec::new();
59-
60-
// Execute each statement sequentially
61-
for (sql, args) in batch_stmts.iter() {
62-
let client_guard = safe_lock_arc(&client, "execute_batch client")?;
63-
let conn_guard = safe_lock_arc(&client_guard.client, "execute_batch conn")?;
64-
65-
match conn_guard.query(sql, args.clone()).await {
66-
Ok(rows) => {
67-
let collected = collect_rows(env, rows)
68-
.await
69-
.map_err(|e| rustler::Error::Term(Box::new(format!("{:?}", e))))?;
70-
all_results.push(collected);
71-
}
72-
Err(e) => {
73-
return Err(rustler::Error::Term(Box::new(format!(
74-
"Batch statement error: {}",
75-
e
76-
))));
77-
}
58+
batch_stmts.push((query, decoded_args));
59+
}
60+
61+
TOKIO_RUNTIME.block_on(async {
62+
let mut all_results: Vec<Term<'a>> = Vec::new();
63+
64+
// Execute each statement sequentially
65+
for (sql, args) in batch_stmts.iter() {
66+
let client_guard = safe_lock_arc(&client, "execute_batch client")?;
67+
let conn_guard = safe_lock_arc(&client_guard.client, "execute_batch conn")?;
68+
69+
match conn_guard.query(sql, args.clone()).await {
70+
Ok(rows) => {
71+
let collected = collect_rows(env, rows)
72+
.await
73+
.map_err(|e| rustler::Error::Term(Box::new(format!("{:?}", e))))?;
74+
all_results.push(collected);
75+
}
76+
Err(e) => {
77+
return Err(rustler::Error::Term(Box::new(format!(
78+
"Batch statement error: {}",
79+
e
80+
))));
7881
}
7982
}
83+
}
8084

81-
Ok(Ok(all_results.encode(env)))
82-
});
83-
84-
return result;
85-
} else {
86-
Err(rustler::Error::Term(Box::new("Invalid connection ID")))
87-
}
85+
Ok(all_results.encode(env))
86+
})
8887
}
8988

9089
/// Execute multiple SQL statements atomically within a transaction.
@@ -111,72 +110,70 @@ pub fn execute_transactional_batch<'a>(
111110
_mode: Atom,
112111
_syncx: Atom,
113112
statements: Vec<Term<'a>>,
114-
) -> Result<NifResult<Term<'a>>, rustler::Error> {
113+
) -> NifResult<Term<'a>> {
115114
let conn_map = safe_lock(&CONNECTION_REGISTRY, "execute_transactional_batch conn_map")?;
116115

117-
if let Some(client) = conn_map.get(conn_id) {
118-
let client = client.clone();
116+
let client = conn_map
117+
.get(conn_id)
118+
.cloned()
119+
.ok_or_else(|| rustler::Error::Term(Box::new("Invalid connection ID")))?;
119120

120-
// Decode each statement with its arguments
121-
let mut batch_stmts: Vec<(String, Vec<Value>)> = Vec::new();
122-
for stmt_term in statements {
123-
let (query, args): (String, Vec<Term>) = stmt_term.decode().map_err(|e| {
124-
rustler::Error::Term(Box::new(format!("Failed to decode statement: {:?}", e)))
125-
})?;
121+
drop(conn_map); // Release lock before async operation
126122

127-
let decoded_args: Vec<Value> = args
128-
.into_iter()
129-
.map(|t| decode_term_to_value(t))
130-
.collect::<Result<_, _>>()
131-
.map_err(|e| rustler::Error::Term(Box::new(e)))?;
123+
// Decode each statement with its arguments
124+
let mut batch_stmts: Vec<(String, Vec<Value>)> = Vec::new();
125+
for stmt_term in statements {
126+
let (query, args): (String, Vec<Term>) = stmt_term.decode().map_err(|e| {
127+
rustler::Error::Term(Box::new(format!("Failed to decode statement: {:?}", e)))
128+
})?;
132129

133-
batch_stmts.push((query, decoded_args));
134-
}
130+
let decoded_args: Vec<Value> = args
131+
.into_iter()
132+
.map(|t| decode_term_to_value(t))
133+
.collect::<Result<_, _>>()
134+
.map_err(|e| rustler::Error::Term(Box::new(e)))?;
135135

136-
let result = TOKIO_RUNTIME.block_on(async {
137-
// Start a transaction
138-
let client_guard = safe_lock_arc(&client, "execute_transactional_batch client")?;
139-
let conn_guard =
140-
safe_lock_arc(&client_guard.client, "execute_transactional_batch conn")?;
141-
142-
let trx = conn_guard.transaction().await.map_err(|e| {
143-
rustler::Error::Term(Box::new(format!("Begin transaction failed: {}", e)))
144-
})?;
145-
146-
let mut all_results: Vec<Term<'a>> = Vec::new();
147-
148-
// Execute each statement in the transaction
149-
for (sql, args) in batch_stmts.iter() {
150-
match trx.query(sql, args.clone()).await {
151-
Ok(rows) => {
152-
let collected = collect_rows(env, rows)
153-
.await
154-
.map_err(|e| rustler::Error::Term(Box::new(format!("{:?}", e))))?;
155-
all_results.push(collected);
156-
}
157-
Err(e) => {
158-
// Rollback on error
159-
let _ = trx.rollback().await;
160-
return Err(rustler::Error::Term(Box::new(format!(
161-
"Batch statement error: {}",
162-
e
163-
))));
164-
}
136+
batch_stmts.push((query, decoded_args));
137+
}
138+
139+
TOKIO_RUNTIME.block_on(async {
140+
// Start a transaction
141+
let client_guard = safe_lock_arc(&client, "execute_transactional_batch client")?;
142+
let conn_guard = safe_lock_arc(&client_guard.client, "execute_transactional_batch conn")?;
143+
144+
let trx = conn_guard.transaction().await.map_err(|e| {
145+
rustler::Error::Term(Box::new(format!("Begin transaction failed: {}", e)))
146+
})?;
147+
148+
let mut all_results: Vec<Term<'a>> = Vec::new();
149+
150+
// Execute each statement in the transaction
151+
for (sql, args) in batch_stmts.iter() {
152+
match trx.query(sql, args.clone()).await {
153+
Ok(rows) => {
154+
let collected = collect_rows(env, rows)
155+
.await
156+
.map_err(|e| rustler::Error::Term(Box::new(format!("{:?}", e))))?;
157+
all_results.push(collected);
158+
}
159+
Err(e) => {
160+
// Rollback on error
161+
let _ = trx.rollback().await;
162+
return Err(rustler::Error::Term(Box::new(format!(
163+
"Batch statement error: {}",
164+
e
165+
))));
165166
}
166167
}
168+
}
167169

168-
// Commit the transaction
169-
trx.commit()
170-
.await
171-
.map_err(|e| rustler::Error::Term(Box::new(format!("Commit failed: {}", e))))?;
172-
173-
Ok(Ok(all_results.encode(env)))
174-
});
170+
// Commit the transaction
171+
trx.commit()
172+
.await
173+
.map_err(|e| rustler::Error::Term(Box::new(format!("Commit failed: {}", e))))?;
175174

176-
return result;
177-
} else {
178-
Err(rustler::Error::Term(Box::new("Invalid connection ID")))
179-
}
175+
Ok(all_results.encode(env))
176+
})
180177
}
181178

182179
/// Execute multiple SQL statements from a single string (semicolon-separated).

native/ecto_libsql/src/connection.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,8 @@ pub fn connect(opts: Term, mode: Term) -> NifResult<String> {
5353
.get("encryption_key")
5454
.and_then(|t| t.decode::<String>().ok());
5555

56-
let rt = tokio::runtime::Runtime::new()
57-
.map_err(|e| rustler::Error::Term(Box::new(format!("Tokio runtime err {}", e))))?;
58-
59-
// Wrap the entire connection process with a timeout.
60-
rt.block_on(async {
56+
// Wrap the entire connection process with a timeout using the global runtime.
57+
TOKIO_RUNTIME.block_on(async {
6158
let timeout = Duration::from_secs(DEFAULT_SYNC_TIMEOUT_SECS);
6259

6360
tokio::time::timeout(timeout, async {

native/ecto_libsql/src/constants.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,10 @@ lazy_static! {
4444
// Atom declarations for EctoLibSql - used as return values and option identifiers in the NIF interface
4545
atoms! {
4646
local,
47-
remote_primary,
47+
remote,
4848
remote_replica,
4949
ok,
50+
error,
5051
conn_id,
5152
trx_id,
5253
stmt_id,

0 commit comments

Comments
 (0)