Skip to content

Commit

Permalink
Introduce "Segments Tree Tracker"
Browse files Browse the repository at this point in the history
Currently the flattener and more specifically "NameTracker" is not hierarchy aware, to explain that let's examine this case:
Pattern: `{ "context": { "user_id": [1] } }`
Event: `{ "context": { "user_id": 1 }, "payload": { /* lots of fields, large payload */ }`

In this case the flattener will read context and then user_id, all the other values (strings, objects and arrays) will be skipped, but still will get traversed.
By replacing the "NameTracker" with "SegmentsTreeTracker" which is aware of the hierarchy, we can:
* Know that once we read "user_id" we can stop consuming the event and exit
* Cache the paths - so `pathForChild` is not needed and we reduce allocations.

```
name                                         old time/op    new time/op    delta
CityLots-10                                    4.42µs ± 0%    4.28µs ± 0%   ~     (p=1.000 n=1+1)
_JsonFlattener_ContextFields-10                9.06µs ± 0%    0.21µs ± 0%   ~     (p=1.000 n=1+1)
_JsonFlattener_MiddleNestedField-10            10.9µs ± 0%     1.8µs ± 0%   ~     (p=1.000 n=1+1)
_JsonFlattener_LastField-10                    9.65µs ± 0%    9.68µs ± 0%   ~     (p=1.000 n=1+1)
_JsonFlattner_Evaluate_ContextFields-10        9.42µs ± 0%    0.50µs ± 0%   ~     (p=1.000 n=1+1)
_JsonFlattner_Evaluate_MiddleNestedField-10    11.0µs ± 0%     2.0µs ± 0%   ~     (p=1.000 n=1+1)
_JsonFlattner_Evaluate_LastField-10            9.81µs ± 0%    9.87µs ± 0%   ~     (p=1.000 n=1+1)

name                                         old alloc/op   new alloc/op   delta
CityLots-10                                      985B ± 0%      832B ± 0%   ~     (p=1.000 n=1+1)
_JsonFlattener_ContextFields-10                 48.0B ± 0%      0.0B        ~     (p=1.000 n=1+1)
_JsonFlattener_MiddleNestedField-10             64.0B ± 0%      0.0B        ~     (p=1.000 n=1+1)
_JsonFlattener_LastField-10                     32.0B ± 0%      0.0B        ~     (p=1.000 n=1+1)
_JsonFlattner_Evaluate_ContextFields-10          232B ± 0%      184B ± 0%   ~     (p=1.000 n=1+1)
_JsonFlattner_Evaluate_MiddleNestedField-10      240B ± 0%       48B ± 0%   ~     (p=1.000 n=1+1)
_JsonFlattner_Evaluate_LastField-10             80.0B ± 0%     48.0B ± 0%   ~     (p=1.000 n=1+1)

name                                         old allocs/op  new allocs/op  delta
CityLots-10                                      38.0 ± 0%      31.0 ± 0%   ~     (p=1.000 n=1+1)
_JsonFlattener_ContextFields-10                  3.00 ± 0%      0.00        ~     (p=1.000 n=1+1)
_JsonFlattener_MiddleNestedField-10              4.00 ± 0%      0.00        ~     (p=1.000 n=1+1)
_JsonFlattener_LastField-10                      2.00 ± 0%      0.00        ~     (p=1.000 n=1+1)
_JsonFlattner_Evaluate_ContextFields-10          9.00 ± 0%      6.00 ± 0%   ~     (p=1.000 n=1+1)
_JsonFlattner_Evaluate_MiddleNestedField-10      9.00 ± 0%      3.00 ± 0%   ~     (p=1.000 n=1+1)
_JsonFlattner_Evaluate_LastField-10              5.00 ± 0%      3.00 ± 0%   ~     (p=1.000 n=1+1)
```

Signed-off-by: Yosi Attias <[email protected]>
  • Loading branch information
yosiat committed Nov 26, 2022
1 parent b04e6d5 commit 61eabea
Show file tree
Hide file tree
Showing 23 changed files with 511 additions and 202 deletions.
4 changes: 2 additions & 2 deletions anything_but_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestParseAnythingButPattern(t *testing.T) {
}

for i, good := range goods {
fields, _, err := patternFromJSON([]byte(good))
fields, err := patternFromJSON([]byte(good))
if err != nil {
t.Errorf("parse anything-but i=%d: "+err.Error(), i)
}
Expand All @@ -139,7 +139,7 @@ func TestParseAnythingButPattern(t *testing.T) {
}

for _, bad := range bads {
_, _, err := patternFromJSON([]byte(bad))
_, err := patternFromJSON([]byte(bad))
if err == nil {
t.Errorf(`accepted anything-but "%s"`, bad)
}
Expand Down
29 changes: 14 additions & 15 deletions arrays_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,30 +70,29 @@ const bands = `{
}`

func TestArrayCorrectness(t *testing.T) {
// only pattern3 should match
pattern1 := `{"bands": { "members": { "given": [ "Mick" ], "surname": [ "Strummer" ] } } }`
pattern2 := `{"bands": { "members": { "given": [ "Wata" ], "role": [ "drums" ] } } }`
pattern3 := `{"bands": { "members": { "given": [ "Wata" ], "role": [ "guitar" ] } } }`
// only wataGuiterPattern should match
mickStrummerPattern := `{"bands": { "members": { "given": [ "Mick" ], "surname": [ "Strummer" ] } } }`
wataDrumsPattern := `{"bands": { "members": { "given": [ "Wata" ], "role": [ "drums" ] } } }`
wataGuiterPattern := `{"bands": { "members": { "given": [ "Wata" ], "role": [ "guitar" ] } } }`

m := newCoreMatcher()
err := m.addPattern("Mick strummer", pattern1)
if err != nil {
t.Error(err.Error())
if err := m.addPattern("Mick strummer", mickStrummerPattern); err != nil {
t.Errorf("Failed adding pattern: %s: %s", mickStrummerPattern, err)
}
err = m.addPattern("Wata drums", pattern2)
if err != nil {
t.Error(err.Error())

if err := m.addPattern("Wata drums", wataDrumsPattern); err != nil {
t.Errorf("Failed adding pattern: %s: %s", wataDrumsPattern, err)
}
err = m.addPattern("Wata guitar", pattern3)
if err != nil {
t.Error(err.Error())
if err := m.addPattern("Wata guitar", wataGuiterPattern); err != nil {
t.Errorf("Failed adding pattern: %s: %s", wataGuiterPattern, err)
}

matches, err := m.matchesForJSONEvent([]byte(bands))
if err != nil {
t.Error(err.Error())
t.Errorf("Failed 'matchesForJSONEvent': %s", err)
}

if len(matches) != 1 || matches[0].(string) != "Wata guitar" {
t.Error("Matches across array boundaries")
t.Errorf("Expected to get a single of 'Wata guiter', but got %d matches: %+v", len(matches), matches)
}
}
2 changes: 1 addition & 1 deletion benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestBigShellStyle(t *testing.T) {
before := time.Now()
fj := newJSONFlattener()
for _, line := range lines {
fields, err := fj.Flatten(line, m)
fields, err := fj.Flatten(line, m.getSegmentsTreeTracker())
if err != nil {
t.Error("Flatten: " + err.Error())
}
Expand Down
32 changes: 13 additions & 19 deletions core_matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,12 @@ type coreMatcher struct {

// coreFields groups the updateable fields in coreMatcher.
// state is the start of the automaton.
// namesUsed is a map of field names that are used in any of the patterns that this automaton encodes. Typically,
// segmentsTree is a tree of segments that are used in any of the patterns that this automaton encodes.Typically,
// patterns only consider a subset of the fields in an incoming data object, and there is no reason to consider
// fields that do not appear in patterns when using the automaton for matching.
// fakeField is used when the flattener for an event returns no fields, because it could still match if
// there were patterns with "exists":false. So in this case we run one fake field through the matcher
// which will cause it to notice that any "exists":false patterns should match.
type coreFields struct {
state *fieldMatcher
namesUsed map[string]bool
state *fieldMatcher
segmentsTree *segmentsTree
}

func newCoreMatcher() *coreMatcher {
Expand All @@ -49,8 +46,8 @@ func newCoreMatcher() *coreMatcher {
// user-supplied path-name because it's not valid in UTF-8
m := coreMatcher{}
m.updateable.Store(&coreFields{
state: newFieldMatcher(),
namesUsed: make(map[string]bool),
state: newFieldMatcher(),
segmentsTree: newSegmentsIndex(),
})
return &m
}
Expand All @@ -62,7 +59,7 @@ func (m *coreMatcher) start() *coreFields {
// addPattern - the patternBytes is a JSON object. The X is what the matcher returns to indicate that the
// provided pattern has been matched. In many applications it might be a string which is the pattern's name.
func (m *coreMatcher) addPattern(x X, patternJSON string) error {
patternFields, patternNamesUsed, err := patternFromJSON([]byte(patternJSON))
patternFields, err := patternFromJSON([]byte(patternJSON))
if err != nil {
return err
}
Expand All @@ -75,15 +72,13 @@ func (m *coreMatcher) addPattern(x X, patternJSON string) error {

// we build up the new coreMatcher state in freshStart so we can atomically switch it in once complete
freshStart := &coreFields{}
freshStart.namesUsed = make(map[string]bool)
current := m.start()
freshStart.segmentsTree = current.segmentsTree.copy()
freshStart.state = current.state

for k := range current.namesUsed {
freshStart.namesUsed[k] = true
}
for used := range patternNamesUsed {
freshStart.namesUsed[used] = true
// Add paths to the segments tree index.
for _, field := range patternFields {
freshStart.segmentsTree.add(field.path)
}

// now we add each of the name/value pairs in fields slice to the automaton, starting with the start state -
Expand Down Expand Up @@ -132,7 +127,7 @@ func (m *coreMatcher) deletePatterns(_ X) error {
// This is a leftover from previous times, is only used by tests, but it's used by a *lot*
// so removing it would require a lot of tedious work
func (m *coreMatcher) matchesForJSONEvent(event []byte) ([]X, error) {
fields, err := newJSONFlattener().Flatten(event, m)
fields, err := newJSONFlattener().Flatten(event, m.getSegmentsTreeTracker())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -251,7 +246,6 @@ func noArrayTrailConflict(from []ArrayPos, to []ArrayPos) bool {
return true
}

func (m *coreMatcher) IsNameUsed(label []byte) bool {
_, ok := m.start().namesUsed[string(label)]
return ok
func (m *coreMatcher) getSegmentsTreeTracker() SegmentsTreeTracker {
return m.start().segmentsTree
}
9 changes: 0 additions & 9 deletions core_matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,6 @@ func TestSimpleaddPattern(t *testing.T) {
if err != nil {
t.Error(err.Error())
}
if len(m.start().namesUsed) != 2 {
t.Errorf("nameUsed = %d", len(m.start().namesUsed))
}
if !m.IsNameUsed([]byte("a")) {
t.Error("'a' not showing as used")
}
if !m.IsNameUsed([]byte("b")) {
t.Error("'b' not showing as used")
}
s0 := m.start().state
if len(s0.fields().transitions) != 1 {
t.Errorf("s0 trans len %d", len(s0.fields().transitions))
Expand Down
4 changes: 2 additions & 2 deletions escaping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ func TestReadMemberName(t *testing.T) {
j := `{"😀💋😺": 1, "x\u0078\ud83d\udc8by": "2"}`
m := fakeMatcher("😀💋😺", `xx💋y`)
f := newJSONFlattener()
fields, err := f.Flatten([]byte(j), m)
fields, err := f.Flatten([]byte(j), m.getSegmentsTreeTracker())
if err != nil {
t.Error("TRMN: " + err.Error())
}
Expand All @@ -26,7 +26,7 @@ func TestReadMemberName(t *testing.T) {
func TestStringValuesWithEscapes(t *testing.T) {
j := `{"a": "x\u0078\ud83d\udc8by", "b": "\ud83d\ude00\ud83d\udc8b\ud83d\ude3a"}`
f := newJSONFlattener()
fields, err := f.Flatten([]byte(j), fakeMatcher("a", "b"))
fields, err := f.Flatten([]byte(j), fakeMatcher("a", "b").getSegmentsTreeTracker())
if err != nil {
t.Error("TSVWE: " + err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type fakeFlattener struct {
r []quamina.Field
}

func (f *fakeFlattener) Flatten(_ []byte, _ quamina.NameTracker) ([]quamina.Field, error) {
func (f *fakeFlattener) Flatten(_ []byte, _ quamina.SegmentsTreeTracker) ([]quamina.Field, error) {
return f.r, nil
}

Expand Down
Loading

0 comments on commit 61eabea

Please sign in to comment.