Skip to content

Commit

Permalink
fix bug 552 by ensuring that ALL db cre links are from Higher->Lower …
Browse files Browse the repository at this point in the history
…and of type either contains or related, no more lower->higher of type partof since this is prone to bugs, part of is now an api construct
  • Loading branch information
northdpole committed Sep 14, 2024
1 parent 77ae860 commit 9c781e9
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 77 deletions.
32 changes: 24 additions & 8 deletions application/cmd/cre_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ def register_node(node: defs.Node, collection: db.Node_collection) -> db.Node:
then map the one who doesn't to the CRE
if both don't map to anything, just add them in the db as unlinked nodes
"""
if not node:
raise ValueError("node is None")
if not node or not issubclass(node.__class__, defs.Node):
raise ValueError(f"node is None or not of type Node, node: {node}")

linked_node = collection.add_node(node)
if node.embeddings:
Expand Down Expand Up @@ -118,12 +118,28 @@ def register_cre(cre: defs.CRE, collection: db.Node_collection) -> Tuple[db.CRE,
for link in cre.links:
if type(link.document) == defs.CRE:
logger.info(f"{link.document.id} {link.ltype} {cre.id}")
lower_cre, _ = register_cre(link.document, collection)
collection.add_internal_link(
higher=dbcre,
lower=lower_cre,
type=link.ltype,
)
other_cre, _ = register_cre(link.document, collection)

if link.ltype == defs.LinkTypes.Contains:
collection.add_internal_link(
higher=dbcre,
lower=other_cre,
type=link.ltype,
)
elif link.ltype == defs.LinkTypes.PartOf:
collection.add_internal_link(
higher=other_cre,
lower=dbcre,
type=defs.LinkTypes.Contains,
)
elif link.ltype == defs.LinkTypes.Related:
collection.add_internal_link(
higher=other_cre,
lower=dbcre,
type=defs.LinkTypes.Related,
)
else:
raise ValueError(f"Unknown link type {link.ltype}")
else:
collection.add_link(
cre=dbcre,
Expand Down
122 changes: 72 additions & 50 deletions application/database/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -1168,63 +1168,85 @@ def get_CREs(
dbcres = query.all()
if not dbcres:
logger.warning(
"CRE %s:%s:%s does not exist in the db"
% (external_id, name, description)
f"CRE {external_id}:{name}:{description} does not exist in the db"
)
return []

# TODO figure a way to return both the Node
# and the link_type for that link
for dbcre in dbcres:
cre = CREfromDB(dbcre)
linked_nodes = self.session.query(Links).filter(Links.cre == dbcre.id).all()
for ls in linked_nodes:
nd = self.session.query(Node).filter(Node.id == ls.node).first()
if not include_only or (include_only and nd.name in include_only):
n = nodeFromDB(nd)
if not cre.link_exists(n):
cre.add_link(
cre_defs.Link(
document=n, ltype=cre_defs.LinkTypes.from_str(ls.type)
)
)
# TODO figure the query to merge the following two
internal_links = (
self.session.query(InternalLinks)
.filter(
sqla.or_(
InternalLinks.cre == dbcre.id, InternalLinks.group == dbcre.id
)
)
.all()
for matching_cre in dbcres:
cre = CREfromDB(matching_cre)
cre.links = self.__make_cre_links(
cre=matching_cre, include_only_nodes=include_only
)
for il in internal_links:
q = self.session.query(CRE)

res: CRE
ltype = cre_defs.LinkTypes.from_str(il.type)

if il.cre == dbcre.id: # if we are a CRE in this relationship
res = q.filter(
CRE.id == il.group
).first() # get the group in order to add the link
# if this CRE is the lower level cre the relationship will be tagged "Contains"
# in that case the implicit relationship is "Is Part Of"
# otherwise the relationship will be "Related" and we don't need to do anything
if ltype == cre_defs.LinkTypes.Contains:
# important, this is the only implicit link we have for now
ltype = cre_defs.LinkTypes.PartOf
elif ltype == cre_defs.LinkTypes.PartOf:
ltype = cre_defs.LinkTypes.Contains
elif il.group == dbcre.id:
res = q.filter(CRE.id == il.cre).first()
ltype = cre_defs.LinkTypes.from_str(il.type)
c = CREfromDB(res)
if not cre.link_exists(c):
cre.add_link(cre_defs.Link(document=c, ltype=ltype))
cre.links.extend(self.__make_cre_internal_links(cre=matching_cre))
cres.append(cre)
return cres

def __make_cre_internal_links(self, cre: CRE) -> List[cre_defs.Link]:
links = []
internal_links = (
self.session.query(InternalLinks)
.filter(
sqla.or_(InternalLinks.cre == cre.id, InternalLinks.group == cre.id)
)
.all()
)

if len(internal_links) == 0:
logger.debug(
f"CRE {cre.name}:{cre.external_id}:{cre.id} has no internal links"
)

for internal_link in internal_links:
linked_cre_query = self.session.query(CRE)
link_type = cre_defs.LinkTypes.from_str(internal_link.type)

if (
internal_link.cre == cre.id
): # if we are the lower cre in this relationship, we need to flip the "Contains" linktypes
linked_cre = linked_cre_query.filter(
CRE.id == internal_link.group
).first() # get the higher cre so we can add the link
if link_type == cre_defs.LinkTypes.Contains:
links.append(
cre_defs.Link(
ltype=cre_defs.LinkTypes.PartOf,
document=CREfromDB(linked_cre),
)
)
elif (
link_type == cre_defs.LinkTypes.Related
): # if it's not a "Contains" link, it's a "Related" link
links.append(
cre_defs.Link(ltype=link_type, document=CREfromDB(linked_cre))
)
else:
raise ValueError(
f"Received an unexpected link type {link_type} in internal link between {linked_cre.name} and {cre.name}"
)
else: # if we are are the higher cre then we don't need to do anything, relationship types are always "higher"->"lower"
linked_cre = linked_cre_query.filter(
CRE.id == internal_link.cre
).first()
links.append(
cre_defs.Link(ltype=link_type, document=CREfromDB(linked_cre))
)
return links

def __make_cre_links(
self, cre: CRE, include_only_nodes: List[str]
) -> List[cre_defs.Link]:
links = []
for link in self.session.query(Links).filter(Links.cre == cre.id).all():
node = self.session.query(Node).filter(Node.id == link.node).first()
if node and (not include_only_nodes or node.name in include_only_nodes):
links.append(
cre_defs.Link(
ltype=cre_defs.LinkTypes.from_str(link.type),
document=nodeFromDB(node),
)
)
return links

def export(self, dir: str = None, dry_run: bool = False) -> List[cre_defs.Document]:
"""Exports the database to a CRE file collection on disk"""
docs: Dict[str, cre_defs.Document] = {}
Expand Down
73 changes: 72 additions & 1 deletion application/tests/cre_main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ def test_register_standard_with_groupped_cre_links(self) -> None:
) # 2 cres in the db

def test_register_cre(self) -> None:
self.maxDiff = None

self.collection = self.collection.with_graph()

standard = defs.Standard(
name="ASVS",
section="SESSION-MGT-TOKEN-DIRECTIVES-DISCRETE-HANDLING",
Expand All @@ -205,9 +209,32 @@ def test_register_cre(self) -> None:
tags=["CREt1", "CREt2"],
metadata={"tags": ["CREl1", "CREl2"]},
)

cre_lower = defs.CRE(
id="100-101",
description="CREdesc lower",
name="CREname lower",
links=[defs.Link(document=cre.shallow_copy(), ltype=defs.LinkTypes.PartOf)],
)

cre_higher = defs.CRE(
id="100-099",
description="CREdesc higher",
name="CREname higher",
links=[
defs.Link(document=cre.shallow_copy(), ltype=defs.LinkTypes.Contains)
],
)
cre_equal = defs.CRE(
id="100-102",
description="CREdesc equal",
name="CREname equal",
links=[
defs.Link(document=cre.shallow_copy(), ltype=defs.LinkTypes.Related)
],
)
c, _ = main.register_cre(cre, self.collection)
self.assertEqual(c.name, cre.name)

self.assertEqual(c.external_id, cre.id)
self.assertEqual(
len(self.collection.session.query(db.CRE).all()), 1
Expand All @@ -216,6 +243,50 @@ def test_register_cre(self) -> None:
len(self.collection.session.query(db.Node).all()), 2
) # 2 nodes in the db

# hierarchy register tests
main.register_cre(cre_lower, self.collection)
main.register_cre(cre_higher, self.collection)
main.register_cre(cre_equal, self.collection)

c_higher = self.collection.get_CREs(cre_higher.id)[0]
c_lower = self.collection.get_CREs(cre_lower.id)[0]
c_equal = self.collection.get_CREs(cre_equal.id)[0]
c = self.collection.get_CREs(cre.id)[0]

self.assertCountEqual(
c_higher.links,
[defs.Link(document=c.shallow_copy(), ltype=defs.LinkTypes.Contains)],
)
self.assertCountEqual(
c.links,
[
defs.Link(document=standard),
defs.Link(document=tool),
defs.Link(
document=c_lower.shallow_copy(), ltype=defs.LinkTypes.Contains
),
defs.Link(
document=c_higher.shallow_copy(), ltype=defs.LinkTypes.PartOf
),
defs.Link(
document=c_equal.shallow_copy(), ltype=defs.LinkTypes.Related
),
],
)

self.assertCountEqual(
c_lower.links,
[defs.Link(document=c.shallow_copy(), ltype=defs.LinkTypes.PartOf)],
)
self.assertCountEqual(
c_higher.links,
[defs.Link(document=c.shallow_copy(), ltype=defs.LinkTypes.Contains)],
)
self.assertCountEqual(
c_equal.links,
[defs.Link(document=c.shallow_copy(), ltype=defs.LinkTypes.Related)],
)

def test_parse_file(self) -> None:
file: List[Dict[str, Any]] = [
{
Expand Down
Loading

0 comments on commit 9c781e9

Please sign in to comment.