From 9e0aa9dc6a9146ab3ff65019b79c2c5d03c2725f Mon Sep 17 00:00:00 2001 From: Radek Vyhnal Date: Wed, 16 Oct 2024 18:05:45 +0200 Subject: [PATCH] add field registrar_abusemail --- convey/types.py | 4 +++ convey/whois.py | 89 ++++++++++++++++++++++++++++++++++++------------ requirements.txt | 1 + 3 files changed, 72 insertions(+), 22 deletions(-) diff --git a/convey/types.py b/convey/types.py index df4e406..fec4607 100644 --- a/convey/types.py +++ b/convey/types.py @@ -295,6 +295,7 @@ def get_method(start: Type, target: Type): raise LookupError whois = Type("whois", TypeGroup.whois, "ask whois servers", is_private=True) + whoisdomain = Type("whoisdomain", TypeGroup.whois, "ask whois servers for domain", is_private=True) web = Type("web", TypeGroup.web, "scrape web contents", is_private=True) external = Type("external", TypeGroup.custom, from_message="from a method in your .py file") @@ -304,6 +305,7 @@ def get_method(start: Type, target: Type): reg_m = Type("reg_m", TypeGroup.custom, from_message="match from a regular expression") netname = Type("netname", TypeGroup.whois) country = Type("country", TypeGroup.whois) + registrar_abusemail = Type("registrar_abusemail", TypeGroup.whois, "Abuse e-mail contact from whois") abusemail = Type("abusemail", TypeGroup.whois, "Abuse e-mail contact from whois") prefix = Type("prefix", TypeGroup.whois) # XX rename to 'inetnum'? to 'range'? csirt_contact = Type("csirt_contact", TypeGroup.whois, @@ -510,12 +512,14 @@ def _get_methods(): "FIELDS") else Checker.hostname_ip, # (t.url, t.ip): Whois.url2ip, (t.ip, t.whois): Whois, + (t.hostname, t.whoisdomain): lambda x: Whois(ip=None, hostname=x), # (t.asn, t.whois): Whois, # XX can be easily allowed, however Whois object will huff there is no IP prefix range (t.cidr, t.ip): Checker.cidr_ips if Config.get("multiple_cidr_ip", "FIELDS") else lambda x: str(ipaddress.ip_interface(x).ip), (t.whois, t.prefix): lambda x: str(x.get[0]), (t.whois, t.asn): lambda x: x.get[3], (t.whois, t.abusemail): lambda x: x.get[6], + (t.whoisdomain, t.registrar_abusemail): lambda x: x.get[8], (t.whois, t.country): lambda x: x.get[5], (t.whois, t.netname): lambda x: x.get[4], (t.whois, t.csirt_contact): diff --git a/convey/whois.py b/convey/whois.py index bc673ef..2ec1b43 100644 --- a/convey/whois.py +++ b/convey/whois.py @@ -6,6 +6,7 @@ from time import time, sleep from netaddr import IPRange, IPNetwork +from tldextract import tldextract from .contacts import Contacts from .config import Config, subprocess_env @@ -76,27 +77,36 @@ def init(cls, stats, ranges, ip_seen, csvstats, slow_mode=False, unknown_mode=Fa # ["whois.ripe.net -r", "whois.arin.net", "whois.lacnic.net", "whois.apnic.net", "whois.afrinic.net"]): # Whois.servers[name] = val - def __init__(self, ip): + def __init__(self, ip, hostname=None): """ self.get stores tuple: prefix, location, mail, asn, netname, country, ttl """ self.ip = ip + self.hostname = hostname self.whois_response = [] - prefix = self.cache_load() # try load prefix from earlier WHOIS responses - if prefix: - if (self.ttl != -1 and self.get[7] + self.ttl < time()) or (Whois.unknown_mode and not self.get[6]): - # the TTL is too old, we cannot guarantee IP stayed in the same prefix, let's get rid of the old results - # OR we are in unknown_mode which means we want abusemail. If not here, maybe another IP claimed - # a range superset without abuse e-mail. Delete this possible superset - # We do not have to call now `self.get = None; del self.ip_seen[ip]` if there is no need to be thread safe, - # these lines will be called at the function end. - del self.ranges[prefix] + + if self.hostname: + if len(self.hostname.split(".")) > 2: + self.hostname_registerable = self.to_registerable(self.hostname) else: - self.count_stats() - return + self.hostname_registerable = self.hostname + + if self.ip: + prefix = self.cache_load() # try load prefix from earlier WHOIS responses + if prefix: + if (self.ttl != -1 and self.get[7] + self.ttl < time()) or (Whois.unknown_mode and not self.get[6]): + # the TTL is too old, we cannot guarantee IP stayed in the same prefix, let's get rid of the old results + # OR we are in unknown_mode which means we want abusemail. If not here, maybe another IP claimed + # a range superset without abuse e-mail. Delete this possible superset + # We do not have to call now `self.get = None; del self.ip_seen[ip]` if there is no need to be thread safe, + # these lines will be called at the function end. + del self.ranges[prefix] + else: + self.count_stats() + return if self.see: - print(f"Whois {ip}... ", end="", flush=True) + print(f"Whois {self.ip or self.hostname_registerable}... ", end="", flush=True) if Whois.slow_mode: if self.see: print("waiting 7 seconds... ", end="", flush=True) @@ -105,7 +115,7 @@ def __init__(self, ip): if self.see: print(get[2] or "no incident contact.") prefix = get[0] - if not prefix: + if not prefix and self.ip: logger.info(f"No prefix found for IP {ip}") prefix = IPRange(0, 0) # make key consistent when saving into cache self.ip_seen[ip] = prefix @@ -202,14 +212,14 @@ def _match_response(self, patterns, last_word=False): :param patterns: pattern string or list of strings :param last_word: returns only the last word of whole matched expression else last group (ex: the one in parentheses) + :return: + """ # , take_nth=None, group=None # :param take_nth: if available, return n-th result instead of the first available # I.E. `whois 131.72.138.234 | grep ountr` returns three countries: UY, CL, CL. # ARIN registry informs us that this IP is a LACNIC resource and prints out LACNIC address in UY. # However, CL is the country the IP is hosted in. # :param group: returned group - default: last group is returned (ex: the one in parentheses) - :return: - """ if type(patterns) is str: patterns = [patterns] @@ -350,6 +360,7 @@ def analyze(self): asn = self._match_response(r'\norigin(.*)\d+', last_word=True) netname = self._match_response([r'netname:\s*([^\s]*)', r'network:network-name:\s*([^\s]*)']) + registrar_ab = self.get_registrar_abusemail() ab = self.get_abusemail() if Whois.unknown_mode and not ab: ab = self.resolve_unknown_mail() @@ -362,7 +373,7 @@ def analyze(self): else: get1 = "local" get2 = ab - return prefix, get1, get2, asn, netname, country, ab, int(time()) + return prefix, get1, get2, asn, netname, country, ab, int(time()), registrar_ab def _load_country_from_addresses(self): # let's try to find country in the non-standardised address field @@ -373,7 +384,9 @@ def _load_country_from_addresses(self): return c return "" - reAbuse = re.compile(r'[a-z0-9._%+-]{1,64}@(?:[a-z0-9-]{1,63}\.){1,125}[a-z]{2,63}') + # email regex + email_regex = r"[a-z0-9._%+-]{1,64}@(?:[a-z0-9-]{1,63}\.){1,125}[a-z]{2,63}" + reAbuse = re.compile(email_regex) def get_abusemail(self): """ Loads abusemail from last whois response OR from whois json api. """ @@ -384,16 +397,48 @@ def get_abusemail(self): ])) return match.group(0) if match else "" + def get_registrar_abusemail(self): + """Loads registrar's abusemail from last whois response OR from whois json api.""" + + # reg_lines = re.findall(r'^.*registrar.*$', self.whois_response[0]) + # for rl in reg_lines: + # if 'abuse' in rl and '@' in rl: + # abusemail = re.search(self.email_regex, rl) + # if abusemail: + # return abusemail.group(0) + # return "" + + match = re.search(r".*(?=.*\babuse\b).*(?=.*\b@\b).*(?=.*\bregistrar\b).*(?:.*\b(email|contact)\b)?.*", self.whois_response[0]) + + if match: + abusemail = re.search(self.email_regex, match.group(0)) + if abusemail: + return abusemail.group(0) + else: + return "" + + def to_registerable(self, url: str) -> str: + """Compares the given url with a public list of registerable domains + + ex: website.xyz.com.br -> xyz.com.br + + Note: if a nonregisterable url is given (e.g.: com.br) the same is returned + """ + + return tldextract.extract(url).registered_domain + regRe = re.compile(r"using server (.*)\.") def _exec(self, server, server_url=None): - """ Query whois server """ + """Query whois server""" + target = self.hostname_registerable if self.hostname else self.ip + if server == "general": - cmd = ["whois", "--verbose", self.ip] + cmd = ["whois", "--verbose", target] else: if not server_url: server_url = Whois.servers[server] - cmd = ["whois", "--verbose", "-h", server_url, "--", self.ip] + cmd = ["whois", "--verbose", "-h", server_url, "--", target] self.last_server = None # check what registry whois asks - may use a strange LIR that returns non-senses try: # in case wrong env is set to whois, we get `147.32.106.205` country NL and not CZ @@ -404,7 +449,7 @@ def _exec(self, server, server_url=None): except UnicodeDecodeError: # ip address 94.230.155.109 had this string 'Jan Krivsky Hl\xc3\x83\x83\xc3\x82\xc2\xa1dkov' and everything failed self.whois_response = [] - logger.warning("Whois response for IP {} on server {} cannot be parsed.".format(self.ip, server)) + logger.warning("Whois response for IP {} on server {} cannot be parsed.".format(target, server)) except TypeError: # could not resolve host self.whois_response = [] except FileNotFoundError: diff --git a/requirements.txt b/requirements.txt index a74ffa4..e01ab5e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,3 +21,4 @@ pythondialog requests tabulate xlrd +tldextract \ No newline at end of file