# This file is part of idtracker.ai a multiple animals tracking system
# described in [1].
# Copyright (C) 2017- Francisco Romero Ferrero, Mattia G. Bergomi,
# Francisco J.H. Heras, Robert Hinz, Gonzalo G. de Polavieja and the
# Champalimaud Foundation.
#
# idtracker.ai is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details. In addition, we require
# derivatives or applications to acknowledge the authors by citing [1].
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# For more information please send an email (idtrackerai@gmail.com) or
# use the tools available at https://gitlab.com/polavieja_lab/idtrackerai.git.
#
# [1] Romero-Ferrero, F., Bergomi, M.G., Hinz, R.C., Heras, F.J.H.,
# de Polavieja, G.G., Nature Methods, 2019.
# idtracker.ai: tracking all individuals in small or large collectives of
# unmarked animals.
# (F.R.-F. and M.G.B. contributed equally to this work.
# Correspondence should be addressed to G.G.d.P:
# gonzalo.polavieja@neuro.fchampalimaud.org)
import logging
import os
import cv2
import h5py
import numpy as np
from sklearn.decomposition import PCA
logger = logging.getLogger("__main__.blob")
[docs]class Blob(object):
"""Represents a segmented blob (collection of pixels) from a given frame.
A blob can represent a single animal or multiple animals during an
occlusion or crossing.
Parameters
----------
centroid : tuple
Tuple (float, float) with the pixels coordinates of the blob center
of mass.
contour : numpy array
Array with the points that define the contour of the blob. The
array is of the form [[[x1,y1]],[[x2,y2]],...,[[xn,yn]]].
area : int
Number of pixels that conform the blob.
bounding_box_in_frame_coordinates : list
Coordinates of the bounding box that encloses the segmented blob
[(x, y), (x + bounding_box_width, y + bounding_box_height)]. Note
that this bouding box is expanded some pixels with respect to the
original bounding box obtained with OpenCV.
bounding_box_image : numpy array or None, optional
Bonding box image that encloses the blob, by default None.
bounding_box_images_path : str, optional
Path to the file where the bounding box images are stored, by
default None.
estimated_body_length : float, optional
Body length of the animal estimated from the diagonal of the
original bounding box, by default None.
pixels : list, optional
List of pixels that define the blob, by default None.
number_of_animals : int, optional
Number of animals in the video as defined by the user,
by default None.
frame_number : int, optional
Frame number in the video from which the blob was segmented,
by default None.
frame_number_in_video_path : int, optional
Frame number in the video path from which the blob was segmented,
by default None.
in_frame_index : int, optional
Index of the blob in the frame where it was segmented,
by default None. This index comes from OpenCV and is defined by the
hierarchy of the countours found in the frame.
pixels_path : str, optional
Path to the file were the pixels of the blob are stored,
by default None.
video_height : int, optional
Height of the video considering the resolution reduction factor,
by default None.
video_width : [type], optional
Width of the video considering the resolution reduction factor,
by default None.
video_path : str, optional
Path to the video file from which the blob was segmented,
by default None.
pixels_are_from_eroded_blob : bool, optional
Flag to indicate if the pixels of the blobs come from from an
eroded blob, by default False.
resolution_reduction : float, optional
Resolution reductio factor as defined by the user, by default 1.0.
"""
def __init__(
self,
centroid,
contour,
area,
bounding_box_in_frame_coordinates,
bounding_box_image=None,
bounding_box_images_path=None,
estimated_body_length=None,
pixels=None,
number_of_animals=None,
frame_number=None,
frame_number_in_video_path=None,
in_frame_index=None,
pixels_path=None,
video_height=None,
video_width=None,
video_path=None,
pixels_are_from_eroded_blob=False,
resolution_reduction=1.0,
):
# Attributed from the input arguments
self.centroid = centroid
self.contour = contour
self.area = area
self.bounding_box_in_frame_coordinates = (
bounding_box_in_frame_coordinates
)
self._bounding_box_image = bounding_box_image
self.bounding_box_images_path = bounding_box_images_path
self.estimated_body_length = estimated_body_length
self._pixels = pixels
self.number_of_animals = number_of_animals
self.frame_number = frame_number
self.frame_number_in_video_path = frame_number_in_video_path
self.in_frame_index = in_frame_index
self._pixels_path = pixels_path
self.video_height = video_height
self.video_width = video_width
self.video_path = video_path
self.pixels_are_from_eroded_blob = pixels_are_from_eroded_blob
self._resolution_reduction = resolution_reduction
# Attributes populated at different points of the tracking
# During crossing detection
self.identification_image_index = None
self.next = []
self.previous = []
self._is_an_individual = False
self._is_a_crossing = False
# During fragmentation
self._fragment_identifier = None
self._blob_index = None
# During the cascade of training and identification protocols
self._used_for_training = None
self._accumulation_step = None
self._identity = None
# During postprocessing and interpolation of crossings
self.interpolated_centroids = None
self._was_a_crossing = False
self._identities_corrected_closing_gaps = None
self._identity_corrected_solving_jumps = None
self.has_eroded_pixels = False
self._eroded_pixels = None
# During validation
self._user_generated_identities = None
self._user_generated_centroids = None
self._pixels_set = None
@property
def bounding_box_image(self):
"""Image cropped from the original video that contains the blob.
This image is used later to extract the `image_for_identification` that
will be used to train and evaluate the crossing detector CNN and the
identification CNN. This image can either be stored in the object
(in RAM), or be stored in in a file (in DISK), or not be stored at all,
in which case it is recomupted from the original video.
Returns
-------
numpy array (uint8)
Image cropped from the video containing the pixels that represent
the blob.
"""
if self._bounding_box_image is not None:
return self._bounding_box_image
elif self.bounding_box_images_path is not None and os.path.isfile(
self.bounding_box_images_path
):
with h5py.File(self.bounding_box_images_path, "r") as f:
return f[
str(self.frame_number) + "-" + str(self.in_frame_index)
][:]
else:
cap = cv2.VideoCapture(self.video_path)
cap.set(1, self.frame_number_in_video_path)
ret, frame = cap.read()
bb = self.bounding_box_in_frame_coordinates
return frame[bb[0][1] : bb[1][1], bb[0][0] : bb[1][0], 0]
@property
def pixels(self):
"""List of pixels that define the blob as determined from the
segmentation parameters defined by the user.
Note that pixels are represented as linearized coordinates of the
frame (considering the resolution reduction factor), i.e. each pixel
is a single integer.
The pixels can either be stored in the object (in RAM), or be stored
in a file (in DISK), or not be stored at all, in which case they are
recomputed from the contour and the original frame of the video.
Returns
-------
List
List of integers indicating the linarized indices of the pixels
that represent the blob.
"""
if self._pixels is not None:
return self._pixels
elif self._pixels_path is not None and os.path.isfile(
self._pixels_path
):
with h5py.File(self._pixels_path, "r") as f:
if not self.pixels_are_from_eroded_blob:
dataset_name = (
str(self.frame_number) + "-" + str(self.in_frame_index)
)
else:
dataset_name = (
str(self.frame_number)
+ "-"
+ str(self.in_frame_index)
+ "-eroded"
)
return f[dataset_name][:]
else:
cimg = np.zeros((self.video_height, self.video_width))
cv2.drawContours(cimg, [self.contour], -1, color=255, thickness=-1)
pts = np.where(cimg == 255)
pixels = np.asarray(list(zip(pts[0], pts[1])))
pixels = np.ravel_multi_index(
[pixels[:, 0], pixels[:, 1]],
(self.video_height, self.video_width),
)
return pixels
@property
def pixels_set(self):
"""A set() version of blob.pixels, used in `blob.overlaps_with()`
Returns
-------
Set
Set of integers indicating the linarized indices of the pixels
that represent the blob.
"""
if self._pixels_set is None:
self._pixels_set = set(self.pixels)
return self._pixels_set
@pixels_set.deleter
def pixels_set(self):
self._pixels_set = None
@property
def eroded_pixels(self):
"""Pixels of the blob after erosion.
The erosion is performed during the crossings interpolation process.
The pixels can either be stored in the object (in RAM), or be stored
in a file (in DISK), or not be stored at all, in which case they are
computed
Returns
-------
List
List of integers indicating the linarized indices of the pixels
that represent the blob after the erosion process.
"""
if self._eroded_pixels is not None:
return self._pixels
elif self._pixels_path is not None and os.path.isfile(
self._pixels_path
):
with h5py.File(self._pixels_path, "r") as f:
return f[
str(self.frame_number)
+ "-"
+ str(self.in_frame_index)
+ "-eroded"
][:]
else:
# TODO: Check that if the blob is eroded the contour is updated.
# Otherwise this pixels will be the original ones and no the
# eroded ones.
cimg = np.zeros((self.video_height, self.video_width))
cv2.drawContours(cimg, [self.contour], -1, color=255, thickness=-1)
pts = np.where(cimg == 255)
pixels = np.asarray(list(zip(pts[0], pts[1])))
pixels = np.ravel_multi_index(
[pixels[:, 0], pixels[:, 1]],
(self.video_height, self.video_width),
)
return pixels
@eroded_pixels.setter
def eroded_pixels(self, eroded_pixels):
if self._pixels_path is not None: # is saving in disk
with h5py.File(self._pixels_path, "a") as f:
dataset_name = (
str(self.frame_number)
+ "-"
+ str(self.in_frame_index)
+ "-eroded"
)
if dataset_name in f:
del f[dataset_name]
f.create_dataset(dataset_name, data=eroded_pixels)
else:
self._eroded_pixels = eroded_pixels
self.has_eroded_pixels = True
@property
def fragment_identifier(self):
"""
Integer indicating the index of the fragment that contains the blob
See Also
--------
:class:`~fragment.Fragment`
"""
return self._fragment_identifier
@property
def is_an_individual(self):
"""Flag indicating whether the blob represent a single animal.
The private attribute `_is_an_individual` is assigned during the
crossings detection process.
"""
return self._is_an_individual
@is_an_individual.setter
def is_an_individual(self, value: bool):
assert isinstance(value, bool)
self._is_an_individual = value
self._is_a_crossing = not value
# TODO: consider removing this property, as it is redundant.
@property
def is_a_crossing(self):
"""Flag indicating whether the blob represents two or more animals
together.
This attribute is the negative of `is_an_individual` and is set at
the same time as is an individual
"""
return self._is_a_crossing
@property
def was_a_crossing(self):
"""Flag indicating whether the blob was created after splitting a
crossing blob during the crossings interpolation process
"""
return self._was_a_crossing
# TODO: consider adding a feature in the validation GUI to add a contour.
@property
def is_a_generated_blob(self):
"""Flag indicating whether the blob was created by the user.
Blobs created by the users during the validation process do not have
a contour as only the centroid is created
"""
return self.contour is None
[docs] def check_for_multiple_next_or_previous(self, direction=None):
"""Flag indicating if the blob has multiple blobs in its past or future
overlapping history
This method is used to check whether the blob is a crossing.
Parameters
----------
direction : str
"previous" or "next". If "previous" the past overlapping history
will be checked in order to find out if the blob will split in the
past.
Symmetrically, if "next" the future overlapping history of the blob
will be checked.
Returns
-------
bool
If True the blob splits into two or multiple overlapping blobs in
its "past" or "future" history, depending on the parameter
"direction".
"""
current = getattr(self, direction)[0]
while len(getattr(current, direction)) == 1:
current = getattr(current, direction)[0]
if len(getattr(current, direction)) > 1:
return True
return False
[docs] def check_for_crossing_in_next_or_previous(self, direction=None):
"""Flag indicating if the blob has a crossing in its past or future
overlapping history
This method is used to check whether the blob is an individual.
Parameters
----------
direction : str
"previous" or "next". If "previous" the past overlapping history
will be checked in order to find out if the blob ends up in a
crossing.
Symmetrically, if "next" the future overlapping history of the blob
will be checked.
Returns
-------
bool
If True the blob has a crossing in its "past" or "future" history,
depending on the parameter `direction`.
"""
opposite_direction = "next" if direction == "previous" else "previous"
current = getattr(self, direction)[0]
while len(getattr(current, direction)) == 1:
current = getattr(current, direction)[0]
if (
len(getattr(current, opposite_direction)) > 1
and current.is_a_crossing
):
return True
return False
[docs] def is_a_sure_individual(self):
"""Flag indicating that the blob is a sure individual according to
some heuristics and it can be used to train the crossing detector CNN.
Returns
-------
bool
"""
if (
self.is_an_individual # assigned in _apply_area_and_unicity_heuristics
and len(self.previous) == 1
and len(self.next) == 1
and len(self.next[0].previous) == 1
and len(self.previous[0].next) == 1
):
has_crossing_in_past = self.check_for_crossing_in_next_or_previous(
"previous"
)
has_crossing_in_future = (
self.check_for_crossing_in_next_or_previous("next")
)
if has_crossing_in_past and has_crossing_in_future:
return True
else:
return False
[docs] def is_a_sure_crossing(self):
"""Flag indicating that the blob is a sure crossing according to
some heuristics and it can be used to train the crossing detector CNN.
Returns
-------
bool
"""
if self.is_a_crossing and (
len(self.previous) > 1 or len(self.next) > 1
):
return True
elif (
self.is_a_crossing
and len(self.previous) == 1
and len(self.next) == 1
):
has_multiple_previous = self.check_for_multiple_next_or_previous(
"previous"
)
has_multiple_next = self.check_for_multiple_next_or_previous(
"next"
)
if has_multiple_previous and has_multiple_next:
return True
else:
return False
[docs] def overlaps_with(self, other):
"""Computes whether the pixels in `self` intersect with the pixels in
`other`
Parameters
----------
other : <Blob object>
An instance of the class Blob
Returns
-------
bool
True if the lists of pixels of both blobs have non-empty
intersection
"""
# Check bounding box overlapping between blobs S (self) and O (other)
(S_xmin, S_ymin) = self.bounding_box_in_frame_coordinates[0]
(S_xmax, S_ymax) = self.bounding_box_in_frame_coordinates[1]
(O_xmin, O_ymin) = other.bounding_box_in_frame_coordinates[0]
(O_xmax, O_ymax) = other.bounding_box_in_frame_coordinates[1]
x_overlap = S_xmax >= O_xmin and O_xmax >= S_xmin
y_overlap = S_ymax >= O_ymin and O_ymax >= S_ymin
bbox_overlaps = x_overlap and y_overlap
# If bounding boxes overlap, then compute pixel-to-pixel overlapping check
if bbox_overlaps:
# check if there the two pixels sets are disjoint
overlaps = not self.pixels_set.isdisjoint(other.pixels_set)
else:
overlaps = False
return overlaps
[docs] def now_points_to(self, other):
"""Given two consecutive blob objects updates their respective
overlapping histories
Parameters
----------
other : <Blob object>
An instance of the class Blob
"""
self.next.append(other)
other.previous.append(self)
[docs] def squared_distance_to(self, other):
"""Returns the squared distance from the centroid of self to the
centroid of `other`
Parameters
----------
other : <Blob object> or tuple
An instance of the class Blob or a tuple (x,y)
Returns
-------
float
Squared distance between centroids
"""
if isinstance(other, Blob):
return np.sum(
(np.asarray(self.centroid) - np.asarray(other.centroid)) ** 2
)
elif isinstance(other, (tuple, list, np.ndarray)):
return np.sum((np.asarray(self.centroid) - np.asarray(other)) ** 2)
[docs] def distance_from_countour_to(self, point):
r"""Returns the distance between `point` and the closest
point in the contour of the blob.
Parameters
----------
point : tuple
(x,y)
Returns
-------
float
Smallest distance between `point` and the contour of the blob.
"""
return np.abs(cv2.pointPolygonTest(self.contour, point, True))
@property
def resolution_reduction(self):
"""Resolution reduction factor defined by the user"""
if hasattr(self, "_resolution_reduction"):
return self._resolution_reduction
else:
return 1.0
@property
def used_for_training(self):
"""Flag indicating if the blob has been used to train the
identification CNN
"""
return self._used_for_training
@property
def accumulation_step(self):
"""Integer indicating the accumulation step in which the blob
was assign by the cascade of training and identification protocols
"""
return self._accumulation_step
@property
def is_in_a_fragment(self):
"""Boolean indicating if the blob is in a fragment, i.e. is not an
isolated blob or is in the extreme of a fragment
"""
# TODO: check if it also indicates that it in an extreme of a fragment
return len(self.previous) == len(self.next) == 1
@property
def is_an_individual_in_a_fragment(self):
"""Boolean indicating if the blob is in an individual fragment"""
return self.is_an_individual and self.is_in_a_fragment
@property
def blob_index(self):
"""Blob index at the segmentation step (comes from the find contours
function of OpenCV)
"""
return self._blob_index
@blob_index.setter
def blob_index(self, new_blob_index):
if self.is_an_individual_in_a_fragment:
self._blob_index = new_blob_index
[docs] def in_a_global_fragment_core(self, blobs_in_frame):
"""Boolean indicating if a blob is in the core of a globalfragment.
A blob in a frame is in the core of a global fragment if in
that frame there are as many blobs as number of animals to track
Parameters
----------
blobs_in_frame : list
List of Blob objects representing the animals segmented in the
frame self.frame_number
Returns
-------
bool
True if the blob is in the core of a global fragment
"""
return len(blobs_in_frame) == self.number_of_animals
@property
def identity(self):
"""Identity of the blob assigned during the identification process"""
return self._identity
@property
def user_generated_identities(self):
"""List of identities of the blob some of which might have been give
by a user during the validation process
"""
return self._user_generated_identities
@property
def identity_corrected_solving_jumps(self):
"""Identity of the blob after correcting impossible velocity jumps"""
return self._identity_corrected_solving_jumps
@property
def identities_corrected_closing_gaps(self):
"""Identity of the blob after crossings interpolation"""
return self._identities_corrected_closing_gaps
@property
def assigned_identities(self):
"""Identities assigned to the blob during the tracking process"""
if self.identities_corrected_closing_gaps is not None:
return self.identities_corrected_closing_gaps
elif self.identity_corrected_solving_jumps is not None:
return [self.identity_corrected_solving_jumps]
return [self.identity]
@property
def final_identities(self):
"""Identities of the blob after the tracking process and after
potential modifications by the users during the validation procedure.
"""
if self.user_generated_identities is not None:
# Note that sometimes len(user_generated_identities)
# > len(assigned_identities)
final_identities = []
for i, user_generated_identity in enumerate(
self.user_generated_identities
):
if user_generated_identity is None and i < len(
self.assigned_identities
):
final_identities.append(self.assigned_identities[i])
elif (
user_generated_identity is not None
and user_generated_identity >= 0
):
final_identities.append(user_generated_identity)
return final_identities
return self.assigned_identities
[docs] def save_image_for_identification(
self, identification_image_size, height, width, file_path
):
"""Saves in disk the image that will be used to train and evaluate the
crossing detector CNN and the identification CNN.
This also updates the `identification_image_index` and the `episode`
attributes. This helps to load the image from the correct `file_path`.
Parameters
----------
identification_image_size : tuple
Tuple of integers (height, width, channels).
height : int
Video height considering the resolution reduction factor.
width : int
Video width considering the resolution reduction factor.
file_path : str
Path to the hdf5 file where the images will be stored.
"""
image_for_identification = self.get_image_for_identification(
identification_image_size,
height,
width,
)
# For RAM optimization
with h5py.File(file_path, "a") as f:
dset = f["identification_images"]
i = dset.shape[0]
dset.resize(
(
i + 1,
image_for_identification.shape[1],
image_for_identification.shape[1],
)
)
dset[i, ...] = image_for_identification
self.identification_image_index = i
self.episode = int(
os.path.basename(file_path).split(".")[0].split("_")[-1]
)
[docs] def get_image_for_identification(
self,
identification_image_size,
height,
width,
):
"""Gets the image used to train and evaluate the crossing detector CNN
and the identification CNN.
Parameters
----------
identification_image_size : tuple
Dimensions of the identification image (height, widht, channels).
Channels is always 1 as images in color are still not considered.
height : int
Video height considering resolution reduction factor.
width : int
Video width considering resolution reduction factor.
Returns
-------
ndarray
Square image with black background used to train the crossings
detector CNN and the identifiactio CNN.
"""
return self._get_image_for_identification(
height,
width,
self.bounding_box_image,
self.pixels,
self.bounding_box_in_frame_coordinates,
identification_image_size[0],
)
# TODO: Consider changing the name of image_for_identification
# it is also the image for the crossing detector.
@staticmethod
def _get_image_for_identification(
height,
width,
bounding_box_image,
pixels,
bounding_box_in_frame_coordinates,
image_size,
):
"""It generates the image that will be used to train and evaluate the
crossings detector CNN and the identification CNN.
Parameters
----------
height : int
Frame height
width : int
Frame width
bounding_box_image : ndarray
Images cropped from the frame by considering the bounding box
associated to a blob
pixels : list
List of pixels associated to a blob
bounding_box_in_frame_coordinates : list
[(x, y), (x + bounding_box_width, y + bounding_box_height)]
image_size : int
Size of the width and height of the square identification image
Returns
-------
ndarray
Square image with black background used to train the crossings
detector CNN and the identifiactio CNN.
"""
bounding_box_image = _mask_background_pixels(
height,
width,
bounding_box_image,
pixels,
bounding_box_in_frame_coordinates,
)
pca = PCA()
pxs = np.unravel_index(pixels, (height, width))
pxs1 = np.asarray(list(zip(pxs[1], pxs[0])))
pca.fit(pxs1)
rot_ang = (
np.arctan(pca.components_[0][1] / pca.components_[0][0])
* 180
/ np.pi
+ 45
)
# we substract 45 so that the fish is aligned in the diagonal.
# This way we have smaller frames
center = (pca.mean_[0], pca.mean_[1])
center = _transform_to_bbox_coordinates(
center, bounding_box_in_frame_coordinates
)
center = np.array([int(center[0]), int(center[1])])
# rotate
diag = np.sqrt(
np.sum(np.asarray(bounding_box_image.shape) ** 2)
).astype(int)
diag = (diag, diag)
M = cv2.getRotationMatrix2D(tuple(center), rot_ang, 1)
minif_rot = cv2.warpAffine(
bounding_box_image,
M,
diag,
borderMode=cv2.BORDER_CONSTANT,
flags=cv2.INTER_CUBIC,
)
crop_distance = int(image_size / 2)
x_range = range(center[0] - crop_distance, center[0] + crop_distance)
y_range = range(center[1] - crop_distance, center[1] + crop_distance)
image_for_identification = minif_rot.take(
y_range, mode="wrap", axis=0
).take(x_range, mode="wrap", axis=1)
return image_for_identification
@property
def contour_full_resolution(self):
"""Blob contour coordinates in pixels without considering the
resolution reduction factor, i.e. in the full resolution of the video.
Returns
-------
numpy array.
Coordinates in pixels of the points in the blob contour
"""
if self.contour is not None:
return (self.contour / self.resolution_reduction).astype(np.int32)
else:
return None
@property
def bounding_box_full_resolution(self):
"""Bounding box cordinates without considering the resolution reduction
factor, i.e. in the full resolution of the video.
Returns
-------
numpy array, or None
Bounding box coordinates of the blob in full resolution of the
video, [(x, y), (x + bounding_box_width, y + bounding_box_height)].
"""
if self.bounding_box_in_frame_coordinates is not None:
bounding_box_full_resolution = (
np.asarray(self.bounding_box_in_frame_coordinates)
/ self.resolution_reduction
).astype(int)
return tuple(map(tuple, bounding_box_full_resolution))
else:
# TODO: Check when this happens and document.
return None
# Centroids
@property
def user_generated_centroids(self):
"""List of centroids generated by the user during the validation
processes.
Returns
-------
list or None
If the user has not generated any centroid for this blob, it
returns None.
"""
if hasattr(self, "_user_generated_centroids"):
return self._user_generated_centroids
return None
@property
def assigned_centroids(self):
"""Centroids assigned to the blob during the tracking process.
It considers the default centroid of the blob at segmentation time
or new centroids added to the blob during the interpolation of the
crossings.
Returns
-------
list
List of pairs (x, y) indicating the position of each individual
in the blob.
"""
if (
hasattr(self, "interpolated_centroids")
and self.interpolated_centroids is not None
):
assert isinstance(self.interpolated_centroids, list)
return self.interpolated_centroids
return [self.centroid]
@property
def final_centroids(self):
"""List of the animal/s centroid/s in the blob, considering the
potential centroids that might have been aded by the user during
the validation.
By default the centroid will be the center of mass of the blob of
pixels defined by the blob. It can be different if the user modified
the default centroid during validation or generated more centroids.
Returns
-------
list
List of tuples (x, y) indicating the centroids of the blob.
"""
if self.user_generated_centroids is not None:
# Note that sometimes len(user_generated_centroids) >
# len(assigned_centroids)
final_centroids = []
for i, user_generated_centroid in enumerate(
self.user_generated_centroids
):
# TODO: missing else
if user_generated_centroid[0] is None and i < len(
self.assigned_centroids
):
final_centroids.append(self.assigned_centroids[i])
elif (
user_generated_centroid[0] is not None
and user_generated_centroid[0] > 0
):
final_centroids.append(user_generated_centroid)
return final_centroids
return self.assigned_centroids
@property
def final_centroids_full_resolution(self):
"""Returns the centroids considering the full resolution of the frame.
Note that after validation a blob can have more than one centroid.
For example, this can happen in a crossing blob with missing
centroids.
Returns
-------
List of tuples (x, y), where x, and y are coordinates in the full
resolution of the video frame.
"""
return [
(
centroid[0] / self.resolution_reduction,
centroid[1] / self.resolution_reduction,
)
for centroid in self.final_centroids
]
# Methods used to modify the blob attributes during the validation of the
# trajectories obtained after tracking.
# TODO: Consider removing this from this class. Maybe move to valdiation.
[docs] def removable_identity(self, identity_to_remove, blobs_in_frame):
"""[Validation] Checks if the identity can be removed.
Parameters
----------
identity_to_remove : int
Identity to be removed
blobs_in_frame : list
List of Blob objects in the frame where the identity is going to
be removed
Returns
-------
bool
True if the identity can be removed.
"""
for blob in blobs_in_frame:
if blob != self:
if (
identity_to_remove in blob.final_identities
): # Is duplicated in another blob
return True
else:
if (
blob.final_identities.count(identity_to_remove) > 1
): # Is duplicated in the same blob
return True
return False
[docs] def update_centroid(self, video, old_centroid, new_centroid, identity):
"""[Validation] Updates the centroid of the blob.
Parameters
----------
video : idtrackerai.video.Video
Instance of Video object
old_centroid : tuple
Centroid to be updated
new_centroid : tuple
Coordinates of the new centroid
identity : int
Identity of the centroid to be updated
"""
logger.info("Calling update_centroid")
video.is_centroid_updated = True
old_centroid = (
old_centroid[0] * self.resolution_reduction,
old_centroid[1] * self.resolution_reduction,
)
new_centroid = (
new_centroid[0] * self.resolution_reduction,
new_centroid[1] * self.resolution_reduction,
)
if not (isinstance(old_centroid, tuple) and len(old_centroid) == 2):
raise Exception("The old_centroid must be a tuple of length 2")
if not (isinstance(new_centroid, tuple) and len(new_centroid) == 2):
raise Exception("The new centroid must be a tuple of length 2")
if self.user_generated_centroids is None:
self._user_generated_centroids = [(None, None)] * len(
self.final_centroids
)
if self.user_generated_identities is None:
self._user_generated_identities = [None] * len(
self.final_centroids
)
try:
if old_centroid in self.user_generated_centroids:
centroid_index = self.user_generated_centroids.index(
old_centroid
)
identity = self.user_generated_identities[centroid_index]
elif old_centroid in self.assigned_centroids:
if self.assigned_centroids.count(old_centroid) > 1:
centroid_index = self.assigned_identities.index(identity)
else:
centroid_index = self.assigned_centroids.index(
old_centroid
)
identity = self.assigned_identities[centroid_index]
else:
raise Exception(
"There is no centroid with the values of old_centroid"
)
except ValueError:
raise Exception(
"There is no centroid with the values of old_centroid"
)
self.user_generated_centroids[centroid_index] = new_centroid
self.user_generated_identities[centroid_index] = identity
video.is_centroid_updated = True
[docs] def delete_centroid(
self,
video,
identity,
centroid,
blobs_in_frame,
apply_resolution_reduction=True,
):
"""[Validation] Deletes a centroid of the blob.
Parameters
----------
video : idtrackerai.video.Video
Instance of Video object
identity : int
Identity of the centroid to be deleted
centroid : tuple
Centroid to be deleted from the blob
blobs_in_frame : List
List of the blobs in the frame
apply_resolution_reduction : bool, optional
Whether to consider the resolution reduction factor when
adding the centroid, by default True. Note that the video is showed
as full resolution in the validation GUI, but all centroids of
the blobs consider the resolution reduction factor.
Raises
------
Exception
If the centroid is not a tuple of length 2
Exception
If the identity is unique in the frame
Exception
If it is the last centroid of the blob
"""
logger.info("Calling delete_centroid")
if not (isinstance(centroid, tuple) and len(centroid) == 2):
raise Exception("The centroid must be a tuple of length 2")
if not self.removable_identity(identity, blobs_in_frame):
raise Exception(
"The centroid cannot be remove beucase it belongs to a"
"unique identity."
"Only centroids of duplicated identities can be deleted"
)
if len(self.final_centroids) == 1:
raise Exception(
"The centroid cannot be removed because if the last "
"centroid of the blob"
)
if apply_resolution_reduction:
centroid = (
centroid[0] * self.resolution_reduction,
centroid[1] * self.resolution_reduction,
)
if (
self.user_generated_centroids is None
): # removing a centroid from a crossing
self._user_generated_centroids = [(None, None)] * len(
self.final_centroids
)
if self.user_generated_identities is None:
self._user_generated_identities = [None] * len(
self.final_identities
)
try:
if centroid in self.user_generated_centroids:
centroid_index = self.user_generated_centroids.index(centroid)
elif centroid in self.assigned_centroids:
if self.assigned_centroids.count(centroid) > 1:
centroid_index = self.assigned_identities.index(identity)
else:
centroid_index = self.assigned_centroids.index(centroid)
else:
raise Exception(
"There is no centroid with the values of centroid"
)
except ValueError:
raise Exception("There is no centroid with the values of centroid")
self._user_generated_centroids[centroid_index] = (-1, -1)
self._user_generated_identities[centroid_index] = -1
video.is_centroid_updated = True
[docs] def add_centroid(
self, video, centroid, identity, apply_resolution_reduction=True
):
"""[Validation] Adds a centroid with a given identity to the blob.
This method is used in the validation GUI. It is useful to add
centroids for crossing blobs that are missing some centroids, or to
individual blobs that should have been classified as crossings and
are also missing some centroids.
Parameters
----------
video : idtrackerai.video.Video
Instance of Video object
centroid : tuple
Centroid to be added to the blob
identity : int
Identity of the centroid to be added
apply_resolution_reduction : bool, optional
Whether to consider the resolution reduction factor when
adding the centroid, by default True. Note that the video is showed
as full resolution in the validation GUI, but all centroids of
the blobs consider the resolution reduction factor.
Raises
------
Exception
If the centroid is not a tuple of length 2
Exception
If the identity is not an integer between 1 and number of animals
Exception
If there is already another centroid with the same identity
"""
logger.info("Calling add_centroid")
if not (isinstance(centroid, tuple) and len(centroid) == 2):
raise Exception("The centroid must be a tuple of length 2")
if not (
isinstance(identity, int)
and identity > 0
and identity <= self.number_of_animals
):
raise Exception(
"The identity must be an integer between 1 and the number of "
"animals in the video"
)
if identity in self.final_identities:
raise Exception(
"The identity of the centroid to be created already exist in "
"this blob"
)
if apply_resolution_reduction:
centroid = (
centroid[0] * self.resolution_reduction,
centroid[1] * self.resolution_reduction,
)
if self.user_generated_centroids is None:
self._user_generated_centroids = [(None, None)] * len(
self.final_centroids
)
if self.user_generated_identities is None:
self._user_generated_identities = [None] * len(
self.final_identities
)
self._user_generated_centroids.append(centroid)
self._user_generated_identities.append(identity)
video.is_centroid_updated = True
[docs] def update_identity(self, old_identity, new_identity, centroid):
"""[Validation] Updates the identity of the blob.
This method is used during the validation GUI.
It populates the private attributes `_user_generated_identities`
and `_user_generated_centroids`.
Parameters
----------
new_identity : int
new value for the identity of the blob
old_identity : int
old value of the identity of the blob. It must be specified when the
blob has multiple identities already assigned.
centroid : tuple
centroid which identity must be updated.
"""
if not (
isinstance(new_identity, int)
and new_identity >= 0
and new_identity <= self.number_of_animals
):
raise Exception(
"The new identity must be an integer between 0 and the number "
"of animals in the video. Blobs with 0 identity will be ommited "
"for the generation of the trajectories"
)
# We prepare to also modify the centroid
if self.user_generated_centroids is None:
self._user_generated_centroids = [(None, None)] * len(
self.final_centroids
)
if self.user_generated_identities is None:
self._user_generated_identities = [None] * len(
self.final_identities
)
# TODO: review this piece of code, not sure I need the try/except
try:
if centroid in self.user_generated_centroids:
centroid_index = self.user_generated_centroids.index(centroid)
elif centroid in self.assigned_centroids:
if self.assigned_centroids.count(centroid) > 1:
centroid_index = self.assigned_identities.index(
old_identity
)
else:
centroid_index = self.assigned_centroids.index(centroid)
else:
raise Exception(
"There is no centroid with the values of centroid"
)
except ValueError:
raise Exception("There is no centroid with the values of centroid")
self._user_generated_identities[centroid_index] = new_identity
self._user_generated_centroids[centroid_index] = (
centroid[0],
centroid[1],
)
[docs] def propagate_identity(self, old_identity, new_identity, centroid):
"""[Validation] Propagates the new identity to next and previous blobs.
This method called in the validation GUI when the used updates the
identity of a given blob.
Parameters
----------
old_identity : int
Previous identity of the blob
new_identity : int
New ientity of the blob
centroid : tuple
[description]
"""
count_past_corrections = 1 # to take into account the modification
# already done in the current frame
count_future_corrections = 0
current = self
current_centroid = np.asarray(centroid)
while (
len(current.next) == 1
and current.next[0].fragment_identifier == self.fragment_identifier
):
# There is only one next blob and we are in the same fragment
if len(current.next[0].final_centroids) > 1:
# The next blob has multiple centroids, i.e. a crossing.
# Find the index of the centroid that correspond to the
# identity that we want to modify
index_same_identities = np.where(
np.asarray(current.next[0].final_identities)
== old_identity
)[0]
if index_same_identities.size == 1:
# there is only one centroid with the old identity
next_centroid = current.next[0].final_centroids[
index_same_identities[0]
]
else:
# there are several centroids in the blob with the same
# identity
next_centroids = np.asarray(
current.next[0].final_centroids
)
index_centroid = np.argmin(
np.sqrt(
np.sum((current_centroid - next_centroids) ** 2)
)
)
next_centroid = current.next[0].final_centroids[
index_centroid
]
else:
# The next blob has a single centroid, i.e. an individual.
next_centroid = current.next[0].final_centroids[0]
current.next[0].update_identity(
old_identity, new_identity, next_centroid
)
current = current.next[0]
current_centroid = np.asarray(next_centroid)
count_future_corrections += 1
current = self
while (
len(current.previous) == 1
and current.previous[0].fragment_identifier
== self.fragment_identifier
):
# There is only one previous blob and we are in the same fragment
if len(current.previous[0].final_centroids) > 1:
# There are multiple centroids, i.e. a crossing.
index_same_identities = np.where(
np.asarray(current.previous[0].final_identities)
== old_identity
)[0]
if index_same_identities.size == 1:
# there is only one centroid with the old identity
previous_centroid = current.previous[0].final_centroids[
index_same_identities[0]
]
else:
# there are several centroids in the blob with the same
# identity
previous_centroids = np.asarray(
current.previous[0].final_centroids
)
index_centroid = np.argmin(
np.sqrt(
np.sum(
(current_centroid - previous_centroids) ** 2
)
)
)
previous_centroid = current.previous[0].final_centroids[
index_centroid
]
else:
# There is a single centroid, i.e. and individual blob
previous_centroid = current.previous[0].final_centroids[0]
current.previous[0].update_identity(
old_identity, new_identity, previous_centroid
)
current = current.previous[0]
current_centroid = np.asarray(previous_centroid)
count_past_corrections += 1
@property
def summary(self):
"""[Validation] Returns a summary string for some blob attributes.
Returns
-------
str
Summary description of the blob
"""
blob_name = f"{self}\n"
used_for_training = f"used for training: {self.used_for_training}\n"
fragment_id = f"fragment id: {self.fragment_identifier}\n"
previous_blob = f"previous blob(s): {self.previous}\n"
next_blob = f"next blob(s): {self.next}\n"
sure_individual_crossing = (
f"sure individual-crossing: "
f"{self.is_a_sure_individual()}-"
f"{self.is_a_sure_crossing()}\n"
)
individual_crossing = (
f"individual-crossing: "
f"{self.is_an_individual}-"
f"{self.is_a_crossing}\n"
)
was_a_crossing = f"was_a_crossing: {self.was_a_crossing}\n"
id = f"identity: {self.identity}\n"
id_correcting_jumps = (
f"identity correcting jumps "
f"{self.identity_corrected_solving_jumps}\n"
)
correcting_gaps_id = (
f"id correcting gaps: {self.identities_corrected_closing_gaps}\n"
)
assigned_identities = (
f"assigned identities: {self.assigned_identities}\n"
)
assigned_centroids = f"assigned centroids: {self.assigned_centroids}\n"
user_identities = (
f"user identities: {self.user_generated_identities}\n"
)
user_centroids = f"user centroids: {self.user_generated_centroids}\n"
final_identities = f"final identities: {self.final_identities}\n"
final_centroids = f"final centroids: {self.final_centroids}\n"
summary_str = (
blob_name
+ used_for_training
+ fragment_id
+ previous_blob
+ next_blob
+ sure_individual_crossing
+ individual_crossing
+ was_a_crossing
+ id
+ id_correcting_jumps
+ correcting_gaps_id
+ assigned_identities
+ assigned_centroids
+ user_identities
+ user_centroids
+ final_identities
+ final_centroids
)
return summary_str
[docs] def draw(
self, frame, colors_lst=None, selected_id=None, is_selected=False
):
"""[Validation] Draw the blob in a given frame of the video.
Parameters
----------
frame : numpy.array
Image where the blob should be draw.
colors_lst : [type], optional
List of colors used to draw the blobs, by default None
selected_id : [type], optional
Identity of the selected blob., by default None
is_selected : bool, optional
Flag indicated if the blob has been selected by the user,
by default False
"""
contour = self.contour_full_resolution
bounding_box = self.bounding_box_full_resolution
for i, (identity, centroid) in enumerate(
zip(self.final_identities, self.final_centroids_full_resolution)
):
pos = int(round(centroid[0], 0)), int(round(centroid[1], 0))
if colors_lst:
color = (
colors_lst[identity]
if identity is not None
else colors_lst[0]
)
else:
color = (0, 0, 255)
if contour is not None:
if not is_selected:
cv2.polylines(
frame, np.array([contour]), True, (0, 255, 0), 1
)
else:
cv2.polylines(
frame, np.array([contour]), True, (0, 255, 0), 2
)
# cv2.circle(frame, pos, 8, (255, 255, 255), -1, lineType=cv2.LINE_AA)
cv2.circle(frame, pos, 6, color, -1, lineType=cv2.LINE_AA)
if identity is not None:
if identity == selected_id:
cv2.circle(
frame, pos, 10, (0, 0, 255), 2, lineType=cv2.LINE_AA
)
idroot = ""
if (
self.user_generated_identities is not None
and identity in self.user_generated_identities
) and (
self.user_generated_centroids is not None
and centroid in self.user_generated_centroids
):
idroot = "u-"
elif (
self.identities_corrected_closing_gaps is not None
and not self.is_an_individual
):
idroot = "c-"
idstr = idroot + str(identity)
text_size = cv2.getTextSize(
idstr, cv2.FONT_HERSHEY_SIMPLEX, 1.0, thickness=2
)
text_width = text_size[0][0]
str_pos = pos[0] - text_width // 2, pos[1] - 12
# cv2.putText(frame, idstr, str_pos, cv2.FONT_HERSHEY_SIMPLEX,
# 1.0, (0, 0, 0), thickness=3,
# lineType=cv2.LINE_AA)
cv2.putText(
frame,
idstr,
str_pos,
cv2.FONT_HERSHEY_SIMPLEX,
1.0,
color,
thickness=3,
lineType=cv2.LINE_AA,
)
if idroot == "c-" and bounding_box is not None:
rect_color = (
self.rect_color
if hasattr(self, "rect_color")
else (255, 0, 0)
)
cv2.rectangle(
frame, bounding_box[0], bounding_box[1], rect_color, 2
)
elif bounding_box is not None:
rect_color = (
self.rect_color
if hasattr(self, "rect_color")
else (255, 0, 0)
)
cv2.rectangle(
frame, bounding_box[0], bounding_box[1], rect_color, 2
)
idstr = "0"
text_size = cv2.getTextSize(
idstr, cv2.FONT_HERSHEY_SIMPLEX, 1.0, thickness=2
)
text_width = text_size[0][0]
str_pos = pos[0] - text_width // 2, pos[1] - 12
cv2.putText(
frame,
idstr,
str_pos,
cv2.FONT_HERSHEY_SIMPLEX,
1.0,
(0, 0, 0),
thickness=3,
lineType=cv2.LINE_AA,
)
def _mask_background_pixels(
height,
width,
bounding_box_image,
pixels,
bounding_box_in_frame_coordinates,
):
"""Masks to black the pixels in bounding_box_images that belong to the
background.
Note that in this step the blob of pixels defined by `pixels` according
to the segmentation parameters is dilated to get more information from the
border of the animal.
Parameters
----------
height : int
Frame height
width : int
Frame width
bounding_box_image : ndarray
Images cropped from the frame by considering the bounding box
associated to a blob
pixels : list
List of pixels associated to a blob
bounding_box_in_frame_coordinates : list
[(x, y), (x + bounding_box_width, y + bounding_box_height)]
Returns
-------
ndarray
Image with black background pixels
"""
pxs = np.array(np.unravel_index(pixels, (height, width))).T
pxs = np.array(
[
pxs[:, 0] - bounding_box_in_frame_coordinates[0][1],
pxs[:, 1] - bounding_box_in_frame_coordinates[0][0],
]
)
temp_image = np.zeros_like(bounding_box_image).astype("uint8")
temp_image[pxs[0, :], pxs[1, :]] = 255
temp_image = cv2.dilate(
temp_image, np.ones((3, 3)).astype("uint8"), iterations=1
)
rows, columns = np.where(temp_image == 255)
dilated_pixels = np.array([rows, columns])
temp_image[
dilated_pixels[0, :], dilated_pixels[1, :]
] = bounding_box_image[dilated_pixels[0, :], dilated_pixels[1, :]]
return temp_image
def _transform_to_bbox_coordinates(point, bounding_box):
"""Transforms a point in full-frame coordinates to boinding_box
coordinates.
Parameters
----------
point : tuple
point (px, py) in full-frame coordinates
boundingBox : list
bounding box coordinates following the convention
[(x, y), (x + bounding_box_width, y + bounding_box_height)]
Returns
-------
tuple
point (px', py') in bouning_box_coordinates
"""
return tuple(
np.asarray(point)
- np.asarray([bounding_box[0][0], bounding_box[0][1]])
)