Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gen4: Add Limit clause support #7312

Merged
merged 5 commits into from
Jan 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions go/vt/vtgate/planbuilder/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ func init() {
vindexes.Register("costly", newCostlyIndex)
}

const samePlanMarker = "Gen4 plan same as above\n"

func TestPlan(t *testing.T) {
vschemaWrapper := &vschemaWrapper{
v: loadSchema(t, "schema_test.json"),
Expand Down Expand Up @@ -418,9 +420,6 @@ func testFile(t *testing.T, filename, tempDir string, vschema *vschemaWrapper, c
if tcase.output2ndPlanner == "" {
empty = true
}
if tcase.output2ndPlanner == "{\n}\n" {
tcase.output2ndPlanner = tcase.output
}

vschema.version = V4
out, err := getPlanOutput(tcase, vschema)
Expand All @@ -444,7 +443,7 @@ func testFile(t *testing.T, filename, tempDir string, vschema *vschemaWrapper, c
}

if tcase.output == out {
expected.WriteString("{\n}\n")
expected.WriteString(samePlanMarker)
} else {
expected.WriteString(out)
}
Expand Down Expand Up @@ -555,7 +554,9 @@ func iterateExecFile(name string) (testCaseIterator chan testCase) {
if err != nil && err != io.EOF {
panic(fmt.Sprintf("error reading file %s line# %d: %s", name, lineno, err.Error()))
}
if len(binput) > 0 && (binput[0] == '"' || binput[0] == '{') {
if len(binput) > 0 && string(binput) == samePlanMarker {
output2Planner = output
} else if len(binput) > 0 && (binput[0] == '"' || binput[0] == '{') {
output2Planner = append(output2Planner, binput...)
for {
l, err := r.ReadBytes('\n')
Expand All @@ -573,9 +574,6 @@ func iterateExecFile(name string) (testCaseIterator chan testCase) {
break
}
}
if string(output2Planner) == "{\n}" {
output2Planner = output
}
}

testCaseIterator <- testCase{
Expand Down
54 changes: 35 additions & 19 deletions go/vt/vtgate/planbuilder/route_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,38 +65,63 @@ func newBuildSelectPlan(sel *sqlparser.Select, vschema ContextVSchema) (engine.P
return nil, err
}

plan, err = planLimit(sel.Limit, plan)
if err != nil {
return nil, err
}

if err := plan.WireupV4(semTable); err != nil {
return nil, err
}
return plan.Primitive(), nil
}

func planLimit(limit *sqlparser.Limit, plan logicalPlan) (logicalPlan, error) {
if limit == nil {
return plan, nil
}
rb, ok := plan.(*route)
if ok && rb.isSingleShard() {
rb.SetLimit(limit)
return plan, nil
}

lPlan, err := createLimit(plan, limit)
if err != nil {
return nil, err
}

// visit does not modify the plan.
_, err = visit(lPlan, setUpperLimit)
if err != nil {
return nil, err
}
return lPlan, nil
}

func planProjections(sel *sqlparser.Select, plan logicalPlan, semTable *semantics.SemTable) error {
rb, ok := plan.(*route)
if ok {
ast := rb.Select.(*sqlparser.Select)
ast.Distinct = sel.Distinct
ast.GroupBy = sel.GroupBy
ast.OrderBy = sel.OrderBy
ast.Limit = sel.Limit
ast.SelectExprs = sel.SelectExprs
ast.Comments = sel.Comments
} else {
var projections []*sqlparser.AliasedExpr

// TODO real horizon planning to be done
for _, expr := range sel.SelectExprs {
switch e := expr.(type) {
case *sqlparser.AliasedExpr:
projections = append(projections, e)
if _, err := pushProjection(e, plan, semTable); err != nil {
return err
}
default:
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "not yet supported %T", e)
}
}

if _, err := pushProjection(projections, plan, semTable); err != nil {
return err
}
}
return nil
}
Expand Down Expand Up @@ -433,27 +458,18 @@ func transformToLogicalPlan(tree joinTree, semTable *semantics.SemTable) (logica
func transformJoinPlan(n *joinPlan, semTable *semantics.SemTable) (*joinV4, error) {
lhsColList := extractColumnsNeededFromLHS(n, semTable, n.lhs.tables())

var lhsColExpr []*sqlparser.AliasedExpr
for _, col := range lhsColList {
lhsColExpr = append(lhsColExpr, &sqlparser.AliasedExpr{
Expr: col,
})
}

lhs, err := transformToLogicalPlan(n.lhs, semTable)
if err != nil {
return nil, err
}
offset, err := pushProjection(lhsColExpr, lhs, semTable)
if err != nil {
return nil, err
}

vars := map[string]int{}

for _, col := range lhsColList {
offset, err := pushProjection(&sqlparser.AliasedExpr{Expr: col}, lhs, semTable)
if err != nil {
return nil, err
}
vars[col.CompliantName("")] = offset
offset++
}

rhs, err := transformToLogicalPlan(n.rhs, semTable)
Expand Down
55 changes: 17 additions & 38 deletions go/vt/vtgate/planbuilder/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,55 +62,34 @@ func buildSelectPlan(query string) func(sqlparser.Statement, ContextVSchema) (en
}
}

func pushProjection(expr []*sqlparser.AliasedExpr, plan logicalPlan, semTable *semantics.SemTable) (firstOffset int, err error) {
func pushProjection(expr *sqlparser.AliasedExpr, plan logicalPlan, semTable *semantics.SemTable) (firstOffset int, err error) {
switch node := plan.(type) {
case *route:
sel := node.Select.(*sqlparser.Select)
offset := len(sel.SelectExprs)
for _, e := range expr {
sel.SelectExprs = append(sel.SelectExprs, e)
}
sel.SelectExprs = append(sel.SelectExprs, expr)
return offset, nil
case *joinV4:
cols := make([]int, len(expr))
var lhs, rhs []*sqlparser.AliasedExpr
lhsSolves := node.Left.ContainsTables()
rhsSolves := node.Right.ContainsTables()
for i, e := range expr {
deps := semTable.Dependencies(e.Expr)
switch {
case deps.IsSolvedBy(lhsSolves):
lhs = append(lhs, e)
cols[i] = -1
case deps.IsSolvedBy(rhsSolves):
rhs = append(rhs, e)
cols[i] = 1
default:
return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unknown dependencies for %s", sqlparser.String(e.Expr))
deps := semTable.Dependencies(expr.Expr)
switch {
case deps.IsSolvedBy(lhsSolves):
offset, err := pushProjection(expr, node.Left, semTable)
if err != nil {
return 0, err
}
}
lOffset, err := pushProjection(lhs, node.Left, semTable)
if err != nil {
return 0, err
}
rOffset, err := pushProjection(rhs, node.Right, semTable)
if err != nil {
return 0, err
}
rOffset++
lOffset = -(lOffset + 1)
for i, col := range cols {
if col == -1 {
cols[i] = lOffset
lOffset--
} else {
cols[i] = rOffset
rOffset++
node.Cols = append(node.Cols, -(offset + 1))
case deps.IsSolvedBy(rhsSolves):
offset, err := pushProjection(expr, node.Right, semTable)
if err != nil {
return 0, err
}
node.Cols = append(node.Cols, offset+1)
default:
return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unknown dependencies for %s", sqlparser.String(expr))
}
node.Cols = cols
return 0, nil

return len(node.Cols) - 1, nil
default:
return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "not yet supported %T", node)
}
Expand Down
42 changes: 14 additions & 28 deletions go/vt/vtgate/planbuilder/testdata/aggr_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
"Table": "unsharded"
}
}
{
}
Gen4 plan same as above

# Aggregate on unique sharded
"select count(*), col from user where id = 1"
Expand All @@ -41,8 +40,7 @@
"Vindex": "user_index"
}
}
{
}
Gen4 plan same as above

# Aggregate detection (non-aggregate function)
"select fun(1), col from user"
Expand All @@ -61,8 +59,7 @@
"Table": "user"
}
}
{
}
Gen4 plan same as above

# select distinct with unique vindex for scatter route.
"select distinct col1, id from user"
Expand All @@ -81,8 +78,7 @@
"Table": "user"
}
}
{
}
Gen4 plan same as above

# distinct and group by together for single route.
"select distinct col1, id from user group by col1"
Expand All @@ -101,8 +97,7 @@
"Table": "user"
}
}
{
}
Gen4 plan same as above

# scatter group by a text column
"select count(*), a, textcol1, b from user group by a, textcol1, b"
Expand Down Expand Up @@ -352,8 +347,7 @@
"Table": "user"
}
}
{
}
Gen4 plan same as above

# group by a unique vindex and other column should use a simple route
"select id, col, count(*) from user group by id, col"
Expand All @@ -372,8 +366,7 @@
"Table": "user"
}
}
{
}
Gen4 plan same as above

# group by a non-vindex column should use an OrderdAggregate primitive
"select col, count(*) from user group by col"
Expand Down Expand Up @@ -452,8 +445,7 @@
"Table": "user"
}
}
{
}
Gen4 plan same as above

# group by a unique vindex where alias from select list is used
"select id as val, 1+count(*) from user group by val"
Expand All @@ -472,8 +464,7 @@
"Table": "user"
}
}
{
}
Gen4 plan same as above

# group by a unique vindex where expression is qualified (alias should be ignored)
"select val as id, 1+count(*) from user group by user.id"
Expand All @@ -492,8 +483,7 @@
"Table": "user"
}
}
{
}
Gen4 plan same as above

# group by a unique vindex where it should skip non-aliased expressions.
"select *, id, 1+count(*) from user group by id"
Expand All @@ -512,8 +502,7 @@
"Table": "user"
}
}
{
}
Gen4 plan same as above

# group by a unique vindex should revert to simple route, and having clause should find the correct symbols.
"select id, count(*) c from user group by id having id=1 and c=10"
Expand Down Expand Up @@ -668,8 +657,7 @@
"Table": "user"
}
}
{
}
Gen4 plan same as above

# count with distinct unique vindex
"select col, count(distinct id) from user group by col"
Expand Down Expand Up @@ -1221,8 +1209,7 @@
"Vindex": "user_index"
}
}
{
}
Gen4 plan same as above

# routing rules for aggregates
"select id, count(*) from route2 group by id"
Expand Down Expand Up @@ -1259,8 +1246,7 @@
"Table": "ref"
}
}
{
}
Gen4 plan same as above

# distinct and aggregate functions missing group by
"select distinct a, count(*) from user"
Expand Down
Loading