-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpool.go
More file actions
215 lines (185 loc) · 4.86 KB
/
pool.go
File metadata and controls
215 lines (185 loc) · 4.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
package sqlite
import (
"context"
"errors"
"fmt"
"log/slog"
"runtime"
"runtime/pprof"
"runtime/trace"
"sync"
"time"
)
// #include <amalgamation/sqlite3.h>
// #include <stdint.h>
//
// extern char * go_strcpy(_GoString_ st);
// extern void go_free(void*);
// extern int sqlite_BindGoPointer(sqlite3_stmt* stmt, int pos, uintptr_t ptr, const char* name);
import "C"
const MemoryPath = "file::memory:?mode=memory"
var connectionsProfiles = pprof.NewProfile("t.sftw/sqlite/connections")
type Connections struct {
free *Conn // free list
mx sync.Mutex // protects all above
wait sync.Cond
}
// FreeCount returns the number of free connections in the pool
func (c *Connections) FreeCount() int {
c.mx.Lock()
defer c.mx.Unlock()
i := 0
for p := c.free; p != nil; p = p.next {
i++
}
return i
}
type ckey struct{}
type spkey struct{}
type savepoint struct {
name string
top bool
released bool
task *trace.Task
}
var NumThreads = 32
func init() {
// There is a limit to how many concurrent writes we can issue in SQLite at the same time,
// even in WAL mode (single writer). Increasing the number too much would still result in busy contention.
// This takes the same approach as Python's [ThreadPoolExecutor].
//
// [ThreadPoolExecutor]: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
w := runtime.NumCPU() + 4
if w < 32 {
NumThreads = w
}
}
// OpenPool ceates a new connection pool
// TODO(rdo) check if WAL by default is right
func OpenPool(name string, exts ...func(SQLITE3)) (*Connections, error) {
if name == ":memory:" {
return nil, errors.New(`":memory:" does not work with pools, use MemoryPath`)
}
var pool Connections
pool.wait = sync.Cond{L: &pool.mx}
ptr := &pool.free
for w := NumThreads; w > 0; w-- {
conn, err := Open(name, exts...)
if err != nil {
return nil, err
}
if w == 1 {
var mode string
err = conn.Exec(context.Background(), "PRAGMA journal_mode=WAL").ScanOne(&mode)
if err != nil || mode != "wal" {
return nil, fmt.Errorf("cannot set WAL mode (mode=%s): %w", mode, err)
}
}
*ptr = conn
ptr = &conn.next
}
return &pool, nil
}
// Savepoint creates a new [savepoint] in transaction (think about begin).
// If the connection does not exist, it is taken from the pool.
//
// [savepoint]: https://sqlite.org/lang_savepoint.html
func (p *Connections) Savepoint(ctx context.Context) (context.Context, error) {
ctn, ok := ctx.Value(ckey{}).(*Conn)
top := false
if !ok {
ctn = p.take()
top = true
ctn.zombie = time.AfterFunc(30*time.Second, func() {
slog.Warn("zombie connection detected")
})
}
spn := randname()
err := ctn.Exec(ctx, "SAVEPOINT "+spn).Err()
if err != nil {
return ctx, err
}
sp := &savepoint{name: spn, top: top}
ctx = context.WithValue(ctx, ckey{}, ctn)
ctx = context.WithValue(ctx, spkey{}, sp)
ctx, sp.task = trace.NewTask(ctx, "db:sqlite-tx")
return ctx, nil
}
// Close closes all connections in the pool.
// It can be safely called concurrently [Connections.Savepoint], [Connections.Exec] and [Connections.Release]
// but note that calls to [Connections.Savepoint] or [Connections.Exec] that happen after Close might block forever.
// The mechanism to terminate other connections has to be done out of band.
func (p *Connections) Close() error {
var err error
for w := NumThreads; w > 0; w-- {
ctn := p.take()
err = errors.Join(err, ctn.Close())
}
return err
}
func (p *Connections) take() *Conn {
p.mx.Lock()
for p.free == nil {
p.wait.Wait()
}
ctn := p.free
p.free = ctn.next
p.mx.Unlock()
connectionsProfiles.Add(ctn, 2)
return ctn
}
func (p *Connections) Release(ctx context.Context) error {
if r := recover(); r != nil {
panic(r)
}
ctn := ctx.Value(ckey{}).(*Conn)
sp := ctx.Value(spkey{}).(*savepoint)
if sp.released {
panic("savepoint released twice")
}
err := ctn.Exec(ctx, "RELEASE "+sp.name).Err()
if sp.top && err == nil {
ctn.zombie.Stop()
p.put(ctn)
}
sp.released = true
sp.task.End()
return err
}
func (p *Connections) put(ctn *Conn) {
connectionsProfiles.Remove(ctn)
p.mx.Lock()
ctn.next = p.free
p.free = ctn
p.wait.Signal()
p.mx.Unlock()
}
// Rollback rolls back all changes to the current changepoint.
// The rollback will not happen if the savepoint is already released; it is safe to call this from a defer.
func (p *Connections) Rollback(ctx context.Context) error {
ctn := ctx.Value(ckey{}).(*Conn)
sp := ctx.Value(spkey{}).(*savepoint)
if sp.released {
return nil
}
err := ctn.Exec(ctx, "ROLLBACK TO "+sp.name).Err()
if sp.top {
ctn.zombie.Stop()
p.put(ctn)
}
sp.task.End()
return err
}
func (p *Connections) Exec(ctx context.Context, cmd string, args ...any) *Rows {
ctn, ok := ctx.Value(ckey{}).(*Conn)
free := false
if !ok {
ctn = p.take()
free = true
}
rows := ctn.Exec(ctx, cmd, args...)
if free {
rows.final = func() { p.put(ctn) }
}
return rows
}