Skip to content

Commit

Permalink
add tests that show that noncollider rule works but loop is broken
Browse files Browse the repository at this point in the history
  • Loading branch information
this-is-sofia committed Feb 8, 2025
1 parent bce48a4 commit 5272604
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 14 deletions.
4 changes: 2 additions & 2 deletions causy/causal_discovery/constraint/orientation_rules/pc.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,14 +248,14 @@ def process(
# if one edge has an arrowhead at z, orient the other one pointing away from z.
# It cannot be a collider because we have already oriented all unshielded triples that contain colliders.
for z in potential_zs:
print(f"x: {x.name}, y: {y.name}, z: {z}")
z = graph.nodes[z]
print(f"x: {x.name}, y: {y.name}, z: {z.name}")
breakflag = False
if graph.only_directed_edge_exists(x, z) and graph.undirected_edge_exists(
z, y
):
for node in graph.nodes:
if graph.only_directed_edge_exists(graph.nodes[node], y):
if graph.only_directed_edge_exists(graph.nodes[node], y) and not graph.edge_exists(graph.nodes[node], z):
breakflag = True
break
if breakflag is True:
Expand Down
124 changes: 114 additions & 10 deletions tests/test_orientation_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ def test_non_collider_test(self):
self.assertTrue(model.graph.only_directed_edge_exists(x, y))
self.assertTrue(model.graph.only_directed_edge_exists(y, z))

def test_non_collider_test_auto_mpg_graph_after_collider_rule(self):
def test_non_collider_test_auto_mpg_graph_after_collider_rule_noncollider_test(self):
pipeline = [NonColliderTest()]
model = graph_model_factory(
Algorithm(
Expand All @@ -456,24 +456,29 @@ def test_non_collider_test_auto_mpg_graph_after_collider_rule(self):

model.graph.add_edge(mpg, weight, {})
model.graph.add_edge(displacement, cylinders, {})
model.graph.add_edge(acceleration, horsepower, {})
model.graph.add_edge(horsepower, displacement, {})

# collider acceleration -> horsepower <- weight
model.graph.add_directed_edge(acceleration, horsepower, {})
model.graph.add_directed_edge(mpg, horsepower, {})
model.graph.add_directed_edge(weight, horsepower, {})
# collider mpg -> horsepower <- acceleration
model.graph.add_directed_edge(mpg, horsepower, {})
# collider acceleration -> displacement <- weight
model.graph.add_directed_edge(weight, displacement, {})
model.graph.add_directed_edge(acceleration, displacement, {})

model.execute_pipeline_steps()
# test NonColliderTest
self.assertTrue(model.graph.edge_of_type_exists(displacement, cylinders, DirectedEdge()))
self.assertTrue(model.graph.edge_of_type_exists(horsepower, displacement, DirectedEdge()))

def test_non_collider_test_auto_mpg_graph(self):
pipeline = [NonColliderTest()]
def test_non_collider_test_auto_mpg_graph_after_collider_rule_whole_loop(self):
pipeline = [*PC_ORIENTATION_RULES]
model = graph_model_factory(
Algorithm(
pipeline_steps=pipeline,
edge_types=[DirectedEdge(), UndirectedEdge()],
name="TestCollider",
name="TestLoopAutoMpgGraph",
)
)()
model.graph = GraphManager()
Expand All @@ -483,19 +488,118 @@ def test_non_collider_test_auto_mpg_graph(self):
cylinders = model.graph.add_node("cylinders", [])
displacement = model.graph.add_node("displacement", [])
weight = model.graph.add_node("weight", [])

model.graph.add_edge(mpg, weight, {})
model.graph.add_edge(weight, displacement, {})
model.graph.add_edge(displacement, cylinders, {})
model.graph.add_edge(weight, horsepower, {})
model.graph.add_edge(acceleration, horsepower, {})
model.graph.add_edge(horsepower, displacement, {})

# collider acceleration -> horsepower <- weight
model.graph.add_directed_edge(acceleration, horsepower, {})
model.graph.add_directed_edge(horsepower, displacement, {})
model.graph.add_directed_edge(weight, horsepower, {})
# collider mpg -> horsepower <- acceleration
model.graph.add_directed_edge(mpg, horsepower, {})
# collider acceleration -> displacement <- weight
model.graph.add_directed_edge(weight, displacement, {})
model.graph.add_directed_edge(acceleration, displacement, {})

model.execute_pipeline_steps()
# test NonColliderTest
self.assertTrue(model.graph.edge_of_type_exists(displacement, cylinders, DirectedEdge()))
self.assertTrue(model.graph.edge_of_type_exists(horsepower, displacement, DirectedEdge()))

def test_only_non_collider_rule_on_loop_test_model(self):
pipeline = [NonColliderTest()]
model = graph_model_factory(
Algorithm(
pipeline_steps=pipeline,
edge_types=[DirectedEdge(), UndirectedEdge()],
name="TestLoop",
)
)()
model.graph = GraphManager()
x = model.graph.add_node("X", [])
y = model.graph.add_node("Y", [])
z = model.graph.add_node("Z", [])
w = model.graph.add_node("W", [])
model.graph.add_edge(z, w, {})
model.graph.add_directed_edge(x, z, {})
model.graph.add_directed_edge(y, z, {})
model.execute_pipeline_steps()
self.assertTrue(model.graph.edge_of_type_exists(x, z, DirectedEdge()))
self.assertTrue(model.graph.edge_of_type_exists(y, z, DirectedEdge()))
self.assertTrue(model.graph.edge_of_type_exists(z, w, DirectedEdge()))

def test_loop(self):
pipeline = [*PC_ORIENTATION_RULES]
model = graph_model_factory(
Algorithm(
pipeline_steps=pipeline,
edge_types=[DirectedEdge(), UndirectedEdge()],
name="TestLoop",
)
)()
model.graph = GraphManager()
x = model.graph.add_node("X", [])
y = model.graph.add_node("Y", [])
z = model.graph.add_node("Z", [])
w = model.graph.add_node("W", [])
model.graph.add_edge(z, w, {})
model.graph.add_directed_edge(x, z, {})
model.graph.add_directed_edge(y, z, {})
model.execute_pipeline_steps()
self.assertTrue(model.graph.edge_of_type_exists(x, z, DirectedEdge()))
self.assertTrue(model.graph.edge_of_type_exists(y, z, DirectedEdge()))
self.assertTrue(model.graph.edge_of_type_exists(z, w, DirectedEdge()))

def test_loop_two_iterations(self):
pipeline = [*PC_ORIENTATION_RULES]
model = graph_model_factory(
Algorithm(
pipeline_steps=pipeline,
edge_types=[DirectedEdge(), UndirectedEdge()],
name="TestLoop",
)
)()
model.graph = GraphManager()
x = model.graph.add_node("X", [])
y = model.graph.add_node("Y", [])
z = model.graph.add_node("Z", [])
w = model.graph.add_node("W", [])
v = model.graph.add_node("V", [])
model.graph.add_edge(z, w, {})
model.graph.add_edge(w, v, {})
model.graph.add_directed_edge(x, z, {})
model.graph.add_directed_edge(y, z, {})
model.execute_pipeline_steps()
self.assertTrue(model.graph.edge_of_type_exists(x, z, DirectedEdge()))
self.assertTrue(model.graph.edge_of_type_exists(y, z, DirectedEdge()))
self.assertTrue(model.graph.edge_of_type_exists(z, w, DirectedEdge()))
self.assertTrue(model.graph.edge_of_type_exists(w, v, DirectedEdge()))

def test_only_noncollider_rule_on_loop_model_after_one_iteration(self):
pipeline = [NonColliderTest()]
model = graph_model_factory(
Algorithm(
pipeline_steps=pipeline,
edge_types=[DirectedEdge(), UndirectedEdge()],
name="TestLoop",
)
)()
model.graph = GraphManager()
x = model.graph.add_node("X", [])
y = model.graph.add_node("Y", [])
z = model.graph.add_node("Z", [])
w = model.graph.add_node("W", [])
v = model.graph.add_node("V", [])
model.graph.add_edge(w, v, {})
model.graph.add_directed_edge(z, w, {})
model.graph.add_directed_edge(x, z, {})
model.graph.add_directed_edge(y, z, {})
model.execute_pipeline_steps()
self.assertTrue(model.graph.edge_of_type_exists(x, z, DirectedEdge()))
self.assertTrue(model.graph.edge_of_type_exists(y, z, DirectedEdge()))
self.assertTrue(model.graph.edge_of_type_exists(z, w, DirectedEdge()))
self.assertTrue(model.graph.edge_of_type_exists(w, v, DirectedEdge()))
def test_non_collider_loop_auto_mpg_graph(self):
pipeline = [*PC_ORIENTATION_RULES]
model = graph_model_factory(
Expand Down
8 changes: 6 additions & 2 deletions tests/test_pc_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
ComputeDirectEffectsInDAGsMultivariateRegression,
)
from causy.common_pipeline_steps.calculation import CalculatePearsonCorrelations
from causy.edge_types import DirectedEdge
from causy.edge_types import DirectedEdge, UndirectedEdge
from causy.generators import PairsWithNeighboursGenerator
from causy.graph_model import graph_model_factory
from causy.causal_discovery.constraint.independence_tests.common import (
Expand Down Expand Up @@ -108,11 +108,15 @@ def test_pc_e2e_auto_mpg(self):
self.assertEqual(pc.graph.edge_exists("horsepower", "cylinders"), False)

# directtions
self.assertEqual(pc.graph.edge_of_type_exists("mpg", "weight", UndirectedEdge()), True)
self.assertEqual(pc.graph.edge_of_type_exists("weight", "horsepower", DirectedEdge()), True)
self.assertEqual(pc.graph.edge_of_type_exists("horsepower", "displacement", DirectedEdge()), True)
self.assertEqual(pc.graph.edge_of_type_exists("weight", "displacement", DirectedEdge()), True)
self.assertEqual(pc.graph.edge_of_type_exists("mpg", "horsepower", DirectedEdge()), True)
self.assertEqual(pc.graph.edge_of_type_exists("acceleration", "horsepower", DirectedEdge()), True)
self.assertEqual(pc.graph.edge_of_type_exists("acceleration", "displacement", DirectedEdge()), True)
self.assertEqual(pc.graph.edge_of_type_exists("displacement", "cylinders", DirectedEdge()), True)
self.assertEqual(pc.graph.edge_of_type_exists("horsepower", "displacement", DirectedEdge()), True)




Expand Down

0 comments on commit 5272604

Please sign in to comment.