-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathdb.go
228 lines (191 loc) · 6.28 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
/*
Copyright 2025 eatmoreapple
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package juice provides a robust and thread-safe database connection management system.
// It supports multiple database drivers and connection pooling with configurable parameters.
package juice
import (
"database/sql"
"errors"
"fmt"
"github.com/go-juicedev/juice/driver"
"sync"
"sync/atomic"
"time"
)
// Source encapsulates all configuration parameters needed for establishing
// and maintaining a database connection. It includes connection pool settings
// and lifecycle management parameters.
type Source struct {
Driver string
DSN string
MaxIdleConns int
MaxOpenConns int
ConnMaxLifetime time.Duration
ConnMaxIdleTime time.Duration
}
// conn represents an active database connection along with its associated driver.
// It uses sync.Once to ensure thread-safe initialization.
type conn struct {
db *sql.DB
drv driver.Driver
once sync.Once
}
// DBManager implements a thread-safe connection manager for multiple database instances.
// It maintains a pool of connections and their corresponding configurations.
type DBManager struct {
conns sync.Map // active connections
sources map[string]Source // connection sources
mu sync.RWMutex // protects sources map
closed atomic.Bool // manager state
names []string // sorted list of registered sources
}
var (
// ErrDBManagerClosed is returned when attempting to use a closed manager
ErrDBManagerClosed = errors.New("manager is closed")
// ErrSourceExists is returned when attempting to add a duplicate source
ErrSourceExists = errors.New("source already exists")
// ErrSourceNotFound is returned when attempting to access a non-existent source
ErrSourceNotFound = errors.New("source not found")
)
// Get retrieves an existing database connection or creates a new one if it doesn't exist.
// It returns the database connection, its driver, and any error that occurred.
// This method is thread-safe and ensures only one connection is created per source.
func (m *DBManager) Get(name string) (*sql.DB, driver.Driver, error) {
if c, ok := m.conns.Load(name); ok {
c := c.(*conn)
return c.db, c.drv, nil
}
if m.closed.Load() {
return nil, nil, ErrDBManagerClosed
}
m.mu.RLock()
if m.closed.Load() {
m.mu.RUnlock()
return nil, nil, ErrDBManagerClosed
}
source, exists := m.sources[name]
m.mu.RUnlock()
if !exists {
return nil, nil, fmt.Errorf("%w: %s", ErrSourceNotFound, name)
}
return m.connect(name, source)
}
// connect establishes a new database connection using the provided source configuration.
// It ensures thread-safe connection initialization using sync.Once and properly
// configures connection pool parameters.
func (m *DBManager) connect(name string, source Source) (db *sql.DB, drv driver.Driver, err error) {
actual, loaded := m.conns.LoadOrStore(name, &conn{})
c := actual.(*conn)
if loaded {
return c.db, c.drv, nil
}
c.once.Do(func() {
drv, err = driver.Get(source.Driver)
if err != nil {
err = fmt.Errorf("failed to get driver: %w", err)
return
}
db, err = driver.Connect(
source.Driver,
source.DSN,
driver.ConnectWithMaxOpenConnNum(source.MaxOpenConns),
driver.ConnectWithMaxIdleConnNum(source.MaxIdleConns),
driver.ConnectWithMaxConnLifetime(source.ConnMaxLifetime),
driver.ConnectWithMaxIdleConnLifetime(source.ConnMaxIdleTime),
)
if err != nil {
err = fmt.Errorf("failed to create connection: %w", err)
return
}
c.db = db
c.drv = drv
})
if err != nil {
m.conns.Delete(name)
}
return c.db, c.drv, err
}
// Add registers a new database source configuration with the manager.
// It returns an error if the source already exists or if the manager is closed.
// This method is thread-safe and prevents duplicate source registration.
func (m *DBManager) Add(name string, source Source) error {
if m.closed.Load() {
return ErrDBManagerClosed
}
m.mu.Lock()
defer m.mu.Unlock()
if m.closed.Load() {
return ErrDBManagerClosed
}
if m.sources == nil {
m.sources = make(map[string]Source)
}
if _, exists := m.sources[name]; exists {
return fmt.Errorf("%w: %s", ErrSourceExists, name)
}
m.sources[name] = source
m.names = append(m.names, name)
return nil
}
func (m *DBManager) Registered() []string {
m.mu.RLock()
defer m.mu.RUnlock()
return m.names
}
// Close gracefully shuts down all managed database connections.
// It ensures that all resources are properly released and prevents new connections
// from being established. This method is idempotent and thread-safe.
func (m *DBManager) Close() error {
if m.closed.Load() {
return nil
}
m.mu.Lock()
defer m.mu.Unlock()
if m.closed.Load() {
return nil
}
var errs []error
m.conns.Range(func(key, value interface{}) bool {
c := value.(*conn)
if err := c.db.Close(); err != nil {
errs = append(errs, fmt.Errorf("failed to close %v: %w", key, err))
}
return true
})
m.closed.Store(true)
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}
// newDBManagerFromConfiguration creates a new DBManager instance using the provided
// configuration. It initializes all configured database sources and validates their
// parameters before adding them to the manager.
func newDBManagerFromConfiguration(cfg IConfiguration) (*DBManager, error) {
m := &DBManager{
sources: make(map[string]Source),
}
for name, env := range cfg.Environments().Iter() {
if err := m.Add(name, Source{
Driver: env.Driver,
DSN: env.DataSource,
MaxOpenConns: env.MaxOpenConnNum,
MaxIdleConns: env.MaxIdleConnNum,
ConnMaxLifetime: time.Duration(env.MaxConnLifetime) * time.Second,
ConnMaxIdleTime: time.Duration(env.MaxIdleConnLifetime) * time.Second,
}); err != nil {
return nil, fmt.Errorf("failed to add source %s: %w", name, err)
}
}
return m, nil
}