Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Netaddr #737

Merged
merged 9 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions hades_logs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,10 @@ def fetch_logs(
) -> t.Iterator[RadiusLogEntry]:
"""Fetch the auth logs of the given port

:param ipaddr nasipaddress: The IP address of the NAS
:param str nasportid: The port identifier (e.g. `C12`) of the
NAS port
:param nasipaddress: The IP address of the NAS.
:param nasportid: The port identifier (e.g. `C12`) of the NAS port

:returns: the result of the task (see
``get_port_auth_attempts`` in hades)
:returns: the result of the task (see ``get_port_auth_attempts`` in hades)
:rtype: iterable (generator if :param:`reduced`)

:raises HadesTimeout: raised when no response arrives in the time window
Expand Down
19 changes: 10 additions & 9 deletions pycroft/helpers/net.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import re
import typing as t

import ipaddr
import netaddr
# Byte represented by 2 hexadecimal digits
from mac_vendor_lookup import MacLookup

Expand Down Expand Up @@ -100,14 +100,15 @@ def port_name_sort_key(port_name: str) -> int:
1024 * ord(letter.group(0) if letter else chr(ord("a") - 1)))


def reverse_pointer(ip_address: ipaddr.IPv4Address | ipaddr.IPv6Address) -> str:
if isinstance(ip_address, ipaddr.IPv4Address):
reversed_octets = reversed(ip_address.exploded.split('.'))
return '.'.join(reversed_octets) + '.in-addr.arpa'
elif isinstance(ip_address, ipaddr.IPv6Address):
reversed_chars = reversed(ip_address.exploded.replace(':', ''))
return '.'.join(reversed_chars) + '.ip6.arpa'
raise TypeError()
def reverse_pointer(ip_address: netaddr.IPAddress) -> str:
import warnings

warnings.warn(
"Omit helper function and use `IPAddress.reverse_dns()` instead.",
DeprecationWarning,
stacklevel=2,
)
return ip_address.reverse_dns


def get_interface_manufacturer(mac: str) -> str | None:
Expand Down
18 changes: 6 additions & 12 deletions pycroft/lib/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"""
import typing as t

import ipaddr
import netaddr
from sqlalchemy import select
from sqlalchemy.orm import Session

Expand Down Expand Up @@ -43,14 +43,8 @@ def change_mac(interface: Interface, mac: str, processor: User) -> Interface:
return interface


def generate_hostname(ip_address: ipaddr.IPv4Address) -> str:
numeric_ip = int(ip_address)
return "x{:02x}{:02x}{:02x}{:02x}".format(
(numeric_ip >> 0x18) & 0xFF,
(numeric_ip >> 0x10) & 0xFF,
(numeric_ip >> 0x08) & 0xFF,
(numeric_ip >> 0x00) & 0xFF,
)
def generate_hostname(ip_address: netaddr.IPAddress) -> str:
return f"x{int(ip_address):08x}"


@with_transaction
Expand Down Expand Up @@ -117,7 +111,7 @@ def interface_create(
host: Host,
name: str,
mac: str,
ips: t.Iterable[ipaddr.IPv4Address] | None,
ips: t.Iterable[netaddr.IPAddress] | None,
processor: User,
) -> Interface:
interface = Interface(host=host, mac=mac, name=name)
Expand Down Expand Up @@ -159,7 +153,7 @@ def interface_edit(
interface: Interface,
name: str,
mac: str,
ips: t.Iterable[ipaddr._BaseIP],
ips: t.Iterable[netaddr.IPAddress],
processor: User,
) -> None:
message = "Edited interface ({}, {}) of host '{}'.".format(
Expand Down Expand Up @@ -201,7 +195,7 @@ def interface_edit(
session.add(IP(interface=interface, address=ip,
subnet=subnet))
ips_changed = True
new_ips.add(ipaddr.IPAddress(ip))
new_ips.add(netaddr.IPAddress(ip))

if ips_changed:
message += " New IPs: {}.".format(', '.join(str(ip) for ip in
Expand Down
7 changes: 3 additions & 4 deletions pycroft/lib/infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
"""
import typing as t

import ipaddr
from ipaddr import IPAddress
from netaddr import IPAddress
from sqlalchemy.orm import Session

from pycroft.helpers.i18n import deferred_gettext
Expand Down Expand Up @@ -198,14 +197,14 @@ def edit_switch(
.format(switch.host.name, str(switch.management_ip), management_ip)
log_room_event(message.to_json(), processor, switch.host.room)

switch.management_ip = ipaddr.IPAddress(management_ip)
switch.management_ip = IPAddress(management_ip)
session.add(switch)


def create_switch(
session: Session,
name: str,
management_ip: ipaddr.IPv4Address,
management_ip: IPAddress,
room: Room,
processor: User,
) -> Switch:
Expand Down
70 changes: 13 additions & 57 deletions pycroft/lib/net.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@
pycroft.lib.net
~~~~~~~~~~~~~~~
"""
import sys
import typing as t
from itertools import islice

import ipaddr
from ipaddr import IPv4Address, IPv6Address, IPv4Network, IPv6Network
import netaddr
from sqlalchemy import func, and_, cast
from sqlalchemy.orm import Session

Expand All @@ -31,37 +28,11 @@ def __init__(self) -> None:
super().__init__("MAC address already exists")


def get_subnet_unused_ips(subnet: Subnet) -> t.Iterator[IPv4Address]:
reserved_bottom = subnet.reserved_addresses_bottom or 0
reserved_top = subnet.reserved_addresses_top or 0
used_ips = frozenset(ip.address for ip in subnet.ips)
unreserved = islice(
subnet.address.iterhosts(), reserved_bottom,
# Stop argument must be None or an integer: 0 <= x <= sys.maxsize.
# IPv6 subnets can exceed this boundary on 32 bit python builds.
min(subnet.address.numhosts - reserved_top - 2, sys.maxsize))
return (ip for ip in unreserved if ip not in used_ips)


def get_unused_ips(
subnets: t.Iterable[Subnet],
) -> dict[Subnet, t.Iterator[IPv4Address]]:
return {subnet: get_subnet_unused_ips(subnet) for subnet in subnets}


def get_free_ip(subnets: t.Iterable[Subnet]) -> tuple[IPv4Address, Subnet]:
unused = get_unused_ips(subnets)

for subnet, ips in unused.items():
try:
ip = next(ips)

if ip is not None and subnet is not None:
return ip, subnet
except StopIteration:
continue

raise SubnetFullException()
def get_free_ip(subnets: t.Iterable[Subnet]) -> tuple[netaddr.IPAddress, Subnet]:
try:
return next((ip, subnet) for subnet in subnets for ip in subnet.unused_ips_iter())
except StopIteration:
raise SubnetFullException from None


#TODO: Implement this in the model
Expand All @@ -77,12 +48,12 @@ def get_subnets_for_room(room: Room) -> list[Subnet]:


def calculate_max_ips(subnet: Subnet) -> int:
max_ips = subnet.address.numhosts - 2
if subnet.reserved_addresses_bottom:
max_ips -= subnet.reserved_addresses_bottom
if subnet.reserved_addresses_top:
max_ips -= subnet.reserved_addresses_top
return max_ips
import warnings

warnings.warn(
"Use `Subnet.usable_size` instead of calculate_max_ips", DeprecationWarning, stacklevel=2
)
return subnet.usable_size


class SubnetUsage(t.NamedTuple):
Expand Down Expand Up @@ -115,21 +86,6 @@ def get_subnets_with_usage() -> list[tuple[Subnet, SubnetUsage]]:
]


def ptr_name(
network: IPv4Network | IPv6Network, ip_address: IPv4Address | IPv6Address
) -> str:
hostbits = network.max_prefixlen - network.prefixlen
if isinstance(ip_address, IPv4Address):
num_octets = min((hostbits + 7 // 8), 1)
reversed_octets = reversed(ip_address.exploded.split('.'))
return '.'.join(islice(reversed_octets, num_octets))
elif isinstance(ip_address, IPv6Address):
num_chars = min((hostbits + 3 // 4), 1)
reversed_chars = reversed(ip_address.exploded.replace(':', ''))
return '.'.join(islice(reversed_chars, num_chars))
raise TypeError()


def delete_ip(session: Session, ip: ipaddr._BaseIP) -> None:
def delete_ip(session: Session, ip: netaddr.IPAddress) -> None:
# TODO use proper `delete` statement
session.delete(IP.q.filter_by(address=ip).first())
6 changes: 3 additions & 3 deletions pycroft/model/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import re
import typing as t

import ipaddr
import netaddr
from sqlalchemy import String
from sqlalchemy.orm import (
declared_attr,
Expand Down Expand Up @@ -51,8 +51,8 @@ class ModelBase(DeclarativeBase, metaclass=_ModelMeta):
str255: String(255),
# does not work yet: see https://github.com/sqlalchemy/sqlalchemy/issues/9175
utc.DateTimeTz: pycroft_sqla_types.DateTimeTz,
ipaddr._BaseIP: IPAddress,
ipaddr._BaseNet: IPNetwork,
netaddr.IPAddress: IPAddress,
netaddr.IPNetwork: IPNetwork,
mac_address: MACAddress,
}

Expand Down
6 changes: 3 additions & 3 deletions pycroft/model/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from __future__ import annotations
import typing as t

import ipaddr
import netaddr
from sqlalchemy import ForeignKey, event, UniqueConstraint, Column
from sqlalchemy.orm import relationship, validates, Mapped, mapped_column
from sqlalchemy.schema import Table
Expand Down Expand Up @@ -74,7 +74,7 @@ class Switch(ModelBase):
ForeignKey(Host.id), primary_key=True, index=True
)
host: Mapped[Host] = relationship(back_populates="switch")
management_ip: Mapped[ipaddr._BaseIP]
management_ip: Mapped[netaddr.IPAddress]

# backrefs
ports: Mapped[list[SwitchPort]] = relationship(
Expand Down Expand Up @@ -174,7 +174,7 @@ def __str__(self):


class IP(IntegerIdModel):
address: Mapped[ipaddr._BaseIP] = mapped_column(unique=True)
address: Mapped[netaddr.IPAddress] = mapped_column(unique=True)
interface_id: Mapped[int] = mapped_column(
ForeignKey(Interface.id, ondelete="CASCADE")
)
Expand Down
40 changes: 37 additions & 3 deletions pycroft/model/net.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from __future__ import annotations
import typing as t

import ipaddr
import netaddr
from sqlalchemy import CheckConstraint, ForeignKey, between, event, sql
from sqlalchemy.orm import relationship, Mapped, mapped_column
from sqlalchemy.schema import AddConstraint
Expand Down Expand Up @@ -39,8 +39,8 @@ class VLAN(IntegerIdModel):


class Subnet(IntegerIdModel):
address: Mapped[ipaddr._BaseNet]
gateway: Mapped[ipaddr._BaseIP | None]
address: Mapped[netaddr.IPNetwork]
gateway: Mapped[netaddr.IPAddress | None]
reserved_addresses_bottom: Mapped[int] = mapped_column(server_default=sql.text("0"))
reserved_addresses_top: Mapped[int] = mapped_column(server_default=sql.text("0"))
description: Mapped[str50 | None]
Expand All @@ -55,6 +55,40 @@ class Subnet(IntegerIdModel):
)
# /backrefs

@property
def reserved_ipset(self) -> netaddr.IPSet:
res_bottom = self.reserved_addresses_bottom or 0
res_top = self.reserved_addresses_top or 0
# takes care of host- and broadcast domains plus edge-cases (e.g. /32)
first_usable, last_usable = self.address._usable_range()
return netaddr.IPSet(
[
netaddr.IPRange(self.address[0], first_usable + res_bottom),
netaddr.IPRange(last_usable - res_top, self.address[-1]),
]
)

def reserved_ip_ranges_iter(self) -> t.Iterator[netaddr.IPRange]:
return self.reserved_ipset.iter_ipranges()

@property
def usable_ip_range(self) -> netaddr.IPRange | None:
"""All IPs in this subnet which are not reserved."""
usable = netaddr.IPSet(self.address) - self.reserved_ipset
assert usable.iscontiguous(), f"Complement of reserved ranges in {self} is not contiguous"
return usable.iprange()

@property
def usable_size(self) -> int:
"""The number of IPs in this subnet which are not reserved."""
return self.usable_ip_range.size if self.usable_ip_range else 0

def unused_ips_iter(self) -> t.Iterator[netaddr.IPAddress]:
if not self.usable_ip_range:
return iter(())
used_ips = frozenset(ip.address for ip in self.ips)
return (ip for ip in self.usable_ip_range if ip not in used_ips)


# Ensure that the gateway is contained in the subnet
constraint = CheckConstraint(Subnet.gateway.op('<<')(Subnet.address))
Expand Down
10 changes: 5 additions & 5 deletions pycroft/model/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from numbers import Number
from typing import Any

import ipaddr
import netaddr
from psycopg2._range import DateTimeTZRange
from sqlalchemy import String, TypeDecorator, Integer, DateTime, literal
from sqlalchemy.dialects.postgresql import MACADDR, INET, Range
Expand Down Expand Up @@ -49,13 +49,13 @@ class IPAddress(_IPType):

def python_type(self):
""""""
return ipaddr._BaseIP
return netaddr.IPAddress

def process_result_value(self, value, dialect):
""""""
if value is None:
return value
return ipaddr.IPAddress(value)
return netaddr.IPAddress(value)


class IPNetwork(_IPType):
Expand All @@ -64,13 +64,13 @@ class IPNetwork(_IPType):

def python_type(self):
""""""
return ipaddr._BaseNet
return netaddr.IPNetwork

def process_result_value(self, value, dialect):
""""""
if value is None:
return value
return ipaddr.IPNetwork(value)
return netaddr.IPNetwork(value)


class MACAddress(TypeDecorator):
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ dependencies = [
"Flask-RESTful ~= 0.3.7",
"Flask-WTF ~= 1.1.1",
"GitPython ~= 3.1.43",
"ipaddr ~= 2.2.0",
"netaddr ~= 1.3.0",
"Jinja2 ~= 3.1.4",
"jsonschema ~= 3.2.0",
"ldap3 ~= 2.5.1", # only needed for ldap caching
Expand Down Expand Up @@ -90,6 +90,7 @@ dev = [
"sphinxcontrib-httpdomain ~= 1.8.0",
"sphinx-paramlinks ~= 0.6.0",
"types-jsonschema ~= 4.3.0",
"types-netaddr ~= 1.3.0",
"types-passlib ~= 1.7.7",
"watchdog ~= 2.3.1",
]
Expand Down
Loading
Loading