From 5b46a5d17b48d382699f0eb6bb7c7a82bfb97ad7 Mon Sep 17 00:00:00 2001 From: multiflexi Date: Thu, 20 Jun 2024 17:38:55 +0200 Subject: [PATCH 1/5] DTB migrations --- .../90249a322ae1_add_value_descriptipn.py | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 src/core/migrations/versions/90249a322ae1_add_value_descriptipn.py diff --git a/src/core/migrations/versions/90249a322ae1_add_value_descriptipn.py b/src/core/migrations/versions/90249a322ae1_add_value_descriptipn.py new file mode 100644 index 000000000..c175a0dcd --- /dev/null +++ b/src/core/migrations/versions/90249a322ae1_add_value_descriptipn.py @@ -0,0 +1,32 @@ +"""Add value description to report item attribute table + +Revision ID: 90249a322ae1 +Revises: 57d784d699d9 +Create Date: 2024-02-25 19:00:13.825003 + +""" + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.engine.reflection import Inspector + + +# revision identifiers, used by Alembic. +revision = "90249a322ae1" +down_revision = "57d784d699d9" +branch_labels = None +depends_on = None + + +def upgrade(): + conn = op.get_bind() + inspector = Inspector.from_engine(conn) + if "value_description" not in [column["name"] for column in inspector.get_columns("report_item_attribute")]: + op.add_column("report_item_attribute", sa.Column("value_description", sa.String(), nullable=True)) + + +def downgrade(): + conn = op.get_bind() + inspector = Inspector.from_engine(conn) + if "value_description" in [column["name"] for column in inspector.get_columns("report_item_attribute")]: + op.drop_column("report_item_attribute", "value_description") From fec77158d9f5ecebe5475aab5d8f7d0c54cc0302 Mon Sep 17 00:00:00 2001 From: multiflexi Date: Thu, 20 Jun 2024 17:40:22 +0200 Subject: [PATCH 2/5] frontend modifications --- .../src/components/analyze/NewReportItem.vue | 7 +- .../src/components/common/EnumSelector.vue | 203 ++++++++---------- .../common/attribute/AttributeCWE.vue | 5 + .../common/attribute/attributes_mixin.js | 25 ++- 4 files changed, 121 insertions(+), 119 deletions(-) diff --git a/src/gui/src/components/analyze/NewReportItem.vue b/src/gui/src/components/analyze/NewReportItem.vue index 80782b3a9..465d21219 100644 --- a/src/gui/src/components/analyze/NewReportItem.vue +++ b/src/gui/src/components/analyze/NewReportItem.vue @@ -434,6 +434,7 @@ export default { for (let k = 0; k < this.attribute_groups[i].attribute_group_items[j].values.length; k++) { let value = this.attribute_groups[i].attribute_group_items[j].values[k].value + let value_description = this.attribute_groups[i].attribute_group_items[j].values[k].value_description if (this.attribute_groups[i].attribute_group_items[j].attribute_group_item.attribute.type === 'CPE') { value = value.replace("*", "%") } else if (this.attribute_groups[i].attribute_group_items[j].attribute_group_item.attribute.type === 'BOOLEAN') { @@ -448,6 +449,7 @@ export default { this.report_item.attributes.push({ id: -1, value: value, + value_description: value_description, attribute_group_item_id: this.attribute_groups[i].attribute_group_items[j].attribute_group_item.id }) } @@ -649,6 +651,7 @@ export default { if (data.attributes[k].attribute_group_item_id === this.selected_type.attribute_groups[i].attribute_group_items[j].id) { let value = data.attributes[k].value + let value_description = data.attributes[k].value_description if (this.selected_type.attribute_groups[i].attribute_group_items[j].attribute.type === 'CPE') { value = value.replace("%", "*") } else if (this.selected_type.attribute_groups[i].attribute_group_items[j].attribute.type === 'BOOLEAN') { @@ -664,6 +667,7 @@ export default { id: data.attributes[k].id, index: values.length, value: value, + value_description: value_description, binary_mime_type: data.attributes[k].binary_mime_type, binary_size: data.attributes[k].binary_size, binary_description: data.attributes[k].binary_description, @@ -690,6 +694,7 @@ export default { id: data.remote_report_items[l].attributes[k].id, index: values.length, value: value, + value_description: value_description, last_updated: data.remote_report_items[l].attributes[k].last_updated, binary_mime_type: data.remote_report_items[l].attributes[k].binary_mime_type, binary_size: data.remote_report_items[l].attributes[k].binary_size, @@ -858,4 +863,4 @@ export default { this.$root.$off('report-item-updated', this.report_item_updated); } } - \ No newline at end of file + diff --git a/src/gui/src/components/common/EnumSelector.vue b/src/gui/src/components/common/EnumSelector.vue index 4815466e1..d5b8496d0 100644 --- a/src/gui/src/components/common/EnumSelector.vue +++ b/src/gui/src/components/common/EnumSelector.vue @@ -9,44 +9,27 @@ {{ UI.ICON.CLOSE }} - {{$t('attribute.select_enum')}} + {{ $t('attribute.select_enum') }} - + @@ -59,92 +42,96 @@ \ No newline at end of file +} + diff --git a/src/gui/src/components/common/attribute/AttributeCWE.vue b/src/gui/src/components/common/attribute/AttributeCWE.vue index 52ab971d4..fb93649b2 100644 --- a/src/gui/src/components/common/attribute/AttributeCWE.vue +++ b/src/gui/src/components/common/attribute/AttributeCWE.vue @@ -17,6 +17,11 @@ :class="getLockedStyle(index)" :disabled="values[index].locked || !canModify" @fo="firstOne(index)"> + + diff --git a/src/gui/src/components/common/attribute/attributes_mixin.js b/src/gui/src/components/common/attribute/attributes_mixin.js index 588636a84..f648b284d 100644 --- a/src/gui/src/components/common/attribute/attributes_mixin.js +++ b/src/gui/src/components/common/attribute/attributes_mixin.js @@ -1,4 +1,4 @@ -import {getReportItemData, holdLockReportItem, lockReportItem, unlockReportItem, updateReportItem} from "@/api/analyze"; +import { getReportItemData, holdLockReportItem, lockReportItem, unlockReportItem, updateReportItem } from "@/api/analyze"; import AuthMixin from "@/services/auth/auth_mixin"; import Permissions from "@/services/auth/permissions"; @@ -35,6 +35,7 @@ var AttributesMixin = { methods: { enumSelected(data) { this.values[data.index].value = data.value + this.values[data.index].value_description = data.value_description this.onEdit(data.index) }, @@ -54,7 +55,7 @@ var AttributesMixin = { index: this.values.length, value: "", last_updated: data.attribute_last_updated, - user: {name: data.attribute_user} + user: { name: data.attribute_user } }) }) }) @@ -104,7 +105,7 @@ var AttributesMixin = { onFocus(field_index) { if (this.edit === true) { - lockReportItem(this.report_item_id, {'field_id': this.values[field_index].id}).then(() => { + lockReportItem(this.report_item_id, { 'field_id': this.values[field_index].id }).then(() => { }) } //window.console.debug("onFocus") @@ -115,7 +116,7 @@ var AttributesMixin = { this.onEdit(field_index) - unlockReportItem(this.report_item_id, {'field_id': this.values[field_index].id}).then(() => { + unlockReportItem(this.report_item_id, { 'field_id': this.values[field_index].id }).then(() => { }) } }, @@ -126,7 +127,7 @@ var AttributesMixin = { clearTimeout(this.key_timeout); let self = this; this.key_timeout = setTimeout(function () { - holdLockReportItem(self.report_item_id, {'field_id': self.values[field_index].id}).then(() => { + holdLockReportItem(self.report_item_id, { 'field_id': self.values[field_index].id }).then(() => { }) }, 1000); } @@ -140,6 +141,7 @@ var AttributesMixin = { data.attribute_id = this.values[field_index].id let value = this.values[field_index].value + let value_description = this.values[field_index].value_description if (this.attribute_group.attribute.type === 'CPE') { value = value.replace("*", "%") } else if (this.attribute_group.attribute.type === 'BOOLEAN') { @@ -150,11 +152,12 @@ var AttributesMixin = { } } data.attribute_value = value + data.value_description = value_description updateReportItem(this.report_item_id, data).then((update_response) => { getReportItemData(this.report_item_id, update_response.data).then((response) => { this.values[field_index].last_updated = response.data.attribute_last_updated - this.values[field_index].user = {name: response.data.attribute_user} + this.values[field_index].user = { name: response.data.attribute_user } }) }) } @@ -201,8 +204,9 @@ var AttributesMixin = { value = value === "true"; } this.values[i].value = value + this.values[i].value_description = data.value_description this.values[i].last_updated = data.attribute_last_updated - this.values[i].user = {name: data.attribute_user} + this.values[i].user = { name: data.attribute_user } break } } @@ -215,14 +219,15 @@ var AttributesMixin = { id: data.attribute_id, index: this.values.length, value: data.attribute_value, + value_description: data.value_description, binary_mime_type: data.binary_mime_type, binary_size: data.binary_size, binary_description: data.binary_description, last_updated: data.attribute_last_updated, - user: {name: data.attribute_user} + user: { name: data.attribute_user } }) if (this.attribute_group.attribute.type === 'ATTACHMENT') { - this.addFile(this.values[this.values.length-1]) + this.addFile(this.values[this.values.length - 1]) } }) } @@ -260,4 +265,4 @@ var AttributesMixin = { } }; -export default AttributesMixin \ No newline at end of file +export default AttributesMixin From 1ecc477440977b508e68e25e6ac3a97dc2da51c4 Mon Sep 17 00:00:00 2001 From: multiflexi Date: Thu, 20 Jun 2024 20:57:51 +0200 Subject: [PATCH 3/5] modified model and schema --- src/core/model/report_item.py | 687 +++++++++++++++++++----- src/shared/shared/schema/report_item.py | 215 ++++++++ 2 files changed, 757 insertions(+), 145 deletions(-) diff --git a/src/core/model/report_item.py b/src/core/model/report_item.py index b1bf3fd10..4bd749159 100644 --- a/src/core/model/report_item.py +++ b/src/core/model/report_item.py @@ -1,4 +1,24 @@ -from marshmallow import post_load +"""This module contains the ReportItem class and its associated schema. + +The ReportItem class represents a report item, which is a component of a larger report. It contains attributes such as ID, UUID, title, +created timestamp, and more. The class also includes methods for finding report item attributes by ID. + +The module also defines several schemas for creating and validating report items and their attributes. + +Classes: + - ReportItem: A class representing a report item. + - ReportItemAttribute: A class representing an attribute of a report item. + - ReportItemRemoteReportItem: A class representing the relationship between a report item and a remote report item. + +Schemas: + - NewReportItemSchema: Schema for creating a new report item. + - NewReportItemAttributeSchema: Schema for creating a new report item attribute. + +Relationships: + - ReportItem has a many-to-one relationship with User and ReportItemType. + - ReportItem has a many-to-many relationship with NewsItemAggregate and ReportItem. +""" + from datetime import datetime import uuid as uuid_generator from sqlalchemy import orm, or_, func, text, and_ @@ -14,19 +34,76 @@ from shared.schema.acl_entry import ItemType from shared.schema.attribute import AttributeType from shared.schema.news_item import NewsItemAggregateIdSchema, NewsItemAggregateSchema -from shared.schema.report_item import ReportItemAttributeBaseSchema, ReportItemBaseSchema, ReportItemIdSchema, RemoteReportItemSchema, ReportItemRemoteSchema, ReportItemSchema, ReportItemPresentationSchema +from shared.schema.report_item import ( + ReportItemAttributeBaseSchema, + ReportItemBaseSchema, + ReportItemIdSchema, + RemoteReportItemSchema, + ReportItemRemoteSchema, + ReportItemSchema, + ReportItemPresentationSchema, +) class NewReportItemAttributeSchema(ReportItemAttributeBaseSchema): + """Schema for creating a new report item attribute. + + This schema is used to validate and deserialize data for creating a new report item attribute. + + Arguments: + ReportItemAttributeBaseSchema -- The base schema for report item attributes. + + Returns: + An instance of the ReportItemAttribute class. + """ @post_load def make_report_item_attribute(self, data, **kwargs): + """Create a report item attribute. + + This method takes in data and creates a ReportItemAttribute object. + + Arguments: + data (dict): A dictionary containing the data for the report item attribute. + + Returns: + ReportItemAttribute: The created report item attribute object. + """ return ReportItemAttribute(**data) class ReportItemAttribute(db.Model): + """A class representing an attribute of a report item. + + Attributes: + id (int): The unique identifier of the attribute. + value (str): The value of the attribute. + value_description (str): The description of the attribute value. + binary_mime_type (str): The MIME type of the binary data, if applicable. + binary_data (bytes): The binary data associated with the attribute. + binary_size (int): The size of the binary data in bytes. + binary_description (str): The description of the binary data. + created (datetime): The timestamp of when the attribute was created. + last_updated (datetime): The timestamp of when the attribute was last updated. + version (int): The version number of the attribute. + current (bool): Indicates whether the attribute is the current version. + attribute_group_item_id (int): The ID of the attribute group item that the attribute belongs to. + attribute_group_item (AttributeGroupItem): The attribute group item that the attribute belongs to. + attribute_group_item_title (str): The title of the attribute group item. + report_item_id (int): The ID of the report item that the attribute belongs to. + report_item (ReportItem): The report item that the attribute belongs to. + user_id (int): The ID of the user who created the attribute. + user (User): The user who created the attribute. + + Methods: + __init__: Initializes a new instance of the ReportItemAttribute class. + find: Finds a report item attribute by its ID. + + """ + id = db.Column(db.Integer, primary_key=True) value = db.Column(db.String(), nullable=False) + value_description = db.Column(db.String()) binary_mime_type = db.Column(db.String()) binary_data = orm.deferred(db.Column(db.LargeBinary)) binary_size = db.Column(db.Integer) @@ -37,20 +114,42 @@ class ReportItemAttribute(db.Model): version = db.Column(db.Integer, default=1) current = db.Column(db.Boolean, default=True) - attribute_group_item_id = db.Column(db.Integer, db.ForeignKey('attribute_group_item.id')) + attribute_group_item_id = db.Column(db.Integer, db.ForeignKey("attribute_group_item.id")) attribute_group_item = db.relationship("AttributeGroupItem", viewonly=True) attribute_group_item_title = db.Column(db.String) - report_item_id = db.Column(db.Integer, db.ForeignKey('report_item.id'), nullable=True) + report_item_id = db.Column(db.Integer, db.ForeignKey("report_item.id"), nullable=True) report_item = db.relationship("ReportItem") - user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True) + user_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True) user = db.relationship("User") - def __init__(self, id, value, binary_mime_type, binary_size, binary_description, attribute_group_item_id, - attribute_group_item_title): + def __init__( + self, + id, + value, + value_description, + binary_mime_type, + binary_size, + binary_description, + attribute_group_item_id, + attribute_group_item_title, + ): + """Initialize a ReportItem object. + + Arguments: + id (int): The ID of the report item. + value (str): The value of the report item. + value_description (str): The description of the value. + binary_mime_type (str): The MIME type of the binary data. + binary_size (int): The size of the binary data. + binary_description (str): The description of the binary data. + attribute_group_item_id (int): The ID of the attribute group item. + attribute_group_item_title (str): The title of the attribute group item. + """ self.id = None self.value = value + self.value_description = value_description self.binary_mime_type = binary_mime_type self.binary_size = binary_size self.binary_description = binary_description @@ -59,26 +158,83 @@ def __init__(self, id, value, binary_mime_type, binary_size, binary_description, @classmethod def find(cls, attribute_id): + """Find a report item attribute by its ID. + + Args: + attribute_id (int): The ID of the attribute to find. + + Returns: + ReportItemAttribute: The report item attribute with the specified ID, or None if not found. + + """ report_item_attribute = cls.query.get(attribute_id) return report_item_attribute class NewReportItemSchema(ReportItemBaseSchema): + """Schema for creating a new report item. + + This schema defines the structure and validation rules for creating a new report item. + + Arguments: + ReportItemBaseSchema -- The base schema for report items. + + Returns: + An instance of the NewReportItemSchema class. + """ + news_item_aggregates = fields.Nested(NewsItemAggregateIdSchema, many=True, missing=[]) remote_report_items = fields.Nested(ReportItemIdSchema, many=True, missing=[]) attributes = fields.Nested(NewReportItemAttributeSchema, many=True) @post_load def make(self, data, **kwargs): + """Create a new ReportItem object. + + Arguments: + data (dict): A dictionary containing the data for the ReportItem. + + Returns: + ReportItem: A new ReportItem object. + """ return ReportItem(**data) class ReportItemRemoteReportItem(db.Model): - report_item_id = db.Column(db.Integer, db.ForeignKey('report_item.id'), primary_key=True) - remote_report_item_id = db.Column(db.Integer, db.ForeignKey('report_item.id'), primary_key=True) + """A class representing the relationship between a report item and a remote report item. + + Arguments: + db -- The database object used for defining the model. + """ + + report_item_id = db.Column(db.Integer, db.ForeignKey("report_item.id"), primary_key=True) + remote_report_item_id = db.Column(db.Integer, db.ForeignKey("report_item.id"), primary_key=True) class ReportItem(db.Model): + """A class representing a report item. + + Attributes: + id (int): The unique identifier of the report item. + uuid (str): The UUID of the report item. + title (str): The title of the report item. + title_prefix (str): The prefix of the report item title. + created (datetime): The datetime when the report item was created. + last_updated (datetime): The datetime when the report item was last updated. + completed (bool): Indicates whether the report item is completed or not. + user_id (int): The ID of the user associated with the report item. + user (User): The user associated with the report item. + remote_user (str): The remote user associated with the report item. + report_item_type_id (int): The ID of the report item type associated with the report item. + report_item_type (ReportItemType): The report item type associated with the report item. + news_item_aggregates (list): The list of news item aggregates associated with the report item. + remote_report_items (list): The list of remote report items associated with the report item. + attributes (list): The list of attributes associated with the report item. + report_item_cpes (list): The list of report item CPES associated with the report item. + subtitle (str): The subtitle of the report item. + tag (str): The tag of the report item. + """ + id = db.Column(db.Integer, primary_key=True) uuid = db.Column(db.String(64)) @@ -89,26 +245,40 @@ class ReportItem(db.Model): last_updated = db.Column(db.DateTime, default=datetime.now) completed = db.Column(db.Boolean, default=False) - user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True) + user_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True) user = db.relationship("User", viewonly=True) remote_user = db.Column(db.String()) - report_item_type_id = db.Column(db.Integer, db.ForeignKey('report_item_type.id'), nullable=True) + report_item_type_id = db.Column(db.Integer, db.ForeignKey("report_item_type.id"), nullable=True) report_item_type = db.relationship("ReportItemType", viewonly=True) - news_item_aggregates = db.relationship("NewsItemAggregate", secondary='report_item_news_item_aggregate') + news_item_aggregates = db.relationship("NewsItemAggregate", secondary="report_item_news_item_aggregate") - remote_report_items = db.relationship("ReportItem", secondary='report_item_remote_report_item', - primaryjoin=ReportItemRemoteReportItem.report_item_id == id, - secondaryjoin=ReportItemRemoteReportItem.remote_report_item_id == id) + remote_report_items = db.relationship( + "ReportItem", + secondary="report_item_remote_report_item", + primaryjoin=ReportItemRemoteReportItem.report_item_id == id, + secondaryjoin=ReportItemRemoteReportItem.remote_report_item_id == id, + ) - attributes = db.relationship('ReportItemAttribute', back_populates="report_item", cascade="all, delete-orphan") + attributes = db.relationship("ReportItemAttribute", back_populates="report_item", cascade="all, delete-orphan") report_item_cpes = db.relationship("ReportItemCpe", cascade="all, delete-orphan") - def __init__(self, id, uuid, title, title_prefix, report_item_type_id, news_item_aggregates, remote_report_items, - attributes, completed): - + def __init__(self, id, uuid, title, title_prefix, report_item_type_id, news_item_aggregates, remote_report_items, attributes, completed): + """Initialize a new instance of the ReportItem class. + + Arguments: + id (int): The ID of the report item. + uuid (str): The UUID of the report item. + title (str): The title of the report item. + title_prefix (str): The prefix of the report item's title. + report_item_type_id (int): The ID of the report item type. + news_item_aggregates (list): A list of news item aggregates associated with the report item. + remote_report_items (list): A list of remote report items associated with the report item. + attributes (dict): A dictionary of attributes for the report item. + completed (bool): Indicates whether the report item is completed or not. + """ self.id = id if uuid is None: @@ -135,35 +305,75 @@ def __init__(self, id, uuid, title, title_prefix, report_item_type_id, news_item @orm.reconstructor def reconstruct(self): + """Reconstructs the report item. + + This method clears the subtitle, sets the tag to "mdi-file-table-outline", + and sorts the attributes based on the attribute group index, attribute group item index, and attribute ID. + """ self.subtitle = "" self.tag = "mdi-file-table-outline" self.attributes.sort(key=lambda obj: (obj.attribute_group_item.attribute_group.index, obj.attribute_group_item.index, obj.id)) @classmethod def count_all(cls, is_completed): + """Count the number of report items based on completion status. + + Arguments: + is_completed (bool): A flag indicating whether to count completed or incomplete report items. + Returns: + int: The count of report items matching the completion status. + """ return cls.query.filter_by(completed=is_completed).count() @classmethod def find(cls, report_item_id): + """Find a report item by its ID. + + Arguments: + report_item_id (int): The ID of the report item. + Returns: + ReportItem: The report item with the specified ID. + """ report_item = cls.query.get(report_item_id) return report_item @classmethod def find_by_uuid(cls, report_item_uuid): + """Find a report item by its UUID. + + Arguments: + report_item_uuid (str): The UUID of the report item. + Returns: + ReportItem: The report item with the specified UUID. + """ report_item = cls.query.filter_by(uuid=report_item_uuid) return report_item @classmethod def allowed_with_acl(cls, report_item_id, user, see, access, modify): - - query = db.session.query(ReportItem.id).distinct().group_by(ReportItem.id).filter( - ReportItem.id == report_item_id) - - query = query.outerjoin(ACLEntry, or_(and_(ReportItem.uuid == ACLEntry.item_id, - ACLEntry.item_type == ItemType.REPORT_ITEM), - and_(cast(ReportItem.report_item_type_id, - sqlalchemy.String) == ACLEntry.item_id, - ACLEntry.item_type == ItemType.REPORT_ITEM_TYPE))) + """Check if the user is allowed to perform actions on a report item based on ACL. + + Arguments: + report_item_id (int): The ID of the report item. + user (User): The user object. + see (bool): Whether the user can see the report item. + access (bool): Whether the user can access the report item. + modify (bool): Whether the user can modify the report item. + Returns: + bool: True if the user is allowed, False otherwise. + """ + query = db.session.query(ReportItem.id).distinct().group_by(ReportItem.id).filter(ReportItem.id == report_item_id) + + query = query.outerjoin( + ACLEntry, + or_( + and_(ReportItem.uuid == ACLEntry.item_id, ACLEntry.item_type == ItemType.REPORT_ITEM), + and_( + cast(ReportItem.report_item_type_id, sqlalchemy.String) == ACLEntry.item_id, + ACLEntry.item_type == ItemType.REPORT_ITEM_TYPE, + ), + ), + ) query = ACLEntry.apply_query(query, user, see, access, modify) @@ -171,14 +381,29 @@ def allowed_with_acl(cls, report_item_id, user, see, access, modify): @classmethod def get_for_sync(cls, last_synced, report_item_types): + """Retrieve report items for synchronization. + + This method retrieves report items that have been updated since the last synchronization time, + and belong to the specified report item types. + Args: + last_synced (datetime): The last synchronization time. + report_item_types (list): A list of report item types. + Returns: + tuple: A tuple containing two elements: + - items (list): A list of report items that need to be synchronized. + - last_sync_time (datetime): The current synchronization time. + """ report_item_type_ids = set() for report_item_type in report_item_types: report_item_type_ids.add(report_item_type.id) last_sync_time = datetime.now() - query = cls.query.filter(ReportItem.last_updated >= last_synced, ReportItem.last_updated <= last_sync_time, - ReportItem.report_item_type_id.in_(report_item_type_ids)) + query = cls.query.filter( + ReportItem.last_updated >= last_synced, + ReportItem.last_updated <= last_sync_time, + ReportItem.report_item_type_id.in_(report_item_type_ids), + ) report_items = query.all() @@ -193,70 +418,108 @@ def get_for_sync(cls, last_synced, report_item_types): @classmethod def get(cls, group, filter, offset, limit, user): - + """Retrieve report items based on specified criteria. + + Arguments: + group (str): The remote user group. + filter (dict): The filter criteria. + offset (int): The offset for pagination. + limit (int): The limit for pagination. + user (str): The user performing the query. + Returns: + tuple: A tuple containing the list of report items and the total count. + """ if group: query = cls.query.filter(ReportItem.remote_user == group) else: - query = db.session.query(ReportItem, func.count().filter(ACLEntry.id > 0).label("acls"), - func.count().filter(ACLEntry.access == True).label("access"), - func.count().filter(ACLEntry.modify == True).label("modify")).distinct().group_by( - ReportItem.id) - - query = query.filter(ReportItem.remote_user == None) - - query = query.outerjoin(ACLEntry, or_(and_(ReportItem.uuid == ACLEntry.item_id, - ACLEntry.item_type == ItemType.REPORT_ITEM), - and_(cast(ReportItem.report_item_type_id, - sqlalchemy.String) == ACLEntry.item_id, - ACLEntry.item_type == ItemType.REPORT_ITEM_TYPE))) + query = ( + db.session.query( + ReportItem, + func.count().filter(ACLEntry.id > 0).label("acls"), + func.count().filter(ACLEntry.access.is_(True)).label("access"), + func.count().filter(ACLEntry.modify.is_(True)).label("modify"), + ) + .distinct() + .group_by(ReportItem.id) + ) + + query = query.filter(ReportItem.remote_user.is_(None)) + + query = query.outerjoin( + ACLEntry, + or_( + and_(ReportItem.uuid == ACLEntry.item_id, ACLEntry.item_type == ItemType.REPORT_ITEM), + and_( + cast(ReportItem.report_item_type_id, sqlalchemy.String) == ACLEntry.item_id, + ACLEntry.item_type == ItemType.REPORT_ITEM_TYPE, + ), + ), + ) query = ACLEntry.apply_query(query, user, True, False, False) - if 'search' in filter and filter['search'] != '': - search_string = '%' + filter['search'].lower() + '%' - query = query.join(ReportItemAttribute, ReportItem.id == ReportItemAttribute.report_item_id).filter(or_( - func.lower(ReportItemAttribute.value).like(search_string), - func.lower(ReportItem.title).like(search_string), - func.lower(ReportItem.title_prefix).like(search_string))) + search_string = filter.get("search", "").lower() + if search_string: + search_string = f"%{search_string}%" + query = query.join(ReportItemAttribute, ReportItem.id == ReportItemAttribute.report_item_id).filter( + or_( + func.lower(ReportItemAttribute.value).like(search_string), + func.lower(ReportItem.title).like(search_string), + func.lower(ReportItem.title_prefix).like(search_string), + ) + ) - if 'completed' in filter and filter['completed'].lower() == "true": - query = query.filter(ReportItem.completed == True) + if filter.get("completed", "").lower() == "true": + query = query.filter(ReportItem.completed) - if 'incompleted' in filter and filter['incompleted'].lower() == "true": - query = query.filter(ReportItem.completed == False) + if filter.get("incompleted", "").lower() == "true": + query = query.filter(ReportItem.completed.is_(False)) - if 'range' in filter and filter['range'] != 'ALL': + if filter.get("range", "ALL") != "ALL": date_limit = datetime.now() - if filter['range'] == 'TODAY': + if filter["range"] == "TODAY": date_limit = date_limit.replace(hour=0, minute=0, second=0, microsecond=0) - if filter['range'] == 'WEEK': - date_limit = date_limit.replace(day=date_limit.day - date_limit.weekday(), hour=0, minute=0, second=0, - microsecond=0) + if filter["range"] == "WEEK": + date_limit = date_limit.replace(day=date_limit.day - date_limit.weekday(), hour=0, minute=0, second=0, microsecond=0) - if filter['range'] == 'MONTH': + if filter["range"] == "MONTH": date_limit = date_limit.replace(day=1, hour=0, minute=0, second=0, microsecond=0) query = query.filter(ReportItem.created >= date_limit) - if 'sort' in filter: - if filter['sort'] == 'DATE_DESC': + if filter.get("sort"): + if filter["sort"] == "DATE_DESC": query = query.order_by(db.desc(ReportItem.created)) - elif filter['sort'] == 'DATE_ASC': + elif filter["sort"] == "DATE_ASC": query = query.order_by(db.asc(ReportItem.created)) return query.offset(offset).limit(limit).all(), query.count() @classmethod def identical(cls, uuid): + """Check if a report item with the given UUID exists. + + Arguments: + uuid -- The UUID of the report item to check. + Returns: + True if a report item with the given UUID exists, False otherwise. + """ return db.session.query(db.exists().where(ReportItem.uuid == uuid)).scalar() @classmethod def get_by_cpe(cls, cpes): - + """Retrieve report items by Common Platform Enumeration (CPE). + + This method queries the database to retrieve report items that match the provided CPEs. + Arguments: + cpes (list): A list of CPEs to search for. + Returns: + list: A list of report item IDs that match the provided CPEs. + """ if len(cpes) > 0: query_string = "SELECT DISTINCT report_item_id FROM report_item_cpe WHERE value LIKE ANY(:cpes) OR {}" - params = {'cpes': cpes} + params = {"cpes": cpes} inner_query = "" for i in range(len(cpes)): @@ -274,6 +537,18 @@ def get_by_cpe(cls, cpes): @classmethod def get_json(cls, group, filter, offset, limit, user): + """Get the JSON representation of report items. + + This method retrieves report items based on the provided parameters and returns them in JSON format. + Arguments: + group (str): The group parameter. + filter (str): The filter parameter. + offset (int): The offset parameter. + limit (int): The limit parameter. + user (str): The user parameter. + Returns: + dict: A dictionary containing the total count of report items and a list of report items in JSON format. + """ results, count = cls.get(group, filter, offset, limit, user) report_items = [] if group: @@ -292,18 +567,36 @@ def get_json(cls, group, filter, offset, limit, user): report_items.append(report_item) report_items_schema = ReportItemPresentationSchema(many=True) - return {'total_count': count, 'items': report_items_schema.dump(report_items)} + return {"total_count": count, "items": report_items_schema.dump(report_items)} @classmethod def get_detail_json(cls, id): + """Get the detailed JSON representation of a report item. + + Arguments: + cls -- The class object. + id -- The ID of the report item. + Returns: + The detailed JSON representation of the report item. + """ report_item = cls.query.get(id) report_item_schema = ReportItemSchema() return report_item_schema.dump(report_item) @classmethod def get_groups(cls): - result = db.session.query(ReportItem.remote_user).distinct().group_by(ReportItem.remote_user).filter( - ReportItem.remote_user != None).all() + """Get the distinct groups associated with the report items. + + Returns: + list: A list of distinct groups. + """ + result = ( + db.session.query(ReportItem.remote_user) + .distinct() + .group_by(ReportItem.remote_user) + .filter(ReportItem.remote_user.isnot(None)) + .all() + ) groups = set() for row in result: groups.add(row[0]) @@ -312,11 +605,21 @@ def get_groups(cls): @classmethod def add_report_item(cls, report_item_data, user): + """Add a report item to the database. + + This method takes in report_item_data and user as arguments and adds a new report item to the database. + It performs authorization checks to ensure that the user has the necessary permissions to add the report item. + Arguments: + report_item_data (dict): The data for the report item. + user (User): The user adding the report item. + Returns: + tuple: A tuple containing the added report item and the HTTP status code. + """ report_item_schema = NewReportItemSchema() report_item = report_item_schema.load(report_item_data) if not ReportItemType.allowed_with_acl(report_item.report_item_type_id, user, False, False, True): - return 'Unauthorized access to report item type', 401 + return "Unauthorized access to report item type", 401 report_item.user_id = user.id for attribute in report_item.attributes: @@ -331,6 +634,16 @@ def add_report_item(cls, report_item_data, user): @classmethod def add_remote_report_items(cls, report_item_data, remote_node_name): + """Add remote report items to the database. + + This method takes a list of report item data and a remote node name, + and adds the report items to the database. If a report item with the + same UUID already exists, it updates the existing report item with the + new data. + Arguments: + report_item_data (list): A list of report item data. + remote_node_name (str): The name of the remote node. + """ report_item_schema = NewReportItemSchema(many=True) report_items = report_item_schema.load(report_item_data) @@ -350,66 +663,81 @@ def add_remote_report_items(cls, report_item_data, remote_node_name): @classmethod def update_report_item(cls, id, data, user): + """Update a report item with the given data. + + Arguments: + id (int): The ID of the report item. + data (dict): The data to update the report item with. + user (User): The user performing the update. + Returns: + tuple: A tuple containing a boolean indicating whether the report item was modified and the updated data. + """ modified = False new_attribute = None report_item = cls.query.get(id) if report_item is not None: - if 'update' in data: - if 'title' in data: - if report_item.title != data['title']: + if "update" in data: + if "title" in data: + if report_item.title != data["title"]: modified = True - report_item.title = data['title'] - data['title'] = '' + report_item.title = data["title"] + data["title"] = "" - if 'title_prefix' in data: - if report_item.title_prefix != data['title_prefix']: + if "title_prefix" in data: + if report_item.title_prefix != data["title_prefix"]: modified = True - report_item.title_prefix = data['title_prefix'] - data['title_prefix'] = '' + report_item.title_prefix = data["title_prefix"] + data["title_prefix"] = "" - if 'completed' in data: - if report_item.completed != data['completed']: + if "completed" in data: + if report_item.completed != data["completed"]: modified = True - report_item.completed = data['completed'] - data['completed'] = '' + report_item.completed = data["completed"] + data["completed"] = "" - if 'attribute_id' in data: + if "attribute_id" in data: for attribute in report_item.attributes: - # sometime we compare: int & int or int & str - if str(attribute.id) == str(data['attribute_id']): - if attribute.value != data['attribute_value']: + # Compare attribute IDs + if str(attribute.id) == str(data["attribute_id"]): + if attribute.value != data["attribute_value"]: + modified = True + attribute.value = data["attribute_value"] + data["attribute_value"] = "" + attribute.user = user + attribute.last_updated = datetime.now() + if attribute.value_description != data["value_description"]: modified = True - attribute.value = data['attribute_value'] - data['attribute_value'] = '' + attribute.value_description = data["value_description"] + data["value_description"] = "" attribute.user = user attribute.last_updated = datetime.now() break - if 'add' in data: - if 'attribute_id' in data: + if "add" in data: + if "attribute_id" in data: modified = True - new_attribute = ReportItemAttribute(None, "", None, 0, None, data['attribute_group_item_id'], None) + new_attribute = ReportItemAttribute(None, "", None, 0, None, data["attribute_group_item_id"], None) new_attribute.user = user report_item.attributes.append(new_attribute) - if 'aggregate_ids' in data: + if "aggregate_ids" in data: modified = True - for aggregate_id in data['aggregate_ids']: + for aggregate_id in data["aggregate_ids"]: aggregate = NewsItemAggregate.find(aggregate_id) report_item.news_item_aggregates.append(aggregate) - if 'remote_report_item_ids' in data: + if "remote_report_item_ids" in data: modified = True - for remote_report_item_id in data['remote_report_item_ids']: + for remote_report_item_id in data["remote_report_item_ids"]: remote_report_item = ReportItem.find(remote_report_item_id) report_item.remote_report_items.append(remote_report_item) - if 'delete' in data: - if 'attribute_id' in data: + if "delete" in data: + if "attribute_id" in data: attribute_to_delete = None for attribute in report_item.attributes: # sometime we compare: int & int or int & str - if str(attribute.id) == str(data['attribute_id']): + if str(attribute.id) == str(data["attribute_id"]): attribute_to_delete = attribute break @@ -417,10 +745,10 @@ def update_report_item(cls, id, data, user): modified = True report_item.attributes.remove(attribute_to_delete) - if 'aggregate_id' in data: + if "aggregate_id" in data: aggregate_to_delete = None for aggregate in report_item.news_item_aggregates: - if aggregate.id == data['aggregate_id']: + if aggregate.id == data["aggregate_id"]: aggregate_to_delete = aggregate break @@ -428,10 +756,10 @@ def update_report_item(cls, id, data, user): modified = True report_item.news_item_aggregates.remove(aggregate_to_delete) - if 'remote_report_item_id' in data: + if "remote_report_item_id" in data: remote_report_item_to_delete = None for remote_report_item in report_item.remote_report_items: - if remote_report_item.id == data['remote_report_item_id']: + if remote_report_item.id == data["remote_report_item_id"]: remote_report_item_to_delete = remote_report_item break @@ -441,73 +769,99 @@ def update_report_item(cls, id, data, user): if modified: report_item.last_updated = datetime.now() - data['user_id'] = user.id - data['report_item_id'] = int(id) + data["user_id"] = user.id + data["report_item_id"] = int(id) report_item.update_cpes() db.session.commit() if new_attribute is not None: - data['attribute_id'] = new_attribute.id + data["attribute_id"] = new_attribute.id return modified, data @classmethod def get_updated_data(cls, id, data): + """Get the updated data for a report item. + + This method retrieves the updated data for a report item based on the provided ID and data. + Arguments: + cls (class): The class object. + id (int): The ID of the report item. + data (dict): The data containing the updates. + Returns: + dict: The updated data for the report item. + """ report_item = cls.query.get(id) if report_item is not None: - if 'update' in data: - if 'title' in data: - data['title'] = report_item.title + if "update" in data: + if "title" in data: + data["title"] = report_item.title - if 'title_prefix' in data: - data['title_prefix'] = report_item.title_prefix + if "title_prefix" in data: + data["title_prefix"] = report_item.title_prefix - if 'completed' in data: - data['completed'] = report_item.completed + if "completed" in data: + data["completed"] = report_item.completed - if 'attribute_id' in data: + if "attribute_id" in data: for attribute in report_item.attributes: - if str(attribute.id) == data['attribute_id']: - data['attribute_value'] = attribute.value - data['attribute_last_updated'] = attribute.last_updated.strftime('%d.%m.%Y - %H:%M') - data['attribute_user'] = attribute.user.name + if str(attribute.id) == data["attribute_id"]: + data["attribute_value"] = attribute.value + data["attribute_value_description"] = attribute.value_description + data["attribute_last_updated"] = attribute.last_updated.strftime("%d.%m.%Y - %H:%M") + data["attribute_user"] = attribute.user.name break - if 'add' in data: - if 'aggregate_ids' in data: + if "add" in data: + if "aggregate_ids" in data: schema = NewsItemAggregateSchema() - data['news_item_aggregates'] = [] - for aggregate_id in data['aggregate_ids']: + data["news_item_aggregates"] = [] + for aggregate_id in data["aggregate_ids"]: aggregate = NewsItemAggregate.find(aggregate_id) - data['news_item_aggregates'].append(schema.dump(aggregate)) + data["news_item_aggregates"].append(schema.dump(aggregate)) - if 'remote_report_item_ids' in data: + if "remote_report_item_ids" in data: schema = RemoteReportItemSchema() - data['remote_report_items'] = [] - for remote_report_item_id in data['remote_report_item_ids']: + data["remote_report_items"] = [] + for remote_report_item_id in data["remote_report_item_ids"]: remote_report_item = ReportItem.find(remote_report_item_id) - data['remote_report_items'].append(schema.dump(remote_report_item)) + data["remote_report_items"].append(schema.dump(remote_report_item)) - if 'attribute_id' in data: + if "attribute_id" in data: for attribute in report_item.attributes: - if str(attribute.id) == data['attribute_id']: - data['attribute_value'] = attribute.value - data['binary_mime_type'] = attribute.binary_mime_type - data['binary_size'] = attribute.binary_size - data['binary_description'] = attribute.binary_description - data['attribute_last_updated'] = attribute.last_updated.strftime('%d.%m.%Y - %H:%M') - data['attribute_user'] = attribute.user.name + if str(attribute.id) == data["attribute_id"]: + data["attribute_value"] = attribute.value + data["attribute_value_description"] = attribute.value_description + data["binary_mime_type"] = attribute.binary_mime_type + data["binary_size"] = attribute.binary_size + data["binary_description"] = attribute.binary_description + data["attribute_last_updated"] = attribute.last_updated.strftime("%d.%m.%Y - %H:%M") + data["attribute_user"] = attribute.user.name break return data @classmethod def add_attachment(cls, id, attribute_group_item_id, user, file, description): + """Add an attachment to a report item. + + Arguments: + id (int): The ID of the report item. + attribute_group_item_id (int): The ID of the attribute group item. + user (User): The user who is adding the attachment. + file (FileStorage): The file to be attached. + description (str): The description of the attachment. + Returns: + dict: A dictionary containing information about the attachment. + - "add" (bool): True if the attachment was added successfully. + - "user_id" (int): The ID of the user who added the attachment. + - "report_item_id" (int): The ID of the report item. + - "attribute_id" (int): The ID of the newly created attribute. + """ report_item = cls.query.get(id) file_data = file.read() - new_attribute = ReportItemAttribute(None, file.filename, file.mimetype, len(file_data), description, - attribute_group_item_id, None) + new_attribute = ReportItemAttribute(None, file.filename, file.mimetype, len(file_data), description, attribute_group_item_id, None) new_attribute.user = user new_attribute.binary_data = file_data report_item.attributes.append(new_attribute) @@ -515,10 +869,10 @@ def add_attachment(cls, id, attribute_group_item_id, user, file, description): report_item.last_updated = datetime.now() data = dict() - data['add'] = True - data['user_id'] = user.id - data['report_item_id'] = int(id) - data['attribute_id'] = new_attribute.id + data["add"] = True + data["user_id"] = user.id + data["report_item_id"] = int(id) + data["attribute_id"] = new_attribute.id db.session.commit() @@ -526,6 +880,20 @@ def add_attachment(cls, id, attribute_group_item_id, user, file, description): @classmethod def remove_attachment(cls, id, attribute_id, user): + """Remove an attachment from a report item. + + Arguments: + cls (ReportItem): The class object. + id (int): The ID of the report item. + attribute_id (int): The ID of the attribute to be removed. + user (User): The user performing the action. + Returns: + dict: A dictionary containing information about the deletion. + - delete (bool): Indicates whether the attribute was successfully deleted. + - user_id (int): The ID of the user performing the action. + - report_item_id (int): The ID of the report item. + - attribute_id (int): The ID of the attribute that was deleted. + """ report_item = cls.query.get(id) attribute_to_delete = None for attribute in report_item.attributes: @@ -539,10 +907,10 @@ def remove_attachment(cls, id, attribute_id, user): report_item.last_updated = datetime.now() data = dict() - data['delete'] = True - data['user_id'] = user.id - data['report_item_id'] = int(id) - data['attribute_id'] = attribute_id + data["delete"] = True + data["user_id"] = user.id + data["report_item_id"] = int(id) + data["attribute_id"] = attribute_id db.session.commit() @@ -550,13 +918,27 @@ def remove_attachment(cls, id, attribute_id, user): @classmethod def delete_report_item(cls, id): + """Delete a report item by its ID. + + Arguments: + id (int): The ID of the report item to be deleted. + Returns: + tuple: A tuple containing the status message and the HTTP status code. + The status message is "success" if the report item was deleted successfully. + The HTTP status code is 200 if the report item was deleted successfully. + """ report_item = cls.query.get(id) if report_item is not None: db.session.delete(report_item) db.session.commit() - return 'success', 200 + return "success", 200 def update_cpes(self): + """Update the list of CPES for the report item. + + This method clears the existing list of CPES and populates it with new CPES + based on the attributes of the report item. Only attributes of type CPE are considered. + """ self.report_item_cpes = [] if self.completed is True: for attribute in self.attributes: @@ -566,10 +948,25 @@ def update_cpes(self): class ReportItemCpe(db.Model): + """A class representing a CPE (Common Platform Enumeration) report item. + + Attributes: + id (int): The unique identifier of the report item. + value (str): The value of the report item. + report_item_id (int): The foreign key referencing the parent report item. + Args: + db (object): The database object. + """ + id = db.Column(db.Integer, primary_key=True) value = db.Column(db.String()) - report_item_id = db.Column(db.Integer, db.ForeignKey('report_item.id')) + report_item_id = db.Column(db.Integer, db.ForeignKey("report_item.id")) def __init__(self, value): + """Initialize a ReportItemCpe object. + + Args: + value (any): The value of the report item. + """ self.id = None self.value = value diff --git a/src/shared/shared/schema/report_item.py b/src/shared/shared/schema/report_item.py index fb6723b47..502012a7e 100644 --- a/src/shared/shared/schema/report_item.py +++ b/src/shared/shared/schema/report_item.py @@ -1,3 +1,24 @@ +"""This module contains schemas and classes for representing report items and their attributes. + +The module includes the following classes: +- ReportItemAttributeBaseSchema: Schema for representing a report item attribute. +- ReportItemAttributeSchema: Schema for representing a report item attribute with additional fields. +- ReportItemAttribute: Class representing an attribute of a report item. +- ReportItemBaseSchema: Schema for the base report item. +- RemoteReportItemSchema: Schema for remote report items. +- ReportItemSchema: Schema for serializing and deserializing ReportItem objects. +- ReportItemAttributeRemoteSchema: Schema for representing a remote attribute of a report item. +- ReportItemRemoteSchema: Schema for representing a remote report item. +- ReportItemPresentationSchema: Schema for presenting a report item. +- ReportItem: Class representing a report item. + +The module also imports schemas from other modules: +- PresentationSchema: Schema for presentation. +- NewsItemAggregateSchema: Schema for representing news item aggregates. +- ACLEntryStatusSchema: Schema for the ACL entry status. +- UserSchemaBase: Schema for representing user data. +""" + from marshmallow import Schema, fields, post_load, EXCLUDE from shared.schema.presentation import PresentationSchema @@ -7,11 +28,27 @@ class ReportItemAttributeBaseSchema(Schema): + """Schema for representing a report item attribute. + + Attributes: + id (int): The ID of the attribute. + value (str): The value of the attribute. + value_description (str): The description of the attribute value. + binary_mime_type (str): The MIME type of the binary attribute. + binary_size (int): The size of the binary attribute. + binary_description (str): The description of the binary attribute. + attribute_group_item_title (str): The title of the attribute group item. + attribute_group_item_id (int): The ID of the attribute group item. + """ + class Meta: + """Meta class for configuring the behavior of the ReportItemAttributeBase schema.""" + unknown = EXCLUDE id = fields.Int(load_default=None, allow_none=True) value = fields.Str() + value_description = fields.Str(load_default=None, allow_none=True) binary_mime_type = fields.Str(load_default=None, allow_none=True) binary_size = fields.Int(load_default=0) binary_description = fields.Str(load_default=None, allow_none=True) @@ -20,6 +57,17 @@ class Meta: class ReportItemAttributeSchema(ReportItemAttributeBaseSchema): + """Schema for representing a report item attribute. + + This schema defines the structure and validation rules for a report item attribute. + + Arguments: + ReportItemAttributeBaseSchema -- The base schema for a report item attribute. + + Returns: + An instance of the ReportItemAttributeSchema class. + """ + created = fields.DateTime("%d.%m.%Y - %H:%M") last_updated = fields.DateTime("%d.%m.%Y - %H:%M") version = fields.Int() @@ -28,10 +76,39 @@ class ReportItemAttributeSchema(ReportItemAttributeBaseSchema): @post_load def make(self, data, **kwargs): + """Create a new ReportItemAttribute object. + + This method takes in data and creates a new ReportItemAttribute object using the provided data. + + Arguments: + data (dict): A dictionary containing the data for creating the ReportItemAttribute object. + **kwargs: Additional keyword arguments. + + Returns: + ReportItemAttribute: The newly created ReportItemAttribute object. + """ return ReportItemAttribute(**data) class ReportItemAttribute: + """Represents an attribute of a report item. + + Attributes: + id (int): The ID of the attribute. + value (str): The value of the attribute. + value_description (str): The description of the attribute value. + binary_mime_type (str): The MIME type of the binary data associated with the attribute. + binary_size (int): The size of the binary data associated with the attribute. + binary_description (str): The description of the binary data associated with the attribute. + attribute_group_item_id (int): The ID of the attribute group item. + attribute_group_item_title (str): The title of the attribute group item. + created (datetime): The date and time when the attribute was created. + last_updated (datetime): The date and time when the attribute was last updated. + version (int): The version of the attribute. + current (bool): Indicates whether the attribute is the current version. + user (str): The user who created or last updated the attribute. + """ + def __init__( self, id, @@ -47,6 +124,7 @@ def __init__( current, user, ): + """Initialize a new instance of the ReportItem class.""" self.id = id self.value = value self.created = created @@ -62,7 +140,22 @@ def __init__( class ReportItemBaseSchema(Schema): + """Schema for the base report item. + + Attributes: + id (int): The ID of the report item. + uuid (str): The UUID of the report item. + title (str): The title of the report item. + title_prefix (str): The prefix of the report item title. + created (DateTime): The date and time when the report item was created. + last_updated (DateTime): The date and time when the report item was last updated. + completed (bool): Indicates whether the report item is completed or not. + report_item_type_id (int): The ID of the report item type. + """ + class Meta: + """Meta class for configuring the behavior of the ReportItem schema.""" + unknown = EXCLUDE id = fields.Int(load_default=None, allow_none=True) @@ -76,11 +169,41 @@ class Meta: class RemoteReportItemSchema(ReportItemBaseSchema, PresentationSchema): + """Schema for remote report items. + + This schema represents the structure and validation rules for remote report items. + + Arguments: + ReportItemBaseSchema -- Base schema for report items. + PresentationSchema -- Schema for presentation. + + Attributes: + remote_user (str): The remote user associated with the report item. + attributes (list): List of nested report item attributes. + """ + remote_user = fields.Str(allow_none=True) attributes = fields.Nested(ReportItemAttributeSchema, many=True) class ReportItemSchema(ReportItemBaseSchema): + """Schema for serializing and deserializing ReportItem objects. + + Inherits from ReportItemBaseSchema. + + Attributes: + news_item_aggregates (List[NewsItemAggregateSchema]): List of nested NewsItemAggregateSchema objects. + remote_report_items (List[RemoteReportItemSchema]): List of nested RemoteReportItemSchema objects. + attributes (List[ReportItemAttributeSchema]): List of nested ReportItemAttributeSchema objects. + remote_user (str): Remote user associated with the report item. + + Methods: + make(data, **kwargs): Post-load method to create a ReportItem object from deserialized data. + + Returns: + ReportItemSchema: An instance of the ReportItemSchema class. + """ + news_item_aggregates = fields.Nested(NewsItemAggregateSchema, many=True) remote_report_items = fields.Nested(RemoteReportItemSchema, many=True) attributes = fields.Nested(ReportItemAttributeSchema, many=True) @@ -88,15 +211,38 @@ class ReportItemSchema(ReportItemBaseSchema): @post_load def make(self, data, **kwargs): + """Create a new ReportItem object. + + This method takes in data and creates a new ReportItem object using the provided data. + + Arguments: + data (dict): A dictionary containing the data for the ReportItem. + + Returns: + ReportItem: A new ReportItem object. + """ return ReportItem(**data) class ReportItemAttributeRemoteSchema(Schema): + """A schema for representing a remote attribute of a report item. + + Attributes: + attribute_group_item_title (str): The title of the attribute group item. + value (str): The value of the attribute. + """ + attribute_group_item_title = fields.Str() value = fields.Str() class ReportItemRemoteSchema(Schema): + """A schema for representing a remote report item. + + Arguments: + Schema -- The base schema class. + """ + uuid = fields.Str(allow_none=True) title = fields.Str() title_prefix = fields.Str() @@ -105,10 +251,41 @@ class ReportItemRemoteSchema(Schema): class ReportItemPresentationSchema(ReportItemBaseSchema, ACLEntryStatusSchema, PresentationSchema): + """Schema for presenting a report item. + + This schema inherits from the ReportItemBaseSchema, ACLEntryStatusSchema, and PresentationSchema classes. + + Arguments: + ReportItemBaseSchema -- Schema for the base report item. + ACLEntryStatusSchema -- Schema for the ACL entry status. + PresentationSchema -- Schema for the presentation. + + Attributes: + remote_user -- String field representing the remote user. Allows None as a value. + """ + remote_user = fields.Str(allow_none=True) class ReportItem: + """ + Represents a report item. + + Attributes: + id (int): The ID of the report item. + uuid (str): The UUID of the report item. + title (str): The title of the report item. + title_prefix (str): The prefix of the report item title. + created (datetime): The date and time when the report item was created. + last_updated (datetime): The date and time when the report item was last updated. + completed (bool): Indicates whether the report item is completed or not. + report_item_type_id (int): The ID of the report item type. + news_item_aggregates (list): A list of news item aggregates associated with the report item. + remote_report_items (list): A list of remote report items associated with the report item. + attributes (dict): Additional attributes of the report item. + remote_user (str): The remote user associated with the report item. + """ + def __init__( self, id, @@ -124,6 +301,7 @@ def __init__( attributes, remote_user, ): + """Initialize a ReportItem object.""" self.id = id self.uuid = uuid self.title = title @@ -139,16 +317,53 @@ def __init__( class ReportItemIdSchema(Schema): + """Schema for Report Item ID. + + This schema defines the structure for the Report Item ID. + + Arguments: + Schema -- The base schema class. + + Returns: + An instance of the ReportItemId class. + """ + class Meta: + """Meta class for configuring the behavior of the ReportItemId schema.""" + unknown = EXCLUDE id = fields.Int() @post_load def make(self, data, **kwargs): + """Create a new ReportItemId object. + + This method takes in data and returns a new ReportItemId object. + + Arguments: + data (dict): The data used to create the ReportItemId object. + + Returns: + ReportItemId: A new ReportItemId object. + """ return ReportItemId(**data) class ReportItemId: + """A class representing the ID of a report item. + + Args: + id (int): The ID of the report item. + + Attributes: + id (int): The ID of the report item. + """ + def __init__(self, id): + """Initialize a ReportItem object. + + Args: + id (int): The ID of the report item. + """ self.id = id From 16ebe738640334e45d6b5abdbfacb923dbba380a Mon Sep 17 00:00:00 2001 From: multiflexi Date: Thu, 20 Jun 2024 20:57:51 +0200 Subject: [PATCH 4/5] modified model and schema --- src/core/model/report_item.py | 687 +++++++++++++++++++----- src/shared/shared/schema/report_item.py | 217 ++++++++ 2 files changed, 759 insertions(+), 145 deletions(-) diff --git a/src/core/model/report_item.py b/src/core/model/report_item.py index b1bf3fd10..4bd749159 100644 --- a/src/core/model/report_item.py +++ b/src/core/model/report_item.py @@ -1,4 +1,24 @@ -from marshmallow import post_load +"""This module contains the ReportItem class and its associated schema. + +The ReportItem class represents a report item, which is a component of a larger report. It contains attributes such as ID, UUID, title, +created timestamp, and more. The class also includes methods for finding report item attributes by ID. + +The module also defines several schemas for creating and validating report items and their attributes. + +Classes: + - ReportItem: A class representing a report item. + - ReportItemAttribute: A class representing an attribute of a report item. + - ReportItemRemoteReportItem: A class representing the relationship between a report item and a remote report item. + +Schemas: + - NewReportItemSchema: Schema for creating a new report item. + - NewReportItemAttributeSchema: Schema for creating a new report item attribute. + +Relationships: + - ReportItem has a many-to-one relationship with User and ReportItemType. + - ReportItem has a many-to-many relationship with NewsItemAggregate and ReportItem. +""" + from datetime import datetime import uuid as uuid_generator from sqlalchemy import orm, or_, func, text, and_ @@ -14,19 +34,76 @@ from shared.schema.acl_entry import ItemType from shared.schema.attribute import AttributeType from shared.schema.news_item import NewsItemAggregateIdSchema, NewsItemAggregateSchema -from shared.schema.report_item import ReportItemAttributeBaseSchema, ReportItemBaseSchema, ReportItemIdSchema, RemoteReportItemSchema, ReportItemRemoteSchema, ReportItemSchema, ReportItemPresentationSchema +from shared.schema.report_item import ( + ReportItemAttributeBaseSchema, + ReportItemBaseSchema, + ReportItemIdSchema, + RemoteReportItemSchema, + ReportItemRemoteSchema, + ReportItemSchema, + ReportItemPresentationSchema, +) class NewReportItemAttributeSchema(ReportItemAttributeBaseSchema): + """Schema for creating a new report item attribute. + + This schema is used to validate and deserialize data for creating a new report item attribute. + + Arguments: + ReportItemAttributeBaseSchema -- The base schema for report item attributes. + + Returns: + An instance of the ReportItemAttribute class. + """ @post_load def make_report_item_attribute(self, data, **kwargs): + """Create a report item attribute. + + This method takes in data and creates a ReportItemAttribute object. + + Arguments: + data (dict): A dictionary containing the data for the report item attribute. + + Returns: + ReportItemAttribute: The created report item attribute object. + """ return ReportItemAttribute(**data) class ReportItemAttribute(db.Model): + """A class representing an attribute of a report item. + + Attributes: + id (int): The unique identifier of the attribute. + value (str): The value of the attribute. + value_description (str): The description of the attribute value. + binary_mime_type (str): The MIME type of the binary data, if applicable. + binary_data (bytes): The binary data associated with the attribute. + binary_size (int): The size of the binary data in bytes. + binary_description (str): The description of the binary data. + created (datetime): The timestamp of when the attribute was created. + last_updated (datetime): The timestamp of when the attribute was last updated. + version (int): The version number of the attribute. + current (bool): Indicates whether the attribute is the current version. + attribute_group_item_id (int): The ID of the attribute group item that the attribute belongs to. + attribute_group_item (AttributeGroupItem): The attribute group item that the attribute belongs to. + attribute_group_item_title (str): The title of the attribute group item. + report_item_id (int): The ID of the report item that the attribute belongs to. + report_item (ReportItem): The report item that the attribute belongs to. + user_id (int): The ID of the user who created the attribute. + user (User): The user who created the attribute. + + Methods: + __init__: Initializes a new instance of the ReportItemAttribute class. + find: Finds a report item attribute by its ID. + + """ + id = db.Column(db.Integer, primary_key=True) value = db.Column(db.String(), nullable=False) + value_description = db.Column(db.String()) binary_mime_type = db.Column(db.String()) binary_data = orm.deferred(db.Column(db.LargeBinary)) binary_size = db.Column(db.Integer) @@ -37,20 +114,42 @@ class ReportItemAttribute(db.Model): version = db.Column(db.Integer, default=1) current = db.Column(db.Boolean, default=True) - attribute_group_item_id = db.Column(db.Integer, db.ForeignKey('attribute_group_item.id')) + attribute_group_item_id = db.Column(db.Integer, db.ForeignKey("attribute_group_item.id")) attribute_group_item = db.relationship("AttributeGroupItem", viewonly=True) attribute_group_item_title = db.Column(db.String) - report_item_id = db.Column(db.Integer, db.ForeignKey('report_item.id'), nullable=True) + report_item_id = db.Column(db.Integer, db.ForeignKey("report_item.id"), nullable=True) report_item = db.relationship("ReportItem") - user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True) + user_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True) user = db.relationship("User") - def __init__(self, id, value, binary_mime_type, binary_size, binary_description, attribute_group_item_id, - attribute_group_item_title): + def __init__( + self, + id, + value, + value_description, + binary_mime_type, + binary_size, + binary_description, + attribute_group_item_id, + attribute_group_item_title, + ): + """Initialize a ReportItem object. + + Arguments: + id (int): The ID of the report item. + value (str): The value of the report item. + value_description (str): The description of the value. + binary_mime_type (str): The MIME type of the binary data. + binary_size (int): The size of the binary data. + binary_description (str): The description of the binary data. + attribute_group_item_id (int): The ID of the attribute group item. + attribute_group_item_title (str): The title of the attribute group item. + """ self.id = None self.value = value + self.value_description = value_description self.binary_mime_type = binary_mime_type self.binary_size = binary_size self.binary_description = binary_description @@ -59,26 +158,83 @@ def __init__(self, id, value, binary_mime_type, binary_size, binary_description, @classmethod def find(cls, attribute_id): + """Find a report item attribute by its ID. + + Args: + attribute_id (int): The ID of the attribute to find. + + Returns: + ReportItemAttribute: The report item attribute with the specified ID, or None if not found. + + """ report_item_attribute = cls.query.get(attribute_id) return report_item_attribute class NewReportItemSchema(ReportItemBaseSchema): + """Schema for creating a new report item. + + This schema defines the structure and validation rules for creating a new report item. + + Arguments: + ReportItemBaseSchema -- The base schema for report items. + + Returns: + An instance of the NewReportItemSchema class. + """ + news_item_aggregates = fields.Nested(NewsItemAggregateIdSchema, many=True, missing=[]) remote_report_items = fields.Nested(ReportItemIdSchema, many=True, missing=[]) attributes = fields.Nested(NewReportItemAttributeSchema, many=True) @post_load def make(self, data, **kwargs): + """Create a new ReportItem object. + + Arguments: + data (dict): A dictionary containing the data for the ReportItem. + + Returns: + ReportItem: A new ReportItem object. + """ return ReportItem(**data) class ReportItemRemoteReportItem(db.Model): - report_item_id = db.Column(db.Integer, db.ForeignKey('report_item.id'), primary_key=True) - remote_report_item_id = db.Column(db.Integer, db.ForeignKey('report_item.id'), primary_key=True) + """A class representing the relationship between a report item and a remote report item. + + Arguments: + db -- The database object used for defining the model. + """ + + report_item_id = db.Column(db.Integer, db.ForeignKey("report_item.id"), primary_key=True) + remote_report_item_id = db.Column(db.Integer, db.ForeignKey("report_item.id"), primary_key=True) class ReportItem(db.Model): + """A class representing a report item. + + Attributes: + id (int): The unique identifier of the report item. + uuid (str): The UUID of the report item. + title (str): The title of the report item. + title_prefix (str): The prefix of the report item title. + created (datetime): The datetime when the report item was created. + last_updated (datetime): The datetime when the report item was last updated. + completed (bool): Indicates whether the report item is completed or not. + user_id (int): The ID of the user associated with the report item. + user (User): The user associated with the report item. + remote_user (str): The remote user associated with the report item. + report_item_type_id (int): The ID of the report item type associated with the report item. + report_item_type (ReportItemType): The report item type associated with the report item. + news_item_aggregates (list): The list of news item aggregates associated with the report item. + remote_report_items (list): The list of remote report items associated with the report item. + attributes (list): The list of attributes associated with the report item. + report_item_cpes (list): The list of report item CPES associated with the report item. + subtitle (str): The subtitle of the report item. + tag (str): The tag of the report item. + """ + id = db.Column(db.Integer, primary_key=True) uuid = db.Column(db.String(64)) @@ -89,26 +245,40 @@ class ReportItem(db.Model): last_updated = db.Column(db.DateTime, default=datetime.now) completed = db.Column(db.Boolean, default=False) - user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True) + user_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True) user = db.relationship("User", viewonly=True) remote_user = db.Column(db.String()) - report_item_type_id = db.Column(db.Integer, db.ForeignKey('report_item_type.id'), nullable=True) + report_item_type_id = db.Column(db.Integer, db.ForeignKey("report_item_type.id"), nullable=True) report_item_type = db.relationship("ReportItemType", viewonly=True) - news_item_aggregates = db.relationship("NewsItemAggregate", secondary='report_item_news_item_aggregate') + news_item_aggregates = db.relationship("NewsItemAggregate", secondary="report_item_news_item_aggregate") - remote_report_items = db.relationship("ReportItem", secondary='report_item_remote_report_item', - primaryjoin=ReportItemRemoteReportItem.report_item_id == id, - secondaryjoin=ReportItemRemoteReportItem.remote_report_item_id == id) + remote_report_items = db.relationship( + "ReportItem", + secondary="report_item_remote_report_item", + primaryjoin=ReportItemRemoteReportItem.report_item_id == id, + secondaryjoin=ReportItemRemoteReportItem.remote_report_item_id == id, + ) - attributes = db.relationship('ReportItemAttribute', back_populates="report_item", cascade="all, delete-orphan") + attributes = db.relationship("ReportItemAttribute", back_populates="report_item", cascade="all, delete-orphan") report_item_cpes = db.relationship("ReportItemCpe", cascade="all, delete-orphan") - def __init__(self, id, uuid, title, title_prefix, report_item_type_id, news_item_aggregates, remote_report_items, - attributes, completed): - + def __init__(self, id, uuid, title, title_prefix, report_item_type_id, news_item_aggregates, remote_report_items, attributes, completed): + """Initialize a new instance of the ReportItem class. + + Arguments: + id (int): The ID of the report item. + uuid (str): The UUID of the report item. + title (str): The title of the report item. + title_prefix (str): The prefix of the report item's title. + report_item_type_id (int): The ID of the report item type. + news_item_aggregates (list): A list of news item aggregates associated with the report item. + remote_report_items (list): A list of remote report items associated with the report item. + attributes (dict): A dictionary of attributes for the report item. + completed (bool): Indicates whether the report item is completed or not. + """ self.id = id if uuid is None: @@ -135,35 +305,75 @@ def __init__(self, id, uuid, title, title_prefix, report_item_type_id, news_item @orm.reconstructor def reconstruct(self): + """Reconstructs the report item. + + This method clears the subtitle, sets the tag to "mdi-file-table-outline", + and sorts the attributes based on the attribute group index, attribute group item index, and attribute ID. + """ self.subtitle = "" self.tag = "mdi-file-table-outline" self.attributes.sort(key=lambda obj: (obj.attribute_group_item.attribute_group.index, obj.attribute_group_item.index, obj.id)) @classmethod def count_all(cls, is_completed): + """Count the number of report items based on completion status. + + Arguments: + is_completed (bool): A flag indicating whether to count completed or incomplete report items. + Returns: + int: The count of report items matching the completion status. + """ return cls.query.filter_by(completed=is_completed).count() @classmethod def find(cls, report_item_id): + """Find a report item by its ID. + + Arguments: + report_item_id (int): The ID of the report item. + Returns: + ReportItem: The report item with the specified ID. + """ report_item = cls.query.get(report_item_id) return report_item @classmethod def find_by_uuid(cls, report_item_uuid): + """Find a report item by its UUID. + + Arguments: + report_item_uuid (str): The UUID of the report item. + Returns: + ReportItem: The report item with the specified UUID. + """ report_item = cls.query.filter_by(uuid=report_item_uuid) return report_item @classmethod def allowed_with_acl(cls, report_item_id, user, see, access, modify): - - query = db.session.query(ReportItem.id).distinct().group_by(ReportItem.id).filter( - ReportItem.id == report_item_id) - - query = query.outerjoin(ACLEntry, or_(and_(ReportItem.uuid == ACLEntry.item_id, - ACLEntry.item_type == ItemType.REPORT_ITEM), - and_(cast(ReportItem.report_item_type_id, - sqlalchemy.String) == ACLEntry.item_id, - ACLEntry.item_type == ItemType.REPORT_ITEM_TYPE))) + """Check if the user is allowed to perform actions on a report item based on ACL. + + Arguments: + report_item_id (int): The ID of the report item. + user (User): The user object. + see (bool): Whether the user can see the report item. + access (bool): Whether the user can access the report item. + modify (bool): Whether the user can modify the report item. + Returns: + bool: True if the user is allowed, False otherwise. + """ + query = db.session.query(ReportItem.id).distinct().group_by(ReportItem.id).filter(ReportItem.id == report_item_id) + + query = query.outerjoin( + ACLEntry, + or_( + and_(ReportItem.uuid == ACLEntry.item_id, ACLEntry.item_type == ItemType.REPORT_ITEM), + and_( + cast(ReportItem.report_item_type_id, sqlalchemy.String) == ACLEntry.item_id, + ACLEntry.item_type == ItemType.REPORT_ITEM_TYPE, + ), + ), + ) query = ACLEntry.apply_query(query, user, see, access, modify) @@ -171,14 +381,29 @@ def allowed_with_acl(cls, report_item_id, user, see, access, modify): @classmethod def get_for_sync(cls, last_synced, report_item_types): + """Retrieve report items for synchronization. + + This method retrieves report items that have been updated since the last synchronization time, + and belong to the specified report item types. + Args: + last_synced (datetime): The last synchronization time. + report_item_types (list): A list of report item types. + Returns: + tuple: A tuple containing two elements: + - items (list): A list of report items that need to be synchronized. + - last_sync_time (datetime): The current synchronization time. + """ report_item_type_ids = set() for report_item_type in report_item_types: report_item_type_ids.add(report_item_type.id) last_sync_time = datetime.now() - query = cls.query.filter(ReportItem.last_updated >= last_synced, ReportItem.last_updated <= last_sync_time, - ReportItem.report_item_type_id.in_(report_item_type_ids)) + query = cls.query.filter( + ReportItem.last_updated >= last_synced, + ReportItem.last_updated <= last_sync_time, + ReportItem.report_item_type_id.in_(report_item_type_ids), + ) report_items = query.all() @@ -193,70 +418,108 @@ def get_for_sync(cls, last_synced, report_item_types): @classmethod def get(cls, group, filter, offset, limit, user): - + """Retrieve report items based on specified criteria. + + Arguments: + group (str): The remote user group. + filter (dict): The filter criteria. + offset (int): The offset for pagination. + limit (int): The limit for pagination. + user (str): The user performing the query. + Returns: + tuple: A tuple containing the list of report items and the total count. + """ if group: query = cls.query.filter(ReportItem.remote_user == group) else: - query = db.session.query(ReportItem, func.count().filter(ACLEntry.id > 0).label("acls"), - func.count().filter(ACLEntry.access == True).label("access"), - func.count().filter(ACLEntry.modify == True).label("modify")).distinct().group_by( - ReportItem.id) - - query = query.filter(ReportItem.remote_user == None) - - query = query.outerjoin(ACLEntry, or_(and_(ReportItem.uuid == ACLEntry.item_id, - ACLEntry.item_type == ItemType.REPORT_ITEM), - and_(cast(ReportItem.report_item_type_id, - sqlalchemy.String) == ACLEntry.item_id, - ACLEntry.item_type == ItemType.REPORT_ITEM_TYPE))) + query = ( + db.session.query( + ReportItem, + func.count().filter(ACLEntry.id > 0).label("acls"), + func.count().filter(ACLEntry.access.is_(True)).label("access"), + func.count().filter(ACLEntry.modify.is_(True)).label("modify"), + ) + .distinct() + .group_by(ReportItem.id) + ) + + query = query.filter(ReportItem.remote_user.is_(None)) + + query = query.outerjoin( + ACLEntry, + or_( + and_(ReportItem.uuid == ACLEntry.item_id, ACLEntry.item_type == ItemType.REPORT_ITEM), + and_( + cast(ReportItem.report_item_type_id, sqlalchemy.String) == ACLEntry.item_id, + ACLEntry.item_type == ItemType.REPORT_ITEM_TYPE, + ), + ), + ) query = ACLEntry.apply_query(query, user, True, False, False) - if 'search' in filter and filter['search'] != '': - search_string = '%' + filter['search'].lower() + '%' - query = query.join(ReportItemAttribute, ReportItem.id == ReportItemAttribute.report_item_id).filter(or_( - func.lower(ReportItemAttribute.value).like(search_string), - func.lower(ReportItem.title).like(search_string), - func.lower(ReportItem.title_prefix).like(search_string))) + search_string = filter.get("search", "").lower() + if search_string: + search_string = f"%{search_string}%" + query = query.join(ReportItemAttribute, ReportItem.id == ReportItemAttribute.report_item_id).filter( + or_( + func.lower(ReportItemAttribute.value).like(search_string), + func.lower(ReportItem.title).like(search_string), + func.lower(ReportItem.title_prefix).like(search_string), + ) + ) - if 'completed' in filter and filter['completed'].lower() == "true": - query = query.filter(ReportItem.completed == True) + if filter.get("completed", "").lower() == "true": + query = query.filter(ReportItem.completed) - if 'incompleted' in filter and filter['incompleted'].lower() == "true": - query = query.filter(ReportItem.completed == False) + if filter.get("incompleted", "").lower() == "true": + query = query.filter(ReportItem.completed.is_(False)) - if 'range' in filter and filter['range'] != 'ALL': + if filter.get("range", "ALL") != "ALL": date_limit = datetime.now() - if filter['range'] == 'TODAY': + if filter["range"] == "TODAY": date_limit = date_limit.replace(hour=0, minute=0, second=0, microsecond=0) - if filter['range'] == 'WEEK': - date_limit = date_limit.replace(day=date_limit.day - date_limit.weekday(), hour=0, minute=0, second=0, - microsecond=0) + if filter["range"] == "WEEK": + date_limit = date_limit.replace(day=date_limit.day - date_limit.weekday(), hour=0, minute=0, second=0, microsecond=0) - if filter['range'] == 'MONTH': + if filter["range"] == "MONTH": date_limit = date_limit.replace(day=1, hour=0, minute=0, second=0, microsecond=0) query = query.filter(ReportItem.created >= date_limit) - if 'sort' in filter: - if filter['sort'] == 'DATE_DESC': + if filter.get("sort"): + if filter["sort"] == "DATE_DESC": query = query.order_by(db.desc(ReportItem.created)) - elif filter['sort'] == 'DATE_ASC': + elif filter["sort"] == "DATE_ASC": query = query.order_by(db.asc(ReportItem.created)) return query.offset(offset).limit(limit).all(), query.count() @classmethod def identical(cls, uuid): + """Check if a report item with the given UUID exists. + + Arguments: + uuid -- The UUID of the report item to check. + Returns: + True if a report item with the given UUID exists, False otherwise. + """ return db.session.query(db.exists().where(ReportItem.uuid == uuid)).scalar() @classmethod def get_by_cpe(cls, cpes): - + """Retrieve report items by Common Platform Enumeration (CPE). + + This method queries the database to retrieve report items that match the provided CPEs. + Arguments: + cpes (list): A list of CPEs to search for. + Returns: + list: A list of report item IDs that match the provided CPEs. + """ if len(cpes) > 0: query_string = "SELECT DISTINCT report_item_id FROM report_item_cpe WHERE value LIKE ANY(:cpes) OR {}" - params = {'cpes': cpes} + params = {"cpes": cpes} inner_query = "" for i in range(len(cpes)): @@ -274,6 +537,18 @@ def get_by_cpe(cls, cpes): @classmethod def get_json(cls, group, filter, offset, limit, user): + """Get the JSON representation of report items. + + This method retrieves report items based on the provided parameters and returns them in JSON format. + Arguments: + group (str): The group parameter. + filter (str): The filter parameter. + offset (int): The offset parameter. + limit (int): The limit parameter. + user (str): The user parameter. + Returns: + dict: A dictionary containing the total count of report items and a list of report items in JSON format. + """ results, count = cls.get(group, filter, offset, limit, user) report_items = [] if group: @@ -292,18 +567,36 @@ def get_json(cls, group, filter, offset, limit, user): report_items.append(report_item) report_items_schema = ReportItemPresentationSchema(many=True) - return {'total_count': count, 'items': report_items_schema.dump(report_items)} + return {"total_count": count, "items": report_items_schema.dump(report_items)} @classmethod def get_detail_json(cls, id): + """Get the detailed JSON representation of a report item. + + Arguments: + cls -- The class object. + id -- The ID of the report item. + Returns: + The detailed JSON representation of the report item. + """ report_item = cls.query.get(id) report_item_schema = ReportItemSchema() return report_item_schema.dump(report_item) @classmethod def get_groups(cls): - result = db.session.query(ReportItem.remote_user).distinct().group_by(ReportItem.remote_user).filter( - ReportItem.remote_user != None).all() + """Get the distinct groups associated with the report items. + + Returns: + list: A list of distinct groups. + """ + result = ( + db.session.query(ReportItem.remote_user) + .distinct() + .group_by(ReportItem.remote_user) + .filter(ReportItem.remote_user.isnot(None)) + .all() + ) groups = set() for row in result: groups.add(row[0]) @@ -312,11 +605,21 @@ def get_groups(cls): @classmethod def add_report_item(cls, report_item_data, user): + """Add a report item to the database. + + This method takes in report_item_data and user as arguments and adds a new report item to the database. + It performs authorization checks to ensure that the user has the necessary permissions to add the report item. + Arguments: + report_item_data (dict): The data for the report item. + user (User): The user adding the report item. + Returns: + tuple: A tuple containing the added report item and the HTTP status code. + """ report_item_schema = NewReportItemSchema() report_item = report_item_schema.load(report_item_data) if not ReportItemType.allowed_with_acl(report_item.report_item_type_id, user, False, False, True): - return 'Unauthorized access to report item type', 401 + return "Unauthorized access to report item type", 401 report_item.user_id = user.id for attribute in report_item.attributes: @@ -331,6 +634,16 @@ def add_report_item(cls, report_item_data, user): @classmethod def add_remote_report_items(cls, report_item_data, remote_node_name): + """Add remote report items to the database. + + This method takes a list of report item data and a remote node name, + and adds the report items to the database. If a report item with the + same UUID already exists, it updates the existing report item with the + new data. + Arguments: + report_item_data (list): A list of report item data. + remote_node_name (str): The name of the remote node. + """ report_item_schema = NewReportItemSchema(many=True) report_items = report_item_schema.load(report_item_data) @@ -350,66 +663,81 @@ def add_remote_report_items(cls, report_item_data, remote_node_name): @classmethod def update_report_item(cls, id, data, user): + """Update a report item with the given data. + + Arguments: + id (int): The ID of the report item. + data (dict): The data to update the report item with. + user (User): The user performing the update. + Returns: + tuple: A tuple containing a boolean indicating whether the report item was modified and the updated data. + """ modified = False new_attribute = None report_item = cls.query.get(id) if report_item is not None: - if 'update' in data: - if 'title' in data: - if report_item.title != data['title']: + if "update" in data: + if "title" in data: + if report_item.title != data["title"]: modified = True - report_item.title = data['title'] - data['title'] = '' + report_item.title = data["title"] + data["title"] = "" - if 'title_prefix' in data: - if report_item.title_prefix != data['title_prefix']: + if "title_prefix" in data: + if report_item.title_prefix != data["title_prefix"]: modified = True - report_item.title_prefix = data['title_prefix'] - data['title_prefix'] = '' + report_item.title_prefix = data["title_prefix"] + data["title_prefix"] = "" - if 'completed' in data: - if report_item.completed != data['completed']: + if "completed" in data: + if report_item.completed != data["completed"]: modified = True - report_item.completed = data['completed'] - data['completed'] = '' + report_item.completed = data["completed"] + data["completed"] = "" - if 'attribute_id' in data: + if "attribute_id" in data: for attribute in report_item.attributes: - # sometime we compare: int & int or int & str - if str(attribute.id) == str(data['attribute_id']): - if attribute.value != data['attribute_value']: + # Compare attribute IDs + if str(attribute.id) == str(data["attribute_id"]): + if attribute.value != data["attribute_value"]: + modified = True + attribute.value = data["attribute_value"] + data["attribute_value"] = "" + attribute.user = user + attribute.last_updated = datetime.now() + if attribute.value_description != data["value_description"]: modified = True - attribute.value = data['attribute_value'] - data['attribute_value'] = '' + attribute.value_description = data["value_description"] + data["value_description"] = "" attribute.user = user attribute.last_updated = datetime.now() break - if 'add' in data: - if 'attribute_id' in data: + if "add" in data: + if "attribute_id" in data: modified = True - new_attribute = ReportItemAttribute(None, "", None, 0, None, data['attribute_group_item_id'], None) + new_attribute = ReportItemAttribute(None, "", None, 0, None, data["attribute_group_item_id"], None) new_attribute.user = user report_item.attributes.append(new_attribute) - if 'aggregate_ids' in data: + if "aggregate_ids" in data: modified = True - for aggregate_id in data['aggregate_ids']: + for aggregate_id in data["aggregate_ids"]: aggregate = NewsItemAggregate.find(aggregate_id) report_item.news_item_aggregates.append(aggregate) - if 'remote_report_item_ids' in data: + if "remote_report_item_ids" in data: modified = True - for remote_report_item_id in data['remote_report_item_ids']: + for remote_report_item_id in data["remote_report_item_ids"]: remote_report_item = ReportItem.find(remote_report_item_id) report_item.remote_report_items.append(remote_report_item) - if 'delete' in data: - if 'attribute_id' in data: + if "delete" in data: + if "attribute_id" in data: attribute_to_delete = None for attribute in report_item.attributes: # sometime we compare: int & int or int & str - if str(attribute.id) == str(data['attribute_id']): + if str(attribute.id) == str(data["attribute_id"]): attribute_to_delete = attribute break @@ -417,10 +745,10 @@ def update_report_item(cls, id, data, user): modified = True report_item.attributes.remove(attribute_to_delete) - if 'aggregate_id' in data: + if "aggregate_id" in data: aggregate_to_delete = None for aggregate in report_item.news_item_aggregates: - if aggregate.id == data['aggregate_id']: + if aggregate.id == data["aggregate_id"]: aggregate_to_delete = aggregate break @@ -428,10 +756,10 @@ def update_report_item(cls, id, data, user): modified = True report_item.news_item_aggregates.remove(aggregate_to_delete) - if 'remote_report_item_id' in data: + if "remote_report_item_id" in data: remote_report_item_to_delete = None for remote_report_item in report_item.remote_report_items: - if remote_report_item.id == data['remote_report_item_id']: + if remote_report_item.id == data["remote_report_item_id"]: remote_report_item_to_delete = remote_report_item break @@ -441,73 +769,99 @@ def update_report_item(cls, id, data, user): if modified: report_item.last_updated = datetime.now() - data['user_id'] = user.id - data['report_item_id'] = int(id) + data["user_id"] = user.id + data["report_item_id"] = int(id) report_item.update_cpes() db.session.commit() if new_attribute is not None: - data['attribute_id'] = new_attribute.id + data["attribute_id"] = new_attribute.id return modified, data @classmethod def get_updated_data(cls, id, data): + """Get the updated data for a report item. + + This method retrieves the updated data for a report item based on the provided ID and data. + Arguments: + cls (class): The class object. + id (int): The ID of the report item. + data (dict): The data containing the updates. + Returns: + dict: The updated data for the report item. + """ report_item = cls.query.get(id) if report_item is not None: - if 'update' in data: - if 'title' in data: - data['title'] = report_item.title + if "update" in data: + if "title" in data: + data["title"] = report_item.title - if 'title_prefix' in data: - data['title_prefix'] = report_item.title_prefix + if "title_prefix" in data: + data["title_prefix"] = report_item.title_prefix - if 'completed' in data: - data['completed'] = report_item.completed + if "completed" in data: + data["completed"] = report_item.completed - if 'attribute_id' in data: + if "attribute_id" in data: for attribute in report_item.attributes: - if str(attribute.id) == data['attribute_id']: - data['attribute_value'] = attribute.value - data['attribute_last_updated'] = attribute.last_updated.strftime('%d.%m.%Y - %H:%M') - data['attribute_user'] = attribute.user.name + if str(attribute.id) == data["attribute_id"]: + data["attribute_value"] = attribute.value + data["attribute_value_description"] = attribute.value_description + data["attribute_last_updated"] = attribute.last_updated.strftime("%d.%m.%Y - %H:%M") + data["attribute_user"] = attribute.user.name break - if 'add' in data: - if 'aggregate_ids' in data: + if "add" in data: + if "aggregate_ids" in data: schema = NewsItemAggregateSchema() - data['news_item_aggregates'] = [] - for aggregate_id in data['aggregate_ids']: + data["news_item_aggregates"] = [] + for aggregate_id in data["aggregate_ids"]: aggregate = NewsItemAggregate.find(aggregate_id) - data['news_item_aggregates'].append(schema.dump(aggregate)) + data["news_item_aggregates"].append(schema.dump(aggregate)) - if 'remote_report_item_ids' in data: + if "remote_report_item_ids" in data: schema = RemoteReportItemSchema() - data['remote_report_items'] = [] - for remote_report_item_id in data['remote_report_item_ids']: + data["remote_report_items"] = [] + for remote_report_item_id in data["remote_report_item_ids"]: remote_report_item = ReportItem.find(remote_report_item_id) - data['remote_report_items'].append(schema.dump(remote_report_item)) + data["remote_report_items"].append(schema.dump(remote_report_item)) - if 'attribute_id' in data: + if "attribute_id" in data: for attribute in report_item.attributes: - if str(attribute.id) == data['attribute_id']: - data['attribute_value'] = attribute.value - data['binary_mime_type'] = attribute.binary_mime_type - data['binary_size'] = attribute.binary_size - data['binary_description'] = attribute.binary_description - data['attribute_last_updated'] = attribute.last_updated.strftime('%d.%m.%Y - %H:%M') - data['attribute_user'] = attribute.user.name + if str(attribute.id) == data["attribute_id"]: + data["attribute_value"] = attribute.value + data["attribute_value_description"] = attribute.value_description + data["binary_mime_type"] = attribute.binary_mime_type + data["binary_size"] = attribute.binary_size + data["binary_description"] = attribute.binary_description + data["attribute_last_updated"] = attribute.last_updated.strftime("%d.%m.%Y - %H:%M") + data["attribute_user"] = attribute.user.name break return data @classmethod def add_attachment(cls, id, attribute_group_item_id, user, file, description): + """Add an attachment to a report item. + + Arguments: + id (int): The ID of the report item. + attribute_group_item_id (int): The ID of the attribute group item. + user (User): The user who is adding the attachment. + file (FileStorage): The file to be attached. + description (str): The description of the attachment. + Returns: + dict: A dictionary containing information about the attachment. + - "add" (bool): True if the attachment was added successfully. + - "user_id" (int): The ID of the user who added the attachment. + - "report_item_id" (int): The ID of the report item. + - "attribute_id" (int): The ID of the newly created attribute. + """ report_item = cls.query.get(id) file_data = file.read() - new_attribute = ReportItemAttribute(None, file.filename, file.mimetype, len(file_data), description, - attribute_group_item_id, None) + new_attribute = ReportItemAttribute(None, file.filename, file.mimetype, len(file_data), description, attribute_group_item_id, None) new_attribute.user = user new_attribute.binary_data = file_data report_item.attributes.append(new_attribute) @@ -515,10 +869,10 @@ def add_attachment(cls, id, attribute_group_item_id, user, file, description): report_item.last_updated = datetime.now() data = dict() - data['add'] = True - data['user_id'] = user.id - data['report_item_id'] = int(id) - data['attribute_id'] = new_attribute.id + data["add"] = True + data["user_id"] = user.id + data["report_item_id"] = int(id) + data["attribute_id"] = new_attribute.id db.session.commit() @@ -526,6 +880,20 @@ def add_attachment(cls, id, attribute_group_item_id, user, file, description): @classmethod def remove_attachment(cls, id, attribute_id, user): + """Remove an attachment from a report item. + + Arguments: + cls (ReportItem): The class object. + id (int): The ID of the report item. + attribute_id (int): The ID of the attribute to be removed. + user (User): The user performing the action. + Returns: + dict: A dictionary containing information about the deletion. + - delete (bool): Indicates whether the attribute was successfully deleted. + - user_id (int): The ID of the user performing the action. + - report_item_id (int): The ID of the report item. + - attribute_id (int): The ID of the attribute that was deleted. + """ report_item = cls.query.get(id) attribute_to_delete = None for attribute in report_item.attributes: @@ -539,10 +907,10 @@ def remove_attachment(cls, id, attribute_id, user): report_item.last_updated = datetime.now() data = dict() - data['delete'] = True - data['user_id'] = user.id - data['report_item_id'] = int(id) - data['attribute_id'] = attribute_id + data["delete"] = True + data["user_id"] = user.id + data["report_item_id"] = int(id) + data["attribute_id"] = attribute_id db.session.commit() @@ -550,13 +918,27 @@ def remove_attachment(cls, id, attribute_id, user): @classmethod def delete_report_item(cls, id): + """Delete a report item by its ID. + + Arguments: + id (int): The ID of the report item to be deleted. + Returns: + tuple: A tuple containing the status message and the HTTP status code. + The status message is "success" if the report item was deleted successfully. + The HTTP status code is 200 if the report item was deleted successfully. + """ report_item = cls.query.get(id) if report_item is not None: db.session.delete(report_item) db.session.commit() - return 'success', 200 + return "success", 200 def update_cpes(self): + """Update the list of CPES for the report item. + + This method clears the existing list of CPES and populates it with new CPES + based on the attributes of the report item. Only attributes of type CPE are considered. + """ self.report_item_cpes = [] if self.completed is True: for attribute in self.attributes: @@ -566,10 +948,25 @@ def update_cpes(self): class ReportItemCpe(db.Model): + """A class representing a CPE (Common Platform Enumeration) report item. + + Attributes: + id (int): The unique identifier of the report item. + value (str): The value of the report item. + report_item_id (int): The foreign key referencing the parent report item. + Args: + db (object): The database object. + """ + id = db.Column(db.Integer, primary_key=True) value = db.Column(db.String()) - report_item_id = db.Column(db.Integer, db.ForeignKey('report_item.id')) + report_item_id = db.Column(db.Integer, db.ForeignKey("report_item.id")) def __init__(self, value): + """Initialize a ReportItemCpe object. + + Args: + value (any): The value of the report item. + """ self.id = None self.value = value diff --git a/src/shared/shared/schema/report_item.py b/src/shared/shared/schema/report_item.py index fb6723b47..9dd0b97d9 100644 --- a/src/shared/shared/schema/report_item.py +++ b/src/shared/shared/schema/report_item.py @@ -1,3 +1,24 @@ +"""This module contains schemas and classes for representing report items and their attributes. + +The module includes the following classes: +- ReportItemAttributeBaseSchema: Schema for representing a report item attribute. +- ReportItemAttributeSchema: Schema for representing a report item attribute with additional fields. +- ReportItemAttribute: Class representing an attribute of a report item. +- ReportItemBaseSchema: Schema for the base report item. +- RemoteReportItemSchema: Schema for remote report items. +- ReportItemSchema: Schema for serializing and deserializing ReportItem objects. +- ReportItemAttributeRemoteSchema: Schema for representing a remote attribute of a report item. +- ReportItemRemoteSchema: Schema for representing a remote report item. +- ReportItemPresentationSchema: Schema for presenting a report item. +- ReportItem: Class representing a report item. + +The module also imports schemas from other modules: +- PresentationSchema: Schema for presentation. +- NewsItemAggregateSchema: Schema for representing news item aggregates. +- ACLEntryStatusSchema: Schema for the ACL entry status. +- UserSchemaBase: Schema for representing user data. +""" + from marshmallow import Schema, fields, post_load, EXCLUDE from shared.schema.presentation import PresentationSchema @@ -7,11 +28,27 @@ class ReportItemAttributeBaseSchema(Schema): + """Schema for representing a report item attribute. + + Attributes: + id (int): The ID of the attribute. + value (str): The value of the attribute. + value_description (str): The description of the attribute value. + binary_mime_type (str): The MIME type of the binary attribute. + binary_size (int): The size of the binary attribute. + binary_description (str): The description of the binary attribute. + attribute_group_item_title (str): The title of the attribute group item. + attribute_group_item_id (int): The ID of the attribute group item. + """ + class Meta: + """Meta class for configuring the behavior of the ReportItemAttributeBase schema.""" + unknown = EXCLUDE id = fields.Int(load_default=None, allow_none=True) value = fields.Str() + value_description = fields.Str(load_default=None, allow_none=True) binary_mime_type = fields.Str(load_default=None, allow_none=True) binary_size = fields.Int(load_default=0) binary_description = fields.Str(load_default=None, allow_none=True) @@ -20,6 +57,17 @@ class Meta: class ReportItemAttributeSchema(ReportItemAttributeBaseSchema): + """Schema for representing a report item attribute. + + This schema defines the structure and validation rules for a report item attribute. + + Arguments: + ReportItemAttributeBaseSchema -- The base schema for a report item attribute. + + Returns: + An instance of the ReportItemAttributeSchema class. + """ + created = fields.DateTime("%d.%m.%Y - %H:%M") last_updated = fields.DateTime("%d.%m.%Y - %H:%M") version = fields.Int() @@ -28,14 +76,44 @@ class ReportItemAttributeSchema(ReportItemAttributeBaseSchema): @post_load def make(self, data, **kwargs): + """Create a new ReportItemAttribute object. + + This method takes in data and creates a new ReportItemAttribute object using the provided data. + + Arguments: + data (dict): A dictionary containing the data for creating the ReportItemAttribute object. + **kwargs: Additional keyword arguments. + + Returns: + ReportItemAttribute: The newly created ReportItemAttribute object. + """ return ReportItemAttribute(**data) class ReportItemAttribute: + """Represents an attribute of a report item. + + Attributes: + id (int): The ID of the attribute. + value (str): The value of the attribute. + value_description (str): The description of the attribute value. + binary_mime_type (str): The MIME type of the binary data associated with the attribute. + binary_size (int): The size of the binary data associated with the attribute. + binary_description (str): The description of the binary data associated with the attribute. + attribute_group_item_id (int): The ID of the attribute group item. + attribute_group_item_title (str): The title of the attribute group item. + created (datetime): The date and time when the attribute was created. + last_updated (datetime): The date and time when the attribute was last updated. + version (int): The version of the attribute. + current (bool): Indicates whether the attribute is the current version. + user (str): The user who created or last updated the attribute. + """ + def __init__( self, id, value, + value_description, binary_mime_type, binary_size, binary_description, @@ -47,8 +125,10 @@ def __init__( current, user, ): + """Initialize a new instance of the ReportItem class.""" self.id = id self.value = value + self.value_description = value_description self.created = created self.last_updated = last_updated self.version = version @@ -62,7 +142,22 @@ def __init__( class ReportItemBaseSchema(Schema): + """Schema for the base report item. + + Attributes: + id (int): The ID of the report item. + uuid (str): The UUID of the report item. + title (str): The title of the report item. + title_prefix (str): The prefix of the report item title. + created (DateTime): The date and time when the report item was created. + last_updated (DateTime): The date and time when the report item was last updated. + completed (bool): Indicates whether the report item is completed or not. + report_item_type_id (int): The ID of the report item type. + """ + class Meta: + """Meta class for configuring the behavior of the ReportItem schema.""" + unknown = EXCLUDE id = fields.Int(load_default=None, allow_none=True) @@ -76,11 +171,41 @@ class Meta: class RemoteReportItemSchema(ReportItemBaseSchema, PresentationSchema): + """Schema for remote report items. + + This schema represents the structure and validation rules for remote report items. + + Arguments: + ReportItemBaseSchema -- Base schema for report items. + PresentationSchema -- Schema for presentation. + + Attributes: + remote_user (str): The remote user associated with the report item. + attributes (list): List of nested report item attributes. + """ + remote_user = fields.Str(allow_none=True) attributes = fields.Nested(ReportItemAttributeSchema, many=True) class ReportItemSchema(ReportItemBaseSchema): + """Schema for serializing and deserializing ReportItem objects. + + Inherits from ReportItemBaseSchema. + + Attributes: + news_item_aggregates (List[NewsItemAggregateSchema]): List of nested NewsItemAggregateSchema objects. + remote_report_items (List[RemoteReportItemSchema]): List of nested RemoteReportItemSchema objects. + attributes (List[ReportItemAttributeSchema]): List of nested ReportItemAttributeSchema objects. + remote_user (str): Remote user associated with the report item. + + Methods: + make(data, **kwargs): Post-load method to create a ReportItem object from deserialized data. + + Returns: + ReportItemSchema: An instance of the ReportItemSchema class. + """ + news_item_aggregates = fields.Nested(NewsItemAggregateSchema, many=True) remote_report_items = fields.Nested(RemoteReportItemSchema, many=True) attributes = fields.Nested(ReportItemAttributeSchema, many=True) @@ -88,15 +213,38 @@ class ReportItemSchema(ReportItemBaseSchema): @post_load def make(self, data, **kwargs): + """Create a new ReportItem object. + + This method takes in data and creates a new ReportItem object using the provided data. + + Arguments: + data (dict): A dictionary containing the data for the ReportItem. + + Returns: + ReportItem: A new ReportItem object. + """ return ReportItem(**data) class ReportItemAttributeRemoteSchema(Schema): + """A schema for representing a remote attribute of a report item. + + Attributes: + attribute_group_item_title (str): The title of the attribute group item. + value (str): The value of the attribute. + """ + attribute_group_item_title = fields.Str() value = fields.Str() class ReportItemRemoteSchema(Schema): + """A schema for representing a remote report item. + + Arguments: + Schema -- The base schema class. + """ + uuid = fields.Str(allow_none=True) title = fields.Str() title_prefix = fields.Str() @@ -105,10 +253,41 @@ class ReportItemRemoteSchema(Schema): class ReportItemPresentationSchema(ReportItemBaseSchema, ACLEntryStatusSchema, PresentationSchema): + """Schema for presenting a report item. + + This schema inherits from the ReportItemBaseSchema, ACLEntryStatusSchema, and PresentationSchema classes. + + Arguments: + ReportItemBaseSchema -- Schema for the base report item. + ACLEntryStatusSchema -- Schema for the ACL entry status. + PresentationSchema -- Schema for the presentation. + + Attributes: + remote_user -- String field representing the remote user. Allows None as a value. + """ + remote_user = fields.Str(allow_none=True) class ReportItem: + """ + Represents a report item. + + Attributes: + id (int): The ID of the report item. + uuid (str): The UUID of the report item. + title (str): The title of the report item. + title_prefix (str): The prefix of the report item title. + created (datetime): The date and time when the report item was created. + last_updated (datetime): The date and time when the report item was last updated. + completed (bool): Indicates whether the report item is completed or not. + report_item_type_id (int): The ID of the report item type. + news_item_aggregates (list): A list of news item aggregates associated with the report item. + remote_report_items (list): A list of remote report items associated with the report item. + attributes (dict): Additional attributes of the report item. + remote_user (str): The remote user associated with the report item. + """ + def __init__( self, id, @@ -124,6 +303,7 @@ def __init__( attributes, remote_user, ): + """Initialize a ReportItem object.""" self.id = id self.uuid = uuid self.title = title @@ -139,16 +319,53 @@ def __init__( class ReportItemIdSchema(Schema): + """Schema for Report Item ID. + + This schema defines the structure for the Report Item ID. + + Arguments: + Schema -- The base schema class. + + Returns: + An instance of the ReportItemId class. + """ + class Meta: + """Meta class for configuring the behavior of the ReportItemId schema.""" + unknown = EXCLUDE id = fields.Int() @post_load def make(self, data, **kwargs): + """Create a new ReportItemId object. + + This method takes in data and returns a new ReportItemId object. + + Arguments: + data (dict): The data used to create the ReportItemId object. + + Returns: + ReportItemId: A new ReportItemId object. + """ return ReportItemId(**data) class ReportItemId: + """A class representing the ID of a report item. + + Args: + id (int): The ID of the report item. + + Attributes: + id (int): The ID of the report item. + """ + def __init__(self, id): + """Initialize a ReportItem object. + + Args: + id (int): The ID of the report item. + """ self.id = id From beedecbd477998f765dcf9fda206bb16d7cf04ac Mon Sep 17 00:00:00 2001 From: multiflexi Date: Thu, 20 Jun 2024 21:11:09 +0200 Subject: [PATCH 5/5] update presenter --- src/presenters/presenters/base_presenter.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/presenters/presenters/base_presenter.py b/src/presenters/presenters/base_presenter.py index b56d1bf06..b932c08f8 100644 --- a/src/presenters/presenters/base_presenter.py +++ b/src/presenters/presenters/base_presenter.py @@ -107,12 +107,15 @@ def __init__(self, report_item, report_types, attribute_map): attr_type = attribute_map[attribute_group_item_id] attr_key = attr_type.title.lower().replace(" ", "_") - max_occurrence = attr_type.max_occurrence - value_to_add = ( - attribute_group_item[0].value - if max_occurrence == 1 and attribute_group_item - else [attribute.value for attribute in attribute_group_item] - ) + if attr_key.startswith("cwe"): + value_to_add = {attribute.value: attribute.value_description for attribute in attribute_group_item} + else: + max_occurrence = attr_type.max_occurrence + value_to_add = ( + attribute_group_item[0].value + if max_occurrence == 1 and attribute_group_item + else [attribute.value for attribute in attribute_group_item] + ) how_many_with_the_same_name = len(attribute_groups[attr_key]) if how_many_with_the_same_name == 1: