Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add field registrar_abusemail #42

Merged
merged 1 commit into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions convey/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ def get_method(start: Type, target: Type):
raise LookupError

whois = Type("whois", TypeGroup.whois, "ask whois servers", is_private=True)
whoisdomain = Type("whoisdomain", TypeGroup.whois, "ask whois servers for domain", is_private=True)
web = Type("web", TypeGroup.web, "scrape web contents", is_private=True)

external = Type("external", TypeGroup.custom, from_message="from a method in your .py file")
Expand All @@ -304,6 +305,7 @@ def get_method(start: Type, target: Type):
reg_m = Type("reg_m", TypeGroup.custom, from_message="match from a regular expression")
netname = Type("netname", TypeGroup.whois)
country = Type("country", TypeGroup.whois)
registrar_abusemail = Type("registrar_abusemail", TypeGroup.whois, "Abuse e-mail contact from whois")
abusemail = Type("abusemail", TypeGroup.whois, "Abuse e-mail contact from whois")
prefix = Type("prefix", TypeGroup.whois) # XX rename to 'inetnum'? to 'range'?
csirt_contact = Type("csirt_contact", TypeGroup.whois,
Expand Down Expand Up @@ -510,12 +512,14 @@ def _get_methods():
"FIELDS") else Checker.hostname_ip,
# (t.url, t.ip): Whois.url2ip,
(t.ip, t.whois): Whois,
(t.hostname, t.whoisdomain): lambda x: Whois(ip=None, hostname=x),
# (t.asn, t.whois): Whois, # XX can be easily allowed, however Whois object will huff there is no IP prefix range
(t.cidr, t.ip): Checker.cidr_ips if Config.get("multiple_cidr_ip", "FIELDS") else
lambda x: str(ipaddress.ip_interface(x).ip),
(t.whois, t.prefix): lambda x: str(x.get[0]),
(t.whois, t.asn): lambda x: x.get[3],
(t.whois, t.abusemail): lambda x: x.get[6],
(t.whoisdomain, t.registrar_abusemail): lambda x: x.get[8],
(t.whois, t.country): lambda x: x.get[5],
(t.whois, t.netname): lambda x: x.get[4],
(t.whois, t.csirt_contact):
Expand Down
89 changes: 67 additions & 22 deletions convey/whois.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from time import time, sleep

from netaddr import IPRange, IPNetwork
from tldextract import tldextract

from .contacts import Contacts
from .config import Config, subprocess_env
Expand Down Expand Up @@ -76,27 +77,36 @@ def init(cls, stats, ranges, ip_seen, csvstats, slow_mode=False, unknown_mode=Fa
# ["whois.ripe.net -r", "whois.arin.net", "whois.lacnic.net", "whois.apnic.net", "whois.afrinic.net"]):
# Whois.servers[name] = val

def __init__(self, ip):
def __init__(self, ip, hostname=None):
"""
self.get stores tuple: prefix, location, mail, asn, netname, country, ttl
"""
self.ip = ip
self.hostname = hostname
self.whois_response = []
prefix = self.cache_load() # try load prefix from earlier WHOIS responses
if prefix:
if (self.ttl != -1 and self.get[7] + self.ttl < time()) or (Whois.unknown_mode and not self.get[6]):
# the TTL is too old, we cannot guarantee IP stayed in the same prefix, let's get rid of the old results
# OR we are in unknown_mode which means we want abusemail. If not here, maybe another IP claimed
# a range superset without abuse e-mail. Delete this possible superset
# We do not have to call now `self.get = None; del self.ip_seen[ip]` if there is no need to be thread safe,
# these lines will be called at the function end.
del self.ranges[prefix]

if self.hostname:
if len(self.hostname.split(".")) > 2:
self.hostname_registerable = self.to_registerable(self.hostname)
else:
self.count_stats()
return
self.hostname_registerable = self.hostname

if self.ip:
prefix = self.cache_load() # try load prefix from earlier WHOIS responses
if prefix:
if (self.ttl != -1 and self.get[7] + self.ttl < time()) or (Whois.unknown_mode and not self.get[6]):
# the TTL is too old, we cannot guarantee IP stayed in the same prefix, let's get rid of the old results
# OR we are in unknown_mode which means we want abusemail. If not here, maybe another IP claimed
# a range superset without abuse e-mail. Delete this possible superset
# We do not have to call now `self.get = None; del self.ip_seen[ip]` if there is no need to be thread safe,
# these lines will be called at the function end.
del self.ranges[prefix]
else:
self.count_stats()
return

if self.see:
print(f"Whois {ip}... ", end="", flush=True)
print(f"Whois {self.ip or self.hostname_registerable}... ", end="", flush=True)
if Whois.slow_mode:
if self.see:
print("waiting 7 seconds... ", end="", flush=True)
Expand All @@ -105,7 +115,7 @@ def __init__(self, ip):
if self.see:
print(get[2] or "no incident contact.")
prefix = get[0]
if not prefix:
if not prefix and self.ip:
logger.info(f"No prefix found for IP {ip}")
prefix = IPRange(0, 0) # make key consistent when saving into cache
self.ip_seen[ip] = prefix
Expand Down Expand Up @@ -202,14 +212,14 @@ def _match_response(self, patterns, last_word=False):
:param patterns: pattern string or list of strings
:param last_word: returns only the last word of whole matched expression else last group (ex: the one in parentheses)

:return:
"""
# , take_nth=None, group=None
# :param take_nth: if available, return n-th result instead of the first available
# I.E. `whois 131.72.138.234 | grep ountr` returns three countries: UY, CL, CL.
# ARIN registry informs us that this IP is a LACNIC resource and prints out LACNIC address in UY.
# However, CL is the country the IP is hosted in.
# :param group: returned group - default: last group is returned (ex: the one in parentheses)
:return:
"""
if type(patterns) is str:
patterns = [patterns]

Expand Down Expand Up @@ -350,6 +360,7 @@ def analyze(self):
asn = self._match_response(r'\norigin(.*)\d+', last_word=True)
netname = self._match_response([r'netname:\s*([^\s]*)', r'network:network-name:\s*([^\s]*)'])

registrar_ab = self.get_registrar_abusemail()
ab = self.get_abusemail()
if Whois.unknown_mode and not ab:
ab = self.resolve_unknown_mail()
Expand All @@ -362,7 +373,7 @@ def analyze(self):
else:
get1 = "local"
get2 = ab
return prefix, get1, get2, asn, netname, country, ab, int(time())
return prefix, get1, get2, asn, netname, country, ab, int(time()), registrar_ab

def _load_country_from_addresses(self):
# let's try to find country in the non-standardised address field
Expand All @@ -373,7 +384,9 @@ def _load_country_from_addresses(self):
return c
return ""

reAbuse = re.compile(r'[a-z0-9._%+-]{1,64}@(?:[a-z0-9-]{1,63}\.){1,125}[a-z]{2,63}')
# email regex
email_regex = r"[a-z0-9._%+-]{1,64}@(?:[a-z0-9-]{1,63}\.){1,125}[a-z]{2,63}"
reAbuse = re.compile(email_regex)

def get_abusemail(self):
""" Loads abusemail from last whois response OR from whois json api. """
Expand All @@ -384,16 +397,48 @@ def get_abusemail(self):
]))
return match.group(0) if match else ""

def get_registrar_abusemail(self):
"""Loads registrar's abusemail from last whois response OR from whois json api."""

# reg_lines = re.findall(r'^.*registrar.*$', self.whois_response[0])
# for rl in reg_lines:
# if 'abuse' in rl and '@' in rl:
# abusemail = re.search(self.email_regex, rl)
# if abusemail:
# return abusemail.group(0)
# return ""

match = re.search(r".*(?=.*\babuse\b).*(?=.*\b@\b).*(?=.*\bregistrar\b).*(?:.*\b(email|contact)\b)?.*", self.whois_response[0])

if match:
abusemail = re.search(self.email_regex, match.group(0))
if abusemail:
return abusemail.group(0)
else:
return ""

def to_registerable(self, url: str) -> str:
"""Compares the given url with a public list of registerable domains

ex: website.xyz.com.br -> xyz.com.br

Note: if a nonregisterable url is given (e.g.: com.br) the same is returned
"""

return tldextract.extract(url).registered_domain

regRe = re.compile(r"using server (.*)\.")

def _exec(self, server, server_url=None):
""" Query whois server """
"""Query whois server"""
target = self.hostname_registerable if self.hostname else self.ip

if server == "general":
cmd = ["whois", "--verbose", self.ip]
cmd = ["whois", "--verbose", target]
else:
if not server_url:
server_url = Whois.servers[server]
cmd = ["whois", "--verbose", "-h", server_url, "--", self.ip]
cmd = ["whois", "--verbose", "-h", server_url, "--", target]
self.last_server = None # check what registry whois asks - may use a strange LIR that returns non-senses
try:
# in case wrong env is set to whois, we get `147.32.106.205` country NL and not CZ
Expand All @@ -404,7 +449,7 @@ def _exec(self, server, server_url=None):
except UnicodeDecodeError:
# ip address 94.230.155.109 had this string 'Jan Krivsky Hl\xc3\x83\x83\xc3\x82\xc2\xa1dkov' and everything failed
self.whois_response = []
logger.warning("Whois response for IP {} on server {} cannot be parsed.".format(self.ip, server))
logger.warning("Whois response for IP {} on server {} cannot be parsed.".format(target, server))
except TypeError: # could not resolve host
self.whois_response = []
except FileNotFoundError:
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ pythondialog
requests
tabulate
xlrd
tldextract
Loading