Skip to content

Commit

Permalink
Breaking check_domains method implementation into several more manage…
Browse files Browse the repository at this point in the history
…able methods that can be also called independently of check_domains. (#93)

Co-authored-by: Ebrahim Aharpour <ebrahim@kevlarr.io>
  • Loading branch information
aharpour and Ebrahim Aharpour authored Jun 27, 2023
1 parent 0e2467a commit 05ae11a
Showing 1 changed file with 116 additions and 74 deletions.
190 changes: 116 additions & 74 deletions checkdmarc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2431,91 +2431,40 @@ def check_domains(domains, parked=False,
for domain in domains:
domain = domain.lower()
logging.debug("Checking: {0}".format(domain))

domain_results = OrderedDict(
[("domain", domain), ("base_domain", get_base_domain(domain)),
("dnssec", None), ("ns", []), ("mx", [])])
domain_results["spf"] = OrderedDict(
[("record", None), ("valid", True), ("dns_lookups", None),
("dns_void_lookups", None)])

domain_results["dnssec"] = test_dnssec(domain,
nameservers=nameservers,
timeout=timeout)
try:
domain_results["ns"] = get_nameservers(
domain,
approved_nameservers=approved_nameservers,
nameservers=nameservers, resolver=resolver,
timeout=timeout)
except DNSException as error:
domain_results["ns"] = OrderedDict([("hostnames", []),
("error", error.__str__())])
try:
domain_results["mx"] = get_mx_hosts(
domain,
skip_tls=skip_tls,
approved_hostnames=approved_mx_hostnames,
nameservers=nameservers, resolver=resolver,
timeout=timeout)
except DNSException as error:
domain_results["mx"] = OrderedDict([("hosts", []),
("error", error.__str__())])
try:
spf_query = query_spf_record(
domain,
nameservers=nameservers, resolver=resolver,
timeout=timeout)
domain_results["spf"]["record"] = spf_query["record"]
domain_results["spf"]["warnings"] = spf_query["warnings"]
parsed_spf = parse_spf_record(domain_results["spf"]["record"],
domain_results["domain"],

domain_results["ns"] = check_ns(domain,
approved_nameservers=approved_nameservers,
nameservers=nameservers,
resolver=resolver, timeout=timeout)

domain_results["mx"] = check_mx(domain,
approved_mx_hostnames=approved_mx_hostnames,
skip_tls=skip_tls,
nameservers=nameservers,
resolver=resolver,
timeout=timeout)

domain_results["spf"] = check_spf(domain,
parked=parked,
nameservers=nameservers,
resolver=resolver,
timeout=timeout)

domain_results["spf"]["dns_lookups"] = parsed_spf[
"dns_lookups"]
domain_results["spf"]["dns_void_lookups"] = parsed_spf[
"dns_void_lookups"]
domain_results["spf"]["parsed"] = parsed_spf["parsed"]
domain_results["spf"]["warnings"] += parsed_spf["warnings"]
except SPFError as error:
domain_results["spf"]["error"] = str(error)
del domain_results["spf"]["dns_lookups"]
domain_results["spf"]["valid"] = False
if hasattr(error, "data") and error.data:
for key in error.data:
domain_results["spf"][key] = error.data[key]

# DMARC
domain_results["dmarc"] = OrderedDict([("record", None),
("valid", True),
("location", None)])
try:
dmarc_query = query_dmarc_record(domain,
nameservers=nameservers,
resolver=resolver,
timeout=timeout)
domain_results["dmarc"]["record"] = dmarc_query["record"]
domain_results["dmarc"]["location"] = dmarc_query["location"]
parsed_dmarc_record = parse_dmarc_record(
dmarc_query["record"],
dmarc_query["location"],
parked=parked,
include_tag_descriptions=include_dmarc_tag_descriptions,
nameservers=nameservers, resolver=resolver,
timeout=timeout)
domain_results["dmarc"]["warnings"] = dmarc_query["warnings"]

domain_results["dmarc"]["tags"] = parsed_dmarc_record["tags"]
domain_results["dmarc"]["warnings"] += parsed_dmarc_record[
"warnings"]
except DMARCError as error:
domain_results["dmarc"]["error"] = str(error)
domain_results["dmarc"]["valid"] = False
if hasattr(error, "data") and error.data:
for key in error.data:
domain_results["dmarc"][key] = error.data[key]
domain_results["dmarc"] = check_dmarc(domain,
parked=parked,
include_dmarc_tag_descriptions=include_dmarc_tag_descriptions,
nameservers=nameservers,
resolver=resolver,
timeout=timeout)

results.append(domain_results)
if wait > 0.0:
logging.debug("Sleeping for {0} seconds".format(wait))
Expand All @@ -2526,6 +2475,99 @@ def check_domains(domains, parked=False,
return results


def check_ns(domain, approved_nameservers=None,
nameservers=None, resolver=None, timeout=2.0):
try:
ns_results = get_nameservers(
domain,
approved_nameservers=approved_nameservers,
nameservers=nameservers, resolver=resolver,
timeout=timeout)
except DNSException as error:
ns_results = OrderedDict([("hostnames", []),
("error", error.__str__())])
return ns_results


def check_mx(domain, approved_mx_hostnames=None, skip_tls=False,
nameservers=None, resolver=None, timeout=2.0):
try:
mx_results = get_mx_hosts(
domain,
skip_tls=skip_tls,
approved_hostnames=approved_mx_hostnames,
nameservers=nameservers, resolver=resolver,
timeout=timeout)
except DNSException as error:
mx_results = OrderedDict([("hosts", []),
("error", error.__str__())])
return mx_results


def check_dmarc(domain, parked=False,
include_dmarc_tag_descriptions=False,
nameservers=None, resolver=None, timeout=2.0):
dmarc_results = OrderedDict([("record", None), ("valid", True), ("location", None)])
try:
dmarc_query = query_dmarc_record(domain,
nameservers=nameservers,
resolver=resolver,
timeout=timeout)
dmarc_results["record"] = dmarc_query["record"]
dmarc_results["location"] = dmarc_query["location"]
parsed_dmarc_record = parse_dmarc_record(
dmarc_query["record"],
dmarc_query["location"],
parked=parked,
include_tag_descriptions=include_dmarc_tag_descriptions,
nameservers=nameservers, resolver=resolver,
timeout=timeout)
dmarc_results["warnings"] = dmarc_query["warnings"]

dmarc_results["tags"] = parsed_dmarc_record["tags"]
dmarc_results["warnings"] += parsed_dmarc_record[
"warnings"]
except DMARCError as error:
dmarc_results["error"] = str(error)
dmarc_results["valid"] = False
if hasattr(error, "data") and error.data:
for key in error.data:
dmarc_results[key] = error.data[key]
return dmarc_results


def check_spf(domain, parked=False, nameservers=None, resolver=None, timeout=2.0):
spf_results = OrderedDict(
[("record", None), ("valid", True), ("dns_lookups", None), ("dns_void_lookups", None)])
try:
spf_query = query_spf_record(
domain,
nameservers=nameservers, resolver=resolver,
timeout=timeout)
spf_results["record"] = spf_query["record"]
spf_results["warnings"] = spf_query["warnings"]
parsed_spf = parse_spf_record(spf_results["record"],
domain,
parked=parked,
nameservers=nameservers,
resolver=resolver,
timeout=timeout)

spf_results["dns_lookups"] = parsed_spf[
"dns_lookups"]
spf_results["dns_void_lookups"] = parsed_spf[
"dns_void_lookups"]
spf_results["parsed"] = parsed_spf["parsed"]
spf_results["warnings"] += parsed_spf["warnings"]
except SPFError as error:
spf_results["error"] = str(error)
del spf_results["dns_lookups"]
spf_results["valid"] = False
if hasattr(error, "data") and error.data:
for key in error.data:
spf_results[key] = error.data[key]
return spf_results

def results_to_json(results):
"""
Converts a dictionary of results to a JSON string
Expand Down

0 comments on commit 05ae11a

Please sign in to comment.