Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add collection-level lock #7

Merged
merged 6 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 19 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ players.Query(func(txn *column.Txn) error {
return true
})

// No error, txn.Commit() will be called
// No error, transaction will be committed
return nil
})
```
Expand All @@ -290,25 +290,11 @@ players.Query(func(txn *column.Txn) error {
return true
})

// Returns an error, txn.Rollback() will be called
// Returns an error, transaction will be rolled back
return fmt.Errorf("bug")
})
```

You can (but probablty won't need to) call `Commit()` or `Rollback()` manually, as many times as required. This could be handy to do partial updates but calling them too often will have a performance hit on your application.

```go
// Range over all of the players and update (successfully their balance)
players.Query(func(txn *column.Txn) error {
txn.Range("balance", func(v column.Cursor) bool {
v.Update(10.0) // Update the "balance" to 10.0
return true
})

txn.Commit() // Manually commit all of the changes
return nil // This will call txn.Commit() again, but will be a no-op
})
```

## Streaming Changes

Expand Down Expand Up @@ -423,50 +409,50 @@ func main(){

## Benchmarks

The benchmarks below were ran on a collection of *500 items* containing a dozen columns. Feel free to explore the benchmarks but I strongly recommend testing it on your actual dataset.
The benchmarks below were ran on a collection of **100,000 items** containing a dozen columns. Feel free to explore the benchmarks but I strongly recommend testing it on your actual dataset.

```
cpu: Intel(R) Core(TM) i7-9700K CPU @ 3.60GHz
BenchmarkCollection/insert-8 5013795 239.9 ns/op 27 B/op 0 allocs/op
BenchmarkCollection/fetch-8 23730796 50.63 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/scan-8 234990 4743 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/count-8 7965873 152.7 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/range-8 1512513 799.9 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/update-at-8 5409420 224.7 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/update-all-8 196626 6099 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/delete-at-8 2006052 594.9 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/delete-all-8 1889685 643.2 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/insert-8 5545016 216.8 ns/op 18 B/op 0 allocs/op
BenchmarkCollection/fetch-8 27272726 43.61 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/scan-8 648 1844623 ns/op 147 B/op 0 allocs/op
BenchmarkCollection/count-8 1000000 1107 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/range-8 10000 102549 ns/op 9 B/op 0 allocs/op
BenchmarkCollection/update-at-8 4316584 280.7 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/update-all-8 826 1379693 ns/op 53068 B/op 0 allocs/op
BenchmarkCollection/delete-at-8 7059126 169.1 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/delete-all-8 196734 6294 ns/op 0 B/op 0 allocs/op
```

When testing for larger collections, I added a small example (see `examples` folder) and ran it with **20 million rows** inserted, each entry has **12 columns and 4 indexes** that need to be calculated, and a few queries and scans around them.

```
running insert of 20000000 rows...
-> insert took 52.8255618s
-> insert took 38.6921853s

running full scan of age >= 30...
-> result = 10200000
-> full scan took 176.01008ms
-> full scan took 171.712196ms

running full scan of class == "rogue"...
-> result = 7160000
-> full scan took 196.153362ms
-> full scan took 199.24443ms

running indexed query of human mages...
-> result = 1360000
-> indexed query took 581.158µs
-> indexed query took 574µs

running indexed query of human female mages...
-> result = 640000
-> indexed query took 753.122µs
-> indexed query took 747.148µs

running update of balance of everyone...
-> updated 20000000 rows
-> update took 301.888912ms
-> update took 317.528908ms

running update of age of mages...
-> updated 6040000 rows
-> update took 93.835876ms
-> update took 98.655836ms
```

## Contributing
Expand Down
16 changes: 7 additions & 9 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ const (

// Collection represents a collection of objects in a columnar format
type Collection struct {
lock sync.RWMutex // The lock for fill list
count uint64 // The current count of elements
lock sync.RWMutex // The global lock for both fill-list & transactions
cols columns // The map of columns
fill bitmap.Bitmap // The fill-list
size int // The initial size for new columns
count int // The current count of elements
writer commit.Writer // The commit writer
cancel context.CancelFunc // The cancellation function for the context
}
Expand Down Expand Up @@ -80,10 +80,11 @@ func NewCollection(opts ...Options) *Collection {

// next finds the next free index in the collection, atomically.
func (c *Collection) next() uint32 {
atomic.AddUint64(&c.count, 1)

c.lock.Lock()
idx := c.findFreeIndex()
c.fill.Set(idx)
c.count++
c.lock.Unlock()
return idx
}
Expand Down Expand Up @@ -152,10 +153,7 @@ func (c *Collection) DeleteAt(idx uint32) (deleted bool) {

// Count returns the total number of elements in the collection.
func (c *Collection) Count() (count int) {
c.lock.RLock()
count = c.count
c.lock.RUnlock()
return
return int(atomic.LoadUint64(&c.count))
}

// CreateColumnsOf registers a set of columns that are present in the target object.
Expand Down Expand Up @@ -252,14 +250,14 @@ func (c *Collection) Query(fn func(txn *Txn) error) error {

// Execute the query and keep the error for later
if err := fn(txn); err != nil {
txn.Rollback()
txn.rollback()
releaseTxn(txn)
return err
}

// Now that the iteration has finished, we can range over the pending action
// queue and apply all of the actions that were requested by the Selector.
txn.Commit()
txn.commit()
releaseTxn(txn)
return nil
}
Expand Down
18 changes: 9 additions & 9 deletions collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ import (
)

/*
BenchmarkCollection/insert-8 5069828 236.8 ns/op 18 B/op 0 allocs/op
BenchmarkCollection/fetch-8 24749718 52.37 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/scan-8 654 1863383 ns/op 153 B/op 0 allocs/op
BenchmarkCollection/count-8 973424 1104 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/range-8 9499 125914 ns/op 3 B/op 0 allocs/op
BenchmarkCollection/update-at-8 3583278 337.1 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/update-all-8 781 1378816 ns/op 74813 B/op 0 allocs/op
BenchmarkCollection/delete-at-8 725270 1612 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/delete-all-8 169120 6833 ns/op 1 B/op 0 allocs/op
BenchmarkCollection/insert-8 5545016 216.8 ns/op 18 B/op 0 allocs/op
BenchmarkCollection/fetch-8 27272726 43.61 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/scan-8 648 1844623 ns/op 147 B/op 0 allocs/op
BenchmarkCollection/count-8 1000000 1107 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/range-8 10000 102549 ns/op 9 B/op 0 allocs/op
BenchmarkCollection/update-at-8 4316584 280.7 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/update-all-8 826 1379693 ns/op 53068 B/op 0 allocs/op
BenchmarkCollection/delete-at-8 7059126 169.1 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/delete-all-8 196734 6294 ns/op 0 B/op 0 allocs/op
*/
func BenchmarkCollection(b *testing.B) {
amount := 100000
Expand Down
81 changes: 7 additions & 74 deletions column.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package column

import (
"reflect"
"sync"

"github.com/kelindar/bitmap"
"github.com/kelindar/column/commit"
Expand Down Expand Up @@ -117,7 +116,6 @@ func ForKind(kind reflect.Kind) Column {

// column represents a column wrapper that synchronizes operations
type column struct {
sync.RWMutex
Column
kind columnType // The type of the colum
name string // The name of the column
Expand All @@ -142,27 +140,6 @@ func (c *column) IsTextual() bool {
return (c.kind & typeTextual) == typeTextual
}

// Intersect performs a logical and operation and updates the destination bitmap.
func (c *column) Intersect(dst *bitmap.Bitmap) {
c.RLock()
dst.And(*c.Index())
c.RUnlock()
}

// Difference performs a logical and not operation and updates the destination bitmap.
func (c *column) Difference(dst *bitmap.Bitmap) {
c.RLock()
dst.AndNot(*c.Index())
c.RUnlock()
}

// Union performs a logical or operation and updates the destination bitmap.
func (c *column) Union(dst *bitmap.Bitmap) {
c.RLock()
dst.Or(*c.Index())
c.RUnlock()
}

// Update performs a series of updates at once
func (c *column) Update(updates []commit.Update, growUntil uint32) {
c.Column.Grow(growUntil)
Expand All @@ -171,91 +148,47 @@ func (c *column) Update(updates []commit.Update, growUntil uint32) {

// Delete deletes a set of items from the column.
func (c *column) Delete(items *bitmap.Bitmap) {
c.Lock()
c.Column.Delete(items)
c.Unlock()
}

// Contains checks whether the column has a value at a specified index.
func (c *column) Contains(idx uint32) (exists bool) {
c.RLock()
exists = c.Column.Contains(idx)
c.RUnlock()
return
}

// Value retrieves a value at a specified index
func (c *column) Value(idx uint32) (v interface{}, ok bool) {
c.RLock()
v, ok = c.loadValue(idx)
c.RUnlock()
v, ok = c.Column.Value(idx)
return
}

// Value retrieves a value at a specified index
func (c *column) String(idx uint32) (v string, ok bool) {
c.RLock()
v, ok = c.loadString(idx)
c.RUnlock()
return
}

// Float64 retrieves a float64 value at a specified index
func (c *column) Float64(idx uint32) (v float64, ok bool) {
c.RLock()
v, ok = c.loadFloat64(idx)
c.RUnlock()
return
}

// Int64 retrieves an int64 value at a specified index
func (c *column) Int64(idx uint32) (v int64, ok bool) {
c.RLock()
v, ok = c.loadInt64(idx)
c.RUnlock()
return
}

// Uint64 retrieves an uint64 value at a specified index
func (c *column) Uint64(idx uint32) (v uint64, ok bool) {
c.RLock()
v, ok = c.loadUint64(idx)
c.RUnlock()
return
}

// loadValue (unlocked) retrieves a value at a specified index
func (c *column) loadValue(idx uint32) (v interface{}, ok bool) {
v, ok = c.Column.Value(idx)
return
}

// loadFloat64 (unlocked) retrieves a float64 value at a specified index
func (c *column) loadString(idx uint32) (v string, ok bool) {
if column, ok := c.Column.(Textual); ok {
v, ok = column.LoadString(idx)
}
return
}

// loadFloat64 (unlocked) retrieves a float64 value at a specified index
func (c *column) loadFloat64(idx uint32) (v float64, ok bool) {
// Float64 retrieves a float64 value at a specified index
func (c *column) Float64(idx uint32) (v float64, ok bool) {
if n, contains := c.Column.(Numeric); contains {
v, ok = n.LoadFloat64(idx)
}
return
}

// loadInt64 (unlocked) retrieves an int64 value at a specified index
func (c *column) loadInt64(idx uint32) (v int64, ok bool) {
// Int64 retrieves an int64 value at a specified index
func (c *column) Int64(idx uint32) (v int64, ok bool) {
if n, contains := c.Column.(Numeric); contains {
v, ok = n.LoadInt64(idx)
}
return
}

// loadUint64 (unlocked) retrieves an uint64 value at a specified index
func (c *column) loadUint64(idx uint32) (v uint64, ok bool) {
// Uint64 retrieves an uint64 value at a specified index
func (c *column) Uint64(idx uint32) (v uint64, ok bool) {
if n, contains := c.Column.(Numeric); contains {
v, ok = n.LoadUint64(idx)
}
Expand Down
3 changes: 0 additions & 3 deletions column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,10 @@ func testColumn(t *testing.T, column Column, value interface{}) {
assert.Equal(t, value, v)
assert.True(t, ok)

fmt.Printf("%v\n", column.Index())

// Delete the value and update again
column.Delete(&bitmap.Bitmap{0xffffffffffffffff})
_, ok = column.Value(9)
assert.False(t, ok)
fmt.Printf("%v\n", column.Index())
column.Update([]commit.Update{{
Type: commit.Put,
Index: 9,
Expand Down
11 changes: 1 addition & 10 deletions examples/million/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

func main() {
amount, runs := 1000000, 50
amount, runs := 20000000, 50
players := column.NewCollection(column.Options{
Capacity: amount,
})
Expand Down Expand Up @@ -68,9 +68,6 @@ func main() {
return txn.Range("balance", func(v column.Cursor) bool {
updates++
v.Update(1000.0)
if updates%10000 == 0 {
txn.Commit() // Avoid big transaction to reduce memory used
}
return true
})
})
Expand All @@ -84,9 +81,6 @@ func main() {
return txn.With("mage").Range("age", func(v column.Cursor) bool {
updates++
v.Update(99.0)
if updates%10000 == 0 {
txn.Commit() // Avoid big transaction to reduce memory used
}
return true
})
})
Expand All @@ -100,9 +94,6 @@ func main() {
return txn.With("male").Range("name", func(v column.Cursor) bool {
updates++
v.Update("Sir " + v.String())
if updates%10000 == 0 {
txn.Commit() // Avoid big transaction to reduce memory used
}
return true
})
})
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/kelindar/column
go 1.16

require (
github.com/cheekybits/genny v1.0.0
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/kelindar/bitmap v1.0.8
github.com/stretchr/testify v1.7.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE=
github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
Loading