"""
django_adtools/discover_dc.py
This script discovers for domain controllers in domain
REQUIREMENTS:
pip install dnspython
"""
__author__ = "shmakovpn <shmakovpn@yandex.ru>"
__date__ = "2020-03-04"
from typing import List, Optional, Pattern
import re
import dns.resolver
import socket
import logging
#: Pattern to match IPv4 addresses
re_ip: Pattern = re.compile(
r'^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$'
)
#: this __package__ logger
logger: logging.Logger = logging.getLogger(__package__)
[docs]class DCHostname:
"""
Hostname of the Domain Controller
:param dc_hostname: a hostname or an ip address of the Domain Controller
:type dc_hostname: str
:param dc_priority:
:type dc_priority: int
:param dc_port:
:type dc_port: int
:param dns_resolver:
:type dns_resolver: dns.resolver.Resolver, optional
"""
def __init__(self,
dc_hostname: str,
dc_priority: int,
dc_port: int,
dns_resolver: dns.resolver.Resolver,
):
self.dc_hostname: str = dc_hostname
self.dc_priority: int = dc_priority
self.dc_port: int = dc_port
self.dns_resolver: dns.resolver.Resolver = dns_resolver
self.dc_ip: str = ''
[docs] def dc_ping(self) -> bool:
"""
Checks that this domain controller host is available
:return: True if this domain controller host is available
:rtype: bool
"""
if not self.dc_ip and re_ip.search(self.dc_hostname):
self.dc_ip = self.dc_hostname
if self.dc_ip:
dc_ips: List[str] = [self.dc_ip] # ip addresses of Domain Controllers
else:
dc_ips: List[str] = [] # ip addresses of Domain controllers need to be resolved using self.dc_hostname
if not dc_ips:
# resolve hostname
try:
dns_answer: dns.resolver.Answer = self.dns_resolver.query(self.dc_hostname)
answers: List[dns.rdtypes.IN.A.A] = list(dns_answer)
dc_ips: List[str] = [answer.address for answer in answers]
except dns.exception.DNSException as e:
raise dns.exception.DNSException(e)
sock: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
for dc_ip in dc_ips:
try:
if sock.connect_ex((dc_ip, self.dc_port,)) == 0:
self.dc_ip = dc_ip
sock.close()
return True
except socket.gaierror:
pass
sock.close()
logger.error(f'{__package__} DCHostname.ping failed no available controllers in dc_ips={dc_ips}')
return False
def __str__(self):
return f"DCHostname" + \
f"(dc_hostname='{self.dc_hostname}', dc_priority='{self.dc_priority}', dc_port='{self.dc_port}'," + \
f"dc_ip={self.dc_ip if self.dc_ip else 'None'})"
[docs]class DCList:
"""
List of domain controllers
:param domain: A name of a domain to discover, e.g. **example.com**
:type domain: str
:param role: A role of server to discover, defaults to **dc**
:type role: str
:param record_type: A type of DNS record to discover, defaults to **SRV**
:type record_type: str
:param nameservers: A list of nameservers, defaults to **None** (Warning: **None** does not work in Windows)
:type nameservers: list of str
:param port: A port number used in DNS requests, defaults to 53
:type port: int
"""
def __init__(
self,
domain: str,
role: str = 'dc',
record_type: str = 'SRV',
nameservers: List[str] = None,
port: int = 53,
):
self.domain: str = domain
self.role: str = role
self.record_type: str = record_type
self.dns_resolver: dns.resolver.Resolver = dns.resolver.get_default_resolver()
if nameservers:
logger.info(f'{__package__} DCList init nameservers is "{nameservers}"')
self.dns_resolver.nameservers = nameservers
self.dns_resolver.port = port
[docs] def get_dns_query_string(self) -> str:
"""
Creates a dns query string to discover Domain Controllers
:return: dns query string
:rtype: str
"""
return '_ldap._tcp.%s._msdcs.%s' % (self.role, self.domain,)
[docs] def get_dc_list(self) -> List[DCHostname]:
"""
Returns a list of domain controllers sorted by priority
Note: this function does not check either a domain controller is available or not
:return: a list of domain controllers' host names from DNS request sorted by priority
:rtype: list of DCHostname
"""
try:
dns_answer: dns.resolver.Answer = self.dns_resolver.query(
self.get_dns_query_string(),
self.record_type,
raise_on_no_answer=True,
)
except dns.exception.DNSException as e:
raise dns.exception.DNSException(e)
answers: List[dns.rdtypes.IN.SRV.SRV] = list(dns_answer)
answers.sort(key=lambda x: x.priority)
return [DCHostname(
dc_hostname='.'.join([x.decode() for x in answer.target.labels[:-1]]),
dc_priority=answer.priority,
dc_port=answer.port,
dns_resolver=self.dns_resolver
) for answer in answers]
[docs] def get_available_dc_ip(self) -> str:
"""
Returns a hostname of an available domain controller or empty string
:return: a hostname of an available domain controller or empty string
:rtype: str
"""
dc_hostnames: List[DCHostname] = self.get_dc_list()
for dc_hostname in dc_hostnames:
if dc_hostname.dc_ping():
return dc_hostname.dc_ip
logger.error(f'{__package__} DCList.get_available_dc_ip() no available dc_ip')
return ''