Module libratools.lbt_outlier_detection
The libratools.lbt_outlier_detection module includes methods for detecting point and subsequence outliers.
Expand source code
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
The libratools.lbt_outlier_detection module includes methods for detecting
point and subsequence outliers.
"""
import numpy as np # 3rd party packages
import pandas as pd
from . import lbt_utils # local imports
from . import lbt_metrics
__author__ = "Vincent (Vince) J. Straub"
__email__ = "vincejstraub@gmail.com"
__status__ = "Testing"
def run_detection(trajectory, step_col='stepLength', frame_col='globalFRAME',
segment_col='chunk_segment', x='x', y='y', thresh=5, fps=5,
seconds=1, spike_method='interpolate', spike_seq_method='exclude',
corrupt_thresh=10):
"""
Implements outlier detection by detecting subsequence outliers, point
outliers and checking whether the movement track is corrupt, i.e. a
certain number of data points labelled as outliers exceed a used-defined
data corruption threshold.
Args:
trajectory (pandas.DataFrame): movement track.
step_col (str, default=stepLength): DataFrame column containg step
lengths.
frame_col (str, default='globalFRAME'): DataFrame column containing
frame count.
segment_col (str, default='chunk_segment'): DataFrame column
containing count of trajectory segment.
x (str, default='x'): DataFrame column containing x-coordinate.
y (str, default='y'): DataFrame column containing y-coordinate.
thresh (int, deafult=5): Maximum step length between frames.
fps (int, default=5): number of frames per second.
seconds (int, default=1): number of seconds for which to exclude
rows before commencing outlier detection.
spike_method (str, default='interpolate'): method for handling spikes,
can be either 'interpolated' or 'keep'.
spike_seq_method (str, default='drop'): method for handling spike
sequences, can be either 'interpolated' or 'keep'.
corrupt_thresh (int, default=10): the number of data points labelled
as outliers as a percentage of all data points that a track
is allowed to have, if this value is exceeded the track is
labelled as being likely corrupted.
Returns:
trajectory (pandas.DataFrame): movement track with outliers
excluded or interpolated.
stats (dict): excluded frame count and number of outliers detected
count.
"""
# detect point outliers or 'spikes' and handle accordingly
trajectory, spike_stats = detect_spikes(
trajectory, step_col, frame_col, segment_col, x, y, thresh,
fps, seconds, spike_method)
# detect subsequence outliers or 'prolonged spikes' and handle accordingly
trajectory, prolonged_spike_stats = detect_spike_seqs(
trajectory, step_col, frame_col, x, y, thresh,
fps, seconds, spike_seq_method)
trajectory, prolonged_spike_stats = detect_prolonged_spikes(
trajectory, step_col, frame_col, x, y, thresh,
fps, seconds, spike_seq_method)
# merge outlier stats
stats = {**prolonged_spike_stats, **spike_stats}
# remove spikes which are the lower or upper bound of a sequence
stats['spike_idxs'] = [i for i in stats[
'spike_idxs'] if i not in stats['prolonged_spike_idxs']]
# create new combined total of data points deemed to be spikes
stats['num_detected_spikes'] = len(stats['spike_idxs'])
stats['total_num_detected_spikes'] = prolonged_spike_stats[
'num_detected_prolonged_spikes'] + stats[
'num_detected_spikes'] + stats[
'num_expected_spikes']
# label outlying trajectories if total number of outliers exceeds threshold
num_data_points = min(trajectory[[x, y]].notnull().sum())
if (stats['total_num_detected_spikes'] / num_data_points) * 100 >= corrupt_thresh:
stats['corruption_likelihood'] = 'positive'
else:
stats['corruption_likelihood'] = 'negative'
return trajectory, stats
def detect_spikes(trajectory, step_col='stepLength', frame_col='globalFRAME',
segment_col='chunk_segment', x='x', y='y', thresh=5, fps=5,
seconds=1, method='interpolate'):
"""
Removes point outliers, or spikes, from a movement track by identifying
locations with extreme incoming and outgoing step lengths that surpass a
user-defined upper threshold. Detected spikes can in turn either kept or
linearly interpolated.
Args:
trajectory (pandas.DataFrame): movement track.
step_col (str, default=stepLength): DataFrame column containg step
lengths.
frame_col (str, default='globalFRAME'): DataFrame column containing
frame count.
segment_col (str, default='chunk_segment'): DataFrame column
containing count of trajectory segment.
x (str, default='x'): DataFrame column containing x-coordinate.
y (str, default='y'): DataFrame column containing y-coordinate.
thresh (int, deafult=5): Maximum step length between frames.
fps (int, default=5): number of frames per second.
seconds (int, default=1): number of seconds for which to exclude
rows before commencing outlier detection.
method (str, default='interpolate'): method for handling spikes,
can be either 'interpolated' or 'keep'.
Returns:
trajectory (pandas.DataFrame): movement track with outliers
excluded or interpolated.
stats (dict): excluded frame count and number of outliers detected
count.
"""
# instantiate dict to collect stats
stats = {}
# append step length between frames to column
check_step_length_col(trajectory)
# detect expected spikes that occur at beginning of each recording
idx_sec = seconds * fps
expected_spikes = []
for segment in trajectory[segment_col].unique():
df = trajectory.loc[(trajectory[segment_col]==segment)]
# detect spike if it exceeds threshold and occurs within first idx_sec
expected_segment_spikes = list(
df.loc[(df[step_col] >= thresh) &
(df.index <= (df.index[0] + idx_sec))][frame_col])
# append all expected spikes and data points that occur in first idx_sex frames
if len(expected_segment_spikes) > 0:
[expected_spikes.append(i) for i in list(
range(df.index[0], max(expected_segment_spikes)+1))]
# get index of expected spikes
expected_spike_idxs = trajectory.loc[trajectory[frame_col].isin(
expected_spikes)].index
# reset start frame by listwise excluding expected spikes and all preceding frames
trajectory = trajectory.drop(expected_spike_idxs)
# detect absoloute spikes with thresh-exceeding incoming step lengths
steps = trajectory[[frame_col, step_col]]
# detect spikes with thresh-exceeding incoming and outgoing step lengths
steps['nextStep'] = steps.stepLength.shift(-1)
spike_idxs = steps.loc[(steps[step_col]>=thresh) &
(steps['nextStep']>=thresh)][frame_col]
spike_idxs = list(spike_idxs)
# label spikes as NaN and then linearly linearly or keep
if method == 'interpolate':
trajectory.loc[trajectory[frame_col].isin(
spike_idxs), [x, y]] = np.nan
trajectory[[x, y]] = trajectory[[x, y]].interpolate()
elif method == 'keep':
pass
# reset globalFRAME
trajectory[frame_col] = pd.RangeIndex(start=0, stop=len(trajectory), step=1)
# update step length
trajectory[step_col] = lbt_metrics.get_step_len(
trajectory[x], trajectory[y])
# return outlier handling statistics
stats['num_expected_spikes'] = len(expected_spikes)
stats['num_detected_spikes'] = len(spike_idxs)
stats['spike_idxs'] = spike_idxs
return trajectory, stats
def detect_spike_seqs(trajectory, step_col='stepLength', frame_col='globalFRAME',
x='x', y='y', thresh=5, fps=5, seconds=1,
spike_seq_method='exclude'):
"""
Drops consecutive sequences of point outlers from a movement track or
excludes them (marks them as 'nan') by identifying the bounds and
removing positions between them. Detected spikes can in turn either
dropped or linearly interpolated.
Args:
trajectory (pandas.DataFrame): movement track.
step_col (str, default=stepLength): DataFrame column containg step
lengths.
frame_col (str, default='globalFRAME'): DataFrame column containing
frame count.
x (str, default='x'): DataFrame column containing x-coordinate.
y (str, default='y'): DataFrame column containing y-coordinate.
thresh (int, deafult=5): Maximum step length between frames.
fps (int, default=5): number of frames per second.
seconds (int, default=1): number of seconds for which to exclude
rows before commencing outlier detection.
spike_seq_method (str, default='interpolate'): method for handling
spikes, can be either 'interpolated' or 'exclude', which labels
data points as NaN.
Returns:
trajectory (pandas.DataFrame): movement track with outliers
excluded or interpolated.
stats (dict): excluded frame count and number of outliers detected
count.
"""
# instantiate dict to collect stats
stats = {}
# append step length between frames to column
check_step_length_col(trajectory)
# detect spikes with thresh-exceeding incoming and outgoing step lengths
spikes = trajectory.loc[trajectory[step_col] >= thresh][[frame_col, step_col]]
# find first of next points with speed out > thresh
spikes['nextSpikeFRAME'] = spikes.globalFRAME.shift(-1)
# store prolonged spikes as those where nextSpikeFRAME += 1 and handle accordingly
prolonged_spikes = []
for spike in spikes.globalFRAME:
if float(spikes.loc[spikes[frame_col]==spike].nextSpikeFRAME) == (spike+1):
prolonged_spikes.append([spike])
spike +=1
# store prolonged spike indicies
prolonged_spikes_idxs = list(set(pd.core.common.flatten(prolonged_spikes)))
# handle spike sequences accordingly
for spike in prolonged_spikes_idxs:
# handle the exception of NaN values
if spikes.loc[spikes[frame_col]==spike].nextSpikeFRAME.isnull().sum() == False:
# handle spikes
if spike_seq_method == 'interpolate':
# label spikes as NaN and then interpolate linearly
trajectory.loc[trajectory[frame_col].isin(
prolonged_spikes_idxs), [x, y]] = np.nan
trajectory[[x, y]] = trajectory[[x, y]].interpolate()
elif spike_seq_method == 'exclude':
# drop spikes
trajectory.loc[trajectory[frame_col].isin(
prolonged_spikes_idxs), [x, y, step_col]] = np.nan
# reset globalFRAME
trajectory[frame_col] = pd.RangeIndex(start=0, stop=len(trajectory), step=1)
# return outlier handling statistics
stats['num_detected_prolonged_spikes'] = len(prolonged_spikes_idxs)
stats['prolonged_spike_idxs'] = list(prolonged_spikes_idxs)
return trajectory, stats
def detect_prolonged_spikes(trajectory, step_col='stepLength', frame_col='globalFRAME',
x='x', y='y', thresh=5, fps=5, seconds=1,
spike_seq_method='exclude'):
"""
Drops subsequence outliers, or prolonged spikes from a movement track or
excludes them (marks them as 'nan') by identifying the bounds and
removing positions between them. Detected spikes can in turn either
dropped or linearly interpolated.
Args:
trajectory (pandas.DataFrame): movement track.
step_col (str, default=stepLength): DataFrame column containg step
lengths.
frame_col (str, default='globalFRAME'): DataFrame column containing
frame count.
x (str, default='x'): DataFrame column containing x-coordinate.
y (str, default='y'): DataFrame column containing y-coordinate.
thresh (int, deafult=5): Maximum step length between frames.
fps (int, default=5): number of frames per second.
seconds (int, default=1): number of seconds for which to exclude
rows before commencing outlier detection.
spike_seq_method (str, default='interpolate'): method for handling spikes,
can be either 'interpolated' or 'exclude', which labels data
points as NaN.
Returns:
trajectory (pandas.DataFrame): movement track with outliers
excluded or interpolated.
stats (dict): excluded frame count and number of outliers detected
count.
"""
# instantiate dict to collect stats
stats = {}
# append step length between frames to column
check_step_length_col(trajectory)
# detect spikes with thresh-exceeding incoming and outgoing step lengths
spikes = trajectory.loc[trajectory[step_col] >= thresh][[frame_col, step_col]]
# find first of next points with speed out > thresh
spikes['nextSpikeFRAME'] = spikes.globalFRAME.shift(-1)
# store initial spikes as those where nextSpikeFRAME =+ 1 and handle spike sequences accordingly
spikes_idxs = []
for spike in spikes.globalFRAME:
next_spike = spikes.loc[spikes[frame_col]==spike].nextSpikeFRAME.sum()
if next_spike > (spike+1):
spikes_idxs.append([int(spike), int(next_spike)])
spike +=1
# store spike indicies
spikes_idxs = set(pd.core.common.flatten(spikes_idxs))
# detect prolonged spikes and handle accordingly
prolonged_spikes_idxs = []
spike_edges = []
for spike in spikes_idxs:
# handle the exception of NaN values
next_spike = spikes.loc[spikes[frame_col]==spike].nextSpikeFRAME.sum()
if spike not in spike_edges and pd.isnull(next_spike) == False:
# define prolonged spike
prolonged_spike_idx = list(range(int(spike), int(next_spike)))
prolonged_spikes_idxs.append(prolonged_spike_idx)
spike_edges.append(spike)
spike_edges.append(next_spike)
# handle prolonged spike
if spike_seq_method == 'interpolate':
# label as NaN and then interpolate linearly
trajectory.loc[trajectory[frame_col].isin(
prolonged_spike_idx), [x, y]] = np.nan
trajectory[[x, y]] = trajectory[[x, y]].interpolate()
elif spike_seq_method == 'exclude':
# drop spikes
trajectory.loc[trajectory[frame_col].isin(
prolonged_spike_idx), [x, y, step_col]] = np.nan
else:
pass
# reset globalFRAME
trajectory[frame_col] = pd.RangeIndex(start=0, stop=len(trajectory), step=1)
# return outlier handling statistics
stats['num_detected_prolonged_spikes'] = len(
set(pd.core.common.flatten(prolonged_spikes_idxs)))
stats['prolonged_spike_idxs'] = list(prolonged_spikes_idxs)
return trajectory, stats
def exclude_step_lengths(trajectory,step_col='stepLength', frame_col='globalFRAME',
x='x', y='y', thresh=5, method='exclude'):
"""
Listwise drops false step length metrics from a movement or excludes them
(marks them as NaN).
Args:
trajectory (pandas.DataFrame): movement track.
step_col (str, default=stepLength): DataFrame column containg step
lengths.
frame_col (str, default='globalFRAME'): DataFrame column containing
frame count.
x (str, default='x'): DataFrame column containing x-coordinate.
y (str, default='y'): DataFrame column containing y-coordinate.
thresh (int, default=5): Maximum step length between frames.
method (str, default='nan'): decides whether to drop or label
thresh-exceeding spikes as 'exclude'.
Returns:
trajectory (pandas.DataFrame): movement track with outliers
excluded or interpolated.
stats (dict): excluded frame count and number of outliers detected
count.
"""
# instantiate dict to collect stats
stats = {}
# append step length between frames to column
check_step_length_col(trajectory)
# detect absoloute spikes with thresh-exceeding incoming step lengths
steps = trajectory[[frame_col, step_col]]
spike_idxs = steps.loc[steps[step_col]>=(thresh)][frame_col]
# determine whether they precede an excluded data point
excess_spike_idxs = []
for i in spike_idxs:
if np.isnan(float(steps[step_col].loc[steps[frame_col]==(i-1)])):
excess_spike_idxs.append(i)
# drop spikes based on index values or label as nan
if method == 'exclude':
trajectory.loc[trajectory[frame_col].isin(
excess_spike_idxs), [step_col]] = np.nan
elif method == 'drop':
trajectory = trajectory.drop(list(excess_spike_idxs))
# reset globalFRAME
trajectory[frame_col] = pd.RangeIndex(start=0, stop=len(trajectory), step=1)
return trajectory, stats
def exclude_interpolated_points(trajectory, step_col='stepLength',
frame_col='globalFRAME',x='x', y='y',
thresh=5, method='exclude'):
"""
Listwise excludes point outliers, or spikes, from a movement that exceed a
predefined tep length.
Args:
trajectory (pandas.DataFrame): movement track.
step_col (str, default=stepLength): DataFrame column containg step
lengths.
frame_col (str, default='globalFRAME'): DataFrame column containing
frame count.
x (str, default='x'): DataFrame column containing x-coordinate.
y (str, default='y'): DataFrame column containing y-coordinate.
thresh (int, default=5): Maximum step length between frames.
method (str, default='nan'): decides whether to drop or label
thresh-exceeding spikes as 'exclude'.
Returns:
trajectory (pandas.DataFrame): movement track with outliers
excluded or interpolated.
stats (dict): excluded frame count and number of outliers detected
count.
"""
# instantiate dict to collect stats
stats = {}
# append step length between frames to column
check_step_length_col(trajectory)
# detect absoloute spikes with thresh-exceeding incoming step lengths
steps = trajectory[[frame_col, step_col]]
excess_spike_idxs = steps.loc[steps[step_col]>=(thresh)][frame_col]
# drop spikes based on index values or label as nan
if method == 'exclude':
trajectory.loc[trajectory[frame_col].isin(
excess_spike_idxs), [x, y, step_col]] = np.nan
elif method == 'drop':
trajectory = trajectory.drop(list(excess_spike_idxs))
# reset globalFRAME
trajectory[frame_col] = pd.RangeIndex(start=0, stop=len(trajectory), step=1)
# return outlier handling statistics
stats['num_excess_spikes'] = len(excess_spike_idxs)
stats['excess_spike_idxs'] = excess_spike_idxs
return trajectory, stats
def check_step_length_col(trajectory, step_col='stepLength', x='x', y='y'):
"""
Add column to trajectory with step length between rows of x
and y-coordinates.
Args:
trajectory (pandas.DataFrame): movement track.
step_col (str, default=stepLength): DataFrame column containg step
lengths.
x (str, default='x'): DataFrame column containing x-coordinate.
y (str, default='y'): DataFrame column containing y-coordinate.
Returns:
trajectory.
"""
if step_col not in trajectory.columns:
trajectory[step_col] = lbt_metrics.get_step_len(
trajectory[x], trajectory[y])
return trajectory
Functions
def check_step_length_col(trajectory, step_col='stepLength', x='x', y='y')
-
Add column to trajectory with step length between rows of x and y-coordinates.
Args
trajectory
:pandas.DataFrame
- movement track.
step_col
:str
, default=stepLength
- DataFrame column containg step lengths.
x (str, default='x'): DataFrame column containing x-coordinate. y (str, default='y'): DataFrame column containing y-coordinate.
Returns
trajectory.
Expand source code
def check_step_length_col(trajectory, step_col='stepLength', x='x', y='y'): """ Add column to trajectory with step length between rows of x and y-coordinates. Args: trajectory (pandas.DataFrame): movement track. step_col (str, default=stepLength): DataFrame column containg step lengths. x (str, default='x'): DataFrame column containing x-coordinate. y (str, default='y'): DataFrame column containing y-coordinate. Returns: trajectory. """ if step_col not in trajectory.columns: trajectory[step_col] = lbt_metrics.get_step_len( trajectory[x], trajectory[y]) return trajectory
def detect_prolonged_spikes(trajectory, step_col='stepLength', frame_col='globalFRAME', x='x', y='y', thresh=5, fps=5, seconds=1, spike_seq_method='exclude')
-
Drops subsequence outliers, or prolonged spikes from a movement track or excludes them (marks them as 'nan') by identifying the bounds and removing positions between them. Detected spikes can in turn either dropped or linearly interpolated.
Args
trajectory
:pandas.DataFrame
- movement track.
step_col
:str
, default=stepLength
- DataFrame column containg step lengths.
- frame_col (str, default='globalFRAME'): DataFrame column containing
- frame count.
- x (str, default='x'): DataFrame column containing x-coordinate.
- y (str, default='y'): DataFrame column containing y-coordinate.
thresh
:int, deafult=5
- Maximum step length between frames.
fps
:int
, default=5
- number of frames per second.
seconds
:int
, default=1
- number of seconds for which to exclude rows before commencing outlier detection.
spike_seq_method (str, default='interpolate'): method for handling spikes, can be either 'interpolated' or 'exclude', which labels data points as NaN.
Returns
trajectory (pandas.DataFrame): movement track with outliers excluded or interpolated. stats (dict): excluded frame count and number of outliers detected count.
Expand source code
def detect_prolonged_spikes(trajectory, step_col='stepLength', frame_col='globalFRAME', x='x', y='y', thresh=5, fps=5, seconds=1, spike_seq_method='exclude'): """ Drops subsequence outliers, or prolonged spikes from a movement track or excludes them (marks them as 'nan') by identifying the bounds and removing positions between them. Detected spikes can in turn either dropped or linearly interpolated. Args: trajectory (pandas.DataFrame): movement track. step_col (str, default=stepLength): DataFrame column containg step lengths. frame_col (str, default='globalFRAME'): DataFrame column containing frame count. x (str, default='x'): DataFrame column containing x-coordinate. y (str, default='y'): DataFrame column containing y-coordinate. thresh (int, deafult=5): Maximum step length between frames. fps (int, default=5): number of frames per second. seconds (int, default=1): number of seconds for which to exclude rows before commencing outlier detection. spike_seq_method (str, default='interpolate'): method for handling spikes, can be either 'interpolated' or 'exclude', which labels data points as NaN. Returns: trajectory (pandas.DataFrame): movement track with outliers excluded or interpolated. stats (dict): excluded frame count and number of outliers detected count. """ # instantiate dict to collect stats stats = {} # append step length between frames to column check_step_length_col(trajectory) # detect spikes with thresh-exceeding incoming and outgoing step lengths spikes = trajectory.loc[trajectory[step_col] >= thresh][[frame_col, step_col]] # find first of next points with speed out > thresh spikes['nextSpikeFRAME'] = spikes.globalFRAME.shift(-1) # store initial spikes as those where nextSpikeFRAME =+ 1 and handle spike sequences accordingly spikes_idxs = [] for spike in spikes.globalFRAME: next_spike = spikes.loc[spikes[frame_col]==spike].nextSpikeFRAME.sum() if next_spike > (spike+1): spikes_idxs.append([int(spike), int(next_spike)]) spike +=1 # store spike indicies spikes_idxs = set(pd.core.common.flatten(spikes_idxs)) # detect prolonged spikes and handle accordingly prolonged_spikes_idxs = [] spike_edges = [] for spike in spikes_idxs: # handle the exception of NaN values next_spike = spikes.loc[spikes[frame_col]==spike].nextSpikeFRAME.sum() if spike not in spike_edges and pd.isnull(next_spike) == False: # define prolonged spike prolonged_spike_idx = list(range(int(spike), int(next_spike))) prolonged_spikes_idxs.append(prolonged_spike_idx) spike_edges.append(spike) spike_edges.append(next_spike) # handle prolonged spike if spike_seq_method == 'interpolate': # label as NaN and then interpolate linearly trajectory.loc[trajectory[frame_col].isin( prolonged_spike_idx), [x, y]] = np.nan trajectory[[x, y]] = trajectory[[x, y]].interpolate() elif spike_seq_method == 'exclude': # drop spikes trajectory.loc[trajectory[frame_col].isin( prolonged_spike_idx), [x, y, step_col]] = np.nan else: pass # reset globalFRAME trajectory[frame_col] = pd.RangeIndex(start=0, stop=len(trajectory), step=1) # return outlier handling statistics stats['num_detected_prolonged_spikes'] = len( set(pd.core.common.flatten(prolonged_spikes_idxs))) stats['prolonged_spike_idxs'] = list(prolonged_spikes_idxs) return trajectory, stats
def detect_spike_seqs(trajectory, step_col='stepLength', frame_col='globalFRAME', x='x', y='y', thresh=5, fps=5, seconds=1, spike_seq_method='exclude')
-
Drops consecutive sequences of point outlers from a movement track or excludes them (marks them as 'nan') by identifying the bounds and removing positions between them. Detected spikes can in turn either dropped or linearly interpolated.
Args
trajectory
:pandas.DataFrame
- movement track.
step_col
:str
, default=stepLength
- DataFrame column containg step lengths.
- frame_col (str, default='globalFRAME'): DataFrame column containing
- frame count.
- x (str, default='x'): DataFrame column containing x-coordinate.
- y (str, default='y'): DataFrame column containing y-coordinate.
thresh
:int, deafult=5
- Maximum step length between frames.
fps
:int
, default=5
- number of frames per second.
seconds
:int
, default=1
- number of seconds for which to exclude rows before commencing outlier detection.
spike_seq_method (str, default='interpolate'): method for handling spikes, can be either 'interpolated' or 'exclude', which labels data points as NaN.
Returns
trajectory (pandas.DataFrame): movement track with outliers excluded or interpolated. stats (dict): excluded frame count and number of outliers detected count.
Expand source code
def detect_spike_seqs(trajectory, step_col='stepLength', frame_col='globalFRAME', x='x', y='y', thresh=5, fps=5, seconds=1, spike_seq_method='exclude'): """ Drops consecutive sequences of point outlers from a movement track or excludes them (marks them as 'nan') by identifying the bounds and removing positions between them. Detected spikes can in turn either dropped or linearly interpolated. Args: trajectory (pandas.DataFrame): movement track. step_col (str, default=stepLength): DataFrame column containg step lengths. frame_col (str, default='globalFRAME'): DataFrame column containing frame count. x (str, default='x'): DataFrame column containing x-coordinate. y (str, default='y'): DataFrame column containing y-coordinate. thresh (int, deafult=5): Maximum step length between frames. fps (int, default=5): number of frames per second. seconds (int, default=1): number of seconds for which to exclude rows before commencing outlier detection. spike_seq_method (str, default='interpolate'): method for handling spikes, can be either 'interpolated' or 'exclude', which labels data points as NaN. Returns: trajectory (pandas.DataFrame): movement track with outliers excluded or interpolated. stats (dict): excluded frame count and number of outliers detected count. """ # instantiate dict to collect stats stats = {} # append step length between frames to column check_step_length_col(trajectory) # detect spikes with thresh-exceeding incoming and outgoing step lengths spikes = trajectory.loc[trajectory[step_col] >= thresh][[frame_col, step_col]] # find first of next points with speed out > thresh spikes['nextSpikeFRAME'] = spikes.globalFRAME.shift(-1) # store prolonged spikes as those where nextSpikeFRAME += 1 and handle accordingly prolonged_spikes = [] for spike in spikes.globalFRAME: if float(spikes.loc[spikes[frame_col]==spike].nextSpikeFRAME) == (spike+1): prolonged_spikes.append([spike]) spike +=1 # store prolonged spike indicies prolonged_spikes_idxs = list(set(pd.core.common.flatten(prolonged_spikes))) # handle spike sequences accordingly for spike in prolonged_spikes_idxs: # handle the exception of NaN values if spikes.loc[spikes[frame_col]==spike].nextSpikeFRAME.isnull().sum() == False: # handle spikes if spike_seq_method == 'interpolate': # label spikes as NaN and then interpolate linearly trajectory.loc[trajectory[frame_col].isin( prolonged_spikes_idxs), [x, y]] = np.nan trajectory[[x, y]] = trajectory[[x, y]].interpolate() elif spike_seq_method == 'exclude': # drop spikes trajectory.loc[trajectory[frame_col].isin( prolonged_spikes_idxs), [x, y, step_col]] = np.nan # reset globalFRAME trajectory[frame_col] = pd.RangeIndex(start=0, stop=len(trajectory), step=1) # return outlier handling statistics stats['num_detected_prolonged_spikes'] = len(prolonged_spikes_idxs) stats['prolonged_spike_idxs'] = list(prolonged_spikes_idxs) return trajectory, stats
def detect_spikes(trajectory, step_col='stepLength', frame_col='globalFRAME', segment_col='chunk_segment', x='x', y='y', thresh=5, fps=5, seconds=1, method='interpolate')
-
Removes point outliers, or spikes, from a movement track by identifying locations with extreme incoming and outgoing step lengths that surpass a user-defined upper threshold. Detected spikes can in turn either kept or linearly interpolated.
Args
trajectory
:pandas.DataFrame
- movement track.
step_col
:str
, default=stepLength
- DataFrame column containg step lengths.
- frame_col (str, default='globalFRAME'): DataFrame column containing
- frame count.
- segment_col (str, default='chunk_segment'): DataFrame column
- containing count of trajectory segment.
- x (str, default='x'): DataFrame column containing x-coordinate.
- y (str, default='y'): DataFrame column containing y-coordinate.
thresh
:int, deafult=5
- Maximum step length between frames.
fps
:int
, default=5
- number of frames per second.
seconds
:int
, default=1
- number of seconds for which to exclude rows before commencing outlier detection.
method (str, default='interpolate'): method for handling spikes, can be either 'interpolated' or 'keep'.
Returns
trajectory (pandas.DataFrame): movement track with outliers excluded or interpolated. stats (dict): excluded frame count and number of outliers detected count.
Expand source code
def detect_spikes(trajectory, step_col='stepLength', frame_col='globalFRAME', segment_col='chunk_segment', x='x', y='y', thresh=5, fps=5, seconds=1, method='interpolate'): """ Removes point outliers, or spikes, from a movement track by identifying locations with extreme incoming and outgoing step lengths that surpass a user-defined upper threshold. Detected spikes can in turn either kept or linearly interpolated. Args: trajectory (pandas.DataFrame): movement track. step_col (str, default=stepLength): DataFrame column containg step lengths. frame_col (str, default='globalFRAME'): DataFrame column containing frame count. segment_col (str, default='chunk_segment'): DataFrame column containing count of trajectory segment. x (str, default='x'): DataFrame column containing x-coordinate. y (str, default='y'): DataFrame column containing y-coordinate. thresh (int, deafult=5): Maximum step length between frames. fps (int, default=5): number of frames per second. seconds (int, default=1): number of seconds for which to exclude rows before commencing outlier detection. method (str, default='interpolate'): method for handling spikes, can be either 'interpolated' or 'keep'. Returns: trajectory (pandas.DataFrame): movement track with outliers excluded or interpolated. stats (dict): excluded frame count and number of outliers detected count. """ # instantiate dict to collect stats stats = {} # append step length between frames to column check_step_length_col(trajectory) # detect expected spikes that occur at beginning of each recording idx_sec = seconds * fps expected_spikes = [] for segment in trajectory[segment_col].unique(): df = trajectory.loc[(trajectory[segment_col]==segment)] # detect spike if it exceeds threshold and occurs within first idx_sec expected_segment_spikes = list( df.loc[(df[step_col] >= thresh) & (df.index <= (df.index[0] + idx_sec))][frame_col]) # append all expected spikes and data points that occur in first idx_sex frames if len(expected_segment_spikes) > 0: [expected_spikes.append(i) for i in list( range(df.index[0], max(expected_segment_spikes)+1))] # get index of expected spikes expected_spike_idxs = trajectory.loc[trajectory[frame_col].isin( expected_spikes)].index # reset start frame by listwise excluding expected spikes and all preceding frames trajectory = trajectory.drop(expected_spike_idxs) # detect absoloute spikes with thresh-exceeding incoming step lengths steps = trajectory[[frame_col, step_col]] # detect spikes with thresh-exceeding incoming and outgoing step lengths steps['nextStep'] = steps.stepLength.shift(-1) spike_idxs = steps.loc[(steps[step_col]>=thresh) & (steps['nextStep']>=thresh)][frame_col] spike_idxs = list(spike_idxs) # label spikes as NaN and then linearly linearly or keep if method == 'interpolate': trajectory.loc[trajectory[frame_col].isin( spike_idxs), [x, y]] = np.nan trajectory[[x, y]] = trajectory[[x, y]].interpolate() elif method == 'keep': pass # reset globalFRAME trajectory[frame_col] = pd.RangeIndex(start=0, stop=len(trajectory), step=1) # update step length trajectory[step_col] = lbt_metrics.get_step_len( trajectory[x], trajectory[y]) # return outlier handling statistics stats['num_expected_spikes'] = len(expected_spikes) stats['num_detected_spikes'] = len(spike_idxs) stats['spike_idxs'] = spike_idxs return trajectory, stats
def exclude_interpolated_points(trajectory, step_col='stepLength', frame_col='globalFRAME', x='x', y='y', thresh=5, method='exclude')
-
Listwise excludes point outliers, or spikes, from a movement that exceed a predefined tep length.
Args
trajectory
:pandas.DataFrame
- movement track.
step_col
:str
, default=stepLength
- DataFrame column containg step lengths.
- frame_col (str, default='globalFRAME'): DataFrame column containing
- frame count.
- x (str, default='x'): DataFrame column containing x-coordinate.
- y (str, default='y'): DataFrame column containing y-coordinate.
thresh
:int
, default=5
- Maximum step length between frames.
method (str, default='nan'): decides whether to drop or label thresh-exceeding spikes as 'exclude'.
Returns
trajectory (pandas.DataFrame): movement track with outliers excluded or interpolated. stats (dict): excluded frame count and number of outliers detected count.
Expand source code
def exclude_interpolated_points(trajectory, step_col='stepLength', frame_col='globalFRAME',x='x', y='y', thresh=5, method='exclude'): """ Listwise excludes point outliers, or spikes, from a movement that exceed a predefined tep length. Args: trajectory (pandas.DataFrame): movement track. step_col (str, default=stepLength): DataFrame column containg step lengths. frame_col (str, default='globalFRAME'): DataFrame column containing frame count. x (str, default='x'): DataFrame column containing x-coordinate. y (str, default='y'): DataFrame column containing y-coordinate. thresh (int, default=5): Maximum step length between frames. method (str, default='nan'): decides whether to drop or label thresh-exceeding spikes as 'exclude'. Returns: trajectory (pandas.DataFrame): movement track with outliers excluded or interpolated. stats (dict): excluded frame count and number of outliers detected count. """ # instantiate dict to collect stats stats = {} # append step length between frames to column check_step_length_col(trajectory) # detect absoloute spikes with thresh-exceeding incoming step lengths steps = trajectory[[frame_col, step_col]] excess_spike_idxs = steps.loc[steps[step_col]>=(thresh)][frame_col] # drop spikes based on index values or label as nan if method == 'exclude': trajectory.loc[trajectory[frame_col].isin( excess_spike_idxs), [x, y, step_col]] = np.nan elif method == 'drop': trajectory = trajectory.drop(list(excess_spike_idxs)) # reset globalFRAME trajectory[frame_col] = pd.RangeIndex(start=0, stop=len(trajectory), step=1) # return outlier handling statistics stats['num_excess_spikes'] = len(excess_spike_idxs) stats['excess_spike_idxs'] = excess_spike_idxs return trajectory, stats
def exclude_step_lengths(trajectory, step_col='stepLength', frame_col='globalFRAME', x='x', y='y', thresh=5, method='exclude')
-
Listwise drops false step length metrics from a movement or excludes them (marks them as NaN).
Args
trajectory
:pandas.DataFrame
- movement track.
step_col
:str
, default=stepLength
- DataFrame column containg step lengths.
- frame_col (str, default='globalFRAME'): DataFrame column containing
- frame count.
- x (str, default='x'): DataFrame column containing x-coordinate.
- y (str, default='y'): DataFrame column containing y-coordinate.
thresh
:int
, default=5
- Maximum step length between frames.
method (str, default='nan'): decides whether to drop or label thresh-exceeding spikes as 'exclude'.
Returns
trajectory (pandas.DataFrame): movement track with outliers excluded or interpolated. stats (dict): excluded frame count and number of outliers detected count.
Expand source code
def exclude_step_lengths(trajectory,step_col='stepLength', frame_col='globalFRAME', x='x', y='y', thresh=5, method='exclude'): """ Listwise drops false step length metrics from a movement or excludes them (marks them as NaN). Args: trajectory (pandas.DataFrame): movement track. step_col (str, default=stepLength): DataFrame column containg step lengths. frame_col (str, default='globalFRAME'): DataFrame column containing frame count. x (str, default='x'): DataFrame column containing x-coordinate. y (str, default='y'): DataFrame column containing y-coordinate. thresh (int, default=5): Maximum step length between frames. method (str, default='nan'): decides whether to drop or label thresh-exceeding spikes as 'exclude'. Returns: trajectory (pandas.DataFrame): movement track with outliers excluded or interpolated. stats (dict): excluded frame count and number of outliers detected count. """ # instantiate dict to collect stats stats = {} # append step length between frames to column check_step_length_col(trajectory) # detect absoloute spikes with thresh-exceeding incoming step lengths steps = trajectory[[frame_col, step_col]] spike_idxs = steps.loc[steps[step_col]>=(thresh)][frame_col] # determine whether they precede an excluded data point excess_spike_idxs = [] for i in spike_idxs: if np.isnan(float(steps[step_col].loc[steps[frame_col]==(i-1)])): excess_spike_idxs.append(i) # drop spikes based on index values or label as nan if method == 'exclude': trajectory.loc[trajectory[frame_col].isin( excess_spike_idxs), [step_col]] = np.nan elif method == 'drop': trajectory = trajectory.drop(list(excess_spike_idxs)) # reset globalFRAME trajectory[frame_col] = pd.RangeIndex(start=0, stop=len(trajectory), step=1) return trajectory, stats
def run_detection(trajectory, step_col='stepLength', frame_col='globalFRAME', segment_col='chunk_segment', x='x', y='y', thresh=5, fps=5, seconds=1, spike_method='interpolate', spike_seq_method='exclude', corrupt_thresh=10)
-
Implements outlier detection by detecting subsequence outliers, point outliers and checking whether the movement track is corrupt, i.e. a certain number of data points labelled as outliers exceed a used-defined data corruption threshold.
Args
trajectory
:pandas.DataFrame
- movement track.
step_col
:str
, default=stepLength
- DataFrame column containg step lengths.
- frame_col (str, default='globalFRAME'): DataFrame column containing
- frame count.
- segment_col (str, default='chunk_segment'): DataFrame column
- containing count of trajectory segment.
- x (str, default='x'): DataFrame column containing x-coordinate.
- y (str, default='y'): DataFrame column containing y-coordinate.
thresh
:int, deafult=5
- Maximum step length between frames.
fps
:int
, default=5
- number of frames per second.
seconds
:int
, default=1
- number of seconds for which to exclude rows before commencing outlier detection.
- spike_method (str, default='interpolate'): method for handling spikes,
- can be either 'interpolated' or 'keep'.
- spike_seq_method (str, default='drop'): method for handling spike
- sequences, can be either 'interpolated' or 'keep'.
corrupt_thresh
:int
, default=10
- the number of data points labelled as outliers as a percentage of all data points that a track is allowed to have, if this value is exceeded the track is labelled as being likely corrupted.
Returns
trajectory (pandas.DataFrame): movement track with outliers excluded or interpolated. stats (dict): excluded frame count and number of outliers detected count.
Expand source code
def run_detection(trajectory, step_col='stepLength', frame_col='globalFRAME', segment_col='chunk_segment', x='x', y='y', thresh=5, fps=5, seconds=1, spike_method='interpolate', spike_seq_method='exclude', corrupt_thresh=10): """ Implements outlier detection by detecting subsequence outliers, point outliers and checking whether the movement track is corrupt, i.e. a certain number of data points labelled as outliers exceed a used-defined data corruption threshold. Args: trajectory (pandas.DataFrame): movement track. step_col (str, default=stepLength): DataFrame column containg step lengths. frame_col (str, default='globalFRAME'): DataFrame column containing frame count. segment_col (str, default='chunk_segment'): DataFrame column containing count of trajectory segment. x (str, default='x'): DataFrame column containing x-coordinate. y (str, default='y'): DataFrame column containing y-coordinate. thresh (int, deafult=5): Maximum step length between frames. fps (int, default=5): number of frames per second. seconds (int, default=1): number of seconds for which to exclude rows before commencing outlier detection. spike_method (str, default='interpolate'): method for handling spikes, can be either 'interpolated' or 'keep'. spike_seq_method (str, default='drop'): method for handling spike sequences, can be either 'interpolated' or 'keep'. corrupt_thresh (int, default=10): the number of data points labelled as outliers as a percentage of all data points that a track is allowed to have, if this value is exceeded the track is labelled as being likely corrupted. Returns: trajectory (pandas.DataFrame): movement track with outliers excluded or interpolated. stats (dict): excluded frame count and number of outliers detected count. """ # detect point outliers or 'spikes' and handle accordingly trajectory, spike_stats = detect_spikes( trajectory, step_col, frame_col, segment_col, x, y, thresh, fps, seconds, spike_method) # detect subsequence outliers or 'prolonged spikes' and handle accordingly trajectory, prolonged_spike_stats = detect_spike_seqs( trajectory, step_col, frame_col, x, y, thresh, fps, seconds, spike_seq_method) trajectory, prolonged_spike_stats = detect_prolonged_spikes( trajectory, step_col, frame_col, x, y, thresh, fps, seconds, spike_seq_method) # merge outlier stats stats = {**prolonged_spike_stats, **spike_stats} # remove spikes which are the lower or upper bound of a sequence stats['spike_idxs'] = [i for i in stats[ 'spike_idxs'] if i not in stats['prolonged_spike_idxs']] # create new combined total of data points deemed to be spikes stats['num_detected_spikes'] = len(stats['spike_idxs']) stats['total_num_detected_spikes'] = prolonged_spike_stats[ 'num_detected_prolonged_spikes'] + stats[ 'num_detected_spikes'] + stats[ 'num_expected_spikes'] # label outlying trajectories if total number of outliers exceeds threshold num_data_points = min(trajectory[[x, y]].notnull().sum()) if (stats['total_num_detected_spikes'] / num_data_points) * 100 >= corrupt_thresh: stats['corruption_likelihood'] = 'positive' else: stats['corruption_likelihood'] = 'negative' return trajectory, stats