/ kubernetes / test / e2e_task / task_e2e_test.go
task_e2e_test.go
  1  // Copyright 2025 Alibaba Group Holding Ltd.
  2  //
  3  // Licensed under the Apache License, Version 2.0 (the "License");
  4  // you may not use this file except in compliance with the License.
  5  // You may obtain a copy of the License at
  6  //
  7  //     http://www.apache.org/licenses/LICENSE-2.0
  8  //
  9  // Unless required by applicable law or agreed to in writing, software
 10  // distributed under the License is distributed on an "AS IS" BASIS,
 11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  // See the License for the specific language governing permissions and
 13  // limitations under the License.
 14  
 15  package e2e_task
 16  
 17  import (
 18  	"context"
 19  	"fmt"
 20  	"os"
 21  	"os/exec"
 22  	"time"
 23  
 24  	. "github.com/onsi/ginkgo/v2"
 25  	. "github.com/onsi/gomega"
 26  
 27  	api "github.com/alibaba/OpenSandbox/sandbox-k8s/pkg/task-executor"
 28  )
 29  
 30  const (
 31  	ImageName         = "task-executor-e2e"
 32  	TargetContainer   = "task-e2e-target"
 33  	ExecutorContainer = "task-e2e-executor"
 34  	VolumeName        = "task-e2e-vol"
 35  	HostPort          = "5758"
 36  )
 37  
 38  var _ = Describe("Task Executor E2E", Ordered, func() {
 39  	var client *api.Client
 40  
 41  	BeforeAll(func() {
 42  		// Check docker
 43  		_, err := exec.LookPath("docker")
 44  		Expect(err).NotTo(HaveOccurred(), "Docker not found, skipping E2E test")
 45  
 46  		By("Building image")
 47  		cmd := exec.Command("docker", "build",
 48  			"--build-arg", "PACKAGE=cmd/task-executor/main.go",
 49  			"-t", ImageName, "-f", "../../Dockerfile", "../../")
 50  		cmd.Stdout = os.Stdout
 51  		cmd.Stderr = os.Stderr
 52  		Expect(cmd.Run()).To(Succeed())
 53  
 54  		By("Cleaning up previous runs")
 55  		exec.Command("docker", "rm", "-f", TargetContainer, ExecutorContainer).Run()
 56  		exec.Command("docker", "volume", "rm", VolumeName).Run()
 57  
 58  		By("Creating shared volume")
 59  		Expect(exec.Command("docker", "volume", "create", VolumeName).Run()).To(Succeed())
 60  
 61  		By("Starting target container")
 62  		targetCmd := exec.Command("docker", "run", "-d", "--name", TargetContainer,
 63  			"-v", fmt.Sprintf("%s:/tmp/tasks", VolumeName),
 64  			"-e", "SANDBOX_MAIN_CONTAINER=main",
 65  			"-e", "TARGET_VAR=hello-from-target",
 66  			"golang:1.24", "sleep", "infinity")
 67  		targetCmd.Stdout = os.Stdout
 68  		targetCmd.Stderr = os.Stderr
 69  		Expect(targetCmd.Run()).To(Succeed())
 70  
 71  		By("Starting executor container in Sidecar Mode")
 72  		execCmd := exec.Command("docker", "run", "-d", "--name", ExecutorContainer,
 73  			"-v", fmt.Sprintf("%s:/tmp/tasks", VolumeName),
 74  			"--privileged",
 75  			"-u", "0",
 76  			"--pid=container:"+TargetContainer,
 77  			"-p", HostPort+":5758",
 78  			ImageName,
 79  			"-enable-sidecar-mode=true",
 80  			"-main-container-name=main",
 81  			"-data-dir=/tmp/tasks")
 82  		execCmd.Stdout = os.Stdout
 83  		execCmd.Stderr = os.Stderr
 84  		Expect(execCmd.Run()).To(Succeed())
 85  
 86  		By("Waiting for executor to be ready")
 87  		client = api.NewClient(fmt.Sprintf("http://127.0.0.1:%s", HostPort))
 88  		Eventually(func() error {
 89  			_, err := client.Get(context.Background())
 90  			return err
 91  		}, 10*time.Second, 500*time.Millisecond).Should(Succeed(), "Executor failed to become ready")
 92  	})
 93  
 94  	AfterAll(func() {
 95  		By("Cleaning up containers")
 96  		if CurrentSpecReport().Failed() {
 97  			By("Dumping logs")
 98  			out, _ := exec.Command("docker", "logs", ExecutorContainer).CombinedOutput()
 99  			fmt.Printf("Executor Logs:\n%s\n", string(out))
100  		}
101  		exec.Command("docker", "rm", "-f", TargetContainer, ExecutorContainer).Run()
102  		exec.Command("docker", "volume", "rm", VolumeName).Run()
103  	})
104  
105  	Context("When creating a short-lived task", func() {
106  		taskName := "e2e-test-1"
107  
108  		It("should run and succeed", func() {
109  			By("Creating task")
110  			task := &api.Task{
111  				Name: taskName,
112  				Process: &api.Process{
113  					Command: []string{"sleep", "2"},
114  				},
115  			}
116  			_, err := client.Set(context.Background(), task)
117  			Expect(err).NotTo(HaveOccurred())
118  
119  			By("Waiting for task to succeed")
120  			Eventually(func(g Gomega) {
121  				got, err := client.Get(context.Background())
122  				g.Expect(err).NotTo(HaveOccurred())
123  				g.Expect(got).NotTo(BeNil())
124  				g.Expect(got.Name).To(Equal(taskName))
125  
126  				// Verify state
127  				if got.ProcessStatus != nil && got.ProcessStatus.Terminated != nil {
128  					g.Expect(got.ProcessStatus.Terminated.ExitCode).To(BeZero())
129  					g.Expect(got.ProcessStatus.Terminated.Reason).To(Equal("Succeeded"))
130  				} else {
131  					// Fail if not terminated yet (so Eventually retries)
132  					g.Expect(got.ProcessStatus).NotTo(BeNil(), "Task ProcessStatus is nil")
133  					g.Expect(got.ProcessStatus.Terminated).NotTo(BeNil(), "Task status: %v", got.ProcessStatus)
134  				}
135  			}, 10*time.Second, 1*time.Second).Should(Succeed())
136  		})
137  
138  		It("should be deletable", func() {
139  			By("Deleting task")
140  			_, err := client.Set(context.Background(), nil)
141  			Expect(err).NotTo(HaveOccurred())
142  
143  			By("Verifying deletion")
144  			Eventually(func() *api.Task {
145  				got, _ := client.Get(context.Background())
146  				return got
147  			}, 5*time.Second, 500*time.Millisecond).Should(BeNil())
148  		})
149  	})
150  
151  	Context("When creating a task checking environment variables", func() {
152  		taskName := "e2e-env-test"
153  
154  		It("should inherit environment variables from target container", func() {
155  			By("Creating task running 'env'")
156  			task := &api.Task{
157  				Name: taskName,
158  				Process: &api.Process{
159  					Command: []string{"env"},
160  				},
161  			}
162  			_, err := client.Set(context.Background(), task)
163  			Expect(err).NotTo(HaveOccurred())
164  
165  			By("Waiting for task to succeed")
166  			Eventually(func(g Gomega) {
167  				got, err := client.Get(context.Background())
168  				g.Expect(err).NotTo(HaveOccurred())
169  				g.Expect(got).NotTo(BeNil())
170  				g.Expect(got.Name).To(Equal(taskName))
171  				g.Expect(got.ProcessStatus.Terminated).NotTo(BeNil())
172  				g.Expect(got.ProcessStatus.Terminated.ExitCode).To(BeZero())
173  			}, 10*time.Second, 1*time.Second).Should(Succeed())
174  
175  			By("Verifying stdout contains target container env")
176  			// Read stdout.log from the executor container (which shares the volume)
177  			out, err := exec.Command("docker", "exec", ExecutorContainer, "cat", fmt.Sprintf("/tmp/tasks/%s/stdout.log", taskName)).CombinedOutput()
178  			Expect(err).NotTo(HaveOccurred(), "Failed to read stdout.log: %s", string(out))
179  
180  			outputStr := string(out)
181  			Expect(outputStr).To(ContainSubstring("TARGET_VAR=hello-from-target"), "Task environment should inherit from target container")
182  		})
183  
184  		It("should be deletable", func() {
185  			By("Deleting task")
186  			_, err := client.Set(context.Background(), nil)
187  			Expect(err).NotTo(HaveOccurred())
188  
189  			By("Verifying deletion")
190  			Eventually(func() *api.Task {
191  				got, _ := client.Get(context.Background())
192  				return got
193  			}, 5*time.Second, 500*time.Millisecond).Should(BeNil())
194  		})
195  	})
196  
197  	Context("When creating a task with timeout", func() {
198  		taskName := "e2e-timeout-test"
199  
200  		It("should timeout and be terminated", func() {
201  			By("Creating task with 5 second timeout that runs for 30 seconds")
202  			timeoutSec := int64(5)
203  			task := &api.Task{
204  				Name: taskName,
205  				Process: &api.Process{
206  					Command:        []string{"sleep", "30"},
207  					TimeoutSeconds: &timeoutSec,
208  				},
209  			}
210  			_, err := client.Set(context.Background(), task)
211  			Expect(err).NotTo(HaveOccurred())
212  
213  			By("Waiting for task to be terminated (within 15 seconds)")
214  			// After timeout detection, Stop is called and the process is killed.
215  			// Once Stop completes, the exit file is written and state becomes Failed.
216  			Eventually(func(g Gomega) {
217  				got, err := client.Get(context.Background())
218  				g.Expect(err).NotTo(HaveOccurred())
219  				g.Expect(got).NotTo(BeNil())
220  				g.Expect(got.Name).To(Equal(taskName))
221  
222  				// Should be Terminated with exit code 137 (SIGKILL) or 143 (SIGTERM)
223  				// sleep responds to SIGTERM quickly, so we usually get 143
224  				// The state will be "Failed" after exit file is written
225  				if got.ProcessStatus != nil && got.ProcessStatus.Terminated != nil {
226  					g.Expect(got.ProcessStatus.Terminated.ExitCode).To(SatisfyAny(
227  						Equal(int32(137)), // SIGKILL
228  						Equal(int32(143)), // SIGTERM
229  					))
230  				} else {
231  					// Fail if not terminated yet
232  					g.Expect(got.ProcessStatus).NotTo(BeNil(), "Task ProcessStatus is nil")
233  					g.Expect(got.ProcessStatus.Terminated).NotTo(BeNil(), "Task status: %v", got.ProcessStatus)
234  				}
235  			}, 15*time.Second, 1*time.Second).Should(Succeed())
236  
237  			By("Verifying the task was terminated")
238  			got, err := client.Get(context.Background())
239  			Expect(err).NotTo(HaveOccurred())
240  			Expect(got.ProcessStatus.Terminated).NotTo(BeNil())
241  			Expect(got.ProcessStatus.Terminated.ExitCode).To(SatisfyAny(
242  				Equal(int32(137)), // SIGKILL
243  				Equal(int32(143)), // SIGTERM
244  			))
245  			// State could be "Failed" (after exit file written) or "Timeout" (during stop)
246  			Expect(got.ProcessStatus.Terminated.Reason).To(SatisfyAny(
247  				Equal("Failed"),
248  				Equal("TaskTimeout"),
249  			))
250  		})
251  
252  		It("should be deletable after timeout", func() {
253  			By("Deleting task")
254  			_, err := client.Set(context.Background(), nil)
255  			Expect(err).NotTo(HaveOccurred())
256  
257  			By("Verifying deletion")
258  			Eventually(func() *api.Task {
259  				got, _ := client.Get(context.Background())
260  				return got
261  			}, 5*time.Second, 500*time.Millisecond).Should(BeNil())
262  		})
263  	})
264  
265  	Context("When creating a task that completes before timeout", func() {
266  		taskName := "e2e-no-timeout-test"
267  
268  		It("should succeed without timeout", func() {
269  			By("Creating task with 60 second timeout that completes in 2 seconds")
270  			timeoutSec := int64(60)
271  			task := &api.Task{
272  				Name: taskName,
273  				Process: &api.Process{
274  					Command:        []string{"sleep", "2"},
275  					TimeoutSeconds: &timeoutSec,
276  				},
277  			}
278  			_, err := client.Set(context.Background(), task)
279  			Expect(err).NotTo(HaveOccurred())
280  
281  			By("Waiting for task to succeed")
282  			Eventually(func(g Gomega) {
283  				got, err := client.Get(context.Background())
284  				g.Expect(err).NotTo(HaveOccurred())
285  				g.Expect(got).NotTo(BeNil())
286  				g.Expect(got.Name).To(Equal(taskName))
287  
288  				// Should succeed with exit code 0
289  				if got.ProcessStatus != nil && got.ProcessStatus.Terminated != nil {
290  					g.Expect(got.ProcessStatus.Terminated.ExitCode).To(BeZero())
291  					g.Expect(got.ProcessStatus.Terminated.Reason).To(Equal("Succeeded"))
292  				} else {
293  					g.Expect(got.ProcessStatus).NotTo(BeNil(), "Task ProcessStatus is nil")
294  					g.Expect(got.ProcessStatus.Terminated).NotTo(BeNil(), "Task status: %v", got.ProcessStatus)
295  				}
296  			}, 10*time.Second, 1*time.Second).Should(Succeed())
297  		})
298  
299  		It("should be deletable", func() {
300  			By("Deleting task")
301  			_, err := client.Set(context.Background(), nil)
302  			Expect(err).NotTo(HaveOccurred())
303  
304  			By("Verifying deletion")
305  			Eventually(func() *api.Task {
306  				got, _ := client.Get(context.Background())
307  				return got
308  			}, 5*time.Second, 500*time.Millisecond).Should(BeNil())
309  		})
310  	})
311  })