Skip to content

Commit

Permalink
Merge pull request #733 from sfu-db/memleak
Browse files Browse the repository at this point in the history
fix memory leak
  • Loading branch information
wangxiaoying authored Jan 22, 2025
2 parents a72a83c + 0f98a95 commit ce973e5
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 101 deletions.
3 changes: 2 additions & 1 deletion connectorx-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ license = "MIT"
maintainers = ["Weiyuan Wu <[email protected]>"]
name = "connectorx"
readme = "README.md" # Markdown files are supported
version = "0.4.1-alpha1"

[project]
name = "connectorx" # Target file name of maturin build
readme = "README.md"
license = { text = "MIT" }
requires-python = ">=3.10"
version = "0.4.1-alpha1"
dynamic = ["version"]

[tool.poetry.dependencies]
dask = {version = "^2021", optional = true, extras = ["dataframe"]}
Expand Down
22 changes: 11 additions & 11 deletions connectorx-python/src/pandas/destination.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{
pandas_columns::{
ArrayBlock, BooleanBlock, BytesBlock, DateTimeBlock, Float64Block, HasPandasColumn,
Int64Block, PandasColumn, PandasColumnObject, PyBytes, StringBlock,
ArrayBlock, BooleanBlock, BytesBlock, DateTimeBlock, ExtractBlockFromBound, Float64Block,
HasPandasColumn, Int64Block, PandasColumn, PandasColumnObject, PyBytes, StringBlock,
},
pystring::PyString,
typesystem::{PandasArrayType, PandasBlockType, PandasTypeSystem},
Expand Down Expand Up @@ -215,7 +215,7 @@ impl<'py> Destination for PandasDestination<'py> {
let buf = &self.block_datas[idx];
match block.dt {
PandasBlockType::Boolean(_) => {
let bblock = buf.extract::<BooleanBlock>()?;
let bblock = BooleanBlock::extract_block(buf)?;

let bcols = bblock.split()?;
for (&cid, bcol) in block.cids.iter().zip_eq(bcols) {
Expand All @@ -227,7 +227,7 @@ impl<'py> Destination for PandasDestination<'py> {
}
}
PandasBlockType::Float64 => {
let fblock = buf.extract::<Float64Block>()?;
let fblock = Float64Block::extract_block(buf)?;
let fcols = fblock.split()?;
for (&cid, fcol) in block.cids.iter().zip_eq(fcols) {
partitioned_columns[cid] = fcol
Expand All @@ -238,7 +238,7 @@ impl<'py> Destination for PandasDestination<'py> {
}
}
PandasBlockType::BooleanArray => {
let bblock = buf.extract::<ArrayBlock<bool>>()?;
let bblock = ArrayBlock::<bool>::extract_block(buf)?;
let bcols = bblock.split()?;
for (&cid, bcol) in block.cids.iter().zip_eq(bcols) {
partitioned_columns[cid] = bcol
Expand All @@ -249,7 +249,7 @@ impl<'py> Destination for PandasDestination<'py> {
}
}
PandasBlockType::Float64Array => {
let fblock = buf.extract::<ArrayBlock<f64>>()?;
let fblock = ArrayBlock::<f64>::extract_block(buf)?;
let fcols = fblock.split()?;
for (&cid, fcol) in block.cids.iter().zip_eq(fcols) {
partitioned_columns[cid] = fcol
Expand All @@ -260,7 +260,7 @@ impl<'py> Destination for PandasDestination<'py> {
}
}
PandasBlockType::Int64Array => {
let fblock = buf.extract::<ArrayBlock<i64>>()?;
let fblock = ArrayBlock::<i64>::extract_block(buf)?;
let fcols = fblock.split()?;
for (&cid, fcol) in block.cids.iter().zip_eq(fcols) {
partitioned_columns[cid] = fcol
Expand All @@ -271,7 +271,7 @@ impl<'py> Destination for PandasDestination<'py> {
}
}
PandasBlockType::Int64(_) => {
let ublock = buf.extract::<Int64Block>()?;
let ublock = Int64Block::extract_block(buf)?;
let ucols = ublock.split()?;
for (&cid, ucol) in block.cids.iter().zip_eq(ucols) {
partitioned_columns[cid] = ucol
Expand All @@ -282,7 +282,7 @@ impl<'py> Destination for PandasDestination<'py> {
}
}
PandasBlockType::String => {
let sblock = buf.extract::<StringBlock>()?;
let sblock = StringBlock::extract_block(buf)?;
let scols = sblock.split()?;
for (&cid, scol) in block.cids.iter().zip_eq(scols) {
partitioned_columns[cid] = scol
Expand All @@ -293,7 +293,7 @@ impl<'py> Destination for PandasDestination<'py> {
}
}
PandasBlockType::Bytes => {
let bblock = buf.extract::<BytesBlock>()?;
let bblock = BytesBlock::extract_block(buf)?;
let bcols = bblock.split()?;
for (&cid, bcol) in block.cids.iter().zip_eq(bcols) {
partitioned_columns[cid] = bcol
Expand All @@ -304,7 +304,7 @@ impl<'py> Destination for PandasDestination<'py> {
}
}
PandasBlockType::DateTime => {
let dblock = buf.extract::<DateTimeBlock>()?;
let dblock = DateTimeBlock::extract_block(buf)?;
let dcols = dblock.split()?;
for (&cid, dcol) in block.cids.iter().zip_eq(dcols) {
partitioned_columns[cid] = dcol
Expand Down
17 changes: 8 additions & 9 deletions connectorx-python/src/pandas/pandas_columns/array.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject, GIL_MUTEX};
use super::{
check_dtype, ExtractBlockFromBound, HasPandasColumn, PandasColumn, PandasColumnObject,
GIL_MUTEX,
};
use crate::errors::ConnectorXPythonError;
use anyhow::anyhow;
use fehler::throws;
use ndarray::{ArrayViewMut2, Axis, Ix2};
use numpy::{Element, PyArray, PyArrayDescr};
use pyo3::{Bound, FromPyObject, Py, PyAny, PyResult, Python, ToPyObject};
use numpy::{Element, PyArray, PyArrayDescr, PyArrayMethods};
use pyo3::{types::PyAnyMethods, Bound, Py, PyAny, PyResult, Python, ToPyObject};
use std::any::TypeId;
use std::marker::PhantomData;

Expand All @@ -30,8 +33,8 @@ pub struct ArrayBlock<'a, V> {
_value_type: PhantomData<V>,
}

impl<'a, V> FromPyObject<'a> for ArrayBlock<'a, V> {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
impl<'a, V> ExtractBlockFromBound<'a> for ArrayBlock<'a, V> {
fn extract_block<'b: 'a>(ob: &'b pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
check_dtype(ob, "object")?;
let array = ob.downcast::<PyArray<PyList, Ix2>>()?;
let data = unsafe { array.as_array_mut() };
Expand All @@ -41,10 +44,6 @@ impl<'a, V> FromPyObject<'a> for ArrayBlock<'a, V> {
_value_type: PhantomData,
})
}

fn extract_bound(ob: &pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
Self::extract(ob.clone().into_gil_ref())
}
}

impl<'a, V> ArrayBlock<'a, V> {
Expand Down
24 changes: 13 additions & 11 deletions connectorx-python/src/pandas/pandas_columns/boolean.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject};
use super::{
check_dtype, ExtractBlockFromBound, HasPandasColumn, PandasColumn, PandasColumnObject,
};
use crate::errors::ConnectorXPythonError;
use anyhow::anyhow;
use fehler::throws;
use ndarray::{ArrayViewMut1, ArrayViewMut2, Axis, Ix2};
use numpy::{PyArray, PyArray1};
use pyo3::{types::PyTuple, FromPyObject, PyAny, PyResult};
use numpy::{PyArray, PyArray1, PyArrayMethods};
use pyo3::{
types::{PyAnyMethods, PyTuple, PyTupleMethods},
PyAny, PyResult,
};
use std::any::TypeId;

// Boolean
pub enum BooleanBlock<'a> {
NumPy(ArrayViewMut2<'a, bool>),
Extention(ArrayViewMut1<'a, bool>, ArrayViewMut1<'a, bool>),
}
impl<'a> FromPyObject<'a> for BooleanBlock<'a> {
fn extract(ob: &'a PyAny) -> PyResult<Self> {

impl<'a> ExtractBlockFromBound<'a> for BooleanBlock<'a> {
fn extract_block<'b: 'a>(ob: &'b pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
if let Ok(array) = ob.downcast::<PyArray<bool, Ix2>>() {
// if numpy array
check_dtype(ob, "bool")?;
Expand All @@ -22,8 +28,8 @@ impl<'a> FromPyObject<'a> for BooleanBlock<'a> {
} else {
// if extension array
let tuple = ob.downcast::<PyTuple>()?;
let data = tuple.get_item(0)?;
let mask = tuple.get_item(1)?;
let data = tuple.as_slice().get(0).unwrap();
let mask = tuple.as_slice().get(1).unwrap();
check_dtype(data, "bool")?;
check_dtype(mask, "bool")?;

Expand All @@ -33,10 +39,6 @@ impl<'a> FromPyObject<'a> for BooleanBlock<'a> {
))
}
}

fn extract_bound(ob: &pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
Self::extract(ob.clone().into_gil_ref())
}
}

impl<'a> BooleanBlock<'a> {
Expand Down
17 changes: 8 additions & 9 deletions connectorx-python/src/pandas/pandas_columns/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject, GIL_MUTEX};
use super::{
check_dtype, ExtractBlockFromBound, HasPandasColumn, PandasColumn, PandasColumnObject,
GIL_MUTEX,
};
use crate::errors::ConnectorXPythonError;
use anyhow::anyhow;
use fehler::throws;
use ndarray::{ArrayViewMut2, Axis, Ix2};
use numpy::{Element, PyArray, PyArrayDescr};
use pyo3::{Bound, FromPyObject, Py, PyAny, PyResult, Python};
use numpy::{Element, PyArray, PyArrayDescr, PyArrayMethods};
use pyo3::{types::PyAnyMethods, Bound, Py, PyAny, PyResult, Python};
use std::any::TypeId;

#[derive(Clone)]
Expand All @@ -28,8 +31,8 @@ pub struct BytesBlock<'a> {
buf_size_mb: usize,
}

impl<'a> FromPyObject<'a> for BytesBlock<'a> {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
impl<'a> ExtractBlockFromBound<'a> for BytesBlock<'a> {
fn extract_block<'b: 'a>(ob: &'b pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
check_dtype(ob, "object")?;
let array = ob.downcast::<PyArray<PyBytes, Ix2>>()?;
let data = unsafe { array.as_array_mut() };
Expand All @@ -38,10 +41,6 @@ impl<'a> FromPyObject<'a> for BytesBlock<'a> {
buf_size_mb: 16, // in MB
})
}

fn extract_bound(ob: &pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
Self::extract(ob.clone().into_gil_ref())
}
}

impl<'a> BytesBlock<'a> {
Expand Down
16 changes: 7 additions & 9 deletions connectorx-python/src/pandas/pandas_columns/datetime.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject};
use super::{
check_dtype, ExtractBlockFromBound, HasPandasColumn, PandasColumn, PandasColumnObject,
};
use crate::errors::ConnectorXPythonError;
use anyhow::anyhow;
use chrono::{DateTime, Utc};
use fehler::throws;
use ndarray::{ArrayViewMut2, Axis, Ix2};
use numpy::PyArray;
use pyo3::{FromPyObject, PyAny, PyResult};
use numpy::{PyArray, PyArrayMethods};
use pyo3::{types::PyAnyMethods, PyAny, PyResult};
use std::any::TypeId;

// datetime64 is represented in int64 in numpy
Expand All @@ -14,17 +16,13 @@ pub struct DateTimeBlock<'a> {
data: ArrayViewMut2<'a, i64>,
}

impl<'a> FromPyObject<'a> for DateTimeBlock<'a> {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
impl<'a> ExtractBlockFromBound<'a> for DateTimeBlock<'a> {
fn extract_block<'b: 'a>(ob: &'b pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
check_dtype(ob, "int64")?;
let array = ob.downcast::<PyArray<i64, Ix2>>()?;
let data = unsafe { array.as_array_mut() };
Ok(DateTimeBlock { data })
}

fn extract_bound(ob: &pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
Self::extract(ob.clone().into_gil_ref())
}
}

impl<'a> DateTimeBlock<'a> {
Expand Down
20 changes: 9 additions & 11 deletions connectorx-python/src/pandas/pandas_columns/float64.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject};
use super::{
check_dtype, ExtractBlockFromBound, HasPandasColumn, PandasColumn, PandasColumnObject,
};
use crate::errors::ConnectorXPythonError;
use anyhow::anyhow;
use fehler::throws;
use ndarray::{ArrayViewMut2, Axis, Ix2};
use numpy::PyArray;
use pyo3::{FromPyObject, PyAny, PyResult};
use numpy::{PyArray, PyArrayMethods};
use pyo3::{types::PyAnyMethods, PyAny, PyResult};
use std::any::TypeId;

// Float
pub struct Float64Block<'a> {
data: ArrayViewMut2<'a, f64>,
}

impl<'a> FromPyObject<'a> for Float64Block<'a> {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
impl<'a> ExtractBlockFromBound<'a> for Float64Block<'a> {
fn extract_block<'b: 'a>(ob: &'b pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
check_dtype(ob, "float64")?;
let array = ob.downcast::<PyArray<f64, Ix2>>()?;
let data = unsafe { array.as_array_mut() };
let array: &pyo3::Bound<'a, PyArray<f64, Ix2>> = ob.downcast()?;
let data: ArrayViewMut2<'a, f64> = unsafe { array.as_array_mut() };
Ok(Float64Block { data })
}

fn extract_bound(ob: &pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
Self::extract(ob.clone().into_gil_ref())
}
}

impl<'a> Float64Block<'a> {
Expand Down
26 changes: 14 additions & 12 deletions connectorx-python/src/pandas/pandas_columns/int64.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,41 @@
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject};
use super::{
check_dtype, ExtractBlockFromBound, HasPandasColumn, PandasColumn, PandasColumnObject,
};
use crate::errors::ConnectorXPythonError;
use anyhow::anyhow;
use fehler::throws;
use ndarray::{ArrayViewMut1, ArrayViewMut2, Axis, Ix2};
use numpy::{PyArray, PyArray1};
use pyo3::{types::PyTuple, FromPyObject, PyAny, PyResult};
use numpy::{PyArray, PyArray1, PyArrayMethods};
use pyo3::{
types::{PyAnyMethods, PyTuple, PyTupleMethods},
PyAny, PyResult,
};
use std::any::TypeId;

pub enum Int64Block<'a> {
NumPy(ArrayViewMut2<'a, i64>),
Extention(ArrayViewMut1<'a, i64>, ArrayViewMut1<'a, bool>),
}
impl<'a> FromPyObject<'a> for Int64Block<'a> {
fn extract(ob: &'a PyAny) -> PyResult<Self> {

impl<'a> ExtractBlockFromBound<'a> for Int64Block<'a> {
fn extract_block<'b: 'a>(ob: &'b pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
if let Ok(array) = ob.downcast::<PyArray<i64, Ix2>>() {
check_dtype(ob, "int64")?;
let data = unsafe { array.as_array_mut() };
Ok(Int64Block::NumPy(data))
} else {
let tuple = ob.downcast::<PyTuple>()?;
let data = tuple.get_item(0)?;
let mask = tuple.get_item(1)?;
// let data = tuple.get_borrowed_item(0)?;
let data = tuple.as_slice().get(0).unwrap();
let mask = tuple.as_slice().get(1).unwrap();
check_dtype(data, "int64")?;
check_dtype(mask, "bool")?;

Ok(Int64Block::Extention(
unsafe { data.downcast::<PyArray1<i64>>()?.as_array_mut() },
unsafe { mask.downcast::<PyArray1<bool>>()?.as_array_mut() },
))
}
}

fn extract_bound(ob: &pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
Self::extract(ob.clone().into_gil_ref())
}
}

impl<'a> Int64Block<'a> {
Expand Down
Loading

0 comments on commit ce973e5

Please sign in to comment.