Source code for gridlib.io.survival_function

import csv
import pathlib
from typing import Dict, Union

import numpy as np

from .. import data_utils


[docs]def write_data_survival_function( path: Union[str, pathlib.Path], data: Dict[str, Dict[str, np.ndarray]] ): """Function writes the survival function data to a csv file. Parameters ---------- path: str or pathlib.Path Path to the location of the file, where to save to. If the file does not exist, it will be created. data: Dict[str, Dict[str, np.ndarray]] Data of the survival function to write to a csv file. The dictionary structure is as follows: { f"{t_tl}": { "time": np.ndarray with the time points, "value": np.ndarray with the survival function values, } } Returns ------- None Raises ------ ValueError If the path suffix does not end with .csv Notes ----- The .csv file with the survival time distributions will looks as follows (without the header): time-lapse 1, value, time-lapse 2, value, ...\n Delta t_1,f1(Delta t_1), Delta t_2, f2(Delta t_2)\n 2*Delta t_1,f1(2*Delta t_1), 2*Delta t_2, f2(2*Delta t_2)\n 3*Delta t_1,f1(3*Delta t_1), 3*Delta t_2, f2(3*Delta t_2)\n ..., ..., ..., ... """ if isinstance(path, str): path = pathlib.Path(path) if path.suffix != ".csv": raise ValueError(f"path should end with .csv, but ends with {path.suffix}") # Make sure the units for all the time-lapse conditions is in seconds # This is needed for sorting of the values data = data_utils.fmt_t_str_data(data) t_tl_all = list(data.keys()) # Sort the tl values from low to high. t_tl_all.sort() max_length = 0 for t_tl in t_tl_all: val = data[t_tl]["time"].shape[0] if val > max_length: max_length = val rows_to_write = [] for i in range(max_length): row = [] for t_tl in t_tl_all: row.append(data[t_tl]["time"][i] if i < data[t_tl]["time"].shape[0] else "") row.append( data[t_tl]["value"][i] if i < data[t_tl]["value"].shape[0] else "" ) row = tuple(row) rows_to_write.append(row) with open(path, "w", newline="") as csvfile: writer = csv.writer(csvfile) for row in rows_to_write: writer.writerow(row) print(f"Writing data to {path} is finished.")
[docs]def read_data_survival_function( path: Union[str, pathlib.Path] ) -> Dict[str, Dict[str, np.ndarray]]: """Function reads survival function from csv file. Parameters ---------- path: str or pathlib.Path Path to the location of the .csv file containing the survival time distributions. Returns ------- data: Dict[str, Dict[str, np.ndarray]] Data of the survival function to write to a csv file. The dictionary structure is as follows: { f"{t_tl}": { "time": np.ndarray with the time points, "value": np.ndarray with the survival function values, } } Raises ------ ValueError If the path suffix does not end with .csv FileNotFoundError If the file is not present at path. Notes ----- The .csv file with the surivival time distributions should look like this (without the header): time-lapse 1, value, time-lapse 2, value, ...\n Delta t_1,f1(Delta t_1), Delta t_2, f2(Delta t_2)\n 2*Delta t_1,f1(2*Delta t_1), 2*Delta t_2, f2(2*Delta t_2)\n 3*Delta t_1,f1(3*Delta t_1), 3*Delta t_2, f2(3*Delta t_2)\n ..., ..., ..., ... """ if isinstance(path, str): path = pathlib.Path(path) if path.suffix != ".csv": raise ValueError(f"path should end with .csv, but ends with {path.suffix}") data = dict() with open(path, "r", newline="") as csvfile: reader = csv.reader(csvfile) # Set-up the data dictionary row1 = next(reader) # read the first row row2 = next(reader) # read the second row # determine the number of tl conditions, every time-lapse condition consist of # two columns, the time column and the value column num_tls = len(row1) // 2 for i in range(num_tls): # Time-lapse condition is determined by calculation the difference # between the first and second time point. time_1 = float(row1[2 * i]) time_2 = float(row2[2 * i]) # Store the initial key time_s = round(time_2 - time_1, 5) t_tl = f"{time_s}s" data[t_tl] = dict() # Format the keys correctly, it is possible that the key due to rounding # was 1.00000s, so now format them to 1s data = data_utils.fmt_t_str_data(data) # Set the pointer back to the start of the file csvfile.seek(0) for row in reader: for i, t_tl in zip(range(num_tls), data.keys()): # Read-out the time value time_s_str = row[(2 * i)] if time_s_str != "": temp = data[t_tl].get("time", []) time_s = round(float(time_s_str), 4) temp.append(time_s) data[t_tl]["time"] = temp # Read out the distribution value val_str = row[(2 * i) + 1] if val_str != "": temp = data[t_tl].get("value", []) # value could be ###.0, so first convert str to float val = int(float(val_str)) temp.append(val) data[t_tl]["value"] = temp # After the lists are completely filled, loop over all the lists and convert # them to arrays for t_tl in data.keys(): for key in data[t_tl].keys(): data[t_tl][key] = np.array(data[t_tl][key]) return data