Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions internal/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"io"
"log"
"os"
"os/signal"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

"github.com/fsnotify/fsnotify"
Expand Down Expand Up @@ -203,6 +205,7 @@ func RunSubgraph(ctx context.Context, cancel context.CancelFunc, port int, openB
}

allRunning := false
graphCompleted := false

for {
select {
Expand Down Expand Up @@ -232,11 +235,19 @@ func RunSubgraph(ctx context.Context, cancel context.CancelFunc, port int, openB
}

if len(failures) > 0 {
hookCtx, hookStop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
runLifecycleHook(hookCtx, wf.Lifecycle.GetOnFailure(), wf, os.Stdout, logger)
hookStop()
setTerminalTitle(workflowTitle(name, subgraph.Nodes))
ringTerminalBell()
return fmt.Errorf("failed tasks: %v", failures)
}

if graphCompleted {
hookCtx, hookStop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
runLifecycleHook(hookCtx, wf.Lifecycle.GetOnSuccess(), wf, os.Stdout, logger)
hookStop()
}
return nil
case event := <-events:
switch x := event.(type) {
Expand Down Expand Up @@ -279,6 +290,7 @@ func RunSubgraph(ctx context.Context, cancel context.CancelFunc, port int, openB

if len(pendingTasks) == 0 {
logger.Println("✅ exiting because all requested tasks completed and none should be restarted")
graphCompleted = true
setTerminalTitle(workflowTitle(name, subgraph.Nodes))
ringTerminalBell()
cancel()
Expand Down Expand Up @@ -516,13 +528,15 @@ func RunSubgraph(ctx context.Context, cancel context.CancelFunc, port int, openB

if err != nil {
setNodeStatus(node, "failed", fmt.Sprint(err))
runLifecycleHook(ctx, t.GetOnFailure(), wf, out, logger)
if t.GetRestartPolicy() != "Never" {
restart()
}
return
}

setNodeStatus(node, "succeeded", "")
runLifecycleHook(ctx, t.GetOnSuccess(), wf, out, logger)
if t.GetRestartPolicy() == "Always" {
restart()
}
Expand All @@ -535,3 +549,21 @@ func RunSubgraph(ctx context.Context, cancel context.CancelFunc, port int, openB
}
}
}

// runLifecycleHook runs the named task as a lifecycle hook, logging any errors.
// It is a best-effort operation: if the hook task fails, the error is logged
// but does not affect the triggering task's outcome.
func runLifecycleHook(ctx context.Context, taskName string, wf *types.Workflow, out io.Writer, logger *log.Logger) {
if taskName == "" {
return
}
t, ok := wf.Tasks[taskName]
if !ok {
logger.Printf("lifecycle hook: task %q not found", taskName)
return
}
p := proc.New(taskName, t, logger, types.Spec(*wf))
if err := p.Run(ctx, out, out); err != nil {
logger.Printf("lifecycle hook failed: %v", err)
}
}
25 changes: 25 additions & 0 deletions internal/types/lifecycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package types

// Lifecycle describes actions that the system should take in response to lifecycle events.
type Lifecycle struct {
// OnSuccess is the name of the task to run after the task/graph succeeds.
OnSuccess string `json:"onSuccess,omitempty"`
// OnFailure is the name of the task to run after the task/graph fails.
OnFailure string `json:"onFailure,omitempty"`
}

// GetOnSuccess returns the OnSuccess task name, or empty string if the Lifecycle is nil.
func (l *Lifecycle) GetOnSuccess() string {
if l == nil {
return ""
}
return l.OnSuccess
}

// GetOnFailure returns the OnFailure task name, or empty string if the Lifecycle is nil.
func (l *Lifecycle) GetOnFailure() string {
if l == nil {
return ""
}
return l.OnFailure
}
59 changes: 59 additions & 0 deletions internal/types/lifecycle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package types

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestLifecycle_GetOnSuccess(t *testing.T) {
t.Run("NilLifecycle", func(t *testing.T) {
var l *Lifecycle
assert.Equal(t, "", l.GetOnSuccess())
})
t.Run("NoOnSuccess", func(t *testing.T) {
l := &Lifecycle{}
assert.Equal(t, "", l.GetOnSuccess())
})
t.Run("WithOnSuccess", func(t *testing.T) {
l := &Lifecycle{OnSuccess: "notify"}
assert.Equal(t, "notify", l.GetOnSuccess())
})
}

func TestLifecycle_GetOnFailure(t *testing.T) {
t.Run("NilLifecycle", func(t *testing.T) {
var l *Lifecycle
assert.Equal(t, "", l.GetOnFailure())
})
t.Run("NoOnFailure", func(t *testing.T) {
l := &Lifecycle{}
assert.Equal(t, "", l.GetOnFailure())
})
t.Run("WithOnFailure", func(t *testing.T) {
l := &Lifecycle{OnFailure: "alert"}
assert.Equal(t, "alert", l.GetOnFailure())
})
}

func TestTask_GetOnSuccess(t *testing.T) {
t.Run("NoLifecycle", func(t *testing.T) {
task := &Task{}
assert.Equal(t, "", task.GetOnSuccess())
})
t.Run("WithOnSuccess", func(t *testing.T) {
task := &Task{Lifecycle: &Lifecycle{OnSuccess: "notify"}}
assert.Equal(t, "notify", task.GetOnSuccess())
})
}

func TestTask_GetOnFailure(t *testing.T) {
t.Run("NoLifecycle", func(t *testing.T) {
task := &Task{}
assert.Equal(t, "", task.GetOnFailure())
})
t.Run("WithOnFailure", func(t *testing.T) {
task := &Task{Lifecycle: &Lifecycle{OnFailure: "alert"}}
assert.Equal(t, "alert", task.GetOnFailure())
})
}
2 changes: 2 additions & 0 deletions internal/types/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type Spec struct {
Env EnvVars `json:"env,omitempty"`
// Environment file (e.g. .env) to use
Envfile Envfile `json:"envfile,omitempty"`
// Lifecycle describes actions that the system should take in response to graph-level lifecycle events.
Lifecycle *Lifecycle `json:"lifecycle,omitempty"`
}

func (s *Spec) GetTerminationGracePeriod() time.Duration {
Expand Down
12 changes: 12 additions & 0 deletions internal/types/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type Task struct {
Group string `json:"group,omitempty"`
// Whether this is the default task to run if no task is specified.
Default bool `json:"default,omitempty"`
// Lifecycle describes actions that the system should take in response to task lifecycle events.
Lifecycle *Lifecycle `json:"lifecycle,omitempty"`
}

func (t *Task) GetHostPorts() []uint16 {
Expand Down Expand Up @@ -216,3 +218,13 @@ func (t *Task) GetStalledTimeout() time.Duration {
}
return 30 * time.Second
}

// GetOnSuccess returns the name of the task to run when this task succeeds, or empty string if none.
func (t *Task) GetOnSuccess() string {
return t.Lifecycle.GetOnSuccess()
}

// GetOnFailure returns the name of the task to run when this task fails, or empty string if none.
func (t *Task) GetOnFailure() string {
return t.Lifecycle.GetOnFailure()
}
27 changes: 27 additions & 0 deletions schema/workflow.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,24 @@
],
"title": "HostPath"
},
"Lifecycle": {
"properties": {
"onSuccess": {
"type": "string",
"title": "onSuccess",
"description": "OnSuccess is the name of the task to run after the task/graph succeeds."
},
"onFailure": {
"type": "string",
"title": "onFailure",
"description": "OnFailure is the name of the task to run after the task/graph fails."
}
},
"additionalProperties": false,
"type": "object",
"title": "Lifecycle",
"description": "Lifecycle describes actions that the system should take in response to lifecycle events."
},
"Port": {
"properties": {
"containerPort": {
Expand Down Expand Up @@ -298,6 +316,11 @@
"type": "boolean",
"title": "default",
"description": "Whether this is the default task to run if no task is specified."
},
"lifecycle": {
"$ref": "#/$defs/Lifecycle",
"title": "lifecycle",
"description": "Lifecycle describes actions that the system should take in response to task lifecycle events."
}
},
"additionalProperties": false,
Expand Down Expand Up @@ -394,6 +417,10 @@
"envfile": {
"$ref": "#/$defs/Envfile",
"title": "envfile"
},
"lifecycle": {
"$ref": "#/$defs/Lifecycle",
"title": "lifecycle"
}
},
"additionalProperties": false,
Expand Down