Skip to content

Commit

Permalink
fix: event keys (#951)
Browse files Browse the repository at this point in the history
* feat: insert unique event keys

* fix: list query

* feat: bulk

* chore: gen
  • Loading branch information
grutt authored Oct 10, 2024
1 parent 9555813 commit 3340ec8
Show file tree
Hide file tree
Showing 11 changed files with 8,290 additions and 6,041 deletions.
14,103 changes: 8,093 additions & 6,010 deletions pkg/repository/prisma/db/db_gen.go

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions pkg/repository/prisma/dbsqlc/events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,25 @@ SELECT
FROM
events;

-- name: CreateEventKeys :exec
INSERT INTO "EventKey" (
"key",
"tenantId"
)
SELECT
unnest(@keys::text[]) AS "key",
unnest(@tenantIds::uuid[]) AS "tenantId"
ON CONFLICT ("key", "tenantId") DO NOTHING;

-- name: ListEventKeys :many
SELECT
"key"
FROM
"EventKey"
WHERE
"tenantId" = @tenantId::uuid
ORDER BY "key" ASC;

-- name: CreateEvent :one
INSERT INTO "Event" (
"id",
Expand Down
51 changes: 51 additions & 0 deletions pkg/repository/prisma/dbsqlc/events.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/repository/prisma/dbsqlc/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions pkg/repository/prisma/dbsqlc/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ CREATE TABLE "Event" (
CONSTRAINT "Event_pkey" PRIMARY KEY ("id")
);

-- CreateTable
CREATE TABLE "EventKey" (
"key" TEXT NOT NULL,
"tenantId" UUID NOT NULL,

CONSTRAINT "EventKey_pkey" PRIMARY KEY ("key")
);

-- CreateTable
CREATE TABLE "GetGroupKeyRun" (
"id" UUID NOT NULL,
Expand Down Expand Up @@ -968,6 +976,9 @@ CREATE INDEX "Event_tenantId_createdAt_idx" ON "Event"("tenantId" ASC, "createdA
-- CreateIndex
CREATE INDEX "Event_tenantId_idx" ON "Event"("tenantId" ASC);

-- CreateIndex
CREATE UNIQUE INDEX "EventKey_key_tenantId_key" ON "EventKey"("key" ASC, "tenantId" ASC);

-- CreateIndex
CREATE INDEX "GetGroupKeyRun_createdAt_idx" ON "GetGroupKeyRun"("createdAt" ASC);

Expand Down
107 changes: 77 additions & 30 deletions pkg/repository/prisma/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/google/uuid"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -167,30 +168,15 @@ func (r *eventAPIRepository) ListEvents(ctx context.Context, tenantId string, op
}

func (r *eventAPIRepository) ListEventKeys(tenantId string) ([]string, error) {
var rows []struct {
Key string `json:"key"`
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

err := r.client.Prisma.QueryRaw(
`
SELECT DISTINCT ON("Event"."key") "Event"."key"
FROM "Event"
WHERE
"Event"."tenantId"::text = $1
`,
tenantId,
).Exec(context.Background(), &rows)
keys, err := r.queries.ListEventKeys(ctx, r.pool, sqlchelpers.UUIDFromStr(tenantId))

if err != nil {
return nil, err
}

keys := make([]string, len(rows))

for i, row := range rows {
keys[i] = row.Key
}

return keys, nil
}

Expand Down Expand Up @@ -224,13 +210,14 @@ func (r *eventAPIRepository) ListEventsById(tenantId string, ids []string) ([]db
}

type eventEngineRepository struct {
pool *pgxpool.Pool
v validator.Validator
queries *dbsqlc.Queries
l *zerolog.Logger
m *metered.Metered
bulkCreateBuffer *TenantBufferManager[*repository.CreateEventOpts, *dbsqlc.Event]
callbacks []repository.Callback[*dbsqlc.Event]
pool *pgxpool.Pool
v validator.Validator
queries *dbsqlc.Queries
l *zerolog.Logger
m *metered.Metered
bulkCreateBuffer *TenantBufferManager[*repository.CreateEventOpts, *dbsqlc.Event]
callbacks []repository.Callback[*dbsqlc.Event]
createEventKeyCache *lru.Cache[string, bool]
}

func (r *eventEngineRepository) cleanup() error {
Expand All @@ -240,12 +227,15 @@ func (r *eventEngineRepository) cleanup() error {
func NewEventEngineRepository(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, m *metered.Metered) (repository.EventEngineRepository, func() error, error) {
queries := dbsqlc.New()

createEventKeyCache, _ := lru.New[string, bool](2000) // nolint: errcheck - this only returns an error if the size is less than 0

e := eventEngineRepository{
pool: pool,
v: v,
queries: queries,
l: l,
m: m,
pool: pool,
v: v,
queries: queries,
l: l,
m: m,
createEventKeyCache: createEventKeyCache,
}
err := e.startBufferLoop()

Expand All @@ -264,6 +254,44 @@ func (r *eventEngineRepository) GetEventForEngine(ctx context.Context, tenantId,
return r.queries.GetEventForEngine(ctx, r.pool, sqlchelpers.UUIDFromStr(id))
}

func (r *eventEngineRepository) createEventKeys(ctx context.Context, tx pgx.Tx, keys map[string]struct {
key string
tenantId string
}) error {

eventKeys := make([]string, 0)
eventKeysTenantIds := make([]pgtype.UUID, 0)

for _, eventKey := range keys {
cacheKey := fmt.Sprintf("%s-%s", eventKey.tenantId, eventKey.key)

// if the key is already in the cache, skip it
if _, ok := r.createEventKeyCache.Get(cacheKey); ok {
continue
}

r.l.Debug().Msgf("creating event key %s for tenant %s", eventKey.key, eventKey.tenantId)
eventKeys = append(eventKeys, eventKey.key)
eventKeysTenantIds = append(eventKeysTenantIds, sqlchelpers.UUIDFromStr(eventKey.tenantId))
}

err := r.queries.CreateEventKeys(ctx, tx, dbsqlc.CreateEventKeysParams{
Tenantids: eventKeysTenantIds,
Keys: eventKeys,
})

if err != nil {
return err
}

// add to cache
for i := range eventKeys {
r.createEventKeyCache.Add(fmt.Sprintf("%s-%s", sqlchelpers.UUIDToStr(eventKeysTenantIds[i]), eventKeys[i]), true)
}

return nil
}

func (r *eventEngineRepository) CreateEvent(ctx context.Context, opts *repository.CreateEventOpts) (*dbsqlc.Event, error) {
return metered.MakeMetered(ctx, r.m, dbsqlc.LimitResourceEVENT, opts.TenantId, 1, func() (*string, *dbsqlc.Event, error) {

Expand Down Expand Up @@ -335,6 +363,11 @@ func (r *eventEngineRepository) BulkCreateEvent(ctx context.Context, opts *repos
params := make([]dbsqlc.CreateEventsParams, len(opts.Events))
ids := make([]pgtype.UUID, len(opts.Events))

uniqueEventKeys := make(map[string]struct {
key string
tenantId string
})

for i, event := range opts.Events {
eventId := uuid.New().String()

Expand All @@ -350,6 +383,14 @@ func (r *eventEngineRepository) BulkCreateEvent(ctx context.Context, opts *repos
params[i].ReplayedFromId = sqlchelpers.UUIDFromStr(*event.ReplayedEvent)
}

uniqueEventKeys[fmt.Sprintf("%s-%s", event.TenantId, event.Key)] = struct {
key string
tenantId string
}{
key: event.Key,
tenantId: event.TenantId,
}

ids[i] = sqlchelpers.UUIDFromStr(eventId)
}

Expand All @@ -362,6 +403,12 @@ func (r *eventEngineRepository) BulkCreateEvent(ctx context.Context, opts *repos

defer deferRollback(ctx, r.l, tx.Rollback)

err = r.createEventKeys(ctx, tx, uniqueEventKeys)

if err != nil {
return nil, nil, fmt.Errorf("could not create event keys: %w", err)
}
// create events
insertCount, err := r.queries.CreateEvents(
ctx,
tx,
Expand Down
10 changes: 10 additions & 0 deletions prisma/migrations/20241008124029_v0_49_2/migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- CreateTable
CREATE TABLE "EventKey" (
"key" TEXT NOT NULL,
"tenantId" UUID NOT NULL,

CONSTRAINT "EventKey_pkey" PRIMARY KEY ("key")
);

-- CreateIndex
CREATE UNIQUE INDEX "EventKey_key_tenantId_key" ON "EventKey"("key", "tenantId");
7 changes: 7 additions & 0 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,13 @@ model APIToken {
webhookWorkers WebhookWorker[]
}

model EventKey {
key String @id
tenantId String @db.Uuid
@@unique([key, tenantId])
}

// Event represents an event in the database.
model Event {
// base fields
Expand Down
4 changes: 4 additions & 0 deletions sql/migrations/20241008124038_v0.49.2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Create "EventKey" table
CREATE TABLE "EventKey" ("key" text NOT NULL, "tenantId" uuid NOT NULL, PRIMARY KEY ("key"));
-- Create index "EventKey_key_tenantId_key" to table: "EventKey"
CREATE UNIQUE INDEX "EventKey_key_tenantId_key" ON "EventKey" ("key", "tenantId");
3 changes: 2 additions & 1 deletion sql/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:MdUIwxqQIsS9pvLCRU1mB2nai1wTMJ7Vjl3FRjCr6uw=
h1:4CduYOxaYNUq+L7PFTvWFiMncxTz5b43B6rgB/ryJUA=
20240115180414_init.sql h1:Ef3ZyjAHkmJPdGF/dEWCahbwgcg6uGJKnDxW2JCRi2k=
20240122014727_v0_6_0.sql h1:o/LdlteAeFgoHJ3e/M4Xnghqt9826IE/Y/h0q95Acuo=
20240126235456_v0_7_0.sql h1:KiVzt/hXgQ6esbdC6OMJOOWuYEXmy1yeCpmsVAHTFKs=
Expand Down Expand Up @@ -64,3 +64,4 @@ h1:MdUIwxqQIsS9pvLCRU1mB2nai1wTMJ7Vjl3FRjCr6uw=
20240930202706_v0.48.1.sql h1:CcgVHTRA4c9u5rQvSFFy/R9AdiSdTapqfpyUs0KGAf0=
20240930233257_v0.49.0.sql h1:B+JMbME62DxaCnesydvQXPg+ZNB0kB/V8gSclh1VdY4=
20241004122206_v0.49.1.sql h1:Fas5TXOp4a2g+y5sGBJG9wTaVL/WCaVJ9+ZlASN9Md4=
20241008124038_v0.49.2.sql h1:YT40sN8Wtqh21emrzDMZIcvcOkipw+4fdwIoBpF+Dek=
11 changes: 11 additions & 0 deletions sql/schema/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ CREATE TABLE "Event" (
CONSTRAINT "Event_pkey" PRIMARY KEY ("id")
);

-- CreateTable
CREATE TABLE "EventKey" (
"key" TEXT NOT NULL,
"tenantId" UUID NOT NULL,

CONSTRAINT "EventKey_pkey" PRIMARY KEY ("key")
);

-- CreateTable
CREATE TABLE "GetGroupKeyRun" (
"id" UUID NOT NULL,
Expand Down Expand Up @@ -968,6 +976,9 @@ CREATE INDEX "Event_tenantId_createdAt_idx" ON "Event"("tenantId" ASC, "createdA
-- CreateIndex
CREATE INDEX "Event_tenantId_idx" ON "Event"("tenantId" ASC);

-- CreateIndex
CREATE UNIQUE INDEX "EventKey_key_tenantId_key" ON "EventKey"("key" ASC, "tenantId" ASC);

-- CreateIndex
CREATE INDEX "GetGroupKeyRun_createdAt_idx" ON "GetGroupKeyRun"("createdAt" ASC);

Expand Down

0 comments on commit 3340ec8

Please sign in to comment.