Module libratools.lbt_datasets
The libratools.lbt_datasets module includes utilities to load, manipulate and save trajectory datasets.
Expand source code
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
The libratools.lbt_datasets module includes utilities to load, manipulate
and save trajectory datasets.
"""
import os # standard library
import yaml
import pathlib
import configparser
import locale # 3rd party packages
import numpy as np
import pandas as pd
__author__ = "Vincent (Vince) J. Straub"
__email__ = "vincejstraub@gmail.com"
__status__ = "Testing"
# set config file depending on whether process.py or app.py is being run
if pathlib.Path.cwd().name is 'Processing':
CONFIG_PATH = pathlib.Path.cwd() / './libratools/libratools/config.ini'
elif pathlib.Path.cwd().name is 'DevExDashboard':
CONFIG_PATH = pathlib.Path.cwd().parents[1] / './libratools/libratools/config.ini'
class configReader:
__conf = None
@staticmethod
def config():
if configReader.__conf is None: # read only once, lazy
configReader.__conf = configparser.ConfigParser()
configReader.__conf.read(CONFIG_PATH)
return configReader.__conf
# read directory configuration for global vars
BIOTRACKER_COLS = configReader.config()['VARS']['BIOTRACKER_COLS'].split(',\n')
def check_columns(df, columns=BIOTRACKER_COLS):
"""
Checks there are expected number of columns, returns dataframe
with columns redefined and index reset if not.
Args:
df (pandas.DataFrame): dataframe to check.
col_num (int, default=14): used as reference for number of
columns that should be in dataframe.
columns (list, default=BIOTRACKER_COLS): list of reference columns
that should be in dataframe.
"""
if list(df.columns) != columns:
df = df.reset_index()
df.columns = columns
else:
pass
return df
def load_trajectory(file_path, dropna=False, na_summary=True, skiprows=3,
warn_bad_lines=True, sep=';', cols=BIOTRACKER_COLS,
keycols=['FRAME', 'x', 'y']):
"""
Loads a CSV file generated from BioTracker using Pandas and numpy.
Note that the first 3 rows containing metadata and lines with too
many commas are automatically dropped. Whether to drop rows with
missing values (NaN) is left up to the user.
Args:
file_path (string): path to file.
drop_na (bool, default=False): if drop_na=False NaN rows are kept, if
drop_na=True NaN rows are dropped (where at least one element is
missing)
na_summary (bool, default=True): if na_summary=True, the number of
rows dropped is displayed as an int.
skiprows (int): number of rows to skip.
warn_bad_lines (bool, default=True): If error_bad_lines is False,
and warn_bad_lines is True, a warning for each “bad line” will
be output.
sep (str, default=';'): seperator to use.
cols (list, default=BIOTRACKER_COLS): list of expected column values.
keycols (list, default=['FRAME', 'x', 'y']): list of key columns to
check for missing values.
Returns:
A pandas.DataFrame.
A numpy array.
"""
# message to display if NaN values detected
NA_MSG = 'Missing FRAME, x, and y values detected for file:\n'
# read csv file
df = pd.read_csv(file_path, skiprows=skiprows, delimiter=sep,
error_bad_lines=False, warn_bad_lines=warn_bad_lines)
# check columns exist
df = check_columns(df, columns=cols)
# check for missing values in key columns
if df[keycols].isna().sum().any() is True:
# store info on rows with missing values
num_na_rows = np.count_nonzero(df[keycols].isna())
# decide whether to drop rows and display summary info
if dropna and na_summary is True:
print(NA_MSG + file_path)
df = df.dropna()
print('Rows dropped: {}.\n'.format(num_na_rows))
elif dropna is False and na_summary is True:
print(NA_MSG + file_path)
print('Rows with missing values: {}.\n'.format(num_na_rows))
elif dropna is True and na_summary is False:
df = df.dropna()
else:
pass
else:
pass
# convert timeString column to datetime
date_time_format = '%a %b %d %H:%M:%S %Y'
# set locale to German time for converting timeString column
locale.setlocale(locale.LC_ALL, ('de', 'utf-8'))
df['timeString'] = pd.to_datetime(df['timeString'], errors='ignore',
format=date_time_format)
# convert DataFrame to numpy array
data = df.to_numpy()
return df, data
def load_npz(file_path, array=''):
"""
Loads NPZ file from disk and returns as numpy array, optionally
returning a single array.
Args:
file_path (str): path to file.
array (str, default=''): array key to index NPZ file, if
array='' NPZ object is returned.
Returns:
f (numpy array).
"""
f = np.load(file_path)
if array != '':
try:
data = f[array]
return data
except KeyError:
print(f'{array} is not a key in the NPZ file.')
else:
return f
def read_file_paths(indir='cwd', extension='.csv', warning=True, suffix=False,
suffix_str=''):
"""
Stores paths of files with specified file type in a list; first looks
in current directory before asking user to provide alternative directory
path if none are found.
Args:
indir (str, default='cwd'): input directory containing files.
extension (str, default='csv'): file extension.
warning (bool default=True): if warning=True, an informational
message is displayed in case no files are found.
suffix (bool default=False): if suffix=True, only file paths
ending in suffix_str are returned.
suffix_str (str, ''): suffix.
Returns:
A list of file paths.
"""
# search for files in current directory if no indir is provided
extension = extension.lower()
extension_cap = extension.capitalize()
if indir == 'cwd':
file_paths = [p for p in indir.rglob(f'*{extension}')]
num_files = len(file_paths)
# prompt user for directory path if no none found else store paths
if num_files == 0:
print(f'No {extension_cap} files found in current working directory')
# check provided directory path exists and read files
else:
assert os.path.exists(indir), 'Directory path not found.'
file_paths = [p for p in indir.rglob(f'*{extension}')]
# check CSV files exist
num_files = len(file_paths)
if num_files == 0 and warning is True:
print(f'No {extension_cap} files found in {indir}.')
if suffix is True:
files_dir = pathlib.Path(file_paths[0]).parents[0]
file_stems = [pathlib.Path(p).stem for p in file_paths]
files = [f + extension for f in file_stems if
os.path.splitext(f)[1][-len(suffix_str):] == suffix_str]
file_paths = [files_dir / f for f in files]
return file_paths, num_files
def find_dir(path='cwd', prefix='', suffix=''):
"""
Returns subdirectory path that begins and ends with specific strings by
using os.walk() to search through all directory and file paths in root
directory of current working directory.
Args:
path (str, default=cwd): directory to search, if default=cwd,
the current working directory is searched.
prefix (str, default=''): prefix of subdirectory path.
suffix (str, default=''): suffix of subdirectory path.
Returns:
dir_path (str).
"""
prefix_str = str(prefix)
suffix_str = str(suffix)
date_dir = None
if path == 'cwd':
path = pathlib.Path.cwd()
for root, dirs, files in os.walk(path):
for dir in dirs:
if dir.startswith(f"{prefix_str}") and dir.endswith(f"{suffix_str}"):
date_dir = dir
if date_dir is None:
print(f'Directory ending with {suffix} not found.')
else:
return date_dir
def list_dirs(parent_dir_path):
"""
Returns all child directory names in parent directory as a list.
Args:
parent_dir_path (str): path to directory containing subdirectories.
"""
return [d for d in os.listdir(parent_dir_path) if
os.path.isdir(pathlib.Path(parent_dir_path, d))]
def read_subdir_paths(parent_dir=''):
"""
Returns directory paths for all subdirectories in provided directory
path as strings in a list, and number of subdirectories as an integer.S
Args:
parent_dir (str, default=''): directory in which to locate all
subdirectories, defaults to current working directory if not
path is provided.
"""
if parent_dir == '':
parent_dir = pathlib.Path.cwd()
# locate subdirectories
try:
subdirs = [parent_dir + file for file in list_dirs(parent_dir)]
except ValueError:
print('Provided directory path not found.')
# store directory count as number of recordings
num_dirs = len(set(subdirs))
# load each file
file_paths = []
for subdir in subdirs:
file_path, _ = read_file_paths(indir=subdir)
for file in file_path:
file_paths.append(file)
return file_paths, num_dirs
def read_metadata(file_path, num_comment_lines=3):
"""
Returns metadata stored as comments in the first few lines of a
BioTracker-generated CSV file as list where each comment is an item
stored as a string.
Args:
file_path (str): path to BioTracker-generated CSV file.
num_comment_lines (int, default=3): number of lines at the beginning
of file that contain comments.
"""
with open(file_path) as file:
# Read specified number of lines
metadata = [file.readline() for line in range(num_comment_lines)]
return metadata
def extract_comments_as_dict(dic):
"""
Takes list of key-value pair comments and returns dict by
splitting on standard python chars # and \n.
"""
comments = [comment.split('#')[1].strip() for comment in dic]
keys = [val.split(':')[0] for val in comments]
keys_comments = dict(zip(keys, comments))
values = [k.replace(j + ':', '').strip() for j, k in keys_comments.items()]
dic = dict(zip(keys, values))
for key in dic.keys():
dic[key] = dic[key].strip()
try:
dic[key] = float(dic[key])
except ValueError:
pass
return dic
def get_chunk_number_from_path(file_path, dir_sep='/', subdir_sep='.',
as_str=False):
"""
Returns last two characters of file path corresponding to chunk number.
Args:
file_path (str): path to file.
dir_sep (str, default='/'): character that separates directories and
files in file path.
subdir_sep (str, default='.'): character that further separates
directories and files in file path.
as_str (bool, default=True): if as_str=True, chunk number is returned
as string.
"""
# split file path up using '/' and '.' separator
_chunk_num = pathlib.Path(file_path).stem.split(f'{dir_sep}')[-1].split(f'{subdir_sep}')
try:
chunk_num = int(_chunk_num[0])
if as_str is True:
return str(chunk_num)
else:
return chunk_num
except ValueError:
print(f'Chunk value {_chunk_num[0]} is not an integer value.')
def save_trajectory_to_csv(df, f_name='', outdir='', metadata='',
extension='.csv', save_msg=True, add_metadata=True,
suffix='_processed'):
"""
Saves pandas.DataFrame object as a CSV file and prepends any metadata
provided in a file object.
Args:
df (pandas.DataFrame): dataframe to be saved to file.
metadata (list, str): metdata to be stored to file, can be either
a string of a list of strings.
f_name (str): file path.
outdir: output directory.
save_msg (bool, default=True): prints message to confirm track
has been saved if save_msg=True.
add_metadata (bool, default=True): adds metadata comments to file
as header if add_metadata=Trueread_file_path.
suffix (str, default='_processed'): suffix to add to the end of file
when saving.
"""
# save DataFrame to CSV
f = pathlib.Path(f_name).stem + suffix + extension
path = outdir / f
df.to_csv(path, sep=',', encoding='utf-8',
index=False)
# optionally prepend metadata to CSV
if add_metadata is True:
prepend_comments_to_csv(path, metadata)
# optionally confirm saving
if save_msg:
if suffix is '_processed':
print(f'Processed {pathlib.Path(f_name).stem}{extension} and saved file to disk.\n')
else:
print(f'Merged {pathlib.Path(f_name).stem}{extension} and saved file to disk.')
else:
pass
def prepend_comments_to_csv(file, comments, extension='.csv'):
"""
Insert a list of strings as a new lines at the beginning of a CSV file.
"""
# define name of temporary dummy file
file_path = pathlib.Path(file).parents[0]
file_name = pathlib.Path(file).stem
temp_file = file_path / (file_name + '.bak')
# open given original file in read mode and dummy file in write mode
with open(file, 'r') as read_obj, open(temp_file, 'w') as write_obj:
# iterate over list of comments and write them to dummy file as lines
for line in comments:
write_obj.write(line)
# read lines from original file and append them to the dummy file
for line in read_obj:
write_obj.write(line)
# remove original file
file.unlink()
# rename dummy file as the original file
new_extension = temp_file.with_suffix(extension)
temp_file.rename(new_extension)
def read_yaml_as_dict(path):
"""
Returns yaml dict values.
"""
with open(path, 'r') as stream:
try:
dic = yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
return dic
def dict_to_comments(dic, sep=': '):
"""
Takes dictionary key, value paris and returns list of comments.
"""
comments = ['# '+str(k)+sep+str(v)+'\n' for k, v in dic.items()]
return comments
Functions
def check_columns(df, columns=['FRAME', 'MillisecsByFPS', 'objectName', 'valid', 'id', 'coordinateUnit', 'x', 'y', 'rad', 'deg', 'xpx', 'ypx', 'time', 'timeString'])
-
Checks there are expected number of columns, returns dataframe with columns redefined and index reset if not.
Args
df
:pandas.DataFrame
- dataframe to check.
col_num
:int
, default=14
- used as reference for number of columns that should be in dataframe.
columns
:list
, default=BIOTRACKER_COLS
- list of reference columns that should be in dataframe.
Expand source code
def check_columns(df, columns=BIOTRACKER_COLS): """ Checks there are expected number of columns, returns dataframe with columns redefined and index reset if not. Args: df (pandas.DataFrame): dataframe to check. col_num (int, default=14): used as reference for number of columns that should be in dataframe. columns (list, default=BIOTRACKER_COLS): list of reference columns that should be in dataframe. """ if list(df.columns) != columns: df = df.reset_index() df.columns = columns else: pass return df
def dict_to_comments(dic, sep=': ')
-
Takes dictionary key, value paris and returns list of comments.
Expand source code
def dict_to_comments(dic, sep=': '): """ Takes dictionary key, value paris and returns list of comments. """ comments = ['# '+str(k)+sep+str(v)+'\n' for k, v in dic.items()] return comments
def extract_comments_as_dict(dic)
-
Takes list of key-value pair comments and returns dict by splitting on standard python chars # and .
Expand source code
def extract_comments_as_dict(dic): """ Takes list of key-value pair comments and returns dict by splitting on standard python chars # and \n. """ comments = [comment.split('#')[1].strip() for comment in dic] keys = [val.split(':')[0] for val in comments] keys_comments = dict(zip(keys, comments)) values = [k.replace(j + ':', '').strip() for j, k in keys_comments.items()] dic = dict(zip(keys, values)) for key in dic.keys(): dic[key] = dic[key].strip() try: dic[key] = float(dic[key]) except ValueError: pass return dic
def find_dir(path='cwd', prefix='', suffix='')
-
Returns subdirectory path that begins and ends with specific strings by using os.walk() to search through all directory and file paths in root directory of current working directory.
Args
path
:str
, default=cwd
- directory to search, if default=cwd, the current working directory is searched.
prefix (str, default=''): prefix of subdirectory path. suffix (str, default=''): suffix of subdirectory path.
Returns
dir_path (str).
Expand source code
def find_dir(path='cwd', prefix='', suffix=''): """ Returns subdirectory path that begins and ends with specific strings by using os.walk() to search through all directory and file paths in root directory of current working directory. Args: path (str, default=cwd): directory to search, if default=cwd, the current working directory is searched. prefix (str, default=''): prefix of subdirectory path. suffix (str, default=''): suffix of subdirectory path. Returns: dir_path (str). """ prefix_str = str(prefix) suffix_str = str(suffix) date_dir = None if path == 'cwd': path = pathlib.Path.cwd() for root, dirs, files in os.walk(path): for dir in dirs: if dir.startswith(f"{prefix_str}") and dir.endswith(f"{suffix_str}"): date_dir = dir if date_dir is None: print(f'Directory ending with {suffix} not found.') else: return date_dir
def get_chunk_number_from_path(file_path, dir_sep='/', subdir_sep='.', as_str=False)
-
Returns last two characters of file path corresponding to chunk number.
Args
file_path
:str
- path to file.
- dir_sep (str, default='/'): character that separates directories and
- files in file path.
- subdir_sep (str, default='.'): character that further separates
- directories and files in file path.
as_str
:bool
, default=True
- if as_str=True, chunk number is returned as string.
Expand source code
def get_chunk_number_from_path(file_path, dir_sep='/', subdir_sep='.', as_str=False): """ Returns last two characters of file path corresponding to chunk number. Args: file_path (str): path to file. dir_sep (str, default='/'): character that separates directories and files in file path. subdir_sep (str, default='.'): character that further separates directories and files in file path. as_str (bool, default=True): if as_str=True, chunk number is returned as string. """ # split file path up using '/' and '.' separator _chunk_num = pathlib.Path(file_path).stem.split(f'{dir_sep}')[-1].split(f'{subdir_sep}') try: chunk_num = int(_chunk_num[0]) if as_str is True: return str(chunk_num) else: return chunk_num except ValueError: print(f'Chunk value {_chunk_num[0]} is not an integer value.')
def list_dirs(parent_dir_path)
-
Returns all child directory names in parent directory as a list.
Args
parent_dir_path
:str
- path to directory containing subdirectories.
Expand source code
def list_dirs(parent_dir_path): """ Returns all child directory names in parent directory as a list. Args: parent_dir_path (str): path to directory containing subdirectories. """ return [d for d in os.listdir(parent_dir_path) if os.path.isdir(pathlib.Path(parent_dir_path, d))]
def load_npz(file_path, array='')
-
Loads NPZ file from disk and returns as numpy array, optionally returning a single array.
Args
file_path
:str
- path to file.
array (str, default=''): array key to index NPZ file, if array='' NPZ object is returned.
Returns
f (numpy array).
Expand source code
def load_npz(file_path, array=''): """ Loads NPZ file from disk and returns as numpy array, optionally returning a single array. Args: file_path (str): path to file. array (str, default=''): array key to index NPZ file, if array='' NPZ object is returned. Returns: f (numpy array). """ f = np.load(file_path) if array != '': try: data = f[array] return data except KeyError: print(f'{array} is not a key in the NPZ file.') else: return f
def load_trajectory(file_path, dropna=False, na_summary=True, skiprows=3, warn_bad_lines=True, sep=';', cols=['FRAME', 'MillisecsByFPS', 'objectName', 'valid', 'id', 'coordinateUnit', 'x', 'y', 'rad', 'deg', 'xpx', 'ypx', 'time', 'timeString'], keycols=['FRAME', 'x', 'y'])
-
Loads a CSV file generated from BioTracker using Pandas and numpy. Note that the first 3 rows containing metadata and lines with too many commas are automatically dropped. Whether to drop rows with missing values (NaN) is left up to the user.
Args
file_path
:string
- path to file.
drop_na
:bool
, default=False
- if drop_na=False NaN rows are kept, if drop_na=True NaN rows are dropped (where at least one element is missing)
na_summary
:bool
, default=True
- if na_summary=True, the number of rows dropped is displayed as an int.
skiprows
:int
- number of rows to skip.
warn_bad_lines
:bool
, default=True
- If error_bad_lines is False, and warn_bad_lines is True, a warning for each “bad line” will be output.
- sep (str, default=';'): seperator to use.
cols
:list
, default=BIOTRACKER_COLS
- list of expected column values.
keycols (list, default=['FRAME', 'x', 'y']): list of key columns to check for missing values.
Returns
A pandas.DataFrame. A numpy array.
Expand source code
def load_trajectory(file_path, dropna=False, na_summary=True, skiprows=3, warn_bad_lines=True, sep=';', cols=BIOTRACKER_COLS, keycols=['FRAME', 'x', 'y']): """ Loads a CSV file generated from BioTracker using Pandas and numpy. Note that the first 3 rows containing metadata and lines with too many commas are automatically dropped. Whether to drop rows with missing values (NaN) is left up to the user. Args: file_path (string): path to file. drop_na (bool, default=False): if drop_na=False NaN rows are kept, if drop_na=True NaN rows are dropped (where at least one element is missing) na_summary (bool, default=True): if na_summary=True, the number of rows dropped is displayed as an int. skiprows (int): number of rows to skip. warn_bad_lines (bool, default=True): If error_bad_lines is False, and warn_bad_lines is True, a warning for each “bad line” will be output. sep (str, default=';'): seperator to use. cols (list, default=BIOTRACKER_COLS): list of expected column values. keycols (list, default=['FRAME', 'x', 'y']): list of key columns to check for missing values. Returns: A pandas.DataFrame. A numpy array. """ # message to display if NaN values detected NA_MSG = 'Missing FRAME, x, and y values detected for file:\n' # read csv file df = pd.read_csv(file_path, skiprows=skiprows, delimiter=sep, error_bad_lines=False, warn_bad_lines=warn_bad_lines) # check columns exist df = check_columns(df, columns=cols) # check for missing values in key columns if df[keycols].isna().sum().any() is True: # store info on rows with missing values num_na_rows = np.count_nonzero(df[keycols].isna()) # decide whether to drop rows and display summary info if dropna and na_summary is True: print(NA_MSG + file_path) df = df.dropna() print('Rows dropped: {}.\n'.format(num_na_rows)) elif dropna is False and na_summary is True: print(NA_MSG + file_path) print('Rows with missing values: {}.\n'.format(num_na_rows)) elif dropna is True and na_summary is False: df = df.dropna() else: pass else: pass # convert timeString column to datetime date_time_format = '%a %b %d %H:%M:%S %Y' # set locale to German time for converting timeString column locale.setlocale(locale.LC_ALL, ('de', 'utf-8')) df['timeString'] = pd.to_datetime(df['timeString'], errors='ignore', format=date_time_format) # convert DataFrame to numpy array data = df.to_numpy() return df, data
def prepend_comments_to_csv(file, comments, extension='.csv')
-
Insert a list of strings as a new lines at the beginning of a CSV file.
Expand source code
def prepend_comments_to_csv(file, comments, extension='.csv'): """ Insert a list of strings as a new lines at the beginning of a CSV file. """ # define name of temporary dummy file file_path = pathlib.Path(file).parents[0] file_name = pathlib.Path(file).stem temp_file = file_path / (file_name + '.bak') # open given original file in read mode and dummy file in write mode with open(file, 'r') as read_obj, open(temp_file, 'w') as write_obj: # iterate over list of comments and write them to dummy file as lines for line in comments: write_obj.write(line) # read lines from original file and append them to the dummy file for line in read_obj: write_obj.write(line) # remove original file file.unlink() # rename dummy file as the original file new_extension = temp_file.with_suffix(extension) temp_file.rename(new_extension)
def read_file_paths(indir='cwd', extension='.csv', warning=True, suffix=False, suffix_str='')
-
Stores paths of files with specified file type in a list; first looks in current directory before asking user to provide alternative directory path if none are found.
Args
- indir (str, default='cwd'): input directory containing files.
- extension (str, default='csv'): file extension.
warning
:bool default=True
- if warning=True, an informational message is displayed in case no files are found.
suffix
:bool default=False
- if suffix=True, only file paths ending in suffix_str are returned.
suffix_str (str, ''): suffix.
Returns
A list of file paths.
Expand source code
def read_file_paths(indir='cwd', extension='.csv', warning=True, suffix=False, suffix_str=''): """ Stores paths of files with specified file type in a list; first looks in current directory before asking user to provide alternative directory path if none are found. Args: indir (str, default='cwd'): input directory containing files. extension (str, default='csv'): file extension. warning (bool default=True): if warning=True, an informational message is displayed in case no files are found. suffix (bool default=False): if suffix=True, only file paths ending in suffix_str are returned. suffix_str (str, ''): suffix. Returns: A list of file paths. """ # search for files in current directory if no indir is provided extension = extension.lower() extension_cap = extension.capitalize() if indir == 'cwd': file_paths = [p for p in indir.rglob(f'*{extension}')] num_files = len(file_paths) # prompt user for directory path if no none found else store paths if num_files == 0: print(f'No {extension_cap} files found in current working directory') # check provided directory path exists and read files else: assert os.path.exists(indir), 'Directory path not found.' file_paths = [p for p in indir.rglob(f'*{extension}')] # check CSV files exist num_files = len(file_paths) if num_files == 0 and warning is True: print(f'No {extension_cap} files found in {indir}.') if suffix is True: files_dir = pathlib.Path(file_paths[0]).parents[0] file_stems = [pathlib.Path(p).stem for p in file_paths] files = [f + extension for f in file_stems if os.path.splitext(f)[1][-len(suffix_str):] == suffix_str] file_paths = [files_dir / f for f in files] return file_paths, num_files
def read_metadata(file_path, num_comment_lines=3)
-
Returns metadata stored as comments in the first few lines of a BioTracker-generated CSV file as list where each comment is an item stored as a string.
Args
file_path
:str
- path to BioTracker-generated CSV file.
num_comment_lines
:int
, default=3
- number of lines at the beginning of file that contain comments.
Expand source code
def read_metadata(file_path, num_comment_lines=3): """ Returns metadata stored as comments in the first few lines of a BioTracker-generated CSV file as list where each comment is an item stored as a string. Args: file_path (str): path to BioTracker-generated CSV file. num_comment_lines (int, default=3): number of lines at the beginning of file that contain comments. """ with open(file_path) as file: # Read specified number of lines metadata = [file.readline() for line in range(num_comment_lines)] return metadata
def read_subdir_paths(parent_dir='')
-
Returns directory paths for all subdirectories in provided directory path as strings in a list, and number of subdirectories as an integer.S
Args
parent_dir (str, default=''): directory in which to locate all subdirectories, defaults to current working directory if not path is provided.
Expand source code
def read_subdir_paths(parent_dir=''): """ Returns directory paths for all subdirectories in provided directory path as strings in a list, and number of subdirectories as an integer.S Args: parent_dir (str, default=''): directory in which to locate all subdirectories, defaults to current working directory if not path is provided. """ if parent_dir == '': parent_dir = pathlib.Path.cwd() # locate subdirectories try: subdirs = [parent_dir + file for file in list_dirs(parent_dir)] except ValueError: print('Provided directory path not found.') # store directory count as number of recordings num_dirs = len(set(subdirs)) # load each file file_paths = [] for subdir in subdirs: file_path, _ = read_file_paths(indir=subdir) for file in file_path: file_paths.append(file) return file_paths, num_dirs
def read_yaml_as_dict(path)
-
Returns yaml dict values.
Expand source code
def read_yaml_as_dict(path): """ Returns yaml dict values. """ with open(path, 'r') as stream: try: dic = yaml.safe_load(stream) except yaml.YAMLError as exc: print(exc) return dic
def save_trajectory_to_csv(df, f_name='', outdir='', metadata='', extension='.csv', save_msg=True, add_metadata=True, suffix='_processed')
-
Saves pandas.DataFrame object as a CSV file and prepends any metadata provided in a file object.
Args
df
:pandas.DataFrame
- dataframe to be saved to file.
metadata
:list, str
- metdata to be stored to file, can be either a string of a list of strings.
f_name
:str
- file path.
outdir
- output directory.
save_msg
:bool
, default=True
- prints message to confirm track has been saved if save_msg=True.
add_metadata
:bool
, default=True
- adds metadata comments to file as header if add_metadata=Trueread_file_path.
suffix (str, default='_processed'): suffix to add to the end of file when saving.
Expand source code
def save_trajectory_to_csv(df, f_name='', outdir='', metadata='', extension='.csv', save_msg=True, add_metadata=True, suffix='_processed'): """ Saves pandas.DataFrame object as a CSV file and prepends any metadata provided in a file object. Args: df (pandas.DataFrame): dataframe to be saved to file. metadata (list, str): metdata to be stored to file, can be either a string of a list of strings. f_name (str): file path. outdir: output directory. save_msg (bool, default=True): prints message to confirm track has been saved if save_msg=True. add_metadata (bool, default=True): adds metadata comments to file as header if add_metadata=Trueread_file_path. suffix (str, default='_processed'): suffix to add to the end of file when saving. """ # save DataFrame to CSV f = pathlib.Path(f_name).stem + suffix + extension path = outdir / f df.to_csv(path, sep=',', encoding='utf-8', index=False) # optionally prepend metadata to CSV if add_metadata is True: prepend_comments_to_csv(path, metadata) # optionally confirm saving if save_msg: if suffix is '_processed': print(f'Processed {pathlib.Path(f_name).stem}{extension} and saved file to disk.\n') else: print(f'Merged {pathlib.Path(f_name).stem}{extension} and saved file to disk.') else: pass
Classes
class configReader
-
Expand source code
class configReader: __conf = None @staticmethod def config(): if configReader.__conf is None: # read only once, lazy configReader.__conf = configparser.ConfigParser() configReader.__conf.read(CONFIG_PATH) return configReader.__conf
Static methods
def config()
-
Expand source code
@staticmethod def config(): if configReader.__conf is None: # read only once, lazy configReader.__conf = configparser.ConfigParser() configReader.__conf.read(CONFIG_PATH) return configReader.__conf