1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
|
// SPDX-FileCopyrightText: 2026 Stefan Majewsky <majewsky@gmx.net>
// SPDX-License-Identifier: Apache-2.0
package oblast
import (
"database/sql"
"fmt"
"reflect"
)
// PrepareThreshold is a tuning parameter for the strategy used by all methods of [Store] operating on batches of records provided by the caller
// (specifically, [Store.Insert], [Store.Update] and [Store.Delete]).
//
// For large amounts of records, it is obviously advantageous to build a prepared statement for the one query that will be used repeatedly on all of them.
// However, building a prepared statement is associated with some amount of bookkeeping on the level of the database/sql library.
// When operating on individual records or small amounts of records at a time (that is, in OLTP rather than OLAP workloads), this overhead becomes a measurable performance burden.
//
// This tuning parameter defines the minimum number of records that will justify maintaining a prepared statement.
// Our benchmarking with the mattn/go-sqlite3 driver (and last checked with Go 1.26.2 on x86_64) indicates that this becomes a worthwhile investment at 8 or more records, so this is our default.
// If your benchmarking indicates a different tradeoff depending on your choice of Go version or SQL driver, you may adjust this variable accordingly.
var PrepareThreshold int = 8
// preparedStatement behaves like sql.Stmt, but only uses *sql.Stmt when it is useful (see explanation above).
type preparedStatement struct {
db Handle
query string
stmt *sql.Stmt // nil for input sizes below PrepareThreshold
}
// prepare behaves like [Handle.Prepare].
func prepare(db Handle, query, operation string, inputSize int) (preparedStatement, error) {
if query == "" {
return preparedStatement{}, fmt.Errorf("cannot execute %s() because query could not be autogenerated", operation)
}
if inputSize < PrepareThreshold {
return preparedStatement{db, query, nil}, nil
}
stmt, err := db.Prepare(query)
if err != nil {
return preparedStatement{}, fmt.Errorf("during Prepare(): %w", err)
}
return preparedStatement{db, query, stmt}, nil
}
// Close behaves like [sql.Stmt.Close].
func (s preparedStatement) Close() error {
if s.stmt == nil {
return nil
}
return s.stmt.Close()
}
// Exec behaves like [sql.Stmt.Exec].
func (s preparedStatement) Exec(args ...any) (sql.Result, error) {
if s.stmt == nil {
return s.db.Exec(s.query, args...)
}
return s.stmt.Exec(args...)
}
// QueryRow behaves like [sql.Stmt.QueryRow].
func (s preparedStatement) QueryRow(args ...any) *sql.Row {
if s.stmt == nil {
return s.db.QueryRow(s.query, args...)
}
return s.stmt.QueryRow(args...)
}
// Insert executes an SQL INSERT statement for each of the provided records.
//
// Fields that are declared with the "auto" tag will not be written into the DB,
// and instead their value (as auto-generated by the DB on insert) will be placed in the record.
//
// Returns an error if [NewStore] was called without the [TableNameIs] option, which is required to generate a query for this method.
//
// Returns an error if any of the `records` has a non-zero value in any column marked as `db:",auto"`.
// Records that already exist in the database should be handled with [Store.Update] instead.
func (s Store[R]) Insert(db Handle, records ...*R) error {
// NOTE: This function body should be as short as possible to reduce the binary size after monomorphization.
// Any expression that does not depend on type R should be factored out into a reusable function.
stmt, err := prepare(db, s.plan.Insert.Query, "Insert", len(records))
if err != nil {
return err
}
var (
argumentIndexes = s.plan.Insert.ArgumentIndexes
argumentSlots = make([]any, len(argumentIndexes))
scanIndexes = s.plan.Insert.ScanIndexes
scanSlots = make([]any, len(scanIndexes))
)
for idx := range records {
v := reflect.ValueOf(records[idx]).Elem()
err := insertRecord(v, idx, stmt, argumentIndexes, argumentSlots, scanIndexes, scanSlots)
if err != nil {
return newIOError(err, "Stmt.Close", stmt.Close())
}
}
return newIOError(nil, "Stmt.Close", stmt.Close())
}
func insertRecord(v reflect.Value, recordIndex int, stmt preparedStatement, argumentIndexes [][]int, argumentSlots []any, scanIndexes [][]int, scanSlots []any) error {
for idx, index := range argumentIndexes {
argumentSlots[idx] = v.FieldByIndex(index).Interface()
}
for idx, index := range scanIndexes {
f := v.FieldByIndex(index)
if !f.IsZero() {
return fmt.Errorf(`refusing to INSERT record with idx = %d that already has non-zero values in its "auto" columns`, recordIndex)
}
scanSlots[idx] = f.Addr().Interface()
}
var err error
if len(scanSlots) == 0 {
_, err = stmt.Exec(argumentSlots...)
} else {
err = stmt.QueryRow(argumentSlots...).Scan(scanSlots...)
}
if err != nil {
return fmt.Errorf("while inserting record with idx = %d: %w", recordIndex, err)
}
return nil
}
// Update executes an SQL UPDATE statement for each of the provided records, updating all non-primary-key columns with the values in the records.
// Returns [MissingRecordError] if any of the records does not exist in the database, that is, if for any of the records, the database contains no row with the same primary key values.
//
// Returns an error if [NewStore] was called without the [TableNameIs] or [PrimaryKeyIs] options, which are both required to generate a query for this method.
func (s Store[R]) Update(db Handle, records ...R) error {
// NOTE: This function body should be as short as possible to reduce the binary size after monomorphization.
// Any expression that does not depend on type R should be factored out into a reusable function.
stmt, err := prepare(db, s.plan.Update.Query, "Update", len(records))
if err != nil {
return err
}
var (
argumentIndexes = s.plan.Update.ArgumentIndexes
argumentSlots = make([]any, len(argumentIndexes))
)
for idx, r := range records {
v := reflect.ValueOf(&r).Elem()
rowsAffected, err := updateRecord(v, idx, stmt, argumentIndexes, argumentSlots)
if err == nil && rowsAffected == 0 {
err = MissingRecordError[R]{r, s.plan}
}
if err != nil {
return newIOError(err, "Stmt.Close", stmt.Close())
}
}
return newIOError(nil, "Stmt.Close", stmt.Close())
}
func updateRecord(v reflect.Value, recordIndex int, stmt preparedStatement, argumentIndexes [][]int, argumentSlots []any) (int64, error) {
for idx, index := range argumentIndexes {
argumentSlots[idx] = v.FieldByIndex(index).Interface()
}
result, err := stmt.Exec(argumentSlots...)
if err != nil {
return 0, fmt.Errorf("while updating record with idx = %d: %w", recordIndex, err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return 0, fmt.Errorf("during RowsAffected() for record with idx = %d: %w", recordIndex, err)
}
return rowsAffected, nil
}
// Delete executes an SQL DELETE statement for each of the provided records, using their primary keys to locate the respective table rows.
//
// Returns an error if [NewStore] was called without the [TableNameIs] or [PrimaryKeyIs] options, which are both required to generate a query for this method.
func (s Store[R]) Delete(db Handle, records ...R) error {
// NOTE: This function body should be as short as possible to reduce the binary size after monomorphization.
// Any expression that does not depend on type R should be factored out into a reusable function.
stmt, err := prepare(db, s.plan.Delete.Query, "Delete", len(records))
if err != nil {
return err
}
var (
argumentIndexes = s.plan.Delete.ArgumentIndexes
argumentSlots = make([]any, len(argumentIndexes))
)
for idx, r := range records {
v := reflect.ValueOf(&r).Elem()
err := deleteRecord(v, idx, stmt, argumentIndexes, argumentSlots)
if err != nil {
return newIOError(err, "Stmt.Close", stmt.Close())
}
}
return newIOError(nil, "Stmt.Close", stmt.Close())
}
func deleteRecord(v reflect.Value, recordIndex int, stmt preparedStatement, argumentIndexes [][]int, argumentSlots []any) error {
for idx, index := range argumentIndexes {
argumentSlots[idx] = v.FieldByIndex(index).Interface()
}
_, err := stmt.Exec(argumentSlots...)
if err != nil {
return fmt.Errorf("while deleting record with idx = %d: %w", recordIndex, err)
}
return nil
}
|