Module libratools.lbt_utils
The libratools.lbt_utils module includes various utilities and private functions.
Expand source code
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
The libratools.lbt_utils module includes various utilities and private
functions.
"""
import datetime # standard library
import calendar
import statistics # 3rd party packages
import numpy as np
import pandas as pd
from . import lbt_datasets # local imports
__author__ = "Vincent (Vince) J. Straub"
__email__ = "vincejstraub@gmail.com"
__status__ = "Testing"
def count_dropped_frames(x):
"""
Returns number of NaN frames in provided array along.
"""
x_arr = np.array(x)
num_nans = np.count_nonzero(np.isnan(x_arr))
return num_nans
def count_missing_values(df, cols, idxs=False):
"""
Returns the number of rows with missing values for each provided
column of a DataFrame as well as the total number for all columns.
Args:
df (DataFrame): pandas DataFrame.
cols (list): pandas DataFrame columns to check for missing row values.
idxs (bool, default=False): if idxs=True, the index for each row is
also returned.
Returns:
A dict of column name as the key and a tuple of the number of
misssing rows and the index for each row as values.
"""
na_rows = {}
# collect the total number of missing rows and row index
missing_val_idxs = list(df.index[df.isna().any(axis=1)])
if len(missing_val_idxs) > 0:
na_rows['total_nans_across_cols'] = df.isna().values.sum()
else:
na_rows['total_nans_across_cols'] = 0
# collect the number of missing rows and row index for each column
for col in list(cols):
col_name = col + '_nans'
missing_val_idxs = df[df[col].isnull()].index.tolist()
if len(missing_val_idxs) > 0:
na_rows[col_name] = len(missing_val_idxs)
if idxs is True:
na_rows[col_name + '_idxs'] = missing_val_idxs
else:
na_rows[col_name] = 0
return na_rows
def partition_segment(df, time_series, segment_num, cols=['x', 'y'], thresh=5,
unit='seconds', segment_limit=2, data_loss_limit=0.5):
"""
Returns or discards dataset using the following decision rule:
if more than 50% (default value) of recording is in tact in no more
than 2 (default value) continuous segments the dataset is deemed worth using
and is kept. Continuous is defined by default as 'no time interval gap
between consecutive frames greater than 5 seconds (5000 milliseconds)'.
If the dataset is split, a list of dataframes is returned, if discarded,
None is returned.
Args:
df (DataFrame): pandas DataFrame containing time stamps.
time_series (Series): list of values containing time stamps/frame value.
cols (list): pandas DataFrame columns to check for missing row values.
missing_vals (list): number of missing rows for key columns.
thresh (int, default=5): threshold for deciding when to create
new DataFrame based on change in time interval value.
unit (str, default=seconds): time unit for thresh value.
segment_limit (int, default=4): maximum number of continuous blocks.
data_loss_limit (float, default=0.5): threshold that determines maximum
amount of data loss permitted before a dataset is discarded.
Returns:
DataFrames and number of DataFrames as integer.
"""
if unit == 'seconds':
thresh_val = thresh * 1000
elif unit == 'milliseconds':
pass
else:
ValueError("Time unit must be in seconds or milliseconds.")
# compute number of missing vals then drop these rows from a dummy df
nan_percent = (df[cols].isnull().sum() / len(df)).sum()
temp_df = df.copy()
temp_df.dropna(inplace=True)
# compute time change between frames and create new column
temp_df['MillisecsBetweenFRAMES'] = (time_series - time_series.shift())
# append a new DataFrame when time interval value exceeds threshold
temp_dfs = {}
for _, g in temp_df.groupby((temp_df.MillisecsBetweenFRAMES.diff()
> thresh_val).cumsum()):
temp_dfs[_] = g
dfs = {}
# implement decision rule to decide whether to keep dataset
if len(temp_dfs.keys()) > segment_limit and nan_percent <= data_loss_limit:
print(f'Skipped segment {segment_num}, segment limit surpassed.')
return dfs, 0
elif len(temp_dfs.keys()) <= segment_limit and nan_percent > data_loss_limit:
print(f'Skipped segment {segment_num}, data loss limit surpassed.')
return dfs, 0
elif len(temp_dfs.keys()) > segment_limit and nan_percent > data_loss_limit:
print(f'Skipped segment {segment_num}, data loss and segment limit surpassed.')
return dfs, 0
else:
df['MillisecsBetweenFRAMES'] = (time_series-time_series.shift())
# return original DataFrame with nans
for _, g in df.groupby((df.MillisecsBetweenFRAMES.diff() > thresh_val).cumsum()):
dfs[_] = g
# return dfs and number of dfs for indexing
num_dfs = len(dfs.keys())
return dfs, num_dfs
def aggregate_segments(dfs, time_interval=40, save_trajectory=False,
metadata=[''], file_name='', suffix='_processed',
save_msg=True, outdir=''):
"""
Returns DataFrame of aggregated segments, individual DataFrames, optionally
saving combined file to disk using index as reference column to maintain
global FRAME count.
Args:
dataframes (dict): DataFrames of each segment.
time_interval (int, default=40): modal time interval.
save_trajectory (bool, default=True): if save_trajectory=True, aggregated
CSV file is saved to disk.
metadata (list, default=['']): list of strings to prepend to CSV file
if saved to disk.
outdir (str, default='.'): path of directory where to save aggregated
file, default is current working directory.
file_name (str, default=''): file name.
suffix (str, default='_processed'): suffix to add to the end of file
when saving.
Returns:
Dict of aggregated DataFrame and list of metadata
"""
all_dfs = {}
dropped_frames_counter = 0
MillisecsByFPS_counter = 0
# load CSV and append to list with segment file name as new column
for segment in dfs.keys():
# create new local FRAME column
dfs[segment]['data'].insert(
loc=1, column='localFRAME', value=dfs[segment]['data'].index)
# set FRAME as index to maintain cumulative count when concatenating
dfs[segment]['data'].set_index('FRAME', inplace=True)
# create new column which stores video chunk segment number
dfs[segment]['data']['chunk_segment'] = segment
# add last MillisecsByFPS value to counter to maintain cumulative count
if segment == list(dfs.keys())[0]:
last_MillisecsByFPS_val = dfs[segment]['data']['MillisecsByFPS'].iloc[-1]
# add time_interval as MillisecsByFPS starts at 0l
MillisecsByFPS_counter += (last_MillisecsByFPS_val + time_interval)
# add 0 to first row of first segment
dfs[segment]['data'].at[0, 'MillisecsBetweenFRAMES'] = 0
else:
last_MillisecsByFPS_val = dfs[segment]['data']['MillisecsByFPS'].iloc[-1]
# skip first segment when adding MillisecsByFPS_counter
dfs[segment]['data']['MillisecsByFPS'] = dfs[segment]['data']['MillisecsByFPS'] + \
MillisecsByFPS_counter
MillisecsByFPS_counter += (last_MillisecsByFPS_val + time_interval)
# add time_interval to first row of subsequent segments
dfs[segment]['data'].at[0, 'MillisecsBetweenFRAMES'] = time_interval
all_dfs[segment] = {'data': dfs[segment]['data']}
dropped_frames_counter += dfs[segment]['metadata']['dropped_frames']
# aggregate DataFrames and insert global FRAME as a separate column
list_of_dfs = [all_dfs[i]['data'] for i in all_dfs.keys()]
aggregate_df = pd.concat(list_of_dfs, ignore_index=True)
aggregate_df.insert(loc=0, column='FRAME', value=aggregate_df.index)
aggregate_df.rename(columns={'FRAME': 'globalFRAME'}, inplace=True)
# clean metadata by removing comment chars
metadata_clean = [i.split('#')[1].strip() for i in metadata]
# add dropped frames, missing values count and metdata to dict
aggregate_data_metadata_df = {
'data': aggregate_df,
'metadata':{
'source_name': metadata_clean[0],
'source_fps': metadata_clean[1],
'generation_time': metadata_clean[2],
'dropped_frames': dropped_frames_counter},
}
# save aggregated DataFrame to disk as CSV and/or return for preprocessing
if save_trajectory is True:
# drop video chunk number from file name
lbt_datasets.save_trajectory_to_csv(
aggregate_df, metadata=metadata, f_name=file_name,
outdir=outdir, suffix=suffix, save_msg=save_msg)
return aggregate_data_metadata_df
def split_series_on_datestr(df, col, HH='', MM='', SS='', YYYYMMDD=''):
"""
Split pandas.DataFrame into two on the basis of value in a column with
datetime values using 'YYYY-MM-DD HH:MM:SS'
Args:
df (pandas.DataFrame): dataframe to split.
col (pandas.Series): column to split on.
HH (str, default=''): hour value in 24-hour clock format.
MM (str, default=''): minute value.
SS (str, default=''): second value.
Returns:
df_split (pandas.DataFrame)
"""
yyyy = YYYYMMDD[:4]
mm = YYYYMMDD[4:6]
dd = YYYYMMDD[-2:]
if YYYYMMDD == '':
yyyymmdd = str(df[col][0].to_period('D'))
ts = f'{yyyymmdd} {HH}:{MM}:{SS}'
split_ts = datetime.datetime.strptime(ts, '%Y-%m-%d %H:%M:%S')
df_split = df.loc[df[col] > split_ts]
return df_split
def strptime_date_arg(date):
"""
Returns string as foramtted datetime object using the regex format:
%Y-%M-%d (%a) where string is expected to be in the format YYYYMMDD.
"""
try:
int(date)
except ValueError:
print(f'{date} does not exclusively contain integers.')
try:
_ = datetime.datetime.strptime(date, '%Y%M%d')
datetime_obj = _.strftime('%Y-%M-%d')
return datetime_obj
except ValueError:
print(f'{date} is not in the format YYYYMMDD.')
def get_date(string=True, delta=0, date_format='%Y%m%d'):
"""
Returns date for which to preprocess CSV files generated by BioTracker, the
default behavior is to return today's date as a 'YYYYMMDD' string.
Args:
string (bool, default=True): if string=True, datetime object is
returned as a string.
delta (int, default=0): number of days to add or subtract to current
date to get desired date, if default=0 current date is returned.
date_format (str, default='%Y%m%d'): format for datetime string, if
default='%Y%m%d' date will be in the form 'YYYMMMDD'.
Returns:
date (datetime or str).
"""
date = datetime.datetime.now() + datetime.timedelta(delta)
if string:
return date.strftime(f'{date_format}')
else:
return date
def unixtime_to_strtime(ut, str_format='%Y-%m-%d %H:%M:%S.%f'):
"""
Returns unix time object in datetime formatted string.
Args:
ut (float): unix time stamp.
str_format(str, default='%Y-%m-%d %H:%M:%S.%f'): format
for datetime object.
Returns:
ts_formatted (datetime.datetime)
"""
ts = float(ut)
try:
ts = datetime.datetime.utcfromtimestamp(ts)
ts = ts.strftime(f'{str_format}')
return ts
except ValueError:
print('Unix time stamp must be in seconds, not milliseconds.')
def to_local_datetime(utc_dt):
"""
Converts from utc datetime to a locally aware datetime according
to the host timezone.
Args:
utc_dt (datetime.datetime): utc datetime object.
Returns:
local timezone datetime
"""
try:
return datetime.datetime.fromtimestamp(calendar.timegm(utc_dt.timetuple()))
except ValueError:
print('respective loopbio array missing frame_time value')
return None
def convert_seconds(s):
"""
Converts seconds to minutes and hours.
Args:
s (float): value in seconds.
Returns:
mins (float): s in minutes
hrs (float): s in hours.
"""
mins = s / 60
hrs = s / 3600
return mins, hrs
def convert_milliseconds(ms):
"""
Converts milliseconds to seconds, minutes, hours, and days.
Args:
ms (float): value in milliseconds.
Returns:
MillisecsBySecsMinsHrs (list): ms in seconds, minutes, hours, days.
"""
secs = ms / 1000.0
mins = secs / 60.0
hrs = mins / 60.
MillisecsBySecsMinsHrsDays = {
'secs': secs, 'mins': mins, 'hrs': hrs, 'days': days
}
return MillisecsBySecsMinsHrs
def add_nested_vals_to_dict(dic1, dic2, key3='metadata',
keys2=['treatment', 'begin_treatment']):
"""
Add values from one nested dictionary to another.
"""
for key1 in dic1.keys():
for key2 in keys2:
dic1[key1][key3][key2] = dic2[key1][key2]
return dic1
def sum_dict_vals(dic, keys=['x_nans', 'y_nans', 'globalFRAME']):
"""
Returns maximum value of values for selective keys of a dictionary.
Args:
dic (dict): dict to iterate through.
keys (list, default=['x_nans', 'y_nans',]): keys to iteratve
over.
Returns:
val (int): maximum value
"""
val = max([dic[key] for key in dic.keys() if key in keys])
return val
def validate_date_arg(date, date_format='YYYYMMDD'):
"""
Validates date string is in format YYYYMMDD, prints error message
if not.
Args:
date (str): date to validate.
"""
try:
datetime.datetime.strptime(date, '%Y%m%d')
except ValueError:
raise ValueError(f'Incorrect date argument, expected: {date_format}')
def strlist_to_intlist(strlist):
"""
Returns list of strings as list of ints.
"""
try:
intlist = [int(i) for i in strlist]
return intlist
except ValueError:
print('The list argument does not exclusively contain numbers.')
def update_setup(dic, new_vals):
"""
Appends dictionary values to existing dictionary.
"""
for k, v in new_vals.items():
dic[k].append(v)
return dic
def _is_type(obj, object_type):
"""
Checks object type against expect type.
"""
if not isinstance(obj, object_type):
print(f'Input is {type(obj)}, expected {object_type}.')
def _has_cols(df, cols={'x', 'y'}):
"""
Checks if specified columns are in pandas.DataFrame.
"""
if not cols.issubset(df.columns):
print(f'The following columns are all required: {cols}')
def drop_missing_coord_vals(df, cols=['x', 'y', 'rad',
'deg', 'xpx', 'ypx']):
"""
Returns DataFrame after dropping NaN values in select columns with
index reset.
"""
df.dropna(subset=cols, how='all', inplace=True)
return df
def get_nested_dict_values(nested_dic, key1='metadata', key2='activity'):
"""
Returns list of values as floats for select key of a nested dictionary.
"""
vals = [float(nested_dic[key][key1][key2]) for key in nested_dic.keys()]
return vals
def get_modal_col_vals(df, cols=['objectName',
'valid', 'id', 'coordinateUnit']):
"""
Returns most common values for select DataFrame columns as dict.
"""
cols_vals = {col: statistics.mode(df[col]) for col in cols}
return cols_vals
def zip_lists(list_a, list_b, list_c,
key1='treatment', key2='begin_treatment'):
"""
Appends multiple lists into a nested dictionary.
"""
res = {a:{key1: b, key2: c} for a, b, c in zip(list_a, list_b, list_c)}
return res
def get_vals_above_thresh(df, col, thresh):
"""
Returns number of rows of a DataFrame where values in selective
column exceed thresh as a percentage
"""
return df[df[col] >= thresh]
Functions
def add_nested_vals_to_dict(dic1, dic2, key3='metadata', keys2=['treatment', 'begin_treatment'])
-
Add values from one nested dictionary to another.
Expand source code
def add_nested_vals_to_dict(dic1, dic2, key3='metadata', keys2=['treatment', 'begin_treatment']): """ Add values from one nested dictionary to another. """ for key1 in dic1.keys(): for key2 in keys2: dic1[key1][key3][key2] = dic2[key1][key2] return dic1
def aggregate_segments(dfs, time_interval=40, save_trajectory=False, metadata=[''], file_name='', suffix='_processed', save_msg=True, outdir='')
-
Returns DataFrame of aggregated segments, individual DataFrames, optionally saving combined file to disk using index as reference column to maintain global FRAME count.
Args
dataframes
:dict
- DataFrames of each segment.
time_interval
:int
, default=40
- modal time interval.
save_trajectory
:bool
, default=True
- if save_trajectory=True, aggregated CSV file is saved to disk.
metadata (list, default=['']): list of strings to prepend to CSV file if saved to disk. outdir (str, default='.'): path of directory where to save aggregated file, default is current working directory. file_name (str, default=''): file name. suffix (str, default='_processed'): suffix to add to the end of file when saving.
Returns
Dict of aggregated DataFrame and list of metadata
Expand source code
def aggregate_segments(dfs, time_interval=40, save_trajectory=False, metadata=[''], file_name='', suffix='_processed', save_msg=True, outdir=''): """ Returns DataFrame of aggregated segments, individual DataFrames, optionally saving combined file to disk using index as reference column to maintain global FRAME count. Args: dataframes (dict): DataFrames of each segment. time_interval (int, default=40): modal time interval. save_trajectory (bool, default=True): if save_trajectory=True, aggregated CSV file is saved to disk. metadata (list, default=['']): list of strings to prepend to CSV file if saved to disk. outdir (str, default='.'): path of directory where to save aggregated file, default is current working directory. file_name (str, default=''): file name. suffix (str, default='_processed'): suffix to add to the end of file when saving. Returns: Dict of aggregated DataFrame and list of metadata """ all_dfs = {} dropped_frames_counter = 0 MillisecsByFPS_counter = 0 # load CSV and append to list with segment file name as new column for segment in dfs.keys(): # create new local FRAME column dfs[segment]['data'].insert( loc=1, column='localFRAME', value=dfs[segment]['data'].index) # set FRAME as index to maintain cumulative count when concatenating dfs[segment]['data'].set_index('FRAME', inplace=True) # create new column which stores video chunk segment number dfs[segment]['data']['chunk_segment'] = segment # add last MillisecsByFPS value to counter to maintain cumulative count if segment == list(dfs.keys())[0]: last_MillisecsByFPS_val = dfs[segment]['data']['MillisecsByFPS'].iloc[-1] # add time_interval as MillisecsByFPS starts at 0l MillisecsByFPS_counter += (last_MillisecsByFPS_val + time_interval) # add 0 to first row of first segment dfs[segment]['data'].at[0, 'MillisecsBetweenFRAMES'] = 0 else: last_MillisecsByFPS_val = dfs[segment]['data']['MillisecsByFPS'].iloc[-1] # skip first segment when adding MillisecsByFPS_counter dfs[segment]['data']['MillisecsByFPS'] = dfs[segment]['data']['MillisecsByFPS'] + \ MillisecsByFPS_counter MillisecsByFPS_counter += (last_MillisecsByFPS_val + time_interval) # add time_interval to first row of subsequent segments dfs[segment]['data'].at[0, 'MillisecsBetweenFRAMES'] = time_interval all_dfs[segment] = {'data': dfs[segment]['data']} dropped_frames_counter += dfs[segment]['metadata']['dropped_frames'] # aggregate DataFrames and insert global FRAME as a separate column list_of_dfs = [all_dfs[i]['data'] for i in all_dfs.keys()] aggregate_df = pd.concat(list_of_dfs, ignore_index=True) aggregate_df.insert(loc=0, column='FRAME', value=aggregate_df.index) aggregate_df.rename(columns={'FRAME': 'globalFRAME'}, inplace=True) # clean metadata by removing comment chars metadata_clean = [i.split('#')[1].strip() for i in metadata] # add dropped frames, missing values count and metdata to dict aggregate_data_metadata_df = { 'data': aggregate_df, 'metadata':{ 'source_name': metadata_clean[0], 'source_fps': metadata_clean[1], 'generation_time': metadata_clean[2], 'dropped_frames': dropped_frames_counter}, } # save aggregated DataFrame to disk as CSV and/or return for preprocessing if save_trajectory is True: # drop video chunk number from file name lbt_datasets.save_trajectory_to_csv( aggregate_df, metadata=metadata, f_name=file_name, outdir=outdir, suffix=suffix, save_msg=save_msg) return aggregate_data_metadata_df
def convert_milliseconds(ms)
-
Converts milliseconds to seconds, minutes, hours, and days.
Args
ms
:float
- value in milliseconds.
Returns
MillisecsBySecsMinsHrs (list): ms in seconds, minutes, hours, days.
Expand source code
def convert_milliseconds(ms): """ Converts milliseconds to seconds, minutes, hours, and days. Args: ms (float): value in milliseconds. Returns: MillisecsBySecsMinsHrs (list): ms in seconds, minutes, hours, days. """ secs = ms / 1000.0 mins = secs / 60.0 hrs = mins / 60. MillisecsBySecsMinsHrsDays = { 'secs': secs, 'mins': mins, 'hrs': hrs, 'days': days } return MillisecsBySecsMinsHrs
def convert_seconds(s)
-
Converts seconds to minutes and hours.
Args
s
:float
- value in seconds.
Returns
mins (float): s in minutes hrs (float): s in hours.
Expand source code
def convert_seconds(s): """ Converts seconds to minutes and hours. Args: s (float): value in seconds. Returns: mins (float): s in minutes hrs (float): s in hours. """ mins = s / 60 hrs = s / 3600 return mins, hrs
def count_dropped_frames(x)
-
Returns number of NaN frames in provided array along.
Expand source code
def count_dropped_frames(x): """ Returns number of NaN frames in provided array along. """ x_arr = np.array(x) num_nans = np.count_nonzero(np.isnan(x_arr)) return num_nans
def count_missing_values(df, cols, idxs=False)
-
Returns the number of rows with missing values for each provided column of a DataFrame as well as the total number for all columns.
Args
df
:DataFrame
- pandas DataFrame.
cols
:list
- pandas DataFrame columns to check for missing row values.
idxs
:bool
, default=False
- if idxs=True, the index for each row is also returned.
Returns
A dict of column name as the key and a tuple of the number of misssing rows and the index for each row as values.
Expand source code
def count_missing_values(df, cols, idxs=False): """ Returns the number of rows with missing values for each provided column of a DataFrame as well as the total number for all columns. Args: df (DataFrame): pandas DataFrame. cols (list): pandas DataFrame columns to check for missing row values. idxs (bool, default=False): if idxs=True, the index for each row is also returned. Returns: A dict of column name as the key and a tuple of the number of misssing rows and the index for each row as values. """ na_rows = {} # collect the total number of missing rows and row index missing_val_idxs = list(df.index[df.isna().any(axis=1)]) if len(missing_val_idxs) > 0: na_rows['total_nans_across_cols'] = df.isna().values.sum() else: na_rows['total_nans_across_cols'] = 0 # collect the number of missing rows and row index for each column for col in list(cols): col_name = col + '_nans' missing_val_idxs = df[df[col].isnull()].index.tolist() if len(missing_val_idxs) > 0: na_rows[col_name] = len(missing_val_idxs) if idxs is True: na_rows[col_name + '_idxs'] = missing_val_idxs else: na_rows[col_name] = 0 return na_rows
def drop_missing_coord_vals(df, cols=['x', 'y', 'rad', 'deg', 'xpx', 'ypx'])
-
Returns DataFrame after dropping NaN values in select columns with index reset.
Expand source code
def drop_missing_coord_vals(df, cols=['x', 'y', 'rad', 'deg', 'xpx', 'ypx']): """ Returns DataFrame after dropping NaN values in select columns with index reset. """ df.dropna(subset=cols, how='all', inplace=True) return df
def get_date(string=True, delta=0, date_format='%Y%m%d')
-
Returns date for which to preprocess CSV files generated by BioTracker, the default behavior is to return today's date as a 'YYYYMMDD' string.
Args
string
:bool
, default=True
- if string=True, datetime object is returned as a string.
delta
:int
, default=0
- number of days to add or subtract to current date to get desired date, if default=0 current date is returned.
date_format (str, default='%Y%m%d'): format for datetime string, if default='%Y%m%d' date will be in the form 'YYYMMMDD'.
Returns
date (datetime or str).
Expand source code
def get_date(string=True, delta=0, date_format='%Y%m%d'): """ Returns date for which to preprocess CSV files generated by BioTracker, the default behavior is to return today's date as a 'YYYYMMDD' string. Args: string (bool, default=True): if string=True, datetime object is returned as a string. delta (int, default=0): number of days to add or subtract to current date to get desired date, if default=0 current date is returned. date_format (str, default='%Y%m%d'): format for datetime string, if default='%Y%m%d' date will be in the form 'YYYMMMDD'. Returns: date (datetime or str). """ date = datetime.datetime.now() + datetime.timedelta(delta) if string: return date.strftime(f'{date_format}') else: return date
def get_modal_col_vals(df, cols=['objectName', 'valid', 'id', 'coordinateUnit'])
-
Returns most common values for select DataFrame columns as dict.
Expand source code
def get_modal_col_vals(df, cols=['objectName', 'valid', 'id', 'coordinateUnit']): """ Returns most common values for select DataFrame columns as dict. """ cols_vals = {col: statistics.mode(df[col]) for col in cols} return cols_vals
def get_nested_dict_values(nested_dic, key1='metadata', key2='activity')
-
Returns list of values as floats for select key of a nested dictionary.
Expand source code
def get_nested_dict_values(nested_dic, key1='metadata', key2='activity'): """ Returns list of values as floats for select key of a nested dictionary. """ vals = [float(nested_dic[key][key1][key2]) for key in nested_dic.keys()] return vals
def get_vals_above_thresh(df, col, thresh)
-
Returns number of rows of a DataFrame where values in selective column exceed thresh as a percentage
Expand source code
def get_vals_above_thresh(df, col, thresh): """ Returns number of rows of a DataFrame where values in selective column exceed thresh as a percentage """ return df[df[col] >= thresh]
def partition_segment(df, time_series, segment_num, cols=['x', 'y'], thresh=5, unit='seconds', segment_limit=2, data_loss_limit=0.5)
-
Returns or discards dataset using the following decision rule: if more than 50% (default value) of recording is in tact in no more than 2 (default value) continuous segments the dataset is deemed worth using and is kept. Continuous is defined by default as 'no time interval gap between consecutive frames greater than 5 seconds (5000 milliseconds)'. If the dataset is split, a list of dataframes is returned, if discarded, None is returned.
Args
df
:DataFrame
- pandas DataFrame containing time stamps.
time_series
:Series
- list of values containing time stamps/frame value.
cols
:list
- pandas DataFrame columns to check for missing row values.
missing_vals
:list
- number of missing rows for key columns.
thresh
:int
, default=5
- threshold for deciding when to create new DataFrame based on change in time interval value.
unit
:str
, default=seconds
- time unit for thresh value.
segment_limit
:int
, default=4
- maximum number of continuous blocks.
data_loss_limit
:float
, default=0.5
- threshold that determines maximum amount of data loss permitted before a dataset is discarded.
Returns
DataFrames and number of DataFrames as integer.
Expand source code
def partition_segment(df, time_series, segment_num, cols=['x', 'y'], thresh=5, unit='seconds', segment_limit=2, data_loss_limit=0.5): """ Returns or discards dataset using the following decision rule: if more than 50% (default value) of recording is in tact in no more than 2 (default value) continuous segments the dataset is deemed worth using and is kept. Continuous is defined by default as 'no time interval gap between consecutive frames greater than 5 seconds (5000 milliseconds)'. If the dataset is split, a list of dataframes is returned, if discarded, None is returned. Args: df (DataFrame): pandas DataFrame containing time stamps. time_series (Series): list of values containing time stamps/frame value. cols (list): pandas DataFrame columns to check for missing row values. missing_vals (list): number of missing rows for key columns. thresh (int, default=5): threshold for deciding when to create new DataFrame based on change in time interval value. unit (str, default=seconds): time unit for thresh value. segment_limit (int, default=4): maximum number of continuous blocks. data_loss_limit (float, default=0.5): threshold that determines maximum amount of data loss permitted before a dataset is discarded. Returns: DataFrames and number of DataFrames as integer. """ if unit == 'seconds': thresh_val = thresh * 1000 elif unit == 'milliseconds': pass else: ValueError("Time unit must be in seconds or milliseconds.") # compute number of missing vals then drop these rows from a dummy df nan_percent = (df[cols].isnull().sum() / len(df)).sum() temp_df = df.copy() temp_df.dropna(inplace=True) # compute time change between frames and create new column temp_df['MillisecsBetweenFRAMES'] = (time_series - time_series.shift()) # append a new DataFrame when time interval value exceeds threshold temp_dfs = {} for _, g in temp_df.groupby((temp_df.MillisecsBetweenFRAMES.diff() > thresh_val).cumsum()): temp_dfs[_] = g dfs = {} # implement decision rule to decide whether to keep dataset if len(temp_dfs.keys()) > segment_limit and nan_percent <= data_loss_limit: print(f'Skipped segment {segment_num}, segment limit surpassed.') return dfs, 0 elif len(temp_dfs.keys()) <= segment_limit and nan_percent > data_loss_limit: print(f'Skipped segment {segment_num}, data loss limit surpassed.') return dfs, 0 elif len(temp_dfs.keys()) > segment_limit and nan_percent > data_loss_limit: print(f'Skipped segment {segment_num}, data loss and segment limit surpassed.') return dfs, 0 else: df['MillisecsBetweenFRAMES'] = (time_series-time_series.shift()) # return original DataFrame with nans for _, g in df.groupby((df.MillisecsBetweenFRAMES.diff() > thresh_val).cumsum()): dfs[_] = g # return dfs and number of dfs for indexing num_dfs = len(dfs.keys()) return dfs, num_dfs
def split_series_on_datestr(df, col, HH='', MM='', SS='', YYYYMMDD='')
-
Split pandas.DataFrame into two on the basis of value in a column with datetime values using 'YYYY-MM-DD HH:MM:SS'
Args
df
:pandas.DataFrame
- dataframe to split.
col
:pandas.Series
- column to split on.
HH (str, default=''): hour value in 24-hour clock format. MM (str, default=''): minute value. SS (str, default=''): second value.
Returns
df_split (pandas.DataFrame)
Expand source code
def split_series_on_datestr(df, col, HH='', MM='', SS='', YYYYMMDD=''): """ Split pandas.DataFrame into two on the basis of value in a column with datetime values using 'YYYY-MM-DD HH:MM:SS' Args: df (pandas.DataFrame): dataframe to split. col (pandas.Series): column to split on. HH (str, default=''): hour value in 24-hour clock format. MM (str, default=''): minute value. SS (str, default=''): second value. Returns: df_split (pandas.DataFrame) """ yyyy = YYYYMMDD[:4] mm = YYYYMMDD[4:6] dd = YYYYMMDD[-2:] if YYYYMMDD == '': yyyymmdd = str(df[col][0].to_period('D')) ts = f'{yyyymmdd} {HH}:{MM}:{SS}' split_ts = datetime.datetime.strptime(ts, '%Y-%m-%d %H:%M:%S') df_split = df.loc[df[col] > split_ts] return df_split
def strlist_to_intlist(strlist)
-
Returns list of strings as list of ints.
Expand source code
def strlist_to_intlist(strlist): """ Returns list of strings as list of ints. """ try: intlist = [int(i) for i in strlist] return intlist except ValueError: print('The list argument does not exclusively contain numbers.')
def strptime_date_arg(date)
-
Returns string as foramtted datetime object using the regex format: %Y-%M-%d (%a) where string is expected to be in the format YYYYMMDD.
Expand source code
def strptime_date_arg(date): """ Returns string as foramtted datetime object using the regex format: %Y-%M-%d (%a) where string is expected to be in the format YYYYMMDD. """ try: int(date) except ValueError: print(f'{date} does not exclusively contain integers.') try: _ = datetime.datetime.strptime(date, '%Y%M%d') datetime_obj = _.strftime('%Y-%M-%d') return datetime_obj except ValueError: print(f'{date} is not in the format YYYYMMDD.')
def sum_dict_vals(dic, keys=['x_nans', 'y_nans', 'globalFRAME'])
-
Returns maximum value of values for selective keys of a dictionary.
Args
dic
:dict
- dict to iterate through.
keys (list, default=['x_nans', 'y_nans',]): keys to iteratve over.
Returns
val (int): maximum value
Expand source code
def sum_dict_vals(dic, keys=['x_nans', 'y_nans', 'globalFRAME']): """ Returns maximum value of values for selective keys of a dictionary. Args: dic (dict): dict to iterate through. keys (list, default=['x_nans', 'y_nans',]): keys to iteratve over. Returns: val (int): maximum value """ val = max([dic[key] for key in dic.keys() if key in keys]) return val
def to_local_datetime(utc_dt)
-
Converts from utc datetime to a locally aware datetime according to the host timezone.
Args
utc_dt
:datetime.datetime
- utc datetime object.
Returns
local timezone datetime
Expand source code
def to_local_datetime(utc_dt): """ Converts from utc datetime to a locally aware datetime according to the host timezone. Args: utc_dt (datetime.datetime): utc datetime object. Returns: local timezone datetime """ try: return datetime.datetime.fromtimestamp(calendar.timegm(utc_dt.timetuple())) except ValueError: print('respective loopbio array missing frame_time value') return None
def unixtime_to_strtime(ut, str_format='%Y-%m-%d %H:%M:%S.%f')
-
Returns unix time object in datetime formatted string.
Args
ut
:float
- unix time stamp.
str_format(str, default='%Y-%m-%d %H:%M:%S.%f'): format for datetime object.
Returns
ts_formatted (datetime.datetime)
Expand source code
def unixtime_to_strtime(ut, str_format='%Y-%m-%d %H:%M:%S.%f'): """ Returns unix time object in datetime formatted string. Args: ut (float): unix time stamp. str_format(str, default='%Y-%m-%d %H:%M:%S.%f'): format for datetime object. Returns: ts_formatted (datetime.datetime) """ ts = float(ut) try: ts = datetime.datetime.utcfromtimestamp(ts) ts = ts.strftime(f'{str_format}') return ts except ValueError: print('Unix time stamp must be in seconds, not milliseconds.')
def update_setup(dic, new_vals)
-
Appends dictionary values to existing dictionary.
Expand source code
def update_setup(dic, new_vals): """ Appends dictionary values to existing dictionary. """ for k, v in new_vals.items(): dic[k].append(v) return dic
def validate_date_arg(date, date_format='YYYYMMDD')
-
Validates date string is in format YYYYMMDD, prints error message if not.
Args
date
:str
- date to validate.
Expand source code
def validate_date_arg(date, date_format='YYYYMMDD'): """ Validates date string is in format YYYYMMDD, prints error message if not. Args: date (str): date to validate. """ try: datetime.datetime.strptime(date, '%Y%m%d') except ValueError: raise ValueError(f'Incorrect date argument, expected: {date_format}')
def zip_lists(list_a, list_b, list_c, key1='treatment', key2='begin_treatment')
-
Appends multiple lists into a nested dictionary.
Expand source code
def zip_lists(list_a, list_b, list_c, key1='treatment', key2='begin_treatment'): """ Appends multiple lists into a nested dictionary. """ res = {a:{key1: b, key2: c} for a, b, c in zip(list_a, list_b, list_c)} return res