1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
|
// SPDX-FileCopyrightText: 2026 Stefan Majewsky <majewsky@gmx.net>
// SPDX-License-Identifier: Apache-2.0
package oblast
import (
"context"
"database/sql"
"fmt"
"reflect"
)
// PrepareThreshold is a tuning parameter for the strategy used by all methods of [Store] operating on batches of records provided by the caller
// (specifically, [Store.Insert], [Store.Update] and [Store.Delete]).
//
// For large amounts of records, it is obviously advantageous to build a prepared statement for the one query that will be used repeatedly on all of them.
// However, building a prepared statement is associated with some amount of bookkeeping on the level of the database/sql library.
// When operating on individual records or small amounts of records at a time (that is, in OLTP rather than OLAP workloads), this overhead becomes a measurable performance burden.
//
// This tuning parameter defines the minimum number of records that will justify maintaining a prepared statement.
// Our benchmarking with the mattn/go-sqlite3 driver (and last checked with Go 1.26.2 on x86_64) indicates that this becomes a worthwhile investment at 8 or more records, so this is our default.
// If your benchmarking indicates a different tradeoff depending on your choice of Go version or SQL driver, you may adjust this variable accordingly.
var PrepareThreshold int = 8
// preparedStatement behaves like sql.Stmt, but only uses *sql.Stmt when it is useful (see explanation above).
type preparedStatement struct {
db Handle
query string
stmt *sql.Stmt // nil for input sizes below PrepareThreshold
}
// prepare behaves like [Handle.Prepare].
func prepare(ctx context.Context, db Handle, query, operation string, inputSize int) (preparedStatement, error) {
if query == "" {
return preparedStatement{}, fmt.Errorf("cannot execute %s() because query could not be autogenerated", operation)
}
if inputSize < PrepareThreshold {
return preparedStatement{db, query, nil}, nil
}
stmt, err := db.PrepareContext(ctx, query)
if err != nil {
return preparedStatement{}, fmt.Errorf("during Prepare(): %w", err)
}
return preparedStatement{db, query, stmt}, nil
}
// Close behaves like [sql.Stmt.Close].
func (s preparedStatement) Close() error {
if s.stmt == nil {
return nil
}
return s.stmt.Close()
}
// ExecContext behaves like [sql.Stmt.ExecContext].
func (s preparedStatement) ExecContext(ctx context.Context, args ...any) (sql.Result, error) {
if s.stmt == nil {
return s.db.ExecContext(ctx, s.query, args...)
}
return s.stmt.ExecContext(ctx, args...)
}
// QueryRow behaves like [sql.Stmt.QueryRowContext].
func (s preparedStatement) QueryRowContext(ctx context.Context, args ...any) *sql.Row {
if s.stmt == nil {
return s.db.QueryRowContext(ctx, s.query, args...)
}
return s.stmt.QueryRowContext(ctx, args...)
}
// Insert executes an SQL INSERT statement for each of the provided records.
//
// Fields that are declared with the "auto" tag will not be written into the DB,
// and instead their value (as auto-generated by the DB on insert) will be placed in the record.
//
// Returns an error if [NewStore] was called without the [TableNameIs] option, which is required to generate a query for this method.
//
// Returns an error if any of the `records` has a non-zero value in any column marked as `db:",auto"`.
// Records that already exist in the database should be handled with [Store.Update] instead.
func (s Store[R]) Insert(ctx context.Context, db Handle, records ...*R) error {
// NOTE: This function body should be as short as possible to reduce the binary size after monomorphization.
// Any expression that does not depend on type R should be factored out into a reusable function.
stmt, err := prepare(ctx, db, s.plan.Insert.Query, "Insert", len(records))
if err != nil {
return err
}
return s.insertUsing(ctx, stmt, db, records)
}
func (s Store[R]) insertUsing(ctx context.Context, stmt preparedStatement, db Handle, records []*R) error {
// NOTE: This function body should be as short as possible to reduce the binary size after monomorphization.
// Any expression that does not depend on type R should be factored out into a reusable function.
var (
argumentIndexes = s.plan.Insert.ArgumentIndexes
argumentSlots = make([]any, len(argumentIndexes))
scanIndexes = s.plan.Insert.ScanIndexes
scanSlots = make([]any, len(scanIndexes))
)
for idx, r := range records {
v := reflect.ValueOf(r).Elem()
err := insertRecord(ctx, v, idx, stmt, argumentIndexes, argumentSlots, scanIndexes, scanSlots)
if err != nil {
return newIOError(err, "Stmt.Close", stmt.Close())
}
}
return newIOError(nil, "Stmt.Close", stmt.Close())
}
func insertRecord(ctx context.Context, v reflect.Value, recordIndex int, stmt preparedStatement, argumentIndexes [][]int, argumentSlots []any, scanIndexes [][]int, scanSlots []any) error {
for idx, index := range argumentIndexes {
argumentSlots[idx] = v.FieldByIndex(index).Interface()
}
for idx, index := range scanIndexes {
f := v.FieldByIndex(index)
if !f.IsZero() {
return fmt.Errorf(`refusing to INSERT record with idx = %d that already has non-zero values in its "auto" columns`, recordIndex)
}
scanSlots[idx] = f.Addr().Interface()
}
var err error
if len(scanSlots) == 0 {
_, err = stmt.ExecContext(ctx, argumentSlots...)
} else {
// TODO: using QueryRow for inserting is extremely expensive because database/sql allocates a Rows instance under the hood; other libraries are doing better by limiting themselves to ExecContext() + LastInsertId()
err = stmt.QueryRowContext(ctx, argumentSlots...).Scan(scanSlots...)
}
if err != nil {
return fmt.Errorf("while inserting record with idx = %d: %w", recordIndex, err)
}
return nil
}
// Update executes an SQL UPDATE statement for each of the provided records, updating all non-primary-key columns with the values in the records.
// Returns [MissingRecordError] if any of the records does not exist in the database, that is, if for any of the records, the database contains no row with the same primary key values.
//
// Returns an error if [NewStore] was called without the [TableNameIs] or [PrimaryKeyIs] options, which are both required to generate a query for this method.
func (s Store[R]) Update(ctx context.Context, db Handle, records ...R) error {
// NOTE: This function body should be as short as possible to reduce the binary size after monomorphization.
// Any expression that does not depend on type R should be factored out into a reusable function.
stmt, err := prepare(ctx, db, s.plan.Update.Query, "Update", len(records))
if err != nil {
return err
}
var (
argumentIndexes = s.plan.Update.ArgumentIndexes
argumentSlots = make([]any, len(argumentIndexes))
)
for idx := range records {
v := reflect.ValueOf(&records[idx]).Elem()
rowsAffected, err := updateRecord(ctx, v, idx, stmt, argumentIndexes, argumentSlots)
if err == nil && rowsAffected == 0 {
err = MissingRecordError[R]{records[idx], s.plan}
}
if err != nil {
return newIOError(err, "Stmt.Close", stmt.Close())
}
}
return newIOError(nil, "Stmt.Close", stmt.Close())
}
func updateRecord(ctx context.Context, v reflect.Value, recordIndex int, stmt preparedStatement, argumentIndexes [][]int, argumentSlots []any) (int64, error) {
for idx, index := range argumentIndexes {
argumentSlots[idx] = v.FieldByIndex(index).Interface()
}
result, err := stmt.ExecContext(ctx, argumentSlots...)
if err != nil {
return 0, fmt.Errorf("while updating record with idx = %d: %w", recordIndex, err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return 0, fmt.Errorf("during RowsAffected() for record with idx = %d: %w", recordIndex, err)
}
return rowsAffected, nil
}
// Delete executes an SQL DELETE statement for each of the provided records, using their primary keys to locate the respective table rows.
//
// Returns an error if [NewStore] was called without the [TableNameIs] or [PrimaryKeyIs] options, which are both required to generate a query for this method.
func (s Store[R]) Delete(ctx context.Context, db Handle, records ...R) error {
// NOTE: This function body should be as short as possible to reduce the binary size after monomorphization.
// Any expression that does not depend on type R should be factored out into a reusable function.
stmt, err := prepare(ctx, db, s.plan.Delete.Query, "Delete", len(records))
if err != nil {
return err
}
var (
argumentIndexes = s.plan.Delete.ArgumentIndexes
argumentSlots = make([]any, len(argumentIndexes))
)
for idx := range records {
v := reflect.ValueOf(&records[idx]).Elem()
err := deleteRecord(ctx, v, idx, stmt, argumentIndexes, argumentSlots)
if err != nil {
return newIOError(err, "Stmt.Close", stmt.Close())
}
}
return newIOError(nil, "Stmt.Close", stmt.Close())
}
func deleteRecord(ctx context.Context, v reflect.Value, recordIndex int, stmt preparedStatement, argumentIndexes [][]int, argumentSlots []any) error {
for idx, index := range argumentIndexes {
argumentSlots[idx] = v.FieldByIndex(index).Interface()
}
_, err := stmt.ExecContext(ctx, argumentSlots...)
if err != nil {
return fmt.Errorf("while deleting record with idx = %d: %w", recordIndex, err)
}
return nil
}
// Upsert executes either an SQL INSERT or UPDATE statement for each of the provided records,
// based on whether the record already exists in the DB or not.
//
// - For record types that have fields declared with the "auto" tag, INSERT is chosen iff those fields hold zero values.
// Returns an error if only some of the respective fields hold zero values while others don't.
// Returns an error if [NewStore] was called without the [TableNameIs] or [PrimaryKeyIs] options, which are both required to generate the respective queries for this method.
// - For record types that do not have fields declared with the "auto" tag, an INSERT ... ON CONFLICT statement is used.
// Returns an error if [NewStore] was called without the [TableNameIs] option, which is required to generate a query for this method.
func (s Store[R]) Upsert(ctx context.Context, db Handle, records ...*R) error {
// NOTE: This function body should be as short as possible to reduce the binary size after monomorphization.
// Any expression that does not depend on type R should be factored out into a reusable function.
if len(s.plan.AutoColumnNames) == 0 {
stmt, err := prepare(ctx, db, s.plan.Upsert.Query, "Upsert", len(records))
if err != nil {
return err
}
return s.insertUsing(ctx, stmt, db, records)
}
// TODO: respect PrepareThreshold (or not? may be too much bookkeeping overhead for not a whole lot of benefit)
insertStmt, err := prepare(ctx, db, s.plan.Insert.Query, "Insert", 0)
if err != nil {
return err
}
updateStmt, err := prepare(ctx, db, s.plan.Update.Query, "Update", 0)
if err != nil {
return err
}
var (
insertArgumentIndexes = s.plan.Insert.ArgumentIndexes
insertArgumentSlots = make([]any, len(insertArgumentIndexes))
insertScanIndexes = s.plan.Insert.ScanIndexes
insertScanSlots = make([]any, len(insertScanIndexes))
updateArgumentIndexes = s.plan.Update.ArgumentIndexes
updateArgumentSlots = make([]any, len(updateArgumentIndexes))
)
for idx, r := range records {
v := reflect.ValueOf(r).Elem()
isInsert, err := upsertDecideStrategy(v, idx, insertScanIndexes)
if err != nil {
return err
}
if isInsert {
err = insertRecord(ctx, v, idx, insertStmt, insertArgumentIndexes, insertArgumentSlots, insertScanIndexes, insertScanSlots)
} else {
var rowsAffected int64
rowsAffected, err = updateRecord(ctx, v, idx, updateStmt, updateArgumentIndexes, updateArgumentSlots)
if err == nil && rowsAffected == 0 {
err = MissingRecordError[R]{*r, s.plan}
}
}
if err != nil {
err = newIOError(err, "InsertStmt.Close", insertStmt.Close())
err = newIOError(err, "UpdateStmt.Close", updateStmt.Close())
return err
}
}
err = newIOError(err, "InsertStmt.Close", insertStmt.Close())
err = newIOError(err, "UpdateStmt.Close", updateStmt.Close())
return err
}
func upsertDecideStrategy(v reflect.Value, recordIndex int, scanIndexes [][]int) (isInsert bool, err error) {
var isUpdate bool
for _, index := range scanIndexes {
if v.FieldByIndex(index).IsZero() {
isInsert = true
} else {
isUpdate = true
}
}
if isInsert && isUpdate {
return false, fmt.Errorf(`cannot decide whether to INSERT or UPDATE record with idx = %d: some "auto" columns are zero, others are not`, recordIndex)
}
return isInsert, nil
}
|