-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add arrow as a backend for speedup of around 10-20x for PLP and other Hades packages #35
Comments
Arrow sounds great! I have been struggling with the speed of our current implementation at times, especially with massive data sets. |
+1 from me - sqlite is very slow, it would be great to have data manipulation done ~10x-20x faster! |
This sounds great. I'll work on it for the next release. |
@egillax -- whether or not we change the backend of |
Arrow was considered when we were looking to replace ff, but at the time I found it to be unstable. It also appears to keep all the data in memory, which is the problem we're trying to avoid with Andromeda. |
Hi @schuemie, It possible was unstable at that time, version 1.0 didn´t come until july 2020. I've not found it unstable and it seems to have been developing fast since then. It´s never crashed on me like it seems to have done for you in 2020. Just to clarify, all the above timings are using arrow datasets which are for working with file base datasets without loading them in memory. This is also something that appears to not have been around in 2020. This is all done using dplyr verbs, which they have been adding support for fast in the last year. Aggregations were only added recently. |
A little more data. I ran three version of tidyCovariates from FeatureExtraction. The arrow implementation from my plp fork, an in memory version I made and the original version which uses andromeda with rqslite. I benchmarked it using bench which measures the memory allocation as well. I used simulated data with 10000 subjects. benchmark <- bench::mark(tidyCovariateDataArrow(plpData$covariateData),
tidyCovariateDataMemory(memoryData),
FeatureExtraction::tidyCovariateData(plpAndromeda$covariateData),
check = FALSE, filter_gc = FALSE)
# A tibble: 3 × 13
expression min median `itr/sec` mem_alloc `gc/sec` n_itr n_gc total_time result memory time gc
<bch:expr> <bch:tm> <bch:tm> <dbl> <bch:byt> <dbl> <int> <dbl> <bch:tm> <list> <list> <list> <list>
1 tidyCovariateDataArrow(plpData$covariateData) 985ms 985ms 1.02 27.88MB 1.02 1 1 985ms <NULL> <Rprofmem [4,547 × 3]> <bench_tm> <tibble>
2 tidyCovariateDataMemory(memoryData) 4.5s 4.5s 0.222 2.47GB 2.89 1 13 4.5s <NULL> <Rprofmem> <bench_tm> <tibble>
3 FeatureExtraction::tidyCovariateData(plpAndromeda$covariateData) 31.2s 31.2s 0.0321 505.3MB 0.0642 1 2 31.2s <NULL> <Rprofmem [12,739 × 3]> <bench_tm> <tibble> As can be seen the arrow implementation is faster than the in-memory version, and uses the least memory of all the approaches. Another interesting thing I hadn't mentioned before is that the file sizes of the arrow dataset is smaller, in this case 253MB vs 577MB for rsqlite. |
Cool! That does look promising. |
Here are some more experiments with the arrow functionality. I think this could work. I guess the directory structure would be something like AndromedaTempFolder/AndromedaObjectFolder/Dataset/file.feather We'd be trading SQL for the dplyr implemented in arrow which would likely be more limited. What would be helpful for me is to better understand the operations/functions Andromeda needs to support (e.g. datediff?) Is there anything in SQL that is not available in arrow that we need? library(arrow)
#>
#> Attaching package: 'arrow'
#> The following object is masked from 'package:utils':
#>
#> timestamp
library(dplyr)
#>
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#>
#> filter, lag
#> The following objects are masked from 'package:base':
#>
#> intersect, setdiff, setequal, union
library(nycflights13)
write_dataset(airlines, "airlines", format = "feather")
write_dataset(flights, "flights", format = "feather")
andr <- list()
andr$airlines <- open_dataset("airlines", format = "feather")
andr$flights <- open_dataset("flights", format = "feather")
# count rows
andr$flights %>%
tally() %>%
collect()
#> # A tibble: 1 × 1
#> n
#> <int>
#> 1 336776
# head
andr$flights %>%
head(10) %>%
collect()
#> # A tibble: 10 × 19
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> <int> <int> <int> <int> <int> <dbl> <int> <int>
#> 1 2013 1 1 517 515 2 830 819
#> 2 2013 1 1 533 529 4 850 830
#> 3 2013 1 1 542 540 2 923 850
#> 4 2013 1 1 544 545 -1 1004 1022
#> 5 2013 1 1 554 600 -6 812 837
#> 6 2013 1 1 554 558 -4 740 728
#> 7 2013 1 1 555 600 -5 913 854
#> 8 2013 1 1 557 600 -3 709 723
#> 9 2013 1 1 557 600 -3 838 846
#> 10 2013 1 1 558 600 -2 753 745
#> # … with 11 more variables: arr_delay <dbl>, carrier <chr>, flight <int>,
#> # tailnum <chr>, origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>,
#> # hour <dbl>, minute <dbl>, time_hour <dttm>
# joins
andr$flights %>%
inner_join(andr$airlines, by = "carrier") %>%
collect()
#> # A tibble: 336,776 × 20
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> <int> <int> <int> <int> <int> <dbl> <int> <int>
#> 1 2013 1 1 517 515 2 830 819
#> 2 2013 1 1 533 529 4 850 830
#> 3 2013 1 1 542 540 2 923 850
#> 4 2013 1 1 544 545 -1 1004 1022
#> 5 2013 1 1 554 600 -6 812 837
#> 6 2013 1 1 554 558 -4 740 728
#> 7 2013 1 1 555 600 -5 913 854
#> 8 2013 1 1 557 600 -3 709 723
#> 9 2013 1 1 557 600 -3 838 846
#> 10 2013 1 1 558 600 -2 753 745
#> # … with 336,766 more rows, and 12 more variables: arr_delay <dbl>,
#> # carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
#> # air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>,
#> # name <chr>
# joins with dataframe in R
andr$flights %>%
inner_join(airlines, by = "carrier") %>%
collect()
#> # A tibble: 336,776 × 20
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> <int> <int> <int> <int> <int> <dbl> <int> <int>
#> 1 2013 1 1 517 515 2 830 819
#> 2 2013 1 1 533 529 4 850 830
#> 3 2013 1 1 542 540 2 923 850
#> 4 2013 1 1 544 545 -1 1004 1022
#> 5 2013 1 1 554 600 -6 812 837
#> 6 2013 1 1 554 558 -4 740 728
#> 7 2013 1 1 555 600 -5 913 854
#> 8 2013 1 1 557 600 -3 709 723
#> 9 2013 1 1 557 600 -3 838 846
#> 10 2013 1 1 558 600 -2 753 745
#> # … with 336,766 more rows, and 12 more variables: arr_delay <dbl>,
#> # carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
#> # air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>,
#> # name <chr>
# I'm kind of surprised this works
# filtering joins with data in R
andr$flights %>%
semi_join(head(airlines, 1), by = "carrier") %>%
collect()
#> # A tibble: 18,460 × 19
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> <int> <int> <int> <int> <int> <dbl> <int> <int>
#> 1 2013 1 1 810 810 0 1048 1037
#> 2 2013 1 1 1451 1500 -9 1634 1636
#> 3 2013 1 1 1452 1455 -3 1637 1639
#> 4 2013 1 1 1454 1500 -6 1635 1636
#> 5 2013 1 1 1507 1515 -8 1651 1656
#> 6 2013 1 1 1530 1530 0 1650 1655
#> 7 2013 1 1 1546 1540 6 1753 1748
#> 8 2013 1 1 1550 1550 0 1844 1831
#> 9 2013 1 1 1552 1600 -8 1749 1757
#> 10 2013 1 1 1554 1600 -6 1701 1734
#> # … with 18,450 more rows, and 11 more variables: arr_delay <dbl>,
#> # carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
#> # air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>
# filtering join with data in arrow
andr$flights %>%
semi_join(head(andr$airlines, 1), by = "carrier") %>%
collect()
#> # A tibble: 18,460 × 19
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> <int> <int> <int> <int> <int> <dbl> <int> <int>
#> 1 2013 1 1 810 810 0 1048 1037
#> 2 2013 1 1 1451 1500 -9 1634 1636
#> 3 2013 1 1 1452 1455 -3 1637 1639
#> 4 2013 1 1 1454 1500 -6 1635 1636
#> 5 2013 1 1 1507 1515 -8 1651 1656
#> 6 2013 1 1 1530 1530 0 1650 1655
#> 7 2013 1 1 1546 1540 6 1753 1748
#> 8 2013 1 1 1550 1550 0 1844 1831
#> 9 2013 1 1 1552 1600 -8 1749 1757
#> 10 2013 1 1 1554 1600 -6 1701 1734
#> # … with 18,450 more rows, and 11 more variables: arr_delay <dbl>,
#> # carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
#> # air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>
# filter using data in arrow (not supported)
andr$flights %>%
filter(carrier %in% andr$airlines$carrier[1:3]) %>%
collect()
#> Error: Filter expression not supported for Arrow Datasets: carrier %in% andr$airlines$carrier[1:3]
#> Call collect() first to pull data into R.
# filter using values in R
andr$flights %>%
filter(carrier %in% airlines$carrier[1:3]) %>%
collect()
#> # A tibble: 51,903 × 19
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> <int> <int> <int> <int> <int> <dbl> <int> <int>
#> 1 2013 1 1 542 540 2 923 850
#> 2 2013 1 1 558 600 -2 753 745
#> 3 2013 1 1 559 600 -1 941 910
#> 4 2013 1 1 606 610 -4 858 910
#> 5 2013 1 1 623 610 13 920 915
#> 6 2013 1 1 628 630 -2 1137 1140
#> 7 2013 1 1 629 630 -1 824 810
#> 8 2013 1 1 635 635 0 1028 940
#> 9 2013 1 1 656 700 -4 854 850
#> 10 2013 1 1 656 659 -3 949 959
#> # … with 51,893 more rows, and 11 more variables: arr_delay <dbl>,
#> # carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
#> # air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>
# sum
andr$flights %>%
count(carrier) %>%
inner_join(andr$airlines, by = "carrier") %>%
mutate(number_of_flights = n) %>%
select(name, number_of_flights) %>%
collect()
#> # A tibble: 16 × 2
#> name number_of_flights
#> <chr> <int>
#> 1 United Air Lines Inc. 58665
#> 2 American Airlines Inc. 32729
#> 3 JetBlue Airways 54635
#> 4 Delta Air Lines Inc. 48110
#> 5 ExpressJet Airlines Inc. 54173
#> 6 Envoy Air 26397
#> 7 US Airways Inc. 20536
#> 8 Southwest Airlines Co. 12275
#> 9 Virgin America 5162
#> 10 AirTran Airways Corporation 3260
#> 11 Alaska Airlines Inc. 714
#> 12 Endeavor Air Inc. 18460
#> 13 Frontier Airlines Inc. 685
#> 14 Hawaiian Airlines Inc. 342
#> 15 Mesa Airlines Inc. 601
#> 16 SkyWest Airlines Inc. 32
# distinct
andr$flights %>%
distinct(carrier) %>%
left_join(andr$airlines, by = "carrier") %>%
collect()
#> # A tibble: 16 × 2
#> carrier name
#> <chr> <chr>
#> 1 UA United Air Lines Inc.
#> 2 AA American Airlines Inc.
#> 3 B6 JetBlue Airways
#> 4 DL Delta Air Lines Inc.
#> 5 EV ExpressJet Airlines Inc.
#> 6 MQ Envoy Air
#> 7 US US Airways Inc.
#> 8 WN Southwest Airlines Co.
#> 9 VX Virgin America
#> 10 FL AirTran Airways Corporation
#> 11 AS Alaska Airlines Inc.
#> 12 9E Endeavor Air Inc.
#> 13 F9 Frontier Airlines Inc.
#> 14 HA Hawaiian Airlines Inc.
#> 15 YV Mesa Airlines Inc.
#> 16 OO SkyWest Airlines Inc.
# mean, min, max
andr$flights %>%
group_by(carrier) %>%
summarise(mean_delay = mean(dep_delay, na.rm = TRUE),
min_delay = min(dep_delay, na.rm = TRUE),
max_delay = max(dep_delay, na.rm = TRUE)) %>%
left_join(andr$airlines, by = "carrier") %>%
select(name, mean_delay, min_delay, max_delay) %>%
arrange(desc(mean_delay)) %>%
collect()
#> # A tibble: 16 × 4
#> name mean_delay min_delay max_delay
#> <chr> <dbl> <dbl> <dbl>
#> 1 Frontier Airlines Inc. 20.2 -27 853
#> 2 ExpressJet Airlines Inc. 20.0 -32 548
#> 3 Mesa Airlines Inc. 19.0 -16 387
#> 4 AirTran Airways Corporation 18.7 -22 602
#> 5 Southwest Airlines Co. 17.7 -13 471
#> 6 Endeavor Air Inc. 16.7 -24 747
#> 7 JetBlue Airways 13.0 -43 502
#> 8 Virgin America 12.9 -20 653
#> 9 SkyWest Airlines Inc. 12.6 -14 154
#> 10 United Air Lines Inc. 12.1 -20 483
#> 11 Envoy Air 10.6 -26 1137
#> 12 Delta Air Lines Inc. 9.26 -33 960
#> 13 American Airlines Inc. 8.59 -24 1014
#> 14 Alaska Airlines Inc. 5.80 -21 225
#> 15 Hawaiian Airlines Inc. 4.90 -16 1301
#> 16 US Airways Inc. 3.78 -19 500
# date functions
library(wakefield)
#>
#> Attaching package: 'wakefield'
#> The following object is masked from 'package:dplyr':
#>
#> id
#> The following object is masked from 'package:arrow':
#>
#> string
date_df <- tibble::tibble(date1 = date_stamp(1e5, T), date2 = date_stamp(1e5, T))
write_dataset(date_df, "date_df", format = "feather")
andr$date_df <- open_dataset("date_df", format = "feather")
# date types are preserved
andr$date_df %>%
collect()
#> # A tibble: 100,000 × 2
#> date1 date2
#> * <date> <date>
#> 1 2022-01-27 2021-08-27
#> 2 2021-05-27 2021-07-27
#> 3 2021-05-27 2021-11-27
#> 4 2021-10-27 2022-04-27
#> 5 2021-05-27 2021-07-27
#> 6 2021-10-27 2021-07-27
#> 7 2022-02-27 2021-06-27
#> 8 2021-11-27 2021-10-27
#> 9 2022-01-27 2022-01-27
#> 10 2022-03-27 2022-01-27
#> # … with 99,990 more rows
# date difference does not work
andr$date_df %>%
mutate(date_diff = date2 - date1) %>%
collect()
#> Error in `handle_csv_read_error()`:
#> ! NotImplemented: Function 'subtract_checked' has no kernel matching input types (array[date32[day]], array[date32[day]])
# probably not much we can do with dates without first pulling the data into R
andr$date_df %>%
mutate(year = lubridate::year(date1)) %>%
collect()
#> Error: Expression lubridate::year(date1) not supported in Arrow
#> Call collect() first to pull data into R.
andr$date_df %>%
mutate(min_date = min(date1)) %>%
collect()
#> Error: window functions not currently supported in Arrow
#> Call collect() first to pull data into R.
# interestingly this works
andr$date_df %>%
mutate(date_number = as.integer(date1)) %>%
collect()
#> # A tibble: 100,000 × 3
#> date1 date2 date_number
#> * <date> <date> <int>
#> 1 2022-01-27 2021-08-27 19019
#> 2 2021-05-27 2021-07-27 18774
#> 3 2021-05-27 2021-11-27 18774
#> 4 2021-10-27 2022-04-27 18927
#> 5 2021-05-27 2021-07-27 18774
#> 6 2021-10-27 2021-07-27 18927
#> 7 2022-02-27 2021-06-27 19050
#> 8 2021-11-27 2021-10-27 18958
#> 9 2022-01-27 2022-01-27 19019
#> 10 2022-03-27 2022-01-27 19078
#> # … with 99,990 more rows
andr$date_df %>%
summarise(min_date_number = min(as.integer(date1))) %>%
collect()
#> # A tibble: 1 × 1
#> min_date_number
#> <int>
#> 1 18774
# window functions are not supported
andr$flights %>%
group_by(carrier) %>%
mutate(mean_delay = mean(dep_delay))
#> Error: window functions not currently supported in Arrow
#> Call collect() first to pull data into R.
# can we create a modified copy of an arrow table without pulling the data into R?
andr$flights %>%
semi_join(andr$airlines[1:4,], by = "carrier") %>%
write_dataset("flights2", format = "feather")
andr$flights2 <- open_dataset("flights2", format = "feather")
andr$flights2 %>%
collect()
#> # A tibble: 106,538 × 19
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> <int> <int> <int> <int> <int> <dbl> <int> <int>
#> 1 2013 1 1 542 540 2 923 850
#> 2 2013 1 1 544 545 -1 1004 1022
#> 3 2013 1 1 555 600 -5 913 854
#> 4 2013 1 1 557 600 -3 838 846
#> 5 2013 1 1 558 600 -2 753 745
#> 6 2013 1 1 558 600 -2 849 851
#> 7 2013 1 1 558 600 -2 853 856
#> 8 2013 1 1 559 600 -1 941 910
#> 9 2013 1 1 559 559 0 702 706
#> 10 2013 1 1 600 600 0 851 858
#> # … with 106,528 more rows, and 11 more variables: arr_delay <dbl>,
#> # carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
#> # air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>
# check memory usage
bench::mark(andr$flights %>% write_dataset("flights2", format = "feather"),
andr$flights %>% collect() %>% write_dataset("flights2", format = "feather"),
check = FALSE, filter_gc = FALSE)
#> # A tibble: 2 × 6
#> expression
#> <bch:expr>
#> 1 andr$flights %>% write_dataset("flights2", format = "feather")
#> 2 andr$flights %>% collect() %>% write_dataset("flights2", format = "feather")
#> # … with 5 more variables: min <bch:tm>, median <bch:tm>, `itr/sec` <dbl>,
#> # mem_alloc <bch:byt>, `gc/sec` <dbl>
# seems like joins are happening outside of R
bench::mark(andr$flights %>% inner_join(andr$airlines, by = "carrier") %>% collect(),
flights %>% inner_join(airlines, by = "carrier") %>% collect(),
check = FALSE, filter_gc = FALSE)
#> # A tibble: 2 × 6
#> expression
#> <bch:expr>
#> 1 andr$flights %>% inner_join(andr$airlines, by = "carrier") %>% collect()
#> 2 flights %>% inner_join(airlines, by = "carrier") %>% collect()
#> # … with 5 more variables: min <bch:tm>, median <bch:tm>, `itr/sec` <dbl>,
#> # mem_alloc <bch:byt>, `gc/sec` <dbl> Created on 2022-04-27 by the reprex package (v2.0.1) |
I haven't tried it but DuckDB can be used for SQL access to Apache Arrow tables: |
I don't think we need the SQL at all. Andromeda was set up to not rely on SQL, just on The most complicated uses of Andromeda I'm aware of are this code in FeatureExtraction and this code in CohortMethod. I'm not aware of any datediffs needed. @jreps : what are your requirements for Andromeda operations? |
I started working on an Andromeda implementation that uses arrow (no SQL or duckdb). I have not checked in my code yet. I'm currently stuck on this function. I switched to using S3. The current implementation has S4 methods but creates the object by assigning the class attribute only and doesn't call
Seems like I need nextMethod. |
I think I have a the basic idea working for assignment. "[[<-.Andromeda" <- function(x, i, value) {
print("using Andromeda method")
arrow::write_dataset(value, file.path("temp", i), format = "feather")
value <- arrow::open_dataset(file.path("temp", i), format = "feather")
NextMethod()
}
a <- list()
class(a) <- "Andromeda"
a[["cars"]] <- cars
#> [1] "using Andromeda method"
class(a$cars)
#> [1] "FileSystemDataset" "Dataset" "ArrowObject"
#> [4] "R6" Created on 2022-05-02 by the reprex package (v2.0.1) |
There is a problem with loading and saving zero row dataframes. arrow::write_dataset() does nothing with a zero row dataframe. df <- cars[cars$speed > 1e6,]
print(df)
#> [1] speed dist
#> <0 rows> (or 0-length row.names)
arrow::write_dataset(df, here::here("df"), format = "parquet")
d <- arrow::open_dataset(here::here("df"), format = "parquet")
#> Error: IOError: Cannot list directory '.../RtmpvMgShC/reprex-850311752e99-bonny-fox/df'. Detail: [errno 2] No such file or directory Created on 2022-05-03 by the reprex package (v2.0.1) |
I pushed my initial implementation to the arrow branch in case anyone wants to give it a try. Arrow greatly simplifies Andromeda because we only have to deal with a set of files rather than a database (e.g. indexes are not needed anymore). I could use help thinking through how the functions in Operations.R file should be adapted to take advantage of arrow. I'm not sure the current implementations make as much sense when you have partitioned feather files. Perhaps the batches should be the partitions. Maybe we can take advantage of arrow::map_batches() |
Great work @ablack3 ! I tried playing with the new branch. One behavior that is different (arguably better) with the arrow backend is that copies are independent: a <- Andromeda::andromeda()
b <- a
b$cars <- cars
a
# # Andromeda object
# # Physical location: C:\Users\mschuemi.EU\AppData\Local\Temp\Rtmp8IUZjQ\file2c287dc86e5c
#
# Tables:
b
# # Andromeda object
# # Physical location: C:\Users\mschuemi.EU\AppData\Local\Temp\Rtmp8IUZjQ\file2c287dc86e5c
#
# Tables:
# $cars (speed, dist) However, I can't seem to actually access the tables? a <- Andromeda::andromeda(cars = cars)
colnames(a$cars)
# NULL |
thanks for giving it a try! I think you found a bug. Copies are not actually independent. a and b are two different Andromeda objects that point to the same files. The 'path' attribute points to the folder where the feather files are stored. library(Andromeda)
a <- andromeda(cars = cars)
# names is implemented by the arrow package
names(a$cars)
#> [1] "speed" "dist"
# colnames is not implemented by arrow but could be added in Andromeda
colnames(a$cars)
#> NULL
# colnames works if you pull the data into R first
colnames(collect(a$cars))
#> [1] "speed" "dist"
b <- a
attr(a, "path") == attr(b, "path")
#> [1] TRUE
list.dirs(attr(a, "path"), full.names = F)
#> [1] "" "cars"
list.dirs(attr(a, "path"), full.names = F)
#> [1] "" "cars"
b$cars <- NULL
list.dirs(attr(a, "path"), full.names = F)
#> [1] ""
list.dirs(attr(a, "path"), full.names = F)
#> [1] ""
a
#> # Andromeda object
#> # Physical location: /var/folders/xx/01v98b6546ldnm1rg1_bvk000000gn/T//RtmpmPXb7A/file1274499a75dd
#>
#> Tables:
#> $cars (speed, dist)
a$cars %>% collect()
#> Error in `handle_csv_read_error()`:
#> ! IOError: Failed to open local file '/private/var/folders/xx/01v98b6546ldnm1rg1_bvk000000gn/T/RtmpmPXb7A/file1274499a75dd/cars/part-0.feather'. Detail: [errno 2] No such file or directory Created on 2022-05-10 by the reprex package (v2.0.1) |
This is tricky. I fixed 'names.Andromeda' but if someone copies an an Andromeda object I can't really update all the other andromeda objects that refer to the same file location. So perhaps it would be better to create a copy every time an andromeda object is modified? However this seems like potentially a lot of unnecessary copying of large datasets. Maybe another option is to use reference classes library(Andromeda)
a <- andromeda(cars = cars)
b <- a
names(a)
#> [1] "cars"
# remove the cars file
b$cars <- NULL
# the new state is reflected in both a and b
a
#> # Andromeda object
#> # Physical location: /var/folders/xx/01v98b6546ldnm1rg1_bvk000000gn/T//RtmpwFFo6h/file1182f64ac99ff
#>
#> Tables:
b
#> # Andromeda object
#> # Physical location: /var/folders/xx/01v98b6546ldnm1rg1_bvk000000gn/T//RtmpwFFo6h/file1182f64ac99ff
#>
#> Tables:
names(a)
#> character(0)
# however a still contains a reference to the file that no longer exists
length(a)
#> [1] 1
length(b)
#> [1] 0
a$cars
#> FileSystemDataset with 1 Feather file
#> speed: double
#> dist: double
#>
#> See $metadata for additional Schema metadata
a$cars %>% collect()
#> Error in `handle_csv_read_error()`:
#> ! IOError: Failed to open local file '/private/var/folders/xx/01v98b6546ldnm1rg1_bvk000000gn/T/RtmpwFFo6h/file1182f64ac99ff/cars/part-0.feather'. Detail: [errno 2] No such file or directory Created on 2022-05-11 by the reprex package (v2.0.1) |
UPDATE: I think I have a solution for copies of the andromeda object by syncing the dataset reference with the existing files every time any method is called. The arrow branch can now handle 0 row dataframes but if a dplyr query produces a result that has zero rows which is then written to andromeda the assignment fails (NULL is assigned). so that is still an issue. I created an issue about it on the arrow Jira https://issues.apache.org/jira/browse/ARROW-16575. |
I think the arrow branch is ready for some alpha testing and some early user feedback. If this seems like a significant improvement and the way to go I can work on implementing the apply functions. I should also create some meaningful performance tests. |
I think this could work for appending. Feather files don't seem to support appending but they can be read an written without copying (not sure how this works but it is pretty cool). library(dplyr)
library(arrow)
library(wakefield)
# create a dataframe with 10 million rows
df <- r_data_theme(n = 1e7, "survey")
df
#> # A tibble: 10,000,000 × 11
#> ID Item_1 Item_2 Item_3 Item_4 Item_5 Item_6 Item_7 Item_8 Item_9 Item_10
#> <chr> <int> <int> <int> <int> <int> <int> <int> <int> <int> <int>
#> 1 00001 1 4 2 4 5 2 3 3 2 5
#> 2 00002 5 2 4 3 5 1 5 4 1 5
#> 3 00003 3 3 2 4 2 4 1 5 2 3
#> 4 00004 3 4 4 4 1 5 2 1 2 4
#> 5 00005 4 4 2 1 5 5 3 3 4 4
#> 6 00006 2 4 5 4 2 4 1 3 5 3
#> 7 00007 3 5 5 3 2 4 4 2 5 3
#> 8 00008 1 1 5 3 3 2 3 3 4 4
#> 9 00009 4 4 2 2 2 3 3 2 4 3
#> 10 00010 4 3 5 1 2 5 3 4 2 3
#> # … with 9,999,990 more rows
# save the data in arrow format and open a link to it in R
system.time({
write_dataset(df, "df", "arrow")
d <- open_dataset("df", format = "arrow")
})
#> user system elapsed
#> 0.594 0.155 0.792
# append to an existing file system dataset
system.time({
s <- Scanner$create(d)
write_dataset(concat_tables(s$ToTable(), arrow_table(df)), "df", format = "arrow")
d <- open_dataset("df", format = "arrow")
})
#> user system elapsed
#> 0.575 0.377 0.940
nrow(d)
#> [1] 20000000
# remove the file
unlink("df", recursive = T) Created on 2022-05-18 by the reprex package (v2.0.1) Kind of crazy how fast arrow read/writes are. |
Hi @ablack3, I did some testing. Looks very good. I could run the tidyCovariate function on the new andromeda object by changing one word (one However the reasons the checks are failing are because FeatureExtraction has it's own S4 class covariateData which inherits from the Andromeda class. Now that you changed Andromeda to an S3 maybe this inheritance is broken? At least running the following used to give true but now gives false: > covData <- FeatureExtraction:::createEmptyCovariateData(cohortId=1, aggregated=F, temporal=F)
> inherits(covData, 'Andromeda')
[1] FALSE Also another of the test |
So I think there are a couple next steps for this issue (which I think will solve all of the currently open issues on Andromeda).
|
Could you explain a bit more about what you mean by (2) ('Asses if actually...')? The |
With regards to Also @ablack3 I see a possible issue with your appendToTable implementation. By converting to an arrow table I believe you are loading everything in memory. Did you try this with data bigger than memory? |
I want to check that all of the remaining functions are necessary and need to be implemented on arrow of is any of them can be deprecated. The remaining functions are batchApply and possibly groupApply functionality could be covered by map_batches. One problem with map_batches is that "This is experimental and not recommended for production use." Should we use map_batches? Is map_batches a drop in replacement for batchApply?
But batchApply does read all the data into R, just not all at once. Possibly a dumb question but how does this save space? If I have 10GB of data and read it in 1GB at a time don't I still have to allocate 10GB of RAM? Is the limitation the size of a single object (e.g. I can create 10 object of 1GB each but not a single object of 10GB). Or perhaps garbage collection is running in between reads?
I think I'm lacking a good mental model for how arrow works. Where, when, and how is the data processing actually done? I understand that arrow can only read in the data that is necessary but aggregation is actually performed in R when collect is called right? There is an arrow course at the upcoming useR conference but unfortunately it is full. :/ So one thing I did learn is that arrow supports "zero copy reads" so apparently even though I convert the "Dataset" into an arrow table it is somehow read into memory without being read into memory. Black magic I guess?
So it looks like reading in an Arrow table does not require much memory. But I did notice that RStudio reported the "memory used by session" increased to about 2 GB. Aside: The torch package is on CRAN and the tarball is 42 megabytes. I thought CRAN had a size limitation of 5MB. Sorry for all the questions and thanks for the input! |
The idea of
I prefer we wrap |
+1 for wrapping @ablack3 I believe the arrow table is an in memory object. I think what you are measuring with I did some experiments with the nyc_taxi dataset referenced on the arrow website. After some initial troubles where I had to exclude some columns that were giving me issues I managed to crash my rsession by calling I did come up with a Linux specific way of measuring the peak memory usage by my rsession process . It's option number 3 from here. Then I can run the emulated appendToTable process on the nyc-taxi dataset: library(dplyr)
library(arrow)
# some of the columns in the dataset were giving me errors, something about
# strings and null, so I made a new dataset without string or null columns
# ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))
#
# write_dataset(ds %>% select(-c(vendor_id, store_and_fwd_flag,
# payment_type, rate_code_id)),
# path,
# partitioning = c('year', "month"))
path <- './new_taxi/'
new_ds <- open_dataset(path)
subset <- new_ds %>% filter(year==2018) %>% filter(month==1) %>% compute()
scanner <- Scanner$create(new_ds %>% filter(year==2017))
table <- scanner$ToTable()
arrow::concat_tables(subset, table) This results in about 9 GiB of peak memory use. On the other hand I made an alternative implementation which dumps the dataframe to be appended to a file and then creates a new dataset from the list of files of the current dataset and the dataframe file. library(dplyr)
library(arrow)
path <- './new_taxi/'
new_ds <- open_dataset(path)
subset <- new_ds %>% filter(year==2018) %>% filter(month==1) %>% compute()
tempFile <- tempfile()
write_parquet(subset, tempFile)
appended_ds <- open_dataset(c(new_ds$files, tempFile)) This give me about 1 GiB peak memory usage of the process. the |
Thanks for the experiments. I'll implement batchApply. I think my implementation was inspired by Wes McKinney's comment here: |
I did try this today as well with feather files instead of parquet with the same results. I don't think zero-copy means the data is still on disk. I think it means that the data is moved to the destination without any unnecessary copies. See for example this. |
Hi @ablack3, looking forward to this and I think this is great job. The new implementation of arrow creates a json file to store attributes instead of the previously rds format. I believe with a small patch to identify the specification format and read it within |
Good point @solis9753. Thinking through it a bit more, both the attribute format and the data table format have changed so I think you're suggesting that if loadAndromeda were used on an old format object it would read the SQLite database and rdf object and convert it to arrow-feather files (the new format). Is that right? |
That's right. And yes on a second look the data table format has to be read correctly as well. |
Ok yea that seems like it might be a good idea. Can I ask a question - How long do you store saved Andromeda objects? Are you using them for permanent storage of results? |
Not so experienced, but I definitely keep them until publication. However, I am not sure anymore it is a good idea. |
To allow for a smooth transition, would it be possible to first release an Andromeda version with the old backend, but already having the |
On Windows, a <- Andromeda::andromeda(cars = cars)
x <- collect(a$cars)
close(a)
# Attempt to remove andromeda file unsuccessful. And indeed the andromeda temp folder still shows the file exists. |
Just documenting some weird behavior here when renaming columns on a zero-row table: # When there's data, it's fine:
a <- Andromeda::andromeda(cars = filter(cars, speed > 1))
a$cars <- dplyr::rename_with(a$cars, toupper)
# When there's no data, we get an error message:
a <- Andromeda::andromeda(cars = filter(cars, speed > 1000))
a$cars <- dplyr::rename_with(a$cars, toupper)
# Error: IOError: Cannot list directory 'D:/temp/Rtemp/RtmpM3Ubhh/temp3078133b153c'. Detail: [Windows error 3] The system cannot find the path specified. This seems like an arrow issue. |
Or maybe not. This works fine too: a <- Andromeda::andromeda(cars = filter(cars, speed > 1000))
a$cars2 <- dplyr::rename_with(a$cars, toupper) So maybe it's the temp copy we make when replacing a table slot in an Andromeda object? |
It seems Andromeda is still not able to remove files sometimes on windows. This happens sporadically and I haven't been able to reproduce it without using the PLP (arrow_S4 branch of Plp, FeatureExtraction and Andromeda needed). I've tried putting a browser in the Note the reprex below fails in different places, sometimes inside the Reprex: library(arrow)
#>
#> Attaching package: 'arrow'
#> The following object is masked from 'package:utils':
#>
#> timestamp
library(PatientLevelPrediction) # branch arrow_S4 of plp, featureExtraction and Andromeda
data("plpDataSimulationProfile")
for (i in 1:100) {
print(i)
# simulate some data
plpData <- PatientLevelPrediction::simulatePlpData(plpDataSimulationProfile,
n=100)
plpData$covariateData$covariateRef <- NULL
}
#> [1] 1
#> Generating covariates
#> Generating cohorts
#> Generating outcomes
#> Error in `removeTableIfExists()` at Andromeda/R/Object.R:255:4:
#> ! Removal of Andromeda dataset coefficients failed.
#> Backtrace:
#> ▆
#> 1. └─PatientLevelPrediction::simulatePlpData(...)
#> 2. └─PatientLevelPrediction:::predictCyclopsType(...) at PatientLevelPrediction/R/Simulation.R:138:4
#> 3. ├─methods (local) `$<-`(`*tmp*`, coefficients, value = `<NULL>`) at PatientLevelPrediction/R/Simulation.R:138:4
#> 4. └─Andromeda (local) `$<-`(`*tmp*`, coefficients, value = `<NULL>`)
#> 5. ├─methods (local) `[[<-`(`*tmp*`, name, value = `<NULL>`) at Andromeda/R/Object.R:225:2
#> 6. └─Andromeda (local) `[[<-`(`*tmp*`, name, value = `<NULL>`)
#> 7. └─Andromeda (local) .local(x, i, ..., value)
#> 8. └─Andromeda (local) removeTableIfExists(x, i) at Andromeda/R/Object.R:255:4
#> 9. └─rlang::abort(paste("Removal of Andromeda dataset", i, "failed.")) at Andromeda/R/Object.R:247:8 Created on 2022-12-05 with reprex v2.0.2 |
Yes of course. I'm stumped on the file removal issue. Someone suggested "And at least it should be easy for a R developer to dump logs on C++ file opens and closes, and see what happens exactly here." https://issues.apache.org/jira/browse/ARROW-16421. I'm not sure how to do that. |
I too have no clue. I'm not even able to compile the arrow package on Windows. When I hit Install I get
which likely comes from here, and causes all sorts of downstream compile issues. Have you been able to compile it? |
(Figured it out: you need to check out a release tag (e.g. |
Just as a comment: I have read this thread because I was also interested in handling large out of memory health data. My test for some usually quite demanding group_by, joins and summarize function is that currently duckdb is factor 4-10 times faster than rsqlite and gets the same results while arrow alone cannot correctly handle group_by in combination with window functions and is not consistently throwing errors. So I decided to go for a local duckdb in combination with arkdb to get data in and out. |
Have add a couple of limitations to my previous comment. My previous speed comparisons were based on artificial data and simple joins and sorting operations. Now I have tested a few more real life examples with actual data and frequent operations (checking for unique rows, joining, group_by and counts). |
A small update regarding the file lock issue on windows. In the upcoming arrow version (11) there have been added
So in summary I think these file lock issues can be solved by using the new methods, no |
Thanks for the information @egillax! I guess I'll try calling unlink and if that fails use the |
Both of these code snippets are crashing R.
I'm using arrow 10.0.1 on R 4.2. |
Would it be possible to add support for Unrelated, when I use
Apart from the warning being annoying, the message means we'll need to add |
dimnames.ArrowObject <- function(x, do.NULL = TRUE, prefix = "col") {
return(list(NULL, names(x)))
} |
I'm still seeing Andromeda temp objects not being cleaned up on the the disk. Did you test these get cleaned up eventually? |
That |
isAndromedaTable has been added. It's just this
I can't easily reproduce the file cleanup problem or write a test for it to make sure my code fixes it. A test like that would be really helpful. I made an attempt to fix it on the develop branch. |
Arrow tables can also inherit from |
Yes I can by end of the day Friday if not before. |
Well that deadline came and went. Working on it this today. |
I think this is complete. Andromeda 0.6.3 is on CRAN https://cran.rstudio.com/web/packages/Andromeda/index.html |
Great! Could you also merge 0.6.3 into main, just to avoid confusion? |
While testing FeatureExtraction with the
|
Nevermind, I (again) forgot how S4 inheritance works. After installing the new version of Andromeda, I forgot to remove and reinstall FeatureExctraction, so CovariateData was still inheriting from the old version. Works good now |
Hello everyone,
Recently I've been experimenting with using apache arrow for out of memory computation instead of andromeda with rsqlite. The speedups I'm getting are quite significant. Arrow can use dplyr verbs so the code changes required are minimal. I've already implemented it for a regular PLP workflow and tested using simulated data (using simulatePlpData) for 10e3, 100e3 and 1e6 cohort sizes (my fork of PLP is here and of Cyclops here). There's an example script in extras/testArrow.R
Following numbers are with all default settings and Lasso, I used the same simulated dataset for each comparison and same splitSeed.
10e3 - Normal PLP: 6.7 minutes, Arrow PLP: 21.5 seconds - a speedup of 19x
100e3 - Normal PLP: 58.5 minutes, Arrow PLP: 5.2 minutes - a speedup of 11x
1e6 - Normal PLP: 18.9 hours, Arrow PLP: 2.5 hours - a speedup of 7.6
My suggestion would be to add arrow as a backend. It should be quite simple. The function I used to create the arrow dataset is defined here. I also have a function to convert andromeda tables to arrow here.
I also saw that there has been some duckdb experimentation/discussion here. With adding arrow you'd get duckdb for free since there is tight integration and near copyless conversion between the two formats. So for example you can do something like:
I've not tested this though since everything I had do do was already available in arrow.
Tagging various people for visibility and people I have discussed this with at some point.
@ablack3 @schuemie @jreps @rekkasa @tomseinen @lhjohn
Regards
Egill Fridgeirsson
The text was updated successfully, but these errors were encountered: