/ components / egress / pkg / dnsproxy / upstream.go
upstream.go
  1  // Copyright 2026 Alibaba Group Holding Ltd.
  2  //
  3  // Licensed under the Apache License, Version 2.0 (the "License");
  4  // you may not use this file except in compliance with the License.
  5  // You may obtain a copy of the License at
  6  //
  7  //     http://www.apache.org/licenses/LICENSE-2.0
  8  //
  9  // Unless required by applicable law or agreed to in writing, software
 10  // distributed under the License is distributed on an "AS IS" BASIS,
 11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  // See the License for the specific language governing permissions and
 13  // limitations under the License.
 14  
 15  package dnsproxy
 16  
 17  import (
 18  	"context"
 19  	"os"
 20  	"strconv"
 21  	"strings"
 22  	"sync"
 23  	"time"
 24  
 25  	"github.com/miekg/dns"
 26  
 27  	"github.com/alibaba/opensandbox/internal/safego"
 28  
 29  	"github.com/alibaba/opensandbox/egress/pkg/constants"
 30  	"github.com/alibaba/opensandbox/egress/pkg/log"
 31  )
 32  
 33  const defaultUpstreamProbeInterval = 30 * time.Second
 34  
 35  func upstreamProbeIntervalFromEnv() time.Duration {
 36  	s := strings.TrimSpace(os.Getenv(constants.EnvDNSUpstreamProbeIntervalSec))
 37  	if s == "" {
 38  		return defaultUpstreamProbeInterval
 39  	}
 40  	n, err := strconv.Atoi(s)
 41  	if err != nil || n < 1 {
 42  		return defaultUpstreamProbeInterval
 43  	}
 44  	if n > 3600 {
 45  		n = 3600
 46  	}
 47  	return time.Duration(n) * time.Second
 48  }
 49  
 50  // upstreamProbeFromEnv returns the DNS question used for upstream liveness checks.
 51  // Default is root IN NS (primers/recursors answer without resolving a public TLD).
 52  // Set OPENSANDBOX_EGRESS_DNS_UPSTREAM_PROBE to an FQDN that your resolvers always
 53  // answer (e.g. split-horizon internal name) when the default is inappropriate.
 54  func upstreamProbeFromEnv() (name string, qtype uint16) {
 55  	raw := strings.TrimSpace(os.Getenv(constants.EnvDNSUpstreamProbe))
 56  	if raw == "" || raw == "." {
 57  		return ".", dns.TypeNS
 58  	}
 59  	return dns.Fqdn(raw), dns.TypeA
 60  }
 61  
 62  func (p *Proxy) runUpstreamProbes(ctx context.Context) {
 63  	p.probeUpstreams()
 64  
 65  	t := time.NewTicker(p.upstreamProbeInterval)
 66  	defer t.Stop()
 67  	for {
 68  		select {
 69  		case <-ctx.Done():
 70  			return
 71  		case <-t.C:
 72  			p.probeUpstreams()
 73  		}
 74  	}
 75  }
 76  
 77  // forwardUpstreams returns the ordered list used for DNS forwarding: last healthy probe
 78  // results when non-empty; otherwise the configured chain (e.g. before first probe).
 79  func (p *Proxy) forwardUpstreams() []string {
 80  	p.upstreamMu.RLock()
 81  	active := p.activeUpstreams
 82  	p.upstreamMu.RUnlock()
 83  	if len(active) > 0 {
 84  		return active
 85  	}
 86  	return p.upstreams
 87  }
 88  
 89  // probeUpstreams checks each configured resolver in parallel and refreshes activeUpstreams.
 90  // If every probe fails, the full upstream list is kept so forwarding still attempts all resolvers.
 91  func (p *Proxy) probeUpstreams() {
 92  	all := p.upstreams
 93  	if len(all) == 0 {
 94  		return
 95  	}
 96  
 97  	timeout := probeExchangeTimeout(p.upstreamExchangeTimeout)
 98  	healthy := make([]bool, len(all))
 99  	var wg sync.WaitGroup
100  	for i := range all {
101  		wg.Add(1)
102  		idx := i
103  		addr := all[i]
104  		safego.Go(func() {
105  			defer wg.Done()
106  			healthy[idx] = p.probeOneUpstream(addr, timeout)
107  		})
108  	}
109  	wg.Wait()
110  
111  	var active []string
112  	for i := range all {
113  		if healthy[i] {
114  			active = append(active, all[i])
115  		}
116  	}
117  	if len(active) == 0 {
118  		log.Warnf("[dns] all upstream probes failed; using full upstream list for forwarding")
119  		active = append([]string(nil), all...)
120  	}
121  
122  	p.upstreamMu.Lock()
123  	p.activeUpstreams = active
124  	p.upstreamMu.Unlock()
125  }
126  
127  func probeExchangeTimeout(upstreamTimeout time.Duration) time.Duration {
128  	const maxProbe = 2 * time.Second
129  	if upstreamTimeout <= 0 {
130  		return maxProbe
131  	}
132  	if upstreamTimeout > maxProbe {
133  		return maxProbe
134  	}
135  	return upstreamTimeout
136  }
137  
138  func (p *Proxy) probeOneUpstream(addr string, timeout time.Duration) bool {
139  	m := new(dns.Msg)
140  	m.SetQuestion(p.upstreamProbeName, p.upstreamProbeQType)
141  	m.RecursionDesired = true
142  
143  	c := &dns.Client{
144  		Timeout: timeout,
145  		Dialer:  p.dialerForUpstream(addr),
146  	}
147  	resp, _, err := c.Exchange(m, addr)
148  	if err != nil {
149  		log.Errorf("[dns] upstream probe %s failed: %v", addr, err)
150  		return false
151  	}
152  	return resp != nil
153  }