/ RNS / Interfaces / util / netinfo.py
netinfo.py
  1  # MIT License
  2  #
  3  # Copyright (c) 2014 Stefan C. Mueller
  4  # Copyright (c) 2025 Mark Qvist
  5  #
  6  # Permission is hereby granted, free of charge, to any person obtaining a copy
  7  # of this software and associated documentation files (the "Software"), to deal
  8  # in the Software without restriction, including without limitation the rights
  9  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 10  # copies of the Software, and to permit persons to whom the Software is
 11  # furnished to do so, subject to the following conditions:
 12  #
 13  # The above copyright notice and this permission notice shall be included in all
 14  # copies or substantial portions of the Software.
 15  #
 16  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 17  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 18  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 19  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 20  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 21  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 22  # SOFTWARE.
 23  
 24  import os
 25  import socket
 26  import ipaddress
 27  import platform
 28  import ctypes.util
 29  import collections
 30  from typing import List, Iterable, Optional, Tuple, Union
 31  
 32  AF_INET6 = socket.AF_INET6.value
 33  AF_INET = socket.AF_INET.value
 34  
 35  def interfaces() -> List[str]:
 36      adapters = get_adapters(include_unconfigured=True)
 37      return [a.name for a in adapters]
 38  
 39  def interface_names_to_indexes() -> dict:
 40      adapters = get_adapters(include_unconfigured=True)
 41      results = {}
 42      for adapter in adapters:
 43          results[adapter.name] = adapter.index
 44      return results
 45  
 46  def interface_name_to_nice_name(ifname) -> str:
 47      try:
 48          adapters = get_adapters(include_unconfigured=True)
 49          for adapter in adapters:
 50              if adapter.name == ifname:
 51                  if hasattr(adapter, "nice_name"):
 52                      return adapter.nice_name
 53  
 54      except: return None
 55      return None
 56  
 57  def ifaddresses(ifname) -> dict:
 58      adapters = get_adapters(include_unconfigured=True)
 59      ifa = {}
 60      for a in adapters:
 61          if a.name == ifname:
 62              ipv4s = []
 63              ipv6s = []
 64              for ip in a.ips:
 65                  t = {}
 66                  if ip.is_IPv4:
 67                      net = ipaddress.ip_network(str(ip.ip)+"/"+str(ip.network_prefix), strict=False)
 68                      t["addr"] = ip.ip
 69                      t["prefix"] = ip.network_prefix
 70                      t["broadcast"] = str(net.broadcast_address)
 71                      ipv4s.append(t)
 72                  if ip.is_IPv6:
 73                      t["addr"] = ip.ip[0]
 74                      ipv6s.append(t)
 75  
 76              if len(ipv4s) > 0: ifa[AF_INET] = ipv4s
 77              if len(ipv6s) > 0: ifa[AF_INET6] = ipv6s
 78  
 79      return ifa
 80  
 81  def get_adapters(include_unconfigured=False):
 82      if os.name == "posix": return _get_adapters_posix(include_unconfigured=include_unconfigured)
 83      elif os.name == "nt":  return _get_adapters_win(include_unconfigured=include_unconfigured)
 84      else: raise RuntimeError(f"Unsupported Operating System: {os.name}")
 85  
 86  class Adapter(object):
 87      def __init__(self, name: str, nice_name: str, ips: List["IP"], index: Optional[int] = None) -> None:
 88          self.name = name
 89          self.nice_name = nice_name
 90          self.ips = ips
 91          self.index = index
 92  
 93      def __repr__(self) -> str:
 94          return "Adapter(name={name}, nice_name={nice_name}, ips={ips}, index={index})".format(
 95              name=repr(self.name), nice_name=repr(self.nice_name), ips=repr(self.ips), index=repr(self.index))
 96  
 97  _IPv4Address = str
 98  _IPv6Address = Tuple[str, int, int]
 99  class IP(object):
100      def __init__(self, ip: Union[_IPv4Address, _IPv6Address], network_prefix: int, nice_name: str) -> None:
101          self.ip = ip
102          self.network_prefix = network_prefix
103          self.nice_name = nice_name
104  
105      @property
106      def is_IPv4(self) -> bool: return not isinstance(self.ip, tuple)
107  
108      @property
109      def is_IPv6(self) -> bool: return isinstance(self.ip, tuple)
110  
111      def __repr__(self) -> str:
112          return "IP(ip={ip}, network_prefix={network_prefix}, nice_name={nice_name})".format(ip=repr(self.ip), network_prefix=repr(self.network_prefix), nice_name=repr(self.nice_name))
113  
114  if platform.system() == "Darwin" or "BSD" in platform.system():
115      class sockaddr(ctypes.Structure):
116          _fields_ = [
117              ("sa_len", ctypes.c_uint8),
118              ("sa_familiy", ctypes.c_uint8),
119              ("sa_data", ctypes.c_uint8 * 14)]
120  
121      class sockaddr_in(ctypes.Structure):
122          _fields_ = [
123              ("sa_len", ctypes.c_uint8),
124              ("sa_familiy", ctypes.c_uint8),
125              ("sin_port", ctypes.c_uint16),
126              ("sin_addr", ctypes.c_uint8 * 4),
127              ("sin_zero", ctypes.c_uint8 * 8)]
128  
129      class sockaddr_in6(ctypes.Structure):
130          _fields_ = [
131              ("sa_len", ctypes.c_uint8),
132              ("sa_familiy", ctypes.c_uint8),
133              ("sin6_port", ctypes.c_uint16),
134              ("sin6_flowinfo", ctypes.c_uint32),
135              ("sin6_addr", ctypes.c_uint8 * 16),
136              ("sin6_scope_id", ctypes.c_uint32)]
137  
138  else:
139      class sockaddr(ctypes.Structure):  # type: ignore
140          _fields_ = [("sa_familiy", ctypes.c_uint16), ("sa_data", ctypes.c_uint8 * 14)]
141  
142      class sockaddr_in(ctypes.Structure):  # type: ignore
143          _fields_ = [
144              ("sin_familiy", ctypes.c_uint16),
145              ("sin_port", ctypes.c_uint16),
146              ("sin_addr", ctypes.c_uint8 * 4),
147              ("sin_zero", ctypes.c_uint8 * 8)]
148  
149      class sockaddr_in6(ctypes.Structure):  # type: ignore
150          _fields_ = [
151              ("sin6_familiy", ctypes.c_uint16),
152              ("sin6_port", ctypes.c_uint16),
153              ("sin6_flowinfo", ctypes.c_uint32),
154              ("sin6_addr", ctypes.c_uint8 * 16),
155              ("sin6_scope_id", ctypes.c_uint32)]
156  
157  def sockaddr_to_ip(sockaddr_ptr: "ctypes.pointer[sockaddr]") -> Optional[Union[_IPv4Address, _IPv6Address]]:
158      if sockaddr_ptr:
159          if sockaddr_ptr[0].sa_familiy == socket.AF_INET:
160              ipv4 = ctypes.cast(sockaddr_ptr, ctypes.POINTER(sockaddr_in))
161              ippacked = bytes(bytearray(ipv4[0].sin_addr))
162              ip = str(ipaddress.ip_address(ippacked))
163              return ip
164          elif sockaddr_ptr[0].sa_familiy == socket.AF_INET6:
165              ipv6 = ctypes.cast(sockaddr_ptr, ctypes.POINTER(sockaddr_in6))
166              flowinfo = ipv6[0].sin6_flowinfo
167              ippacked = bytes(bytearray(ipv6[0].sin6_addr))
168              ip = str(ipaddress.ip_address(ippacked))
169              scope_id = ipv6[0].sin6_scope_id
170              return (ip, flowinfo, scope_id)
171      return None
172  
173  
174  def ipv6_prefixlength(address: ipaddress.IPv6Address) -> int:
175      prefix_length = 0
176      for i in range(address.max_prefixlen):
177          if int(address) >> i & 1: prefix_length = prefix_length + 1
178      return prefix_length
179  
180  if os.name == "posix":
181      class ifaddrs(ctypes.Structure): pass
182      ifaddrs._fields_ = [
183          ("ifa_next", ctypes.POINTER(ifaddrs)),
184          ("ifa_name", ctypes.c_char_p),
185          ("ifa_flags", ctypes.c_uint),
186          ("ifa_addr", ctypes.POINTER(sockaddr)),
187          ("ifa_netmask", ctypes.POINTER(sockaddr)),]
188  
189      libc = ctypes.CDLL(ctypes.util.find_library("socket" if os.uname()[0] == "SunOS" else "c"), use_errno=True) # type: ignore
190  
191      def _get_adapters_posix(include_unconfigured: bool = False) -> Iterable[Adapter]:
192          addr0 = addr = ctypes.POINTER(ifaddrs)()
193          retval = libc.getifaddrs(ctypes.byref(addr))
194          if retval != 0:
195              eno = ctypes.get_errno()
196              raise OSError(eno, os.strerror(eno))
197  
198          ips = collections.OrderedDict()
199  
200          def add_ip(adapter_name: str, ip: Optional[IP]) -> None:
201              if adapter_name not in ips:
202                  index = None  # type: Optional[int]
203                  try:
204                      index = socket.if_nametoindex(adapter_name) # type: ignore
205                  except (OSError, AttributeError): pass
206                  ips[adapter_name] = Adapter(adapter_name, adapter_name, [], index=index)
207              if ip is not None:
208                  ips[adapter_name].ips.append(ip)
209  
210          while addr:
211              name = addr[0].ifa_name.decode(encoding="UTF-8")
212              ip_addr = sockaddr_to_ip(addr[0].ifa_addr)
213              if ip_addr:
214                  if addr[0].ifa_netmask and not addr[0].ifa_netmask[0].sa_familiy:
215                      addr[0].ifa_netmask[0].sa_familiy = addr[0].ifa_addr[0].sa_familiy
216                  netmask = sockaddr_to_ip(addr[0].ifa_netmask)
217                  if isinstance(netmask, tuple):
218                      netmaskStr = str(netmask[0])
219                      prefixlen = ipv6_prefixlength(ipaddress.IPv6Address(netmaskStr))
220                  else:
221                      assert netmask is not None, f"sockaddr_to_ip({addr[0].ifa_netmask}) returned None"
222                      netmaskStr = str("0.0.0.0/" + netmask)
223                      prefixlen = ipaddress.IPv4Network(netmaskStr).prefixlen
224                  ip = IP(ip_addr, prefixlen, name)
225                  add_ip(name, ip)
226              else:
227                  if include_unconfigured:
228                      add_ip(name, None)
229              addr = addr[0].ifa_next
230  
231          libc.freeifaddrs(addr0)
232          return ips.values()
233  
234  elif os.name == "nt":
235      from ctypes import wintypes
236      NO_ERROR = 0
237      ERROR_BUFFER_OVERFLOW = 111
238      MAX_ADAPTER_NAME_LENGTH = 256
239      MAX_ADAPTER_DESCRIPTION_LENGTH = 128
240      MAX_ADAPTER_ADDRESS_LENGTH = 8
241      AF_UNSPEC = 0
242  
243      class SOCKET_ADDRESS(ctypes.Structure): _fields_ = [("lpSockaddr", ctypes.POINTER(sockaddr)), ("iSockaddrLength", wintypes.INT)]
244      class IP_ADAPTER_UNICAST_ADDRESS(ctypes.Structure): pass
245      IP_ADAPTER_UNICAST_ADDRESS._fields_ = [
246          ("Length", wintypes.ULONG),
247          ("Flags", wintypes.DWORD),
248          ("Next", ctypes.POINTER(IP_ADAPTER_UNICAST_ADDRESS)),
249          ("Address", SOCKET_ADDRESS),
250          ("PrefixOrigin", ctypes.c_uint),
251          ("SuffixOrigin", ctypes.c_uint),
252          ("DadState", ctypes.c_uint),
253          ("ValidLifetime", wintypes.ULONG),
254          ("PreferredLifetime", wintypes.ULONG),
255          ("LeaseLifetime", wintypes.ULONG),
256          ("OnLinkPrefixLength", ctypes.c_uint8)]
257  
258      class IP_ADAPTER_ADDRESSES(ctypes.Structure): pass
259      IP_ADAPTER_ADDRESSES._fields_ = [
260          ("Length", wintypes.ULONG),
261          ("IfIndex", wintypes.DWORD),
262          ("Next", ctypes.POINTER(IP_ADAPTER_ADDRESSES)),
263          ("AdapterName", ctypes.c_char_p),
264          ("FirstUnicastAddress", ctypes.POINTER(IP_ADAPTER_UNICAST_ADDRESS)),
265          ("FirstAnycastAddress", ctypes.c_void_p),
266          ("FirstMulticastAddress", ctypes.c_void_p),
267          ("FirstDnsServerAddress", ctypes.c_void_p),
268          ("DnsSuffix", ctypes.c_wchar_p),
269          ("Description", ctypes.c_wchar_p),
270          ("FriendlyName", ctypes.c_wchar_p)]
271  
272      iphlpapi = ctypes.windll.LoadLibrary("Iphlpapi") # type: ignore
273  
274      def _enumerate_interfaces_of_adapter_win(nice_name: str, address: IP_ADAPTER_UNICAST_ADDRESS) -> Iterable[IP]:
275          # Iterate through linked list and fill list
276          addresses = [] # type: List[IP_ADAPTER_UNICAST_ADDRESS]
277          while True:
278              addresses.append(address)
279              if not address.Next: break
280              address = address.Next[0]
281  
282          for address in addresses:
283              ip = sockaddr_to_ip(address.Address.lpSockaddr)
284              assert ip is not None, f"sockaddr_to_ip({address.Address.lpSockaddr}) returned None"
285              network_prefix = address.OnLinkPrefixLength
286              yield IP(ip, network_prefix, nice_name)
287  
288      def _get_adapters_win(include_unconfigured: bool = False) -> Iterable[Adapter]:
289          addressbuffersize = wintypes.ULONG(15 * 1024)
290          retval = ERROR_BUFFER_OVERFLOW
291          while retval == ERROR_BUFFER_OVERFLOW:
292              addressbuffer = ctypes.create_string_buffer(addressbuffersize.value)
293              retval = iphlpapi.GetAdaptersAddresses(
294                  wintypes.ULONG(AF_UNSPEC),
295                  wintypes.ULONG(0),
296                  None,
297                  ctypes.byref(addressbuffer),
298                  ctypes.byref(addressbuffersize))
299  
300          if retval != NO_ERROR:
301              raise ctypes.WinError() # type: ignore
302  
303          # Iterate through adapters and fill array
304          address_infos = []  # type: List[IP_ADAPTER_ADDRESSES]
305          address_info = IP_ADAPTER_ADDRESSES.from_buffer(addressbuffer)
306          while True:
307              address_infos.append(address_info)
308              if not address_info.Next: break
309              address_info = address_info.Next[0]
310  
311          # Iterate through unicast addresses
312          result = [] # type: List[Adapter]
313          for adapter_info in address_infos:
314              name = adapter_info.AdapterName.decode()
315              nice_name = adapter_info.Description
316              index = adapter_info.IfIndex
317  
318              if adapter_info.FirstUnicastAddress:
319                  ips = _enumerate_interfaces_of_adapter_win(adapter_info.FriendlyName, adapter_info.FirstUnicastAddress[0])
320                  ips = list(ips)
321                  result.append(Adapter(name, nice_name, ips, index=index))
322              
323              elif include_unconfigured: result.append(Adapter(name, nice_name, [], index=index))
324  
325          return result