diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b50fc00..1e00a48 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,7 +12,7 @@ jobs: strategy: matrix: platform: [ubuntu-latest, macos-latest, windows-latest] - python-version: [3.8, 3.9, "3.10", 3.11, 3.12, 3.13] + python-version: [3.9, "3.10", 3.11, 3.12, 3.13] name: Python ${{ matrix.python-version }} on ${{ matrix.platform }} runs-on: ${{ matrix.platform }} diff --git a/HISTORY.rst b/HISTORY.rst index 5e38cb8..1ba7797 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,9 +3,22 @@ History ------- -2.12.0 +3.0.0 +++++++++++++++++++ +* BREAKING CHANGE: The ``minfraud.model.*`` classes have been refactored to + simplify them and make them more flexible. They are no longer subclass + NamedTuple and are now standard Python classes. This also means the + classes are no longer immutable. For most users, these differences should + not impact their integration. +* BREAKING CHANGE: Model attributes that were formerly tuples are now lists. +* BREAKING CHANGE: The deprecated `is_high_risk` attribute on + `resp.ip_address.country` has been removed. +* IMPORTANT: Python 3.9 or greater is required. If you are using an older + version, please use an earlier release. +* Added ``to_dict`` methods to the model classes. These return a dict version + of the object that is suitable for serialization. It recursively calls + ``to_dict`` or the equivalent on all objects contained within the object. * The minFraud Factors subscores have been deprecated. They will be removed in March 2025. Please see `our release notes `_ for more information. diff --git a/README.rst b/README.rst index 144761d..b2251da 100644 --- a/README.rst +++ b/README.rst @@ -307,7 +307,7 @@ For asynchronous reporting: Requirements ------------ -Python 3.8 or greater is required. Older versions are not supported. +Python 3.9 or greater is required. Older versions are not supported. Versioning ---------- @@ -327,6 +327,6 @@ for assistance. Copyright and License --------------------- -This software is Copyright © 2015-2024 by MaxMind, Inc. +This software is Copyright © 2015-2025 by MaxMind, Inc. This is free software, licensed under the Apache License, Version 2.0. diff --git a/docs/conf.py b/docs/conf.py index d093582..3e9793e 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -46,7 +46,7 @@ # General information about the project. project = "minfraud" -copyright = "2015-2024, MaxMind, Inc" +copyright = "2015-2025, MaxMind, Inc" # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the diff --git a/docs/index.rst b/docs/index.rst index 1f73485..a17b799 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -31,6 +31,6 @@ Indices and tables * :ref:`modindex` * :ref:`search` -:copyright: © 2015-2024 by MaxMind, Inc. +:copyright: © 2015-2025 by MaxMind, Inc. :license: Apache License, Version 2.0 diff --git a/minfraud/models.py b/minfraud/models.py index 094ea0a..e4082a6 100644 --- a/minfraud/models.py +++ b/minfraud/models.py @@ -6,62 +6,42 @@ """ -# pylint:disable=too-many-lines -from collections import namedtuple -from functools import update_wrapper -from typing import Any, Dict, List, Optional, Tuple +# pylint:disable=too-many-lines,too-many-instance-attributes,too-many-locals +from typing import Dict, List, Optional, Sequence +from geoip2.mixins import SimpleEquality import geoip2.models import geoip2.records -# Using a factory decorator rather than a metaclass as supporting -# metaclasses on Python 2 and 3 is more painful (although we could use -# six, I suppose). Using a closure rather than a class-based decorator as -# class based decorators don't work right with `update_wrapper`, -# causing help(class) to not work correctly. -def _inflate_to_namedtuple(orig_cls): - keys = sorted(orig_cls._fields.keys()) - fields = orig_cls._fields - name = orig_cls.__name__ - orig_cls.__name__ += "Super" - ntup = namedtuple(name, keys) - ntup.__name__ = name + "NamedTuple" - ntup.__new__.__defaults__ = (None,) * len(keys) - new_cls = type( - name, (ntup, orig_cls), {"__slots__": (), "__doc__": orig_cls.__doc__} - ) - update_wrapper(_inflate_to_namedtuple, new_cls) - orig_new = new_cls.__new__ - - # wipe out original namedtuple field docs as they aren't useful - # for attr in fields: - # getattr(cls, attr).__func__.__doc__ = None - - def new(cls, *args, **kwargs): - """Create new instance.""" - if (args and kwargs) or len(args) > 1: - raise ValueError( - "Only provide a single (dict) positional argument" - " or use keyword arguments. Do not use both." - ) - if args: - values = args[0] if args[0] else {} - - for field, default in fields.items(): - if callable(default): - kwargs[field] = default(values.get(field)) - else: - kwargs[field] = values.get(field, default) - - return orig_new(cls, **kwargs) - - new_cls.__new__ = staticmethod(new) - return new_cls - - -@_inflate_to_namedtuple -class IPRiskReason: +class _Serializable(SimpleEquality): + def to_dict(self): + """Returns a dict of the object suitable for serialization""" + result = {} + for key, value in self.__dict__.items(): + if hasattr(value, "to_dict") and callable(value.to_dict): + if d := value.to_dict(): + result[key] = d + elif hasattr(value, "raw"): + # geoip2 uses "raw" for historical reasons + if d := value.raw: + result[key] = d + elif isinstance(value, list): + ls = [] + for e in value: + if hasattr(e, "to_dict") and callable(e.to_dict): + if e := e.to_dict(): + ls.append(e) + elif e is not None: + ls.append(e) + if ls: + result[key] = ls + elif value is not None: + result[key] = value + return result + + +class IPRiskReason(_Serializable): """Reason for the IP risk. This class provides both a machine-readable code and a human-readable @@ -102,19 +82,9 @@ class IPRiskReason: code: Optional[str] reason: Optional[str] - __slots__ = () - _fields = { - "code": None, - "reason": None, - } - - -def _create_ip_risk_reasons( - reasons: Optional[List[Dict[str, str]]] -) -> Tuple[IPRiskReason, ...]: - if not reasons: - return () - return tuple(IPRiskReason(x) for x in reasons) # type: ignore + def __init__(self, code: Optional[str] = None, reason: Optional[str] = None): + self.code = code + self.reason = reason class GeoIP2Location(geoip2.records.Location): @@ -146,34 +116,6 @@ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) -class GeoIP2Country(geoip2.records.Country): - """Country information for the IP address. - - In addition to the attributes provided by ``geoip2.records.Country``, - this class provides: - - .. attribute:: is_high_risk - - This is true if the IP country is high risk. - - :type: bool | None - - .. deprecated:: 1.8.0 - Deprecated effective August 29, 2019. - - Parent: - - """ - - __doc__ += geoip2.records.Country.__doc__ # type: ignore - - is_high_risk: bool - - def __init__(self, *args, **kwargs) -> None: - self.is_high_risk = kwargs.get("is_high_risk", False) - super().__init__(*args, **kwargs) - - class IPAddress(geoip2.models.Insights): """Model for minFraud and GeoIP2 data about the IP address. @@ -252,34 +194,37 @@ class IPAddress(geoip2.models.Insights): """ - country: GeoIP2Country location: GeoIP2Location risk: Optional[float] - risk_reasons: Tuple[IPRiskReason, ...] - - def __init__(self, ip_address: Dict[str, Any]) -> None: - if ip_address is None: - ip_address = {} - locales = ip_address.get("_locales") - if "_locales" in ip_address: - del ip_address["_locales"] - super().__init__(ip_address, locales=locales) - self.country = GeoIP2Country(locales, **ip_address.get("country", {})) - self.location = GeoIP2Location(**ip_address.get("location", {})) - self.risk = ip_address.get("risk", None) - self.risk_reasons = _create_ip_risk_reasons(ip_address.get("risk_reasons")) - self._finalized = True - - # Unfortunately the GeoIP2 models are not immutable, only the records. This - # corrects that for minFraud - def __setattr__(self, name: str, value: Any) -> None: - if hasattr(self, "_finalized") and self._finalized: - raise AttributeError("can't set attribute") - super().__setattr__(name, value) - - -@_inflate_to_namedtuple -class ScoreIPAddress: + risk_reasons: List[IPRiskReason] + + def __init__( + self, + locales: Optional[Sequence[str]], + *, + country: Optional[Dict] = None, + location: Optional[Dict] = None, + risk: Optional[float] = None, + risk_reasons: Optional[List[Dict]] = None, + **kwargs, + ) -> None: + # For raw attribute + if country is not None: + kwargs["country"] = country + if location is not None: + kwargs["location"] = location + if risk is not None: + kwargs["risk"] = risk + if risk_reasons is not None: + kwargs["risk_reasons"] = risk_reasons + + super().__init__(kwargs, locales=list(locales or [])) + self.location = GeoIP2Location(**(location or {})) + self.risk = risk + self.risk_reasons = [IPRiskReason(**x) for x in risk_reasons or []] + + +class ScoreIPAddress(_Serializable): """Information about the IP address for minFraud Score. .. attribute:: risk @@ -292,14 +237,11 @@ class ScoreIPAddress: risk: Optional[float] - __slots__ = () - _fields = { - "risk": None, - } + def __init__(self, *, risk: Optional[float] = None, **_): + self.risk = risk -@_inflate_to_namedtuple -class Issuer: +class Issuer(_Serializable): """Information about the credit card issuer. .. attribute:: name @@ -342,17 +284,22 @@ class Issuer: phone_number: Optional[str] matches_provided_phone_number: Optional[bool] - __slots__ = () - _fields = { - "name": None, - "matches_provided_name": None, - "phone_number": None, - "matches_provided_phone_number": None, - } - - -@_inflate_to_namedtuple -class Device: + def __init__( + self, + *, + name: Optional[str] = None, + matches_provided_name: Optional[bool] = None, + phone_number: Optional[str] = None, + matches_provided_phone_number: Optional[bool] = None, + **_, + ): + self.name = name + self.matches_provided_name = matches_provided_name + self.phone_number = phone_number + self.matches_provided_phone_number = matches_provided_phone_number + + +class Device(_Serializable): """Information about the device associated with the IP address. In order to receive device output from minFraud Insights or minFraud @@ -396,17 +343,23 @@ class Device: last_seen: Optional[str] local_time: Optional[str] - __slots__ = () - _fields = { - "confidence": None, - "id": None, - "last_seen": None, - "local_time": None, - } - - -@_inflate_to_namedtuple -class Disposition: + def __init__( + self, + *, + confidence: Optional[float] = None, + # pylint:disable=redefined-builtin + id: Optional[str] = None, + last_seen: Optional[str] = None, + local_time: Optional[str] = None, + **_, + ): + self.confidence = confidence + self.id = id + self.last_seen = last_seen + self.local_time = local_time + + +class Disposition(_Serializable): """Information about disposition for the request as set by custom rules. In order to receive a disposition, you must be use the minFraud custom @@ -442,16 +395,20 @@ class Disposition: reason: Optional[str] rule_label: Optional[str] - __slots__ = () - _fields = { - "action": None, - "reason": None, - "rule_label": None, - } + def __init__( + self, + *, + action: Optional[str] = None, + reason: Optional[str] = None, + rule_label: Optional[str] = None, + **_, + ): + self.action = action + self.reason = reason + self.rule_label = rule_label -@_inflate_to_namedtuple -class EmailDomain: +class EmailDomain(_Serializable): """Information about the email domain passed in the request. .. attribute:: first_seen @@ -466,14 +423,11 @@ class EmailDomain: first_seen: Optional[str] - __slots__ = () - _fields = { - "first_seen": None, - } + def __init__(self, *, first_seen: Optional[str] = None, **_): + self.first_seen = first_seen -@_inflate_to_namedtuple -class Email: +class Email(_Serializable): """Information about the email address passed in the request. .. attribute:: domain @@ -521,18 +475,22 @@ class Email: is_free: Optional[bool] is_high_risk: Optional[bool] - __slots__ = () - _fields = { - "domain": EmailDomain, - "first_seen": None, - "is_disposable": None, - "is_free": None, - "is_high_risk": None, - } - - -@_inflate_to_namedtuple -class CreditCard: + def __init__( + self, + domain: Optional[Dict] = None, + first_seen: Optional[str] = None, + is_disposable: Optional[bool] = None, + is_free: Optional[bool] = None, + is_high_risk: Optional[bool] = None, + ): + self.domain = EmailDomain(**(domain or {})) + self.first_seen = first_seen + self.is_disposable = is_disposable + self.is_free = is_free + self.is_high_risk = is_high_risk + + +class CreditCard(_Serializable): """Information about the credit card based on the issuer ID number. .. attribute:: country @@ -604,21 +562,29 @@ class CreditCard: is_virtual: Optional[bool] type: Optional[str] - __slots__ = () - _fields = { - "issuer": Issuer, - "country": None, - "brand": None, - "is_business": None, - "is_issued_in_billing_address_country": None, - "is_prepaid": None, - "is_virtual": None, - "type": None, - } - - -@_inflate_to_namedtuple -class BillingAddress: + def __init__( + self, + issuer: Optional[Dict] = None, + country: Optional[str] = None, + brand: Optional[str] = None, + is_business: Optional[bool] = None, + is_issued_in_billing_address_country: Optional[bool] = None, + is_prepaid: Optional[bool] = None, + is_virtual: Optional[bool] = None, + # pylint:disable=redefined-builtin + type: Optional[str] = None, + ): + self.issuer = Issuer(**(issuer or {})) + self.country = country + self.brand = brand + self.is_business = is_business + self.is_issued_in_billing_address_country = is_issued_in_billing_address_country + self.is_prepaid = is_prepaid + self.is_virtual = is_virtual + self.type = type + + +class BillingAddress(_Serializable): """Information about the billing address. .. attribute:: distance_to_ip_location @@ -667,18 +633,24 @@ class BillingAddress: distance_to_ip_location: Optional[int] is_in_ip_country: Optional[bool] - __slots__ = () - _fields = { - "is_postal_in_city": None, - "latitude": None, - "longitude": None, - "distance_to_ip_location": None, - "is_in_ip_country": None, - } - - -@_inflate_to_namedtuple -class ShippingAddress: + def __init__( + self, + *, + is_postal_in_city: Optional[bool] = None, + latitude: Optional[float] = None, + longitude: Optional[float] = None, + distance_to_ip_location: Optional[int] = None, + is_in_ip_country: Optional[bool] = None, + **_, + ): + self.is_postal_in_city = is_postal_in_city + self.latitude = latitude + self.longitude = longitude + self.distance_to_ip_location = distance_to_ip_location + self.is_in_ip_country = is_in_ip_country + + +class ShippingAddress(_Serializable): """Information about the shipping address. .. attribute:: distance_to_ip_location @@ -746,20 +718,28 @@ class ShippingAddress: is_high_risk: Optional[bool] distance_to_billing_address: Optional[int] - __slots__ = () - _fields = { - "is_postal_in_city": None, - "latitude": None, - "longitude": None, - "distance_to_ip_location": None, - "is_in_ip_country": None, - "is_high_risk": None, - "distance_to_billing_address": None, - } - - -@_inflate_to_namedtuple -class Phone: + def __init__( + self, + *, + is_postal_in_city: Optional[bool] = None, + latitude: Optional[float] = None, + longitude: Optional[float] = None, + distance_to_ip_location: Optional[int] = None, + is_in_ip_country: Optional[bool] = None, + is_high_risk: Optional[bool] = None, + distance_to_billing_address: Optional[int] = None, + **_, + ): + self.is_postal_in_city = is_postal_in_city + self.latitude = latitude + self.longitude = longitude + self.distance_to_ip_location = distance_to_ip_location + self.is_in_ip_country = is_in_ip_country + self.is_high_risk = is_high_risk + self.distance_to_billing_address = distance_to_billing_address + + +class Phone(_Serializable): """Information about the billing or shipping phone number. .. attribute:: country @@ -801,17 +781,22 @@ class Phone: network_operator: Optional[str] number_type: Optional[str] - __slots__ = () - _fields = { - "country": None, - "is_voip": None, - "network_operator": None, - "number_type": None, - } - - -@_inflate_to_namedtuple -class ServiceWarning: + def __init__( + self, + *, + country: Optional[str] = None, + is_voip: Optional[bool] = None, + network_operator: Optional[str] = None, + number_type: Optional[str] = None, + **_, + ): + self.country = country + self.is_voip = is_voip + self.network_operator = network_operator + self.number_type = number_type + + +class ServiceWarning(_Serializable): """Warning from the web service. .. attribute:: code @@ -845,22 +830,20 @@ class ServiceWarning: warning: Optional[str] input_pointer: Optional[str] - __slots__ = () - _fields = { - "code": None, - "warning": None, - "input_pointer": None, - } - + def __init__( + self, + *, + code: Optional[str] = None, + warning: Optional[str] = None, + input_pointer: Optional[str] = None, + **_, + ): + self.code = code + self.warning = warning + self.input_pointer = input_pointer -def _create_warnings(warnings: List[Dict[str, str]]) -> Tuple[ServiceWarning, ...]: - if not warnings: - return () - return tuple(ServiceWarning(x) for x in warnings) # type: ignore - -@_inflate_to_namedtuple -class Subscores: +class Subscores(_Serializable): """Risk factor scores used in calculating the overall risk score. .. deprecated:: 2.12.0 @@ -1053,33 +1036,58 @@ class Subscores: shipping_address_distance_to_ip_location: Optional[float] time_of_day: Optional[float] - __slots__ = () - _fields = { - "avs_result": None, - "billing_address": None, - "billing_address_distance_to_ip_location": None, - "browser": None, - "chargeback": None, - "country": None, - "country_mismatch": None, - "cvv_result": None, - "device": None, - "email_address": None, - "email_domain": None, - "email_local_part": None, - "email_tenure": None, - "ip_tenure": None, - "issuer_id_number": None, - "order_amount": None, - "phone_number": None, - "shipping_address": None, - "shipping_address_distance_to_ip_location": None, - "time_of_day": None, - } - - -@_inflate_to_namedtuple -class Reason: + def __init__( + self, + *, + avs_result: Optional[float] = None, + billing_address: Optional[float] = None, + billing_address_distance_to_ip_location: Optional[float] = None, + browser: Optional[float] = None, + chargeback: Optional[float] = None, + country: Optional[float] = None, + country_mismatch: Optional[float] = None, + cvv_result: Optional[float] = None, + device: Optional[float] = None, + email_address: Optional[float] = None, + email_domain: Optional[float] = None, + email_local_part: Optional[float] = None, + email_tenure: Optional[float] = None, + ip_tenure: Optional[float] = None, + issuer_id_number: Optional[float] = None, + order_amount: Optional[float] = None, + phone_number: Optional[float] = None, + shipping_address: Optional[float] = None, + shipping_address_distance_to_ip_location: Optional[float] = None, + time_of_day: Optional[float] = None, + **_, + ): + self.avs_result = avs_result + self.billing_address = billing_address + self.billing_address_distance_to_ip_location = ( + billing_address_distance_to_ip_location + ) + self.browser = browser + self.chargeback = chargeback + self.country = country + self.country_mismatch = country_mismatch + self.cvv_result = cvv_result + self.device = device + self.email_address = email_address + self.email_domain = email_domain + self.email_local_part = email_local_part + self.email_tenure = email_tenure + self.ip_tenure = ip_tenure + self.issuer_id_number = issuer_id_number + self.order_amount = order_amount + self.phone_number = phone_number + self.shipping_address = shipping_address + self.shipping_address_distance_to_ip_location = ( + shipping_address_distance_to_ip_location + ) + self.time_of_day = time_of_day + + +class Reason(_Serializable): """The risk score reason for the multiplier. This class provides both a machine-readable code and a human-readable @@ -1165,21 +1173,14 @@ class Reason: code: Optional[str] reason: Optional[str] - __slots__ = () - _fields = { - "code": None, - "reason": None, - } - - -def _create_reasons(reasons: Optional[List[Dict[str, str]]]) -> Tuple[Reason, ...]: - if not reasons: - return () - return tuple(Reason(x) for x in reasons) # type: ignore + def __init__( + self, *, code: Optional[str] = None, reason: Optional[str] = None, **_ + ): + self.code = code + self.reason = reason -@_inflate_to_namedtuple -class RiskScoreReason: +class RiskScoreReason(_Serializable): """The risk score multiplier and the reasons for that multiplier. .. attribute:: multiplier @@ -1201,25 +1202,20 @@ class RiskScoreReason: """ multiplier: float - reasons: Tuple[Reason, ...] - - __slots__ = () - _fields = { - "multiplier": None, - "reasons": _create_reasons, - } - + reasons: List[Reason] -def _create_risk_score_reasons( - risk_score_reasons: Optional[List[Dict[str, str]]] -) -> Tuple[RiskScoreReason, ...]: - if not risk_score_reasons: - return () - return tuple(RiskScoreReason(x) for x in risk_score_reasons) # type: ignore + def __init__( + self, + *, + multiplier: float, + reasons: Optional[List] = None, + **_, + ): + self.multiplier = multiplier + self.reasons = [Reason(**x) for x in reasons or []] -@_inflate_to_namedtuple -class Factors: +class Factors(_Serializable): """Model for Factors response. .. attribute:: id @@ -1361,32 +1357,53 @@ class Factors: shipping_address: ShippingAddress shipping_phone: Phone subscores: Subscores - warnings: Tuple[ServiceWarning, ...] - risk_score_reasons: Tuple[RiskScoreReason, ...] - - __slots__ = () - _fields = { - "billing_address": BillingAddress, - "billing_phone": Phone, - "credit_card": CreditCard, - "disposition": Disposition, - "funds_remaining": None, - "device": Device, - "email": Email, - "id": None, - "ip_address": IPAddress, - "queries_remaining": None, - "risk_score": None, - "shipping_address": ShippingAddress, - "shipping_phone": Phone, - "subscores": Subscores, - "warnings": _create_warnings, - "risk_score_reasons": _create_risk_score_reasons, - } - - -@_inflate_to_namedtuple -class Insights: + warnings: List[ServiceWarning] + risk_score_reasons: List[RiskScoreReason] + + def __init__( + self, + locales: Sequence[str], + *, + billing_address: Optional[Dict] = None, + billing_phone: Optional[Dict] = None, + credit_card: Optional[Dict] = None, + disposition: Optional[Dict] = None, + funds_remaining: float, + device: Optional[Dict] = None, + email: Optional[Dict] = None, + # pylint:disable=redefined-builtin + id: str, + ip_address: Optional[Dict] = None, + queries_remaining: int, + risk_score: float, + shipping_address: Optional[Dict] = None, + shipping_phone: Optional[Dict] = None, + subscores: Optional[Dict] = None, + warnings: Optional[List[Dict]] = None, + risk_score_reasons: Optional[List[Dict]] = None, + **_, + ): + self.billing_address = BillingAddress(**(billing_address or {})) + self.billing_phone = Phone(**(billing_phone or {})) + self.credit_card = CreditCard(**(credit_card or {})) + self.disposition = Disposition(**(disposition or {})) + self.funds_remaining = funds_remaining + self.device = Device(**(device or {})) + self.email = Email(**(email or {})) + self.id = id + self.ip_address = IPAddress(locales, **(ip_address or {})) + self.queries_remaining = queries_remaining + self.risk_score = risk_score + self.shipping_address = ShippingAddress(**(shipping_address or {})) + self.shipping_phone = Phone(**(shipping_phone or {})) + self.subscores = Subscores(**(subscores or {})) + self.warnings = [ServiceWarning(**x) for x in warnings or []] + self.risk_score_reasons = [ + RiskScoreReason(**x) for x in risk_score_reasons or [] + ] + + +class Insights(_Serializable): """Model for Insights response. .. attribute:: id @@ -1507,29 +1524,46 @@ class Insights: risk_score: float shipping_address: ShippingAddress shipping_phone: Phone - warnings: Tuple[ServiceWarning, ...] - - __slots__ = () - _fields = { - "billing_address": BillingAddress, - "billing_phone": Phone, - "credit_card": CreditCard, - "device": Device, - "disposition": Disposition, - "email": Email, - "funds_remaining": None, - "id": None, - "ip_address": IPAddress, - "queries_remaining": None, - "risk_score": None, - "shipping_address": ShippingAddress, - "shipping_phone": Phone, - "warnings": _create_warnings, - } - - -@_inflate_to_namedtuple -class Score: + warnings: List[ServiceWarning] + + def __init__( + self, + locales: Sequence[str], + *, + billing_address: Optional[Dict] = None, + billing_phone: Optional[Dict] = None, + credit_card: Optional[Dict] = None, + device: Optional[Dict] = None, + disposition: Optional[Dict] = None, + email: Optional[Dict] = None, + funds_remaining: float, + # pylint:disable=redefined-builtin + id: str, + ip_address: Optional[Dict] = None, + queries_remaining: int, + risk_score: float, + shipping_address: Optional[Dict] = None, + shipping_phone: Optional[Dict] = None, + warnings: Optional[List[Dict]] = None, + **_, + ): + self.billing_address = BillingAddress(**(billing_address or {})) + self.billing_phone = Phone(**(billing_phone or {})) + self.credit_card = CreditCard(**(credit_card or {})) + self.device = Device(**(device or {})) + self.disposition = Disposition(**(disposition or {})) + self.email = Email(**(email or {})) + self.funds_remaining = funds_remaining + self.id = id + self.ip_address = IPAddress(locales, **(ip_address or {})) + self.queries_remaining = queries_remaining + self.risk_score = risk_score + self.shipping_address = ShippingAddress(**(shipping_address or {})) + self.shipping_phone = Phone(**(shipping_phone or {})) + self.warnings = [ServiceWarning(**x) for x in warnings or []] + + +class Score(_Serializable): """Model for Score response. .. attribute:: id @@ -1593,15 +1627,25 @@ class Score: ip_address: ScoreIPAddress queries_remaining: int risk_score: float - warnings: Tuple[ServiceWarning, ...] - - __slots__ = () - _fields = { - "disposition": Disposition, - "funds_remaining": None, - "id": None, - "ip_address": ScoreIPAddress, - "queries_remaining": None, - "risk_score": None, - "warnings": _create_warnings, - } + warnings: List[ServiceWarning] + + def __init__( + self, + *, + disposition: Optional[Dict] = None, + funds_remaining: float, + # pylint:disable=redefined-builtin + id: str, + ip_address: Optional[Dict] = None, + queries_remaining: int, + risk_score: float, + warnings: Optional[List[Dict]] = None, + **_, + ): + self.disposition = Disposition(**(disposition or {})) + self.funds_remaining = funds_remaining + self.id = id + self.ip_address = ScoreIPAddress(**(ip_address or {})) + self.queries_remaining = queries_remaining + self.risk_score = risk_score + self.warnings = [ServiceWarning(**x) for x in warnings or []] diff --git a/minfraud/webservice.py b/minfraud/webservice.py index 69d7be7..7e6bcb0 100644 --- a/minfraud/webservice.py +++ b/minfraud/webservice.py @@ -7,7 +7,8 @@ """ import json -from typing import Any, cast, Dict, Optional, Tuple, Type, Union +from functools import partial +from typing import Any, Callable, cast, Dict, Optional, Sequence, Union import aiohttp import aiohttp.http @@ -39,7 +40,7 @@ class BaseClient: _account_id: str _license_key: str - _locales: Tuple[str, ...] + _locales: Sequence[str] _timeout: float _score_uri: str @@ -52,7 +53,7 @@ def __init__( account_id: int, license_key: str, host: str = "minfraud.maxmind.com", - locales: Tuple[str, ...] = ("en",), + locales: Sequence[str] = ("en",), timeout: float = 60, ) -> None: self._locales = locales @@ -69,7 +70,7 @@ def _handle_success( self, raw_body: str, uri: str, - model_class: Union[Type[Factors], Type[Score], Type[Insights]], + model_class: Callable, ) -> Union[Score, Factors, Insights]: """Handle successful response.""" try: @@ -81,9 +82,7 @@ def _handle_success( 200, uri, ) from ex - if "ip_address" in decoded_body: - decoded_body["ip_address"]["_locales"] = self._locales - return model_class(decoded_body) # type: ignore + return model_class(**decoded_body) # type: ignore def _exception_for_error( self, status: int, content_type: Optional[str], raw_body: str, uri: str @@ -210,7 +209,7 @@ def __init__( account_id: int, license_key: str, host: str = "minfraud.maxmind.com", - locales: Tuple[str, ...] = ("en",), + locales: Sequence[str] = ("en",), timeout: float = 60, proxy: Optional[str] = None, ) -> None: @@ -269,7 +268,7 @@ async def factors( Factors, await self._response_for( self._factors_uri, - Factors, + partial(Factors, self._locales), transaction, validate, hash_email, @@ -308,7 +307,7 @@ async def insights( Insights, await self._response_for( self._insights_uri, - Insights, + partial(Insights, self._locales), transaction, validate, hash_email, @@ -387,7 +386,7 @@ async def report( async def _response_for( self, uri: str, - model_class: Union[Type[Factors], Type[Score], Type[Insights]], + model_class: Callable, request: Dict[str, Any], validate: bool, hash_email: bool, @@ -445,7 +444,7 @@ def __init__( account_id: int, license_key: str, host: str = "minfraud.maxmind.com", - locales: Tuple[str, ...] = ("en",), + locales: Sequence[str] = ("en",), timeout: float = 60, proxy: Optional[str] = None, ) -> None: @@ -518,7 +517,7 @@ def factors( Factors, self._response_for( self._factors_uri, - Factors, + partial(Factors, self._locales), transaction, validate, hash_email, @@ -557,7 +556,7 @@ def insights( Insights, self._response_for( self._insights_uri, - Insights, + partial(Insights, self._locales), transaction, validate, hash_email, @@ -634,7 +633,7 @@ def report(self, report: Dict[str, Optional[str]], validate: bool = True) -> Non def _response_for( self, uri: str, - model_class: Union[Type[Factors], Type[Score], Type[Insights]], + model_class: Callable, request: Dict[str, Any], validate: bool, hash_email: bool, diff --git a/pyproject.toml b/pyproject.toml index ae68385..1a70a36 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,7 @@ dependencies = [ "requests>=2.24.0,<3.0.0", "voluptuous", ] -requires-python = ">=3.8" +requires-python = ">=3.9" readme = "README.rst" license = {text = "Apache License 2.0"} classifiers = [ @@ -22,7 +22,6 @@ classifiers = [ "License :: OSI Approved :: Apache Software License", "Programming Language :: Python", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", diff --git a/setup.cfg b/setup.cfg index bddf03c..76e975f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -3,11 +3,10 @@ max-line-length = 88 [tox:tox] -envlist = {py38,py39,py310,py311,py313}-test,py313-{black,lint,flake8,mypy} +envlist = {py39,py310,py311,py313}-test,py313-{black,lint,flake8,mypy} [gh-actions] python = - 3.8: py38 3.9: py39 3.10: py310 3.11: py311 diff --git a/tests/test_models.py b/tests/test_models.py index b42bacb..2cb97ed 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -1,33 +1,14 @@ -from minfraud.models import * - import unittest +from minfraud.models import * + class TestModels(unittest.TestCase): - def test_model_immutability(self): - """This tests some level of _shallow_ immutability for these classes""" - T = namedtuple("T", ["obj", "attr"]) - models = [ - T(IPRiskReason(), "code"), - T(Issuer(), "name"), - T(CreditCard(), "country"), - T(Device(), "id"), - T(Email(), "is_free"), - T(EmailDomain(), "first_seen"), - T(BillingAddress(), "latitude"), - T(ShippingAddress(), "latitude"), - T(ServiceWarning(), "code"), - T(Insights(), "id"), - T(Score(), "id"), - T(IPAddress({}), "city"), - ] - for model in models: - for attr in (model.attr, "does_not_exist"): - with self.assertRaises(AttributeError, msg=f"{model.obj} - {attr}"): - setattr(model.obj, attr, 5) # type: ignore + def setUp(self): + self.maxDiff = 20_000 def test_billing_address(self): - address = BillingAddress(self.address_dict) + address = BillingAddress(**self.address_dict) self.check_address(address) def test_shipping_address(self): @@ -35,7 +16,7 @@ def test_shipping_address(self): address_dict["is_high_risk"] = False address_dict["distance_to_billing_address"] = 200 - address = ShippingAddress(address_dict) + address = ShippingAddress(**address_dict) self.check_address(address) self.assertEqual(False, address.is_high_risk) self.assertEqual(200, address.distance_to_billing_address) @@ -59,16 +40,14 @@ def check_address(self, address): def test_credit_card(self): cc = CreditCard( - { - "issuer": {"name": "Bank"}, - "brand": "Visa", - "country": "US", - "is_issued_in_billing_address_country": True, - "is_business": True, - "is_prepaid": True, - "is_virtual": True, - "type": "credit", - } + issuer={"name": "Bank"}, + brand="Visa", + country="US", + is_issued_in_billing_address_country=True, + is_business=True, + is_prepaid=True, + is_virtual=True, + type="credit", ) self.assertEqual("Bank", cc.issuer.name) @@ -85,12 +64,10 @@ def test_device(self): last_seen = "2016-06-08T14:16:38Z" local_time = "2016-06-10T14:19:10-08:00" device = Device( - { - "confidence": 99, - "id": id, - "last_seen": last_seen, - "local_time": local_time, - } + confidence=99, + id=id, + last_seen=last_seen, + local_time=local_time, ) self.assertEqual(99, device.confidence) @@ -100,7 +77,9 @@ def test_device(self): def test_disposition(self): disposition = Disposition( - {"action": "accept", "reason": "default", "rule_label": "custom rule label"} + action="accept", + reason="default", + rule_label="custom rule label", ) self.assertEqual("accept", disposition.action) @@ -110,12 +89,10 @@ def test_disposition(self): def test_email(self): first_seen = "2016-01-01" email = Email( - { - "first_seen": first_seen, - "is_disposable": True, - "is_free": True, - "is_high_risk": False, - } + first_seen=first_seen, + is_disposable=True, + is_free=True, + is_high_risk=False, ) self.assertEqual(first_seen, email.first_seen) @@ -126,18 +103,11 @@ def test_email(self): def test_email_domain(self): first_seen = "2016-01-01" domain = EmailDomain( - { - "first_seen": first_seen, - } + first_seen=first_seen, ) self.assertEqual(first_seen, domain.first_seen) - def test_geoip2_country(self): - country = GeoIP2Country(is_high_risk=True, iso_code="US") - self.assertEqual(True, country.is_high_risk) - self.assertEqual("US", country.iso_code) - def test_geoip2_location(self): time = "2015-04-19T12:59:23-01:00" location = GeoIP2Location(local_time=time, latitude=5) @@ -147,42 +117,39 @@ def test_geoip2_location(self): def test_ip_address(self): time = "2015-04-19T12:59:23-01:00" address = IPAddress( - { - "country": { - "is_high_risk": True, - "is_in_european_union": True, - }, - "location": { - "local_time": time, + ["en"], + country={ + "is_in_european_union": True, + }, + location={ + "local_time": time, + }, + risk=99, + risk_reasons=[ + { + "code": "ANONYMOUS_IP", + "reason": "The IP address belongs to an anonymous network. See /ip_address/traits for more details.", }, - "risk": 99, - "risk_reasons": [ - { - "code": "ANONYMOUS_IP", - "reason": "The IP address belongs to an anonymous network. See /ip_address/traits for more details.", - }, - { - "code": "MINFRAUD_NETWORK_ACTIVITY", - "reason": "Suspicious activity has been seen on this IP address across minFraud customers.", - }, - ], - "traits": { - "is_anonymous": True, - "is_anonymous_proxy": True, - "is_anonymous_vpn": True, - "is_hosting_provider": True, - "is_public_proxy": True, - "is_residential_proxy": True, - "is_satellite_provider": True, - "is_tor_exit_node": True, - "mobile_country_code": "310", - "mobile_network_code": "004", + { + "code": "MINFRAUD_NETWORK_ACTIVITY", + "reason": "Suspicious activity has been seen on this IP address across minFraud customers.", }, - } + ], + traits={ + "is_anonymous": True, + "is_anonymous_proxy": True, + "is_anonymous_vpn": True, + "is_hosting_provider": True, + "is_public_proxy": True, + "is_residential_proxy": True, + "is_satellite_provider": True, + "is_tor_exit_node": True, + "mobile_country_code": "310", + "mobile_network_code": "004", + }, ) self.assertEqual(time, address.location.local_time) - self.assertEqual(True, address.country.is_high_risk) self.assertEqual(True, address.country.is_in_european_union) self.assertEqual(99, address.risk) self.assertEqual(True, address.traits.is_anonymous) @@ -195,7 +162,6 @@ def test_ip_address(self): self.assertEqual(True, address.traits.is_tor_exit_node) self.assertEqual("310", address.traits.mobile_country_code) self.assertEqual("004", address.traits.mobile_network_code) - self.assertEqual(True, address.country.is_high_risk) self.assertEqual("ANONYMOUS_IP", address.risk_reasons[0].code) self.assertEqual( @@ -210,20 +176,18 @@ def test_ip_address(self): ) def test_empty_address(self): - address = IPAddress({}) - self.assertEqual((), address.risk_reasons) + address = IPAddress([]) + self.assertEqual([], address.risk_reasons) def test_score_ip_address(self): - address = ScoreIPAddress({"risk": 99}) + address = ScoreIPAddress(risk=99) self.assertEqual(99, address.risk) def test_ip_address_locales(self): loc = IPAddress( - { - "_locales": ["fr"], - "country": {"names": {"fr": "Country"}}, - "city": {"names": {"fr": "City"}}, - } + ["fr"], + country={"names": {"fr": "Country"}}, + city={"names": {"fr": "City"}}, ) self.assertEqual("City", loc.city.name) @@ -233,12 +197,10 @@ def test_issuer(self): phone = "132-342-2131" issuer = Issuer( - { - "name": "Bank", - "matches_provided_name": True, - "phone_number": phone, - "matches_provided_phone_number": True, - } + name="Bank", + matches_provided_name=True, + phone_number=phone, + matches_provided_phone_number=True, ) self.assertEqual("Bank", issuer.name) @@ -248,12 +210,10 @@ def test_issuer(self): def test_phone(self): phone = Phone( - { - "country": "US", - "is_voip": True, - "network_operator": "Verizon/1", - "number_type": "fixed", - } + country="US", + is_voip=True, + network_operator="Verizon/1", + number_type="fixed", ) self.assertEqual("US", phone.country) @@ -265,9 +225,7 @@ def test_warning(self): code = "INVALID_INPUT" msg = "Input invalid" - warning = ServiceWarning( - {"code": code, "warning": msg, "input_pointer": "/first/second"} - ) + warning = ServiceWarning(code=code, warning=msg, input_pointer="/first/second") self.assertEqual(code, warning.code) self.assertEqual(msg, warning.warning) @@ -277,7 +235,7 @@ def test_reason(self): code = "EMAIL_ADDRESS_NEW" msg = "Riskiness of newly-sighted email address" - reason = Reason({"code": code, "reason": msg}) + reason = Reason(code=code, reason=msg) self.assertEqual(code, reason.code) self.assertEqual(msg, reason.reason) @@ -288,7 +246,8 @@ def test_risk_score_reason(self): msg = "Riskiness of newly-sighted email address" reason = RiskScoreReason( - {"multiplier": 0.34, "reasons": [{"code": code, "reason": msg}]} + multiplier=0.34, + reasons=[{"code": code, "reason": msg}], ) self.assertEqual(multiplier, reason.multiplier) @@ -297,16 +256,15 @@ def test_risk_score_reason(self): def test_score(self): id = "b643d445-18b2-4b9d-bad4-c9c4366e402a" - score = Score( - { - "id": id, - "funds_remaining": 10.01, - "queries_remaining": 123, - "risk_score": 0.01, - "ip_address": {"risk": 99}, - "warnings": [{"code": "INVALID_INPUT"}], - } - ) + response = { + "id": id, + "funds_remaining": 10.01, + "queries_remaining": 123, + "risk_score": 0.01, + "ip_address": {"risk": 99}, + "warnings": [{"code": "INVALID_INPUT"}], + } + score = Score(**response) self.assertEqual(id, score.id) self.assertEqual(10.01, score.funds_remaining) @@ -315,15 +273,19 @@ def test_score(self): self.assertEqual("INVALID_INPUT", score.warnings[0].code) self.assertEqual(99, score.ip_address.risk) + self.assertEqual(response, score.to_dict()) + def test_insights(self): response = self.factors_response() + del response["risk_score_reasons"] del response["subscores"] - insights = Insights(response) + insights = Insights(None, **response) self.check_insights_data(insights, response["id"]) + self.assertEqual(response, insights.to_dict()) def test_factors(self): response = self.factors_response() - factors = Factors(response) + factors = Factors(None, **response) self.check_insights_data(factors, response["id"]) self.check_risk_score_reasons_data(factors.risk_score_reasons) self.assertEqual(0.01, factors.subscores.avs_result) @@ -351,6 +313,8 @@ def test_factors(self): ) self.assertEqual(0.17, factors.subscores.time_of_day) + self.assertEqual(response, factors.to_dict()) + def factors_response(self): return { "id": "b643d445-18b2-4b9d-bad4-c9c4366e402a", @@ -427,9 +391,7 @@ def check_insights_data(self, insights, uuid): self.assertEqual(123, insights.queries_remaining) self.assertEqual(0.01, insights.risk_score) self.assertEqual("INVALID_INPUT", insights.warnings[0].code) - self.assertIsInstance( - insights.warnings, tuple, "warnings is a tuple, not a dict" - ) + self.assertIsInstance(insights.warnings, list, "warnings is a list") def check_risk_score_reasons_data(self, reasons): self.assertEqual(1, len(reasons)) diff --git a/tests/test_webservice.py b/tests/test_webservice.py index 7f5a9c9..47fab04 100644 --- a/tests/test_webservice.py +++ b/tests/test_webservice.py @@ -1,6 +1,7 @@ import asyncio import json import os +from functools import partial from io import open from typing import Type, Union from pytest_httpserver import HTTPServer @@ -181,9 +182,10 @@ def has_ip_location(self): def test_200(self): model = self.create_success() response = json.loads(self.response) + cls = self.cls if self.has_ip_location(): - response["ip_address"]["_locales"] = ("en",) - self.assertEqual(self.cls(response), model) + cls = partial(cls, ("en",)) + self.assertEqual(cls(**response), model) if self.has_ip_location(): self.assertEqual("United Kingdom", model.ip_address.country.name) self.assertEqual(True, model.ip_address.traits.is_residential_proxy) @@ -240,9 +242,10 @@ def test_200_with_locales(self): ) model = self.create_success(client=client) response = json.loads(self.response) + cls = self.cls if self.has_ip_location(): - response["ip_address"]["_locales"] = locales - self.assertEqual(self.cls(response), model) + cls = partial(cls, locales) + self.assertEqual(cls(**response), model) if self.has_ip_location(): self.assertEqual("Royaume-Uni", model.ip_address.country.name) self.assertEqual("Londres", model.ip_address.city.name) @@ -251,6 +254,8 @@ def test_200_with_reserved_ip_warning(self): model = self.create_success( """ { + "funds_remaining": 10.00, + "queries_remaining": 1000, "risk_score": 12, "id": "0e52f5ac-7690-4780-a939-173cb13ecd75", "warnings": [ @@ -274,7 +279,7 @@ def test_200_with_no_risk_score_reasons(self): response = json.loads(self.response) del response["risk_score_reasons"] model = self.create_success(text=json.dumps(response)) - self.assertEqual(tuple(), model.risk_score_reasons) + self.assertEqual([], model.risk_score_reasons) def test_200_with_no_body(self): with self.assertRaisesRegex(