Skip to content

Commit

Permalink
IWF-343: Fix concurrent write panic for parallel test (#545)
Browse files Browse the repository at this point in the history
  • Loading branch information
samuel27m authored Feb 13, 2025
1 parent 333e351 commit 6249778
Show file tree
Hide file tree
Showing 30 changed files with 707 additions and 242 deletions.
24 changes: 12 additions & 12 deletions integ/any_command_combination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"time"

"github.com/indeedeng/iwf/gen/iwfidl"
anycommandconbination "github.com/indeedeng/iwf/integ/workflow/any_command_combination"
anycommandcombination "github.com/indeedeng/iwf/integ/workflow/any_command_combination"
"github.com/indeedeng/iwf/service"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestAnyCommandCombinationWorkflowCadenceContinueAsNew(t *testing.T) {
func doTestAnyCommandCombinationWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) {
assertions := assert.New(t)
// start test workflow server
wfHandler := anycommandconbination.NewHandler()
wfHandler := anycommandcombination.NewHandler()
closeFunc1 := startWorkflowWorker(wfHandler, t)
defer closeFunc1()

Expand All @@ -74,14 +74,14 @@ func doTestAnyCommandCombinationWorkflow(t *testing.T, backendType service.Backe
},
},
})
wfId := anycommandconbination.WorkflowType + strconv.Itoa(int(time.Now().UnixNano()))
wfId := anycommandcombination.WorkflowType + strconv.Itoa(int(time.Now().UnixNano()))
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background())
_, httpResp, err := req.WorkflowStartRequest(iwfidl.WorkflowStartRequest{
WorkflowId: wfId,
IwfWorkflowType: anycommandconbination.WorkflowType,
IwfWorkflowType: anycommandcombination.WorkflowType,
WorkflowTimeoutSeconds: 40,
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort,
StartStateId: ptr.Any(anycommandconbination.State1),
StartStateId: ptr.Any(anycommandcombination.State1),
WorkflowStartOptions: &iwfidl.WorkflowStartOptions{
WorkflowConfigOverride: config,
},
Expand All @@ -97,13 +97,13 @@ func doTestAnyCommandCombinationWorkflow(t *testing.T, backendType service.Backe
req2 := apiClient.DefaultApi.ApiV1WorkflowSignalPost(context.Background())
httpResp, err = req2.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{
WorkflowId: wfId,
SignalChannelName: anycommandconbination.SignalNameAndId1,
SignalChannelName: anycommandcombination.SignalNameAndId1,
SignalValue: &signalValue,
}).Execute()
failTestAtHttpError(err, httpResp, t)
httpResp, err = req2.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{
WorkflowId: wfId,
SignalChannelName: anycommandconbination.SignalNameAndId1,
SignalChannelName: anycommandcombination.SignalNameAndId1,
SignalValue: &signalValue,
}).Execute()
failTestAtHttpError(err, httpResp, t)
Expand All @@ -114,7 +114,7 @@ func doTestAnyCommandCombinationWorkflow(t *testing.T, backendType service.Backe
httpResp, err = req3.WorkflowSkipTimerRequest(iwfidl.WorkflowSkipTimerRequest{
WorkflowId: wfId,
WorkflowStateExecutionId: "S1-1",
TimerCommandId: iwfidl.PtrString(anycommandconbination.TimerId1),
TimerCommandId: iwfidl.PtrString(anycommandcombination.TimerId1),
}).Execute()
failTestAtHttpError(err, httpResp, t)

Expand All @@ -124,7 +124,7 @@ func doTestAnyCommandCombinationWorkflow(t *testing.T, backendType service.Backe
// send first signal for s2
httpResp, err = req2.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{
WorkflowId: wfId,
SignalChannelName: anycommandconbination.SignalNameAndId1,
SignalChannelName: anycommandcombination.SignalNameAndId1,
SignalValue: &signalValue,
}).Execute()
failTestAtHttpError(err, httpResp, t)
Expand All @@ -140,15 +140,15 @@ func doTestAnyCommandCombinationWorkflow(t *testing.T, backendType service.Backe

httpResp, err = req2.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{
WorkflowId: wfId,
SignalChannelName: anycommandconbination.SignalNameAndId3,
SignalChannelName: anycommandcombination.SignalNameAndId3,
SignalValue: &signalValue,
}).Execute()
failTestAtHttpError(err, httpResp, t)

// send 2nd signal for s2
httpResp, err = req2.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{
WorkflowId: wfId,
SignalChannelName: anycommandconbination.SignalNameAndId2,
SignalChannelName: anycommandcombination.SignalNameAndId2,
SignalValue: &signalValue,
}).Execute()
failTestAtHttpError(err, httpResp, t)
Expand Down Expand Up @@ -176,7 +176,7 @@ func doTestAnyCommandCombinationWorkflow(t *testing.T, backendType service.Backe
"S1_decide": 1,
"S2_start": 2,
"S2_decide": 1,
}, history, "anycommandconbination test fail, %v", history)
}, history, "anycommandcombination test fail, %v", history)

var s1CommandResults iwfidl.CommandResults
var s2CommandResults iwfidl.CommandResults
Expand Down
49 changes: 34 additions & 15 deletions integ/workflow/any_command_close/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/indeedeng/iwf/service/common/ptr"
"log"
"net/http"
"sync"
"testing"
)

Expand All @@ -30,14 +31,14 @@ const (
)

type handler struct {
invokeHistory map[string]int64
invokeData map[string]interface{}
invokeHistory sync.Map
invokeData sync.Map
}

func NewHandler() common.WorkflowHandler {
return &handler{
invokeHistory: make(map[string]int64),
invokeData: make(map[string]interface{}),
invokeHistory: sync.Map{},
invokeData: sync.Map{},
}
}

Expand All @@ -51,7 +52,11 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) {
log.Println("received state start request, ", req)

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++
if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok {
h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1)
} else {
h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1))
}

if req.GetWorkflowStateId() == State1 {
// Proceed after either signal is received
Expand Down Expand Up @@ -95,21 +100,25 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) {
log.Println("received state decide request, ", req)

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++
if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok {
h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1)
} else {
h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1))
}

if req.GetWorkflowStateId() == State1 {
signalResults := req.GetCommandResults()
h.invokeData["signalCommandResultsLength"] = len(signalResults.SignalResults)
h.invokeData.Store("signalCommandResultsLength", len(signalResults.SignalResults))

// Trigger signals
h.invokeData["signalChannelName0"] = signalResults.SignalResults[0].GetSignalChannelName()
h.invokeData["signalCommandId0"] = signalResults.SignalResults[0].GetCommandId()
h.invokeData["signalStatus0"] = signalResults.SignalResults[0].GetSignalRequestStatus()
h.invokeData.Store("signalChannelName0", signalResults.SignalResults[0].GetSignalChannelName())
h.invokeData.Store("signalCommandId0", signalResults.SignalResults[0].GetCommandId())
h.invokeData.Store("signalStatus0", signalResults.SignalResults[0].GetSignalRequestStatus())

h.invokeData["signalChannelName1"] = signalResults.SignalResults[1].GetSignalChannelName()
h.invokeData["signalCommandId1"] = signalResults.SignalResults[1].GetCommandId()
h.invokeData["signalStatus1"] = signalResults.SignalResults[1].GetSignalRequestStatus()
h.invokeData["signalValue1"] = signalResults.SignalResults[1].GetSignalValue()
h.invokeData.Store("signalChannelName1", signalResults.SignalResults[1].GetSignalChannelName())
h.invokeData.Store("signalCommandId1", signalResults.SignalResults[1].GetCommandId())
h.invokeData.Store("signalStatus1", signalResults.SignalResults[1].GetSignalRequestStatus())
h.invokeData.Store("signalValue1", signalResults.SignalResults[1].GetSignalValue())

// Move to State 2
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
Expand Down Expand Up @@ -141,5 +150,15 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) {
}

func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) {
return h.invokeHistory, h.invokeData
invokeHistory := make(map[string]int64)
h.invokeHistory.Range(func(key, value interface{}) bool {
invokeHistory[key.(string)] = value.(int64)
return true
})
invokeData := make(map[string]interface{})
h.invokeData.Range(func(key, value interface{}) bool {
invokeData[key.(string)] = value
return true
})
return invokeHistory, invokeData
}
37 changes: 28 additions & 9 deletions integ/workflow/any_command_combination/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/indeedeng/iwf/service/common/ptr"
"log"
"net/http"
"sync"
"testing"
"time"
)
Expand All @@ -34,17 +35,17 @@ const (
)

type handler struct {
invokeHistory map[string]int64
invokeData map[string]interface{}
invokeHistory sync.Map
invokeData sync.Map
//we want to confirm that the interpreter workflow activity will fail when the commandId is empty with ANY_COMMAND_COMBINATION_COMPLETED
hasS1RetriedForInvalidCommandId bool
hasS2RetriedForInvalidCommandId bool
}

func NewHandler() common.WorkflowHandler {
return &handler{
invokeHistory: make(map[string]int64),
invokeData: make(map[string]interface{}),
invokeHistory: sync.Map{},
invokeData: sync.Map{},
hasS1RetriedForInvalidCommandId: false,
hasS2RetriedForInvalidCommandId: false,
}
Expand Down Expand Up @@ -99,7 +100,11 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) {
}

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++
if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok {
h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1)
} else {
h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1))
}

if req.GetWorkflowStateId() == State1 {
// If the state has already retried an invalid command, proceed on combination completed
Expand Down Expand Up @@ -192,11 +197,15 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) {
log.Println("received state decide request, ", req)

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++
if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok {
h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1)
} else {
h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1))
}

// Trigger signals and move to State 2
if req.GetWorkflowStateId() == State1 {
h.invokeData["s1_commandResults"] = req.GetCommandResults()
h.invokeData.Store("s1_commandResults", req.GetCommandResults())

c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
Expand All @@ -210,7 +219,7 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) {
return
} else if req.GetWorkflowStateId() == State2 {
// Trigger data and move to completion
h.invokeData["s2_commandResults"] = req.GetCommandResults()
h.invokeData.Store("s2_commandResults", req.GetCommandResults())
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
Expand All @@ -228,5 +237,15 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) {
}

func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) {
return h.invokeHistory, h.invokeData
invokeHistory := make(map[string]int64)
h.invokeHistory.Range(func(key, value interface{}) bool {
invokeHistory[key.(string)] = value.(int64)
return true
})
invokeData := make(map[string]interface{})
h.invokeData.Range(func(key, value interface{}) bool {
invokeData[key.(string)] = value
return true
})
return invokeHistory, invokeData
}
47 changes: 33 additions & 14 deletions integ/workflow/any_timer_signal/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/indeedeng/iwf/service/common/ptr"
"log"
"net/http"
"sync"
"testing"
"time"
)
Expand All @@ -30,14 +31,14 @@ const (
)

type handler struct {
invokeHistory map[string]int64
invokeData map[string]interface{}
invokeHistory sync.Map
invokeData sync.Map
}

func NewHandler() common.WorkflowHandler {
return &handler{
invokeHistory: make(map[string]int64),
invokeData: make(map[string]interface{}),
invokeHistory: sync.Map{},
invokeData: sync.Map{},
}
}

Expand All @@ -51,7 +52,11 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) {
log.Println("received state start request, ", req)

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++
if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_start"); ok {
h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", value.(int64)+1)
} else {
h.invokeHistory.Store(req.GetWorkflowStateId()+"_start", int64(1))
}

if req.GetWorkflowStateId() == State1 {
var timerCommands []iwfidl.TimerCommand
Expand Down Expand Up @@ -105,24 +110,28 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) {
log.Println("received state decide request, ", req)

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++
if value, ok := h.invokeHistory.Load(req.GetWorkflowStateId() + "_decide"); ok {
h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", value.(int64)+1)
} else {
h.invokeHistory.Store(req.GetWorkflowStateId()+"_decide", int64(1))
}
if req.GetWorkflowStateId() == State1 {
signalResults := req.GetCommandResults()
var movements []iwfidl.StateMovement

context := req.GetContext()
// On first State 1 attempt, trigger signals and stay on the first state
if context.GetStateExecutionId() == State1+"-"+"1" {
h.invokeData["signalChannelName1"] = signalResults.SignalResults[0].GetSignalChannelName()
h.invokeData["signalCommandId1"] = signalResults.SignalResults[0].GetCommandId()
h.invokeData["signalStatus1"] = signalResults.SignalResults[0].GetSignalRequestStatus()
h.invokeData.Store("signalChannelName1", signalResults.SignalResults[0].GetSignalChannelName())
h.invokeData.Store("signalCommandId1", signalResults.SignalResults[0].GetCommandId())
h.invokeData.Store("signalStatus1", signalResults.SignalResults[0].GetSignalRequestStatus())
movements = []iwfidl.StateMovement{{StateId: State1}}
} else {
// After the first State 1 attempt, trigger signals and move to next state
h.invokeData["signalChannelName2"] = signalResults.SignalResults[0].GetSignalChannelName()
h.invokeData["signalCommandId2"] = signalResults.SignalResults[0].GetCommandId()
h.invokeData["signalStatus2"] = signalResults.SignalResults[0].GetSignalRequestStatus()
h.invokeData["signalValue2"] = signalResults.SignalResults[0].GetSignalValue()
h.invokeData.Store("signalChannelName2", signalResults.SignalResults[0].GetSignalChannelName())
h.invokeData.Store("signalCommandId2", signalResults.SignalResults[0].GetCommandId())
h.invokeData.Store("signalStatus2", signalResults.SignalResults[0].GetSignalRequestStatus())
h.invokeData.Store("signalValue2", signalResults.SignalResults[0].GetSignalValue())
movements = []iwfidl.StateMovement{{StateId: State2}}
}

Expand Down Expand Up @@ -151,5 +160,15 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) {
}

func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) {
return h.invokeHistory, h.invokeData
invokeHistory := make(map[string]int64)
h.invokeHistory.Range(func(key, value interface{}) bool {
invokeHistory[key.(string)] = value.(int64)
return true
})
invokeData := make(map[string]interface{})
h.invokeData.Range(func(key, value interface{}) bool {
invokeData[key.(string)] = value
return true
})
return invokeHistory, invokeData
}
Loading

0 comments on commit 6249778

Please sign in to comment.