upstream.go
1 // Copyright 2026 Alibaba Group Holding Ltd. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 package dnsproxy 16 17 import ( 18 "context" 19 "os" 20 "strconv" 21 "strings" 22 "sync" 23 "time" 24 25 "github.com/miekg/dns" 26 27 "github.com/alibaba/opensandbox/internal/safego" 28 29 "github.com/alibaba/opensandbox/egress/pkg/constants" 30 "github.com/alibaba/opensandbox/egress/pkg/log" 31 ) 32 33 const defaultUpstreamProbeInterval = 30 * time.Second 34 35 func upstreamProbeIntervalFromEnv() time.Duration { 36 s := strings.TrimSpace(os.Getenv(constants.EnvDNSUpstreamProbeIntervalSec)) 37 if s == "" { 38 return defaultUpstreamProbeInterval 39 } 40 n, err := strconv.Atoi(s) 41 if err != nil || n < 1 { 42 return defaultUpstreamProbeInterval 43 } 44 if n > 3600 { 45 n = 3600 46 } 47 return time.Duration(n) * time.Second 48 } 49 50 // upstreamProbeFromEnv returns the DNS question used for upstream liveness checks. 51 // Default is root IN NS (primers/recursors answer without resolving a public TLD). 52 // Set OPENSANDBOX_EGRESS_DNS_UPSTREAM_PROBE to an FQDN that your resolvers always 53 // answer (e.g. split-horizon internal name) when the default is inappropriate. 54 func upstreamProbeFromEnv() (name string, qtype uint16) { 55 raw := strings.TrimSpace(os.Getenv(constants.EnvDNSUpstreamProbe)) 56 if raw == "" || raw == "." { 57 return ".", dns.TypeNS 58 } 59 return dns.Fqdn(raw), dns.TypeA 60 } 61 62 func (p *Proxy) runUpstreamProbes(ctx context.Context) { 63 p.probeUpstreams() 64 65 t := time.NewTicker(p.upstreamProbeInterval) 66 defer t.Stop() 67 for { 68 select { 69 case <-ctx.Done(): 70 return 71 case <-t.C: 72 p.probeUpstreams() 73 } 74 } 75 } 76 77 // forwardUpstreams returns the ordered list used for DNS forwarding: last healthy probe 78 // results when non-empty; otherwise the configured chain (e.g. before first probe). 79 func (p *Proxy) forwardUpstreams() []string { 80 p.upstreamMu.RLock() 81 active := p.activeUpstreams 82 p.upstreamMu.RUnlock() 83 if len(active) > 0 { 84 return active 85 } 86 return p.upstreams 87 } 88 89 // probeUpstreams checks each configured resolver in parallel and refreshes activeUpstreams. 90 // If every probe fails, the full upstream list is kept so forwarding still attempts all resolvers. 91 func (p *Proxy) probeUpstreams() { 92 all := p.upstreams 93 if len(all) == 0 { 94 return 95 } 96 97 timeout := probeExchangeTimeout(p.upstreamExchangeTimeout) 98 healthy := make([]bool, len(all)) 99 var wg sync.WaitGroup 100 for i := range all { 101 wg.Add(1) 102 idx := i 103 addr := all[i] 104 safego.Go(func() { 105 defer wg.Done() 106 healthy[idx] = p.probeOneUpstream(addr, timeout) 107 }) 108 } 109 wg.Wait() 110 111 var active []string 112 for i := range all { 113 if healthy[i] { 114 active = append(active, all[i]) 115 } 116 } 117 if len(active) == 0 { 118 log.Warnf("[dns] all upstream probes failed; using full upstream list for forwarding") 119 active = append([]string(nil), all...) 120 } 121 122 p.upstreamMu.Lock() 123 p.activeUpstreams = active 124 p.upstreamMu.Unlock() 125 } 126 127 func probeExchangeTimeout(upstreamTimeout time.Duration) time.Duration { 128 const maxProbe = 2 * time.Second 129 if upstreamTimeout <= 0 { 130 return maxProbe 131 } 132 if upstreamTimeout > maxProbe { 133 return maxProbe 134 } 135 return upstreamTimeout 136 } 137 138 func (p *Proxy) probeOneUpstream(addr string, timeout time.Duration) bool { 139 m := new(dns.Msg) 140 m.SetQuestion(p.upstreamProbeName, p.upstreamProbeQType) 141 m.RecursionDesired = true 142 143 c := &dns.Client{ 144 Timeout: timeout, 145 Dialer: p.dialerForUpstream(addr), 146 } 147 resp, _, err := c.Exchange(m, addr) 148 if err != nil { 149 log.Errorf("[dns] upstream probe %s failed: %v", addr, err) 150 return false 151 } 152 return resp != nil 153 }