Skip to content

Commit

Permalink
wip: refactor controller
Browse files Browse the repository at this point in the history
  • Loading branch information
aMahanna committed Jan 7, 2025
1 parent 6bf7dde commit 4525a5a
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 315 deletions.
40 changes: 31 additions & 9 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ env:
PACKAGE_DIR: adbnx_adapter
TESTS_DIR: tests
jobs:
build:
lint:
runs-on: ubuntu-latest
continue-on-error: true
strategy:
matrix:
python: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python: ["3.9", "3.10", "3.11", "3.12"]
name: Python ${{ matrix.python }}
steps:
- uses: actions/checkout@v4
Expand All @@ -25,12 +24,6 @@ jobs:
cache: 'pip'
cache-dependency-path: setup.py

- name: Set up ArangoDB Instance via Docker
run: docker create --name adb -p 8529:8529 -e ARANGO_ROOT_PASSWORD= arangodb/arangodb

- name: Start ArangoDB Instance
run: docker start adb

- name: Setup pip
run: python -m pip install --upgrade pip setuptools wheel

Expand All @@ -49,6 +42,35 @@ jobs:
- name: Run mypy
run: mypy ${{env.PACKAGE_DIR}} ${{env.TESTS_DIR}}

test:
runs-on: ubuntu-latest
continue-on-error: true
strategy:
matrix:
python: ["3.9", "3.10", "3.11", "3.12"]
name: Python ${{ matrix.python }}
steps:
- uses: actions/checkout@v4

- name: Setup Python ${{ matrix.python }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python }}
cache: 'pip'
cache-dependency-path: setup.py

- name: Set up ArangoDB Instance via Docker
run: docker create --name adb -p 8529:8529 -e ARANGO_ROOT_PASSWORD= arangodb/arangodb

- name: Start ArangoDB Instance
run: docker start adb

- name: Setup pip
run: python -m pip install --upgrade pip setuptools wheel

- name: Install packages
run: pip install .[dev]

- name: Run pytest
run: pytest --cov=${{env.PACKAGE_DIR}} --cov-report xml --cov-report term-missing -v --color=yes --no-cov-on-fail --code-highlight=yes

Expand Down
51 changes: 11 additions & 40 deletions adbnx_adapter/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
# -*- coding: utf-8 -*-

from abc import ABC
from typing import Any, Dict, List, Optional, Set, Union
from typing import Any, Dict, List, Optional, Set

from arango.graph import Graph as ADBGraph
from networkx.classes.graph import Graph as NXGraph
from networkx.classes.multidigraph import MultiGraph as NXMultiDiGraph

from .typings import ArangoMetagraph, Json, NxData, NxId
from .typings import ArangoMetagraph, Json, NxId, NxData


class Abstract_ADBNX_Adapter(ABC):
Expand Down Expand Up @@ -50,7 +50,7 @@ def networkx_to_arangodb(
edge_definitions: Optional[List[Json]] = None,
orphan_collections: Optional[List[str]] = None,
overwrite_graph: bool = False,
batch_size: Optional[int] = None,
batch_size: int = 1000,
use_async: bool = False,
**adb_import_kwargs: Any,
) -> ADBGraph:
Expand All @@ -64,49 +64,20 @@ def _prepare_arangodb_vertex(self, adb_vertex: Json, col: str) -> None:
def _prepare_arangodb_edge(self, adb_edge: Json, col: str) -> None:
raise NotImplementedError # pragma: no cover

def _identify_networkx_node(
self, nx_node_id: NxId, nx_node: NxData, adb_v_cols: List[str]
) -> str:
raise NotImplementedError # pragma: no cover

def _identify_networkx_edge(
self,
nx_edge: NxData,
from_node_id: NxId,
to_node_id: NxId,
nx_map: Dict[NxId, str],
adb_e_cols: List[str],
) -> str:
raise NotImplementedError # pragma: no cover

def _keyify_networkx_node(
self, i: int, nx_node_id: NxId, nx_node: NxData, col: str
) -> str:
def _prepare_networkx_node(
self, i: int, nx_node_id: Any, nx_node: NxData, adb_v_cols: List[str]
) -> tuple[str, str]:
raise NotImplementedError # pragma: no cover

def _keyify_networkx_edge(
def _prepare_networkx_edge(
self,
i: int,
nx_edge: NxData,
from_node_id: NxId,
to_node_id: NxId,
nx_map: Dict[NxId, str],
col: str,
) -> Union[str, None]:
raise NotImplementedError # pragma: no cover

def _prepare_networkx_node(
self,
nx_node: Json,
col: str,
) -> None:
raise NotImplementedError # pragma: no cover

def _prepare_networkx_edge(
self,
nx_edge: Json,
col: str,
) -> None:
nx_edge: NxData,
adb_e_cols: List[str],
nx_map: Dict[Any, str],
) -> tuple[str, str | None]:
raise NotImplementedError # pragma: no cover

@property
Expand Down
83 changes: 30 additions & 53 deletions adbnx_adapter/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ def __init__(
):
self.set_logging(logging_lvl)

if issubclass(type(db), StandardDatabase) is False:
if isinstance(db, StandardDatabase) is False:
msg = "**db** parameter must inherit from arango.database.StandardDatabase"
raise TypeError(msg)

if issubclass(type(controller), ADBNX_Controller) is False:
if isinstance(controller, ADBNX_Controller) is False:
msg = "**controller** parameter must inherit from ADBNX_Controller"
raise TypeError(msg)

Expand Down Expand Up @@ -261,7 +261,7 @@ def networkx_to_arangodb(
edge_definitions: Optional[List[Json]] = None,
orphan_collections: Optional[List[str]] = None,
overwrite_graph: bool = False,
batch_size: Optional[int] = None,
batch_size: int = 1000,
use_async: bool = False,
**adb_import_kwargs: Any,
) -> ADBGraph:
Expand All @@ -285,7 +285,7 @@ def networkx_to_arangodb(
:type overwrite_graph: bool
:param batch_size: If specified, runs the ArangoDB Data Ingestion
process for every **batch_size** NetworkX nodes/edges within **nx_graph**.
Defaults to `len(nx_nodes)` & `len(nx_edges)`.
Defaults to `len(nx_nodes)` & `len(nx_edges)`. Defaults to 1000.
:type batch_size: int | None
:param use_async: Performs asynchronous ArangoDB ingestion if enabled.
Defaults to False.
Expand Down Expand Up @@ -339,13 +339,12 @@ def networkx_to_arangodb(
nx_node: NxData

nx_nodes = nx_graph.nodes(data=True)
node_batch_size = batch_size or len(nx_nodes)

bar_progress = get_bar_progress("(NX → ADB): Nodes", "#97C423")
bar_progress_task = bar_progress.add_task("Nodes", total=len(nx_nodes))

with Live(Group(bar_progress, spinner_progress)):
for i, (nx_id, nx_node) in enumerate(nx_nodes):
for i, (nx_id, nx_node) in enumerate(nx_nodes, 1):
bar_progress.advance(bar_progress_task)

# 1. Process NetworkX node
Expand All @@ -356,11 +355,10 @@ def networkx_to_arangodb(
nx_map,
adb_docs,
adb_v_cols,
has_one_v_col,
)

# 2. Insert batch of nodes
if i and i % node_batch_size == 0:
if i % batch_size == 0:
self.__insert_adb_docs(
spinner_progress, adb_docs, use_async, **adb_import_kwargs
)
Expand All @@ -379,13 +377,12 @@ def networkx_to_arangodb(
nx_edge: NxData

nx_edges = nx_graph.edges(data=True)
edge_batch_size = batch_size or len(nx_edges)

bar_progress = get_bar_progress("(NX → ADB): Edges", "#5E3108")
bar_progress_task = bar_progress.add_task("Edges", total=len(nx_edges))

with Live(Group(bar_progress, spinner_progress)):
for i, (from_node_id, to_node_id, nx_edge) in enumerate(nx_edges):
for i, (from_node_id, to_node_id, nx_edge) in enumerate(nx_edges, 1):
bar_progress.advance(bar_progress_task)

# 1. Process NetworkX edge
Expand All @@ -397,11 +394,10 @@ def networkx_to_arangodb(
nx_map,
adb_docs,
adb_e_cols,
has_one_e_col,
)

# 2. Insert batch of edges
if i and i % edge_batch_size == 0:
if i % batch_size == 0:
self.__insert_adb_docs(
spinner_progress, adb_docs, use_async, **adb_import_kwargs
)
Expand Down Expand Up @@ -445,6 +441,7 @@ def __fetch_adb_docs(
:rtype: Tuple[arango.cursor.Cursor, int]
"""
aql_return_value = "doc"

if explicit_metagraph:
default_keys = ["_id", "_key"]
default_keys += ["_from", "_to"] if is_edge else []
Expand Down Expand Up @@ -528,6 +525,8 @@ def __process_adb_vertex(
self.__cntrl._prepare_arangodb_vertex(adb_v, v_col)
nx_id: str = adb_v["_id"]

del adb_v["_id"]

if adb_id != nx_id:
adb_map[adb_id] = nx_id

Expand Down Expand Up @@ -605,7 +604,6 @@ def __process_nx_node(
nx_map: Dict[NxId, str],
adb_docs: DefaultDict[str, List[Json]],
adb_v_cols: List[str],
has_one_v_col: bool,
) -> None:
"""NetworkX -> ArangoDB: Processes a NetworkX node.
Expand All @@ -621,30 +619,23 @@ def __process_nx_node(
:type adb_docs: DefaultDict[str, List[Dict[str, Any]]]
:param adb_v_cols: The ArangoDB vertex collections.
:type adb_v_cols: List[str]
:param has_one_v_col: True if the Graph has one Vertex collection.
:type has_one_v_col: bool
"""
logger.debug(f"N{i}: {nx_id}")

col = (
adb_v_cols[0]
if has_one_v_col
else self.__cntrl._identify_networkx_node(nx_id, nx_node, adb_v_cols)
)
col, key = self.__cntrl._prepare_networkx_node(i, nx_id, nx_node, adb_v_cols)

if not has_one_v_col and col not in adb_v_cols:
if col not in adb_v_cols:
msg = f"'{nx_id}' identified as '{col}', which is not in {adb_v_cols}"
raise ValueError(msg)

key = self.__cntrl._keyify_networkx_node(i, nx_id, nx_node, col)

nx_node["_key"] = key
if not key:
msg = f"'{nx_id}' has no _key value"
raise ValueError(msg)

_id = f"{col}/{key}"
if _id != nx_id:
nx_map[nx_id] = _id
if f"{col}/{key}" != nx_id:
nx_map[nx_id] = f"{col}/{key}"

self.__cntrl._prepare_networkx_node(nx_node, col)
nx_node["_key"] = key
adb_docs[col].append(nx_node)

def __process_nx_edge(
Expand All @@ -656,7 +647,6 @@ def __process_nx_edge(
nx_map: Dict[NxId, str],
adb_docs: DefaultDict[str, List[Json]],
adb_e_cols: List[str],
has_one_e_col: bool,
) -> None:
"""NetworkX -> ArangoDB: Processes a NetworkX edge.
Expand All @@ -674,43 +664,29 @@ def __process_nx_edge(
:type adb_docs: DefaultDict[str, List[Dict[str, Any]]]
:param adb_e_cols: The ArangoDB edge collections.
:type adb_e_cols: List[str]
:param has_one_e_col: True if the Graph has one Edge collection.
:type has_one_e_col: bool
"""
edge_str = f"({from_node_id}, {to_node_id})"
logger.debug(f"E{i}: {edge_str}")

col = (
adb_e_cols[0]
if has_one_e_col
else self.__cntrl._identify_networkx_edge(
nx_edge,
from_node_id,
to_node_id,
nx_map,
adb_e_cols,
)
)

if not has_one_e_col and col not in adb_e_cols:
msg = f"{edge_str} identified as '{col}', which is not in {adb_e_cols}"
raise ValueError(msg)

key = self.__cntrl._keyify_networkx_edge(
col, key = self.__cntrl._prepare_networkx_edge(
i,
nx_edge,
from_node_id,
to_node_id,
nx_edge,
adb_e_cols,
nx_map,
col,
)

nx_edge["_from"] = nx_map.get(from_node_id, from_node_id)
nx_edge["_to"] = nx_map.get(to_node_id, to_node_id)
if col not in adb_e_cols:
msg = f"{edge_str} identified as '{col}', which is not in {adb_e_cols}"
raise ValueError(msg)

if key:
nx_edge["_key"] = key

self.__cntrl._prepare_networkx_edge(nx_edge, col)
nx_edge["_from"] = nx_map.get(from_node_id, from_node_id)
nx_edge["_to"] = nx_map.get(to_node_id, to_node_id)

adb_docs[col].append(nx_edge)

def __insert_adb_docs(
Expand Down Expand Up @@ -747,6 +723,7 @@ def __insert_adb_docs(
action = f"ADB Import: '{col}' ({len(doc_list)})"
spinner_progress_task = spinner_progress.add_task("", action=action)

logger.debug(action)
result = db.collection(col).import_bulk(doc_list, **adb_import_kwargs)
logger.debug(result)

Expand Down
Loading

0 comments on commit 4525a5a

Please sign in to comment.