diff --git a/docs/pycti/pycti.rst b/docs/pycti/pycti.rst index aebab54f..c53a2683 100644 --- a/docs/pycti/pycti.rst +++ b/docs/pycti/pycti.rst @@ -75,7 +75,7 @@ Classes Undocumented. - :py:class:`ThreatActor`: - Undocumented. + Main ThreatActor class for OpenCTI - :py:class:`IntrusionSet`: Undocumented. diff --git a/pycti/api/opencti_api_client.py b/pycti/api/opencti_api_client.py index b7805e36..270122cd 100644 --- a/pycti/api/opencti_api_client.py +++ b/pycti/api/opencti_api_client.py @@ -7,6 +7,8 @@ import json import logging +from typing import Union + from pycti.api.opencti_api_connector import OpenCTIApiConnector from pycti.api.opencti_api_job import OpenCTIApiJob from pycti.utils.constants import ObservableTypes @@ -346,15 +348,12 @@ def not_empty(self, value): else: return False - def process_multiple(self, data, with_pagination=False): + def process_multiple(self, data: dict, with_pagination=False) -> Union[dict, list]: """processes data returned by the OpenCTI API with multiple entities :param data: data to process - :type data: - :param with_pagination: whether to use pagination with the API, defaults to False - :type with_pagination: bool, optional - :return: returns either a dict or list with the processes entities - :rtype: list or dict + :param with_pagination: whether to use pagination with the API + :returns: returns either a dict or list with the processes entities """ if with_pagination: @@ -383,13 +382,11 @@ def process_multiple(self, data, with_pagination=False): result["pagination"] = data["pageInfo"] return result - def process_multiple_ids(self, data): + def process_multiple_ids(self, data) -> list: """processes data returned by the OpenCTI API with multiple ids :param data: data to process - :type data: :return: returns a list of ids - :rtype: list """ result = [] diff --git a/pycti/entities/opencti_threat_actor.py b/pycti/entities/opencti_threat_actor.py index 4bf1bf1e..de2f3f45 100644 --- a/pycti/entities/opencti_threat_actor.py +++ b/pycti/entities/opencti_threat_actor.py @@ -1,12 +1,23 @@ # coding: utf-8 import json + +from typing import Union + from pycti.utils.constants import CustomProperties from pycti.utils.opencti_stix2 import SPEC_VERSION class ThreatActor: + """Main ThreatActor class for OpenCTI + + :param opencti: instance of :py:class:`~pycti.api.opencti_api_client.OpenCTIApiClient` + """ + def __init__(self, opencti): + """Create an instance of ThreatActor + """ + self.opencti = opencti self.properties = """ id @@ -27,7 +38,7 @@ def __init__(self, opencti): first_seen last_seen created - modified + modified created_at updated_at createdByRef { @@ -48,7 +59,7 @@ def __init__(self, opencti): relation { id } - } + } markingDefinitions { edges { node { @@ -98,20 +109,25 @@ def __init__(self, opencti): id } } - } + } """ - """ - List Threat-Actor objects + def list(self, **kwargs) -> dict: + """List Threat-Actor objects - :param filters: the filters to apply - :param search: the search keyword - :param first: return the first n rows from the after ID (or the beginning if not set) - :param after: ID of the first row for pagination - :return List of Threat-Actor objects - """ + The list method accepts the following \**kwargs: + + :param dict filters: (optional) the filters to apply + :param str search: (optional) a search keyword to apply for the listing + :param int first: (optional) return the first n rows from the `after` ID + or the beginning if not set + :param str after: (optional) OpenCTI object ID of the first row for pagination + :param str orderBy: (optional) the field to order the response on + :param bool orderMode: (optional) either "`asc`" or "`desc`" + :param bool getAll: (optional) switch to return the first 500 entries + :param bool withPagination: (optional) switch to use pagination + """ - def list(self, **kwargs): filters = kwargs.get("filters", None) search = kwargs.get("search", None) first = kwargs.get("first", 500) @@ -163,15 +179,20 @@ def list(self, **kwargs): result["data"]["threatActors"], with_pagination ) - """ - Read a Threat-Actor object - - :param id: the id of the Threat-Actor - :param filters: the filters to apply if no id provided - :return Threat-Actor object - """ + def read(self, **kwargs) -> Union[dict, None]: + """Read a Threat-Actor object + + read can be either used with a known OpenCTI entity `id` or by using a + valid filter to search and return a single Threat-Actor entity or None. + + The list method accepts the following \**kwargs. + + Note: either `id` or `filters` is required. + + :param str id: the id of the Threat-Actor + :param dict filters: the filters to apply if no id provided + """ - def read(self, **kwargs): id = kwargs.get("id", None) filters = kwargs.get("filters", None) custom_attributes = kwargs.get("customAttributes", None) @@ -206,14 +227,10 @@ def read(self, **kwargs): ) return None - """ - Create a Threat-Actor object - - :param name: the name of the Threat-Actor - :return Threat-Actor object - """ + def _create_raw(self, **kwargs): + """Create a Threat-Actor object + """ - def create_raw(self, **kwargs): name = kwargs.get("name", None) description = kwargs.get("description", None) alias = kwargs.get("alias", None) @@ -280,14 +297,47 @@ def create_raw(self, **kwargs): "[opencti_threat_actor] Missing parameters: name and description", ) - """ - Create a Threat-Actor object only if it not exists, update it on request + def create(self, **kwargs): + """Create a Threat-Actor object - :param name: the name of the Threat-Actor - :return Threat-Actor object - """ + The Threat-Actor entity will only be created if it doesn't exists + By setting `update` to `True` it acts like an upsert and updates + fields of an existing Threat-Actor entity. - def create(self, **kwargs): + The create method accepts the following \**kwargs. + + Note: `name` and `description` or `stix_id_key` is required. + + :param str id: (optional) OpenCTI `id` for the Threat-Actor + :param str name: the name of the Threat-Actor + :param str description: descriptive text + :param str stix_id_key: stix2 id reference for the Threat-Actor entity + :param list alias: (optional) list of alias names for the Threat-Actor + :param str first_seen: (optional) date in OpenCTI date format + :param str last_seen: (optional) date in OpenCTI date format + :param str goal: (optional) describe the actors goal in text + :param str sophistication: (optional) describe the actors + sophistication in text + :param str resource_level: (optional) describe the actors + resource_level in text + :param str primary_motivation: (optional) describe the actors + primary_motivation in text + :param str secondary_motivation: (optional) describe the actors + secondary_motivation in text + :param str personal_motivation: (optional) describe the actors + personal_motivation in text + :param str created: (optional) date in OpenCTI date format + :param str modified: (optional) date in OpenCTI date format + :param str createdByRef: (optional) id of the organization that + created the knowledge + :param list markingDefinitions: (optional) list of OpenCTI marking + definition ids + :param tags: TODO (optional) + :param bool update: (optional) choose to updated an existing + Threat-Actor entity, default `False` + """ + + id = kwargs.get("id", None) name = kwargs.get("name", None) description = kwargs.get("description", None) alias = kwargs.get("alias", None) @@ -299,7 +349,6 @@ def create(self, **kwargs): primary_motivation = kwargs.get("primary_motivation", None) secondary_motivation = kwargs.get("secondary_motivation", None) personal_motivation = kwargs.get("personal_motivation", None) - id = kwargs.get("id", None) stix_id_key = kwargs.get("stix_id_key", None) created = kwargs.get("created", None) modified = kwargs.get("modified", None) @@ -311,13 +360,13 @@ def create(self, **kwargs): id entity_type name - description + description alias createdByRef { node { id } - } + } ... on ThreatActor { first_seen last_seen @@ -439,7 +488,7 @@ def create(self, **kwargs): object_result["personal_motivation"] = personal_motivation return object_result else: - return self.create_raw( + return self._create_raw( name=name, description=description, alias=alias, @@ -460,14 +509,18 @@ def create(self, **kwargs): tags=tags, ) - """ - Export an Threat-Actor object in STIX2 - - :param id: the id of the Threat-Actor - :return Threat-Actor object - """ - def to_stix2(self, **kwargs): + """Returns a Stix2 object for a Threat-Actor id + + Takes either an `id` or a Threat-Actor python object via `entity` and + returns a stix2 representation of it. + + The to_stix2 method accepts the following \**kwargs. + + :param id: (optional) `id` of the Threat-Actor you want to convert to stix2 + :param mode: (optional) either `simple` or `full`, default: `simple` + :param entity: (optional) Threat-Actor object to convert + """ id = kwargs.get("id", None) mode = kwargs.get("mode", "simple") max_marking_definition_entity = kwargs.get(