context.go
1 // Copyright 2025 Alibaba Group Holding Ltd. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 package runtime 16 17 import ( 18 "errors" 19 "fmt" 20 "net/http" 21 "os" 22 "path/filepath" 23 "strings" 24 25 "github.com/google/uuid" 26 "k8s.io/client-go/util/retry" 27 28 "github.com/alibaba/opensandbox/execd/pkg/jupyter" 29 jupytersession "github.com/alibaba/opensandbox/execd/pkg/jupyter/session" 30 "github.com/alibaba/opensandbox/execd/pkg/log" 31 "github.com/alibaba/opensandbox/execd/pkg/util/pathutil" 32 ) 33 34 // CreateContext provisions a kernel-backed session and returns its ID. 35 // Bash language uses Jupyter kernel like other languages; for pipe-based bash sessions use CreateBashSession (session API). 36 func (c *Controller) CreateContext(req *CreateContextRequest) (string, error) { 37 // Create a new Jupyter session. 38 var ( 39 client *jupyter.Client 40 session *jupytersession.Session 41 err error 42 ) 43 44 err = retry.OnError(kernelWaitingBackoff, func(err error) bool { 45 log.Error("failed to create session, retrying: %v", err) 46 return err != nil 47 }, func() error { 48 client, session, err = c.createJupyterContext(*req) 49 return err 50 }) 51 if err != nil { 52 return "", err 53 } 54 55 kernel := &jupyterKernel{ 56 kernelID: session.Kernel.ID, 57 client: client, 58 language: req.Language, 59 } 60 c.storeJupyterKernel(session.ID, kernel) 61 62 err = c.setWorkingDir(kernel, req) 63 if err != nil { 64 return "", fmt.Errorf("failed to setup working dir: %w", err) 65 } 66 67 return session.ID, nil 68 } 69 70 func (c *Controller) DeleteContext(session string) error { 71 return c.deleteSessionAndCleanup(session) 72 } 73 74 func (c *Controller) GetContext(session string) (CodeContext, error) { 75 kernel := c.getJupyterKernel(session) 76 if kernel == nil { 77 return CodeContext{}, ErrContextNotFound 78 } 79 return CodeContext{ 80 ID: session, 81 Language: kernel.language, 82 }, nil 83 } 84 85 func (c *Controller) ListContext(language string) ([]CodeContext, error) { 86 switch language { 87 case Command.String(), BackgroundCommand.String(), SQL.String(): 88 return nil, fmt.Errorf("unsupported language context operation: %s", language) 89 case "": 90 return c.listAllContexts() 91 default: 92 return c.listLanguageContexts(Language(language)) 93 } 94 } 95 96 func (c *Controller) DeleteLanguageContext(language Language) error { 97 contexts, err := c.listLanguageContexts(language) 98 if err != nil { 99 return err 100 } 101 102 seen := make(map[string]struct{}) 103 for _, context := range contexts { 104 if _, ok := seen[context.ID]; ok { 105 continue 106 } 107 seen[context.ID] = struct{}{} 108 109 if err := c.deleteSessionAndCleanup(context.ID); err != nil { 110 return fmt.Errorf("error deleting context %s: %w", context.ID, err) 111 } 112 } 113 return nil 114 } 115 116 func (c *Controller) deleteSessionAndCleanup(session string) error { 117 if c.getJupyterKernel(session) == nil { 118 return ErrContextNotFound 119 } 120 if err := c.jupyterClient().DeleteSession(session); err != nil { 121 return err 122 } 123 c.jupyterClientMap.Delete(session) 124 c.deleteDefaultSessionByID(session) 125 return nil 126 } 127 128 func (c *Controller) newContextID() string { 129 return strings.ReplaceAll(uuid.New().String(), "-", "") 130 } 131 132 func (c *Controller) newIpynbPath(sessionID, cwd string) (string, error) { 133 resolvedCwd, err := pathutil.ExpandPath(cwd) 134 if err != nil { 135 return "", err 136 } 137 if cwd != "" { 138 err := os.MkdirAll(resolvedCwd, os.ModePerm) 139 if err != nil { 140 return "", err 141 } 142 } 143 144 return filepath.Join(resolvedCwd, fmt.Sprintf("%s.ipynb", sessionID)), nil 145 } 146 147 // createDefaultLanguageJupyterContext prewarms a session for stateless execution. 148 func (c *Controller) createDefaultLanguageJupyterContext(language Language) error { 149 if c.getDefaultLanguageSession(language) != "" { 150 return nil 151 } 152 153 var ( 154 client *jupyter.Client 155 session *jupytersession.Session 156 err error 157 ) 158 err = retry.OnError(kernelWaitingBackoff, func(err error) bool { 159 log.Error("failed to create context, retrying: %v", err) 160 return err != nil 161 }, func() error { 162 client, session, err = c.createJupyterContext(CreateContextRequest{ 163 Language: language, 164 Cwd: "", 165 }) 166 return err 167 }) 168 if err != nil { 169 return err 170 } 171 172 c.setDefaultLanguageSession(language, session.ID) 173 c.jupyterClientMap.Store(session.ID, &jupyterKernel{ 174 kernelID: session.Kernel.ID, 175 client: client, 176 language: language, 177 }) 178 return nil 179 } 180 181 // createJupyterContext performs the actual context creation workflow. 182 func (c *Controller) createJupyterContext(request CreateContextRequest) (*jupyter.Client, *jupytersession.Session, error) { 183 client := c.jupyterClient() 184 185 kernel, err := c.searchKernel(client, request.Language) 186 if err != nil { 187 return nil, nil, err 188 } 189 190 sessionID := c.newContextID() 191 ipynb, err := c.newIpynbPath(sessionID, request.Cwd) 192 if err != nil { 193 return nil, nil, err 194 } 195 196 jupyterSession, err := client.CreateSession(sessionID, ipynb, kernel) 197 if err != nil { 198 return nil, nil, err 199 } 200 201 kernels, err := client.ListKernels() 202 if err != nil { 203 return nil, nil, err 204 } 205 206 found := false 207 for _, k := range kernels { 208 if k.ID == jupyterSession.Kernel.ID { 209 found = true 210 break 211 } 212 } 213 if !found { 214 return nil, nil, errors.New("kernel not found") 215 } 216 217 return client, jupyterSession, nil 218 } 219 220 // storeJupyterKernel caches a session -> kernel mapping. 221 func (c *Controller) storeJupyterKernel(sessionID string, kernel *jupyterKernel) { 222 c.jupyterClientMap.Store(sessionID, kernel) 223 } 224 225 func (c *Controller) jupyterClient() *jupyter.Client { 226 httpClient := &http.Client{ 227 Transport: &jupyter.AuthTransport{ 228 Token: c.token, 229 Base: http.DefaultTransport, 230 }, 231 } 232 233 return jupyter.NewClient(c.baseURL, 234 jupyter.WithToken(c.token), 235 jupyter.WithHTTPClient(httpClient)) 236 } 237 238 func (c *Controller) getDefaultLanguageSession(language Language) string { 239 if v, ok := c.defaultLanguageSessions.Load(language); ok { 240 if session, ok := v.(string); ok { 241 return session 242 } 243 } 244 return "" 245 } 246 247 func (c *Controller) setDefaultLanguageSession(language Language, sessionID string) { 248 c.defaultLanguageSessions.Store(language, sessionID) 249 } 250 251 func (c *Controller) deleteDefaultSessionByID(sessionID string) { 252 c.defaultLanguageSessions.Range(func(key, value any) bool { 253 if s, ok := value.(string); ok && s == sessionID { 254 c.defaultLanguageSessions.Delete(key) 255 } 256 return true 257 }) 258 } 259 260 func (c *Controller) listAllContexts() ([]CodeContext, error) { 261 contexts := make([]CodeContext, 0) 262 seen := make(map[string]struct{}) 263 264 c.jupyterClientMap.Range(func(key, value any) bool { 265 session, _ := key.(string) 266 if kernel, ok := value.(*jupyterKernel); ok && kernel != nil { 267 contexts = append(contexts, CodeContext{ID: session, Language: kernel.language}) 268 seen[session] = struct{}{} 269 } 270 return true 271 }) 272 273 c.defaultLanguageSessions.Range(func(key, value any) bool { 274 lang, _ := key.(Language) 275 session, _ := value.(string) 276 if session == "" { 277 return true 278 } 279 // Skip if already collected from jupyterClientMap to avoid duplicates. 280 if _, exists := seen[session]; exists { 281 return true 282 } 283 contexts = append(contexts, CodeContext{ID: session, Language: lang}) 284 return true 285 }) 286 287 return contexts, nil 288 } 289 290 func (c *Controller) listLanguageContexts(language Language) ([]CodeContext, error) { 291 contexts := make([]CodeContext, 0) 292 seen := make(map[string]struct{}) 293 294 c.jupyterClientMap.Range(func(key, value any) bool { 295 session, _ := key.(string) 296 if kernel, ok := value.(*jupyterKernel); ok && kernel != nil && kernel.language == language { 297 contexts = append(contexts, CodeContext{ID: session, Language: language}) 298 seen[session] = struct{}{} 299 } 300 return true 301 }) 302 303 if defaultContext := c.getDefaultLanguageSession(language); defaultContext != "" { 304 // Skip if already collected from jupyterClientMap to avoid duplicates. 305 if _, exists := seen[defaultContext]; !exists { 306 contexts = append(contexts, CodeContext{ID: defaultContext, Language: language}) 307 } 308 } 309 310 return contexts, nil 311 }