/ components / execd / pkg / runtime / context.go
context.go
  1  // Copyright 2025 Alibaba Group Holding Ltd.
  2  //
  3  // Licensed under the Apache License, Version 2.0 (the "License");
  4  // you may not use this file except in compliance with the License.
  5  // You may obtain a copy of the License at
  6  //
  7  //     http://www.apache.org/licenses/LICENSE-2.0
  8  //
  9  // Unless required by applicable law or agreed to in writing, software
 10  // distributed under the License is distributed on an "AS IS" BASIS,
 11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  // See the License for the specific language governing permissions and
 13  // limitations under the License.
 14  
 15  package runtime
 16  
 17  import (
 18  	"errors"
 19  	"fmt"
 20  	"net/http"
 21  	"os"
 22  	"path/filepath"
 23  	"strings"
 24  
 25  	"github.com/google/uuid"
 26  	"k8s.io/client-go/util/retry"
 27  
 28  	"github.com/alibaba/opensandbox/execd/pkg/jupyter"
 29  	jupytersession "github.com/alibaba/opensandbox/execd/pkg/jupyter/session"
 30  	"github.com/alibaba/opensandbox/execd/pkg/log"
 31  	"github.com/alibaba/opensandbox/execd/pkg/util/pathutil"
 32  )
 33  
 34  // CreateContext provisions a kernel-backed session and returns its ID.
 35  // Bash language uses Jupyter kernel like other languages; for pipe-based bash sessions use CreateBashSession (session API).
 36  func (c *Controller) CreateContext(req *CreateContextRequest) (string, error) {
 37  	// Create a new Jupyter session.
 38  	var (
 39  		client  *jupyter.Client
 40  		session *jupytersession.Session
 41  		err     error
 42  	)
 43  
 44  	err = retry.OnError(kernelWaitingBackoff, func(err error) bool {
 45  		log.Error("failed to create session, retrying: %v", err)
 46  		return err != nil
 47  	}, func() error {
 48  		client, session, err = c.createJupyterContext(*req)
 49  		return err
 50  	})
 51  	if err != nil {
 52  		return "", err
 53  	}
 54  
 55  	kernel := &jupyterKernel{
 56  		kernelID: session.Kernel.ID,
 57  		client:   client,
 58  		language: req.Language,
 59  	}
 60  	c.storeJupyterKernel(session.ID, kernel)
 61  
 62  	err = c.setWorkingDir(kernel, req)
 63  	if err != nil {
 64  		return "", fmt.Errorf("failed to setup working dir: %w", err)
 65  	}
 66  
 67  	return session.ID, nil
 68  }
 69  
 70  func (c *Controller) DeleteContext(session string) error {
 71  	return c.deleteSessionAndCleanup(session)
 72  }
 73  
 74  func (c *Controller) GetContext(session string) (CodeContext, error) {
 75  	kernel := c.getJupyterKernel(session)
 76  	if kernel == nil {
 77  		return CodeContext{}, ErrContextNotFound
 78  	}
 79  	return CodeContext{
 80  		ID:       session,
 81  		Language: kernel.language,
 82  	}, nil
 83  }
 84  
 85  func (c *Controller) ListContext(language string) ([]CodeContext, error) {
 86  	switch language {
 87  	case Command.String(), BackgroundCommand.String(), SQL.String():
 88  		return nil, fmt.Errorf("unsupported language context operation: %s", language)
 89  	case "":
 90  		return c.listAllContexts()
 91  	default:
 92  		return c.listLanguageContexts(Language(language))
 93  	}
 94  }
 95  
 96  func (c *Controller) DeleteLanguageContext(language Language) error {
 97  	contexts, err := c.listLanguageContexts(language)
 98  	if err != nil {
 99  		return err
100  	}
101  
102  	seen := make(map[string]struct{})
103  	for _, context := range contexts {
104  		if _, ok := seen[context.ID]; ok {
105  			continue
106  		}
107  		seen[context.ID] = struct{}{}
108  
109  		if err := c.deleteSessionAndCleanup(context.ID); err != nil {
110  			return fmt.Errorf("error deleting context %s: %w", context.ID, err)
111  		}
112  	}
113  	return nil
114  }
115  
116  func (c *Controller) deleteSessionAndCleanup(session string) error {
117  	if c.getJupyterKernel(session) == nil {
118  		return ErrContextNotFound
119  	}
120  	if err := c.jupyterClient().DeleteSession(session); err != nil {
121  		return err
122  	}
123  	c.jupyterClientMap.Delete(session)
124  	c.deleteDefaultSessionByID(session)
125  	return nil
126  }
127  
128  func (c *Controller) newContextID() string {
129  	return strings.ReplaceAll(uuid.New().String(), "-", "")
130  }
131  
132  func (c *Controller) newIpynbPath(sessionID, cwd string) (string, error) {
133  	resolvedCwd, err := pathutil.ExpandPath(cwd)
134  	if err != nil {
135  		return "", err
136  	}
137  	if cwd != "" {
138  		err := os.MkdirAll(resolvedCwd, os.ModePerm)
139  		if err != nil {
140  			return "", err
141  		}
142  	}
143  
144  	return filepath.Join(resolvedCwd, fmt.Sprintf("%s.ipynb", sessionID)), nil
145  }
146  
147  // createDefaultLanguageJupyterContext prewarms a session for stateless execution.
148  func (c *Controller) createDefaultLanguageJupyterContext(language Language) error {
149  	if c.getDefaultLanguageSession(language) != "" {
150  		return nil
151  	}
152  
153  	var (
154  		client  *jupyter.Client
155  		session *jupytersession.Session
156  		err     error
157  	)
158  	err = retry.OnError(kernelWaitingBackoff, func(err error) bool {
159  		log.Error("failed to create context, retrying: %v", err)
160  		return err != nil
161  	}, func() error {
162  		client, session, err = c.createJupyterContext(CreateContextRequest{
163  			Language: language,
164  			Cwd:      "",
165  		})
166  		return err
167  	})
168  	if err != nil {
169  		return err
170  	}
171  
172  	c.setDefaultLanguageSession(language, session.ID)
173  	c.jupyterClientMap.Store(session.ID, &jupyterKernel{
174  		kernelID: session.Kernel.ID,
175  		client:   client,
176  		language: language,
177  	})
178  	return nil
179  }
180  
181  // createJupyterContext performs the actual context creation workflow.
182  func (c *Controller) createJupyterContext(request CreateContextRequest) (*jupyter.Client, *jupytersession.Session, error) {
183  	client := c.jupyterClient()
184  
185  	kernel, err := c.searchKernel(client, request.Language)
186  	if err != nil {
187  		return nil, nil, err
188  	}
189  
190  	sessionID := c.newContextID()
191  	ipynb, err := c.newIpynbPath(sessionID, request.Cwd)
192  	if err != nil {
193  		return nil, nil, err
194  	}
195  
196  	jupyterSession, err := client.CreateSession(sessionID, ipynb, kernel)
197  	if err != nil {
198  		return nil, nil, err
199  	}
200  
201  	kernels, err := client.ListKernels()
202  	if err != nil {
203  		return nil, nil, err
204  	}
205  
206  	found := false
207  	for _, k := range kernels {
208  		if k.ID == jupyterSession.Kernel.ID {
209  			found = true
210  			break
211  		}
212  	}
213  	if !found {
214  		return nil, nil, errors.New("kernel not found")
215  	}
216  
217  	return client, jupyterSession, nil
218  }
219  
220  // storeJupyterKernel caches a session -> kernel mapping.
221  func (c *Controller) storeJupyterKernel(sessionID string, kernel *jupyterKernel) {
222  	c.jupyterClientMap.Store(sessionID, kernel)
223  }
224  
225  func (c *Controller) jupyterClient() *jupyter.Client {
226  	httpClient := &http.Client{
227  		Transport: &jupyter.AuthTransport{
228  			Token: c.token,
229  			Base:  http.DefaultTransport,
230  		},
231  	}
232  
233  	return jupyter.NewClient(c.baseURL,
234  		jupyter.WithToken(c.token),
235  		jupyter.WithHTTPClient(httpClient))
236  }
237  
238  func (c *Controller) getDefaultLanguageSession(language Language) string {
239  	if v, ok := c.defaultLanguageSessions.Load(language); ok {
240  		if session, ok := v.(string); ok {
241  			return session
242  		}
243  	}
244  	return ""
245  }
246  
247  func (c *Controller) setDefaultLanguageSession(language Language, sessionID string) {
248  	c.defaultLanguageSessions.Store(language, sessionID)
249  }
250  
251  func (c *Controller) deleteDefaultSessionByID(sessionID string) {
252  	c.defaultLanguageSessions.Range(func(key, value any) bool {
253  		if s, ok := value.(string); ok && s == sessionID {
254  			c.defaultLanguageSessions.Delete(key)
255  		}
256  		return true
257  	})
258  }
259  
260  func (c *Controller) listAllContexts() ([]CodeContext, error) {
261  	contexts := make([]CodeContext, 0)
262  	seen := make(map[string]struct{})
263  
264  	c.jupyterClientMap.Range(func(key, value any) bool {
265  		session, _ := key.(string)
266  		if kernel, ok := value.(*jupyterKernel); ok && kernel != nil {
267  			contexts = append(contexts, CodeContext{ID: session, Language: kernel.language})
268  			seen[session] = struct{}{}
269  		}
270  		return true
271  	})
272  
273  	c.defaultLanguageSessions.Range(func(key, value any) bool {
274  		lang, _ := key.(Language)
275  		session, _ := value.(string)
276  		if session == "" {
277  			return true
278  		}
279  		// Skip if already collected from jupyterClientMap to avoid duplicates.
280  		if _, exists := seen[session]; exists {
281  			return true
282  		}
283  		contexts = append(contexts, CodeContext{ID: session, Language: lang})
284  		return true
285  	})
286  
287  	return contexts, nil
288  }
289  
290  func (c *Controller) listLanguageContexts(language Language) ([]CodeContext, error) {
291  	contexts := make([]CodeContext, 0)
292  	seen := make(map[string]struct{})
293  
294  	c.jupyterClientMap.Range(func(key, value any) bool {
295  		session, _ := key.(string)
296  		if kernel, ok := value.(*jupyterKernel); ok && kernel != nil && kernel.language == language {
297  			contexts = append(contexts, CodeContext{ID: session, Language: language})
298  			seen[session] = struct{}{}
299  		}
300  		return true
301  	})
302  
303  	if defaultContext := c.getDefaultLanguageSession(language); defaultContext != "" {
304  		// Skip if already collected from jupyterClientMap to avoid duplicates.
305  		if _, exists := seen[defaultContext]; !exists {
306  			contexts = append(contexts, CodeContext{ID: defaultContext, Language: language})
307  		}
308  	}
309  
310  	return contexts, nil
311  }