/ internal / memory / client.go
client.go
  1  package memory
  2  
  3  import (
  4  	"bytes"
  5  	"context"
  6  	"crypto/rand"
  7  	"encoding/hex"
  8  	"encoding/json"
  9  	"errors"
 10  	"fmt"
 11  	"io"
 12  	"net"
 13  	"net/http"
 14  	"time"
 15  )
 16  
 17  type ctxKey int
 18  
 19  const requestIDKey ctxKey = 0
 20  
 21  // WithRequestID stamps an X-Request-ID onto ctx so the client propagates it.
 22  // Used by callers that already have a correlation ID (e.g. agent-loop run id).
 23  func WithRequestID(ctx context.Context, id string) context.Context {
 24  	return context.WithValue(ctx, requestIDKey, id)
 25  }
 26  
 27  func requestIDFrom(ctx context.Context) string {
 28  	if v, ok := ctx.Value(requestIDKey).(string); ok && v != "" {
 29  		return v
 30  	}
 31  	var b [6]byte
 32  	_, _ = rand.Read(b[:])
 33  	return "req-" + hex.EncodeToString(b[:])
 34  }
 35  
 36  // Client is the UDS HTTP client for the Kocoro Cloud memory sidecar.
 37  // Per-request timeout applies to the whole exchange; dial uses a 2s
 38  // ctx-cancelable Dialer so a canceled context returns immediately.
 39  type Client struct {
 40  	socket string
 41  	httpc  *http.Client
 42  }
 43  
 44  func NewClient(socket string, timeout time.Duration) *Client {
 45  	tr := &http.Transport{
 46  		DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
 47  			d := net.Dialer{Timeout: 2 * time.Second}
 48  			return d.DialContext(ctx, "unix", socket)
 49  		},
 50  		DisableKeepAlives: false,
 51  		MaxIdleConns:      4,
 52  	}
 53  	return &Client{socket: socket, httpc: &http.Client{Transport: tr, Timeout: timeout}}
 54  }
 55  
 56  func (c *Client) do(ctx context.Context, method, path string, body any, out any) (int, string, error) {
 57  	rid := requestIDFrom(ctx)
 58  	var rdr io.Reader
 59  	if body != nil {
 60  		b, err := json.Marshal(body)
 61  		if err != nil {
 62  			return 0, rid, fmt.Errorf("%w: marshal: %v", ErrTransport, err)
 63  		}
 64  		rdr = bytes.NewReader(b)
 65  	}
 66  	req, err := http.NewRequestWithContext(ctx, method, "http://unix"+path, rdr)
 67  	if err != nil {
 68  		return 0, rid, fmt.Errorf("%w: build request: %v", ErrTransport, err)
 69  	}
 70  	req.Header.Set("X-Request-ID", rid)
 71  	if body != nil {
 72  		req.Header.Set("Content-Type", "application/json")
 73  	}
 74  	resp, err := c.httpc.Do(req)
 75  	if err != nil {
 76  		return 0, rid, fmt.Errorf("%w: %v", ErrTransport, err)
 77  	}
 78  	defer resp.Body.Close()
 79  	if out != nil {
 80  		if err := json.NewDecoder(resp.Body).Decode(out); err != nil {
 81  			return resp.StatusCode, rid, fmt.Errorf("%w: decode: %v", ErrTransport, err)
 82  		}
 83  	}
 84  	return resp.StatusCode, rid, nil
 85  }
 86  
 87  // Query returns (env, class, err). Contract:
 88  //   - class is the authoritative branch for the caller. Tool fallback / retry /
 89  //     surface-to-LLM logic should read class only.
 90  //   - err is non-nil for transport-level failures (refused, timeout, EOF, ctx
 91  //     cancel) AND for response-body decode failures on 200. In both cases
 92  //     class == ClassUnavailable and env == nil. The caller may log err for
 93  //     diagnostics but must not branch on it.
 94  //   - For sidecar-reported envelope errors (any non-200), err == nil, env
 95  //     != nil, class is decided by errclass.ClassifyHTTP.
 96  func (c *Client) Query(ctx context.Context, intent QueryIntent) (*ResponseEnvelope, ErrorClass, error) {
 97  	rid := requestIDFrom(ctx)
 98  	req := QueryRequest{Intent: intent, RequestID: &rid}
 99  	var env ResponseEnvelope
100  	status, _, err := c.do(WithRequestID(ctx, rid), http.MethodPost, "/query", req, &env)
101  	if err != nil {
102  		if errors.Is(err, ErrTransport) {
103  			return nil, ClassUnavailable, err
104  		}
105  		return nil, ClassUnavailable, err
106  	}
107  	return &env, ClassifyHTTP(status, &env), nil
108  }
109  
110  func (c *Client) Reload(ctx context.Context) (*ReloadResponse, error) {
111  	var r ReloadResponse
112  	status, _, err := c.do(ctx, http.MethodPost, "/bundle/reload", struct{}{}, &r)
113  	if err != nil {
114  		return nil, err
115  	}
116  	if status == 409 {
117  		return &r, fmt.Errorf("reload_in_progress")
118  	}
119  	return &r, nil
120  }
121  
122  func (c *Client) Health(ctx context.Context) (*HealthPayload, error) {
123  	var h HealthPayload
124  	_, _, err := c.do(ctx, http.MethodGet, "/health", nil, &h)
125  	if err != nil {
126  		return nil, err
127  	}
128  	return &h, nil
129  }