Skip to content

Commit

Permalink
Merge yaib configs (#12)
Browse files Browse the repository at this point in the history
* define SIC data structure

* add SIC to auto attach

* add SIC loading helpers

* add callback hooks to postprocess tbls on import

* add callback to deserialise sicdb data_float_h

* add missing tbl_callback function

* add sic_itm inspired by hrd_itm

* adjust data_float_h config to recent changes

* add hr and crea as examples for sicdb

* add sex and death concepts for sic

* add vitals, labs, height, and weight concepts for sic

* add age and los_icu concepts

* add most medication concepts for sic

* fix preproc for data_float_h

some values are only taken once during the hour and thus have a cnt=1 and rawdata=NA. The actual data is stored in Val, which otherwise holds the average. Since after expansion, rawdata is the main data field, the value from Val needs to be moved to rawdata in this case.

* add OMR to miiv

* add miiv omr

* load_concepts() concepts arg doc fix

* load_concepts.integer() src NULL fix

* Fix sic config

* Properly support full rawdata found in sic

* Remove print

* Add utility functions proposed by `prockenschaub` here: https://github.com/eth-mds/ricu/pull/30/files

* Fix configs for `sic` based on `prockenschaub`

* Fix `sic` configs based on https://github.com/prockenschaub/ricu-package/tree/sicdb

* Remove prints and use ricu msg

* Remove redundant `report_probolems`

* Add prints and tempdir arg

* Cleanup prints

* Fix blood_gas config

* Fix sic table config

* Use finer resolution rawdata where available

* Pass tbl callback correctly

* Fix missing callback application

* Apply callback before split_write

* Config updates:
- Fix sic bugs
- Slack temp range

* Fix configs

* Fix callback

* Start to integrate YAIB configs

* Add `outcome` and `output` configs

* Use `apply_map` for `sic` `sex`

* Add `vitals` and `respiratory`

* Add callbacks and `sic` support for `urine_rate`

* Add `death_icu` for `sic`

* Support `sao2`, `spo2` and combination in `o2sat`

* Add empty `phn_dur`

* Add circ callbacks

* Fix `susp_inf_abx_cont` calls

* Add `patient_id`

---------

Co-authored-by: prockenschaub <[email protected]>
Co-authored-by: Drago <[email protected]>
  • Loading branch information
3 people authored Apr 3, 2024
1 parent 00424ee commit 0f8362d
Show file tree
Hide file tree
Showing 15 changed files with 1,376 additions and 86 deletions.
183 changes: 183 additions & 0 deletions R/callback-circ-fail.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# ==============================================================================
#
# Callback for circulatory failure
# Implementation by `prockenschaub`
# from: https://github.com/prockenschaub/icuDG-preprocessing/blob/main/R/callback-circ-fail.R
#
# ==============================================================================

circ_fail <- function (..., lact_thresh = 2, map_thresh = 65,
fill_for = hours(3L), cond_win = mins(45L),
cond_dur = mins(30L), keep_components = FALSE,
interval = NULL, by_ref = FALSE) {
cnc <- c("map", "cf_treat", "lact")
res <- ricu:::collect_dots(cnc, interval, ...)
assert_that(lact_thresh >= 0, map_thresh >= 0,
ricu:::is_interval(fill_for), ricu:::is_interval(cond_win),
ricu:::is_interval(cond_dur), is.flag(keep_components),
units(cond_win) == units(cond_dur), cond_dur < cond_win)

map <- res[["map"]]
cf_treat <- res[["cf_treat"]]
lact <- res[["lact"]]

if (!by_ref) {
map <- copy(map)
cf_treat <- copy(cf_treat)
lact <- copy(lact)
}

id <- id_vars(map)
step_size <- interval(map)

assert_that(units(step_size) == units(cond_win))

p <- as.numeric(cond_dur) / as.numeric(cond_win)
steps <- as.integer(cond_win / as.numeric(step_size))

# Interpolate lactate values
map_times <- map[, .SD, .SDcols = meta_vars(map)]
map_limits <- ricu::collapse(map_times, as_win_tbl = FALSE)
grid_times <- fill_gaps(map_times, map_limits)
lact <- interpolate_lactate(lact, grid_times, lact_thresh, fill_for)

# Combine MAP, vasopress/inotrope meds, and lactate to define cf
res <- merge_lst(list(map, cf_treat, lact))

.rmean <- function(x) frollmean(x, steps, align = "center")
.thresh <- function(x, op, val) fifelse(!is.na(x), op(x, val), FALSE)

res[, miss := pmax(.rmean(is.na(map)), .rmean(is.na(lact))) == 1, by = c(id)]
res[, low_map := .rmean(.thresh(map, `<`, map_thresh)), by = c(id)]
res[, treated := .rmean(!is.na(cf_treat)), by = c(id)]
res[, high_lact := .rmean(.thresh(lact, `>`, lact_thresh)), by = c(id)]
res[, circ_fail := fcase(
miss, NA,
(low_map <= p | treated <= p) & high_lact <= p, FALSE,
(low_map > p | treated > p) & high_lact > p, TRUE,
default = NA
)]

cols_rm <- c("miss", "low_map", "treated", "high_lact")
if (!keep_components) {
cols_rm <- c(cols_rm, "map", "cf_treat", "lact")
}
res <- rm_cols(res, cols_rm, skip_absent = TRUE, by_ref = TRUE)

res
}

approx <- function(x, y = NULL, xout, ...) {
if (length(x) == 1) {
return(list(x = x, y = y))
}
stats::approx(x, y, xout, ...)
}

# TODO: bring in line with ricu::replace_na
replace_na <- function(x, val, type = "const", max_n = Inf, ...) {
seq_num <- rleid(is.na(x))
imp_cnt <- sapply(split(x, seq_num), function(x) seq_along(x))

if (type == "nocb") {
imp_cnt <- Map(rev, imp_cnt)
}

imp_cnt <- Reduce(c, imp_cnt)

if (identical(type, "const")) {
repl <- data.table::nafill(x, type, val, ...)
}
else {
repl <- data.table::nafill(x, type, ...)
}

fifelse(imp_cnt <= max_n, repl, x)
}

interpolate_lactate <- function(df, grid_times, thresh, fill_win = hours(3L)) {
id <- id_vars(df)
ind <- index_var(df)
val <- data_var(df)

step_size <- interval(df)
fill_win <- ricu:::re_time(fill_win, step_size)

df[, abn := .SD[[val]] >= thresh] # TODO: generalise to allow for < thresh
df[, tdiff := c(diff(.SD[[ind]]), NA_real_), by = c(id)]

# Linearly interpolate everything
rep_to_interpol_grid <- function(x, d) {
rep(x, times = fifelse(is.na(d), 1., as.numeric(d) / as.numeric(step_size)))
}

int_lin_cond <- expr(abn == shift(abn, type = "lead") | tdiff < 2 * fill_win)
df[, int_lin := eval(int_lin_cond), by = c(id)]

df <- df[, c(
# Expand all existing columns to match length of interpolation
lapply(.SD, rep_to_interpol_grid, d = tdiff),
# Linearly interpolate every observation (for speed, ffill/bfill later)
approx(.SD[[ind]], .SD[[val]], xout = seq(as.numeric(.SD[[ind]][1]), as.numeric(.SD[[ind]][.N]), by = as.numeric(step_size)))
),
by = c(id)
]

df[x == get(ind), int_lin := TRUE]
df[int_lin == FALSE , y := NA_real_]

# Replace linear with forward/backward fill if consecutive values crossed
# thresh and are more than `fill_win` apart
max_n <- as.numeric(fill_win) / as.numeric(step_size)
df[, y := fifelse(!int_lin, replace_na(y, type = "locf", max_n = max_n), y), by = c(id)]
df[, y := fifelse(!int_lin, replace_na(y, type = "nocb", max_n = max_n), y), by = c(id)]

# Clean up table
df[, c(ind) := as.difftime(x, units = attr(step_size, "units"))]
df[, c(val) := y]
df <- df[, .SD, .SDcols = c(id, ind, val, "abn")]

# Forward/backward fill the first and last observation over grid
df <- merge(df, grid_times, all = TRUE)
df[, abn_int := replace_na(as.integer(abn), type = "nocb"), by = c(id)]
df[, c(val) := fcase(
!is.na(abn), .SD[[val]], # If not first, do nothing
abn_int == 0, replace_na(.SD[[val]], type = "nocb"), # If first val normal, infinite bfill
abn_int == 1, replace_na(.SD[[val]], type = "nocb", max_n = max_n), # If first val abnormal, bfill `max_n` steps
rep(TRUE, .N), .SD[[val]]
),
by = c(id)
]
df[, abn_int := replace_na(as.integer(abn), type = "locf"), by = c(id)]
df[, c(val) := fcase(
!is.na(abn), .SD[[val]], # If not last, do nothing
abn_int == 0, replace_na(.SD[[val]], type = "locf"), # If last val normal, infinite ffill
abn_int == 1, replace_na(.SD[[val]], type = "locf", max_n = max_n), # If last val abnormal, ffill `max_n` steps
rep(TRUE, .N), .SD[[val]]
),
by = c(id)
]

df[, .SD, .SDcols = c(id, ind, val)]
}

cf_treat <- function(..., interval = NULL) {

cnc <- c("epi_dur", "norepi_dur", "dopa_dur", "dobu_dur", "adh_dur", "phn_dur",
"levo_dur", "milrin_dur", "teophyllin_dur")
res <- ricu:::collect_dots(cnc, interval, ..., merge_dat = TRUE)
unt <- ricu::time_unit(res)

res <- res[, c(cnc) := lapply(.SD, as.difftime, units = unt), .SDcols = cnc]
res <- res[, c("cf_treat", cnc) := list(pmax(
get("dopa_dur"), get("norepi_dur"), get("dobu_dur"), get("epi_dur"),
get("adh_dur"), get("phn_dur"), get("levo_dur"), get("milrin_dur"), get("teophyllin_dur"),
na.rm = TRUE), NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)
]

res <- expand(res, index_var(res), "cf_treat")
res <- unique(res)
res <- res[, c("cf_treat") := TRUE]

res
}
24 changes: 24 additions & 0 deletions R/callback-cncpt.R
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,30 @@ bmi <- function(..., interval = NULL) {
res
}

#' @rdname callback_cncpt
#' @export
o2sat_lab_first <- function(..., interval = NULL) {

# Pulse Oxymetry: `spo2`
# Arterial Blood Gas: `sao2`
cnc <- c("sao2", "spo2")
res <- collect_dots(cnc, interval, ..., merge_dat = TRUE)

# default to sao2 (arterial blood gas)
res <- res[, o2sat := sao2]

# if sao2 is missing, use spo2 (pulse oxymetry)
res <- res[is.na(sao2), o2sat := spo2]

# Filter out values below 50 and above 100
res <- filter_bounds(res, "o2sat", 50, 100)

# remove sao2 and spo2 columns
res <- rm_cols(res, cnc, by_ref = TRUE)

res
}

#' @rdname callback_cncpt
#' @export
norepi_equiv <- function(..., interval = NULL) {
Expand Down
63 changes: 63 additions & 0 deletions R/callback-icu-mortality.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# ==============================================================================
#
# Refined ICU mortality callbacks
#
# based on YAIB: https://github.com/rvandewater/YAIB-cohorts/tree/main/ricu-extensions/callbacks
# ==============================================================================

aumc_death_icu <- function (x, val_var, death_date, ...) {
# Identify ICU mortality in AUMCdb via the discharge destination field. Use
# discharge time from the ICU as death time, as date of death sometimes only
# contain the date part and no time (i.e., 00:00:00).
#
# See discussions here:
# https://github.com/AmsterdamUMC/AmsterdamUMCdb/issues/56
# https://github.com/AmsterdamUMC/AmsterdamUMCdb/issues/61
idx <- index_var(x)
x[, `:=`(c(val_var), ricu:::is_true(get(val_var) == "Overleden"))]
x[get(death_date) - get(idx) > hours(24L), `:=`(c(val_var), FALSE)]
x
}

hirid_death_icu <- function (x, val_var, sub_var, env, ...) {
dis <- "discharge_status"
idx <- index_var(x)
idc <- id_vars(x)
res <- dt_gforce(x, "last", by = idc, vars = idx)
tmp <- load_id(env[["general"]], cols = dis)
res <- merge(res, tmp[ricu:::is_true(get(dis) == "dead"), ])
res <- res[, `:=`(c(val_var, dis), list(TRUE, NULL))]
res
}

mi_death_icu <- function(x, transfers, icu_wards, ...) {
# Look for all hospital deaths in which the last careunit was an ICU.
# See discussion here: https://github.com/MIT-LCP/mimic-code/issues/874
id <- id_vars(transfers)
lead <- function(x) data.table:::shift(x, type = "lead")

transfers[, is_last := ricu:::is_true(lead(eventtype) == "discharge")]
last_ward <- transfers[, .(ward = ward[is_last]), by = c(id)]
last_ward[, "is_icu" := .(ricu:::is_true(ward %in% icu_wards))]

dat <- data_var(x)
x[(last_ward[is_icu == FALSE]), c(dat) := 0L]
x[, c(dat) := ricu:::is_true(get(dat) == 1L)]
x
}

mimic_death_icu <- function(x, env, ...){
icu_wards <- sort(unique(env[["icustays"]]$first_careunit))
transfers <- load_ts(env[["transfers"]], id_var = "hadm_id", index_var = "intime", interval = mins(1L))
transfers <- change_id(transfers, "icustay", as_src_cfg(env), id_type = TRUE)
rename_cols(transfers, "ward", "curr_careunit", by_ref = TRUE)
mi_death_icu(x, transfers, icu_wards, ...)
}

miiv_death_icu <- function(x, env, ...){
icu_wards <- sort(unique(env[["icustays"]]$first_careunit))
transfers <- load_ts(env[["transfers"]], index_var = "intime")
rename_cols(transfers, "ward", "careunit", by_ref = TRUE)
mi_death_icu(x, transfers, icu_wards, ...)
}

Loading

0 comments on commit 0f8362d

Please sign in to comment.