samgeo2 module¶
SamGeo2
¶
The main class for segmenting geospatial data with the Segment Anything Model 2 (SAM2). See https://github.com/facebookresearch/segment-anything-2 for details.
Source code in samgeo/samgeo2.py
class SamGeo2:
"""The main class for segmenting geospatial data with the Segment Anything Model 2 (SAM2). See
https://github.com/facebookresearch/segment-anything-2 for details.
"""
def __init__(
self,
model_id: str = "sam2-hiera-large",
device: Optional[str] = None,
empty_cache: bool = True,
automatic: bool = True,
video: bool = False,
mode: str = "eval",
hydra_overrides_extra: Optional[List[str]] = None,
apply_postprocessing: bool = False,
points_per_side: Optional[int] = 32,
points_per_batch: int = 64,
pred_iou_thresh: float = 0.8,
stability_score_thresh: float = 0.95,
stability_score_offset: float = 1.0,
mask_threshold: float = 0.0,
box_nms_thresh: float = 0.7,
crop_n_layers: int = 0,
crop_nms_thresh: float = 0.7,
crop_overlap_ratio: float = 512 / 1500,
crop_n_points_downscale_factor: int = 1,
point_grids: Optional[List[np.ndarray]] = None,
min_mask_region_area: int = 0,
output_mode: str = "binary_mask",
use_m2m: bool = False,
multimask_output: bool = False,
max_hole_area: float = 0.0,
max_sprinkle_area: float = 0.0,
**kwargs: Any,
) -> None:
"""
Initializes the SamGeo2 class.
Args:
model_id (str): The model ID to use. Can be one of the following: "sam2-hiera-tiny",
"sam2-hiera-small", "sam2-hiera-base-plus", "sam2-hiera-large".
Defaults to "sam2-hiera-large".
device (Optional[str]): The device to use (e.g., "cpu", "cuda", "mps"). Defaults to None.
empty_cache (bool): Whether to empty the cache. Defaults to True.
automatic (bool): Whether to use automatic mask generation. Defaults to True.
video (bool): Whether to use video prediction. Defaults to False.
mode (str): The mode to use. Defaults to "eval".
hydra_overrides_extra (Optional[List[str]]): Additional Hydra overrides. Defaults to None.
apply_postprocessing (bool): Whether to apply postprocessing. Defaults to False.
points_per_side (int or None): The number of points to be sampled
along one side of the image. The total number of points is
points_per_side**2. If None, 'point_grids' must provide explicit
point sampling.
points_per_batch (int): Sets the number of points run simultaneously
by the model. Higher numbers may be faster but use more GPU memory.
pred_iou_thresh (float): A filtering threshold in [0,1], using the
model's predicted mask quality.
stability_score_thresh (float): A filtering threshold in [0,1], using
the stability of the mask under changes to the cutoff used to binarize
the model's mask predictions.
stability_score_offset (float): The amount to shift the cutoff when
calculated the stability score.
mask_threshold (float): Threshold for binarizing the mask logits
box_nms_thresh (float): The box IoU cutoff used by non-maximal
suppression to filter duplicate masks.
crop_n_layers (int): If >0, mask prediction will be run again on
crops of the image. Sets the number of layers to run, where each
layer has 2**i_layer number of image crops.
crop_nms_thresh (float): The box IoU cutoff used by non-maximal
suppression to filter duplicate masks between different crops.
crop_overlap_ratio (float): Sets the degree to which crops overlap.
In the first crop layer, crops will overlap by this fraction of
the image length. Later layers with more crops scale down this overlap.
crop_n_points_downscale_factor (int): The number of points-per-side
sampled in layer n is scaled down by crop_n_points_downscale_factor**n.
point_grids (list(np.ndarray) or None): A list over explicit grids
of points used for sampling, normalized to [0,1]. The nth grid in the
list is used in the nth crop layer. Exclusive with points_per_side.
min_mask_region_area (int): If >0, postprocessing will be applied
to remove disconnected regions and holes in masks with area smaller
than min_mask_region_area. Requires opencv.
output_mode (str): The form masks are returned in. Can be 'binary_mask',
'uncompressed_rle', or 'coco_rle'. 'coco_rle' requires pycocotools.
For large resolutions, 'binary_mask' may consume large amounts of
memory.
use_m2m (bool): Whether to add a one step refinement using previous mask predictions.
multimask_output (bool): Whether to output multimask at each point of the grid.
Defaults to False.
max_hole_area (int): If max_hole_area > 0, we fill small holes in up to
the maximum area of max_hole_area in low_res_masks.
max_sprinkle_area (int): If max_sprinkle_area > 0, we remove small sprinkles up to
the maximum area of max_sprinkle_area in low_res_masks.
**kwargs (Any): Additional keyword arguments to pass to
SAM2AutomaticMaskGenerator.from_pretrained() or SAM2ImagePredictor.from_pretrained().
"""
if isinstance(model_id, str):
if not model_id.startswith("facebook/"):
model_id = f"facebook/{model_id}"
else:
raise ValueError("model_id must be a string")
allowed_models = [
"facebook/sam2-hiera-tiny",
"facebook/sam2-hiera-small",
"facebook/sam2-hiera-base-plus",
"facebook/sam2-hiera-large",
]
if model_id not in allowed_models:
raise ValueError(
f"model_id must be one of the following: {', '.join(allowed_models)}"
)
if device is None:
device = common.choose_device(empty_cache=empty_cache)
if hydra_overrides_extra is None:
hydra_overrides_extra = []
self.model_id = model_id
self.model_version = "sam2"
self.device = device
if video:
automatic = False
if automatic:
self.mask_generator = SAM2AutomaticMaskGenerator.from_pretrained(
model_id,
device=device,
mode=mode,
hydra_overrides_extra=hydra_overrides_extra,
apply_postprocessing=apply_postprocessing,
points_per_side=points_per_side,
points_per_batch=points_per_batch,
pred_iou_thresh=pred_iou_thresh,
stability_score_thresh=stability_score_thresh,
stability_score_offset=stability_score_offset,
mask_threshold=mask_threshold,
box_nms_thresh=box_nms_thresh,
crop_n_layers=crop_n_layers,
crop_nms_thresh=crop_nms_thresh,
crop_overlap_ratio=crop_overlap_ratio,
crop_n_points_downscale_factor=crop_n_points_downscale_factor,
point_grids=point_grids,
min_mask_region_area=min_mask_region_area,
output_mode=output_mode,
use_m2m=use_m2m,
multimask_output=multimask_output,
**kwargs,
)
elif video:
self.predictor = SAM2VideoPredictor.from_pretrained(
model_id,
device=device,
mode=mode,
hydra_overrides_extra=hydra_overrides_extra,
apply_postprocessing=apply_postprocessing,
**kwargs,
)
else:
self.predictor = SAM2ImagePredictor.from_pretrained(
model_id,
device=device,
mode=mode,
hydra_overrides_extra=hydra_overrides_extra,
apply_postprocessing=apply_postprocessing,
mask_threshold=mask_threshold,
max_hole_area=max_hole_area,
max_sprinkle_area=max_sprinkle_area,
**kwargs,
)
def generate(
self,
source: Union[str, np.ndarray],
output: Optional[str] = None,
foreground: bool = True,
erosion_kernel: Optional[Tuple[int, int]] = None,
mask_multiplier: int = 255,
unique: bool = True,
min_size: int = 0,
max_size: int = None,
**kwargs: Any,
) -> List[Dict[str, Any]]:
"""
Generate masks for the input image.
Args:
source (Union[str, np.ndarray]): The path to the input image or the
input image as a numpy array.
output (Optional[str]): The path to the output image. Defaults to None.
foreground (bool): Whether to generate the foreground mask. Defaults
to True.
erosion_kernel (Optional[Tuple[int, int]]): The erosion kernel for
filtering object masks and extract borders.
Such as (3, 3) or (5, 5). Set to None to disable it. Defaults to None.
mask_multiplier (int): The mask multiplier for the output mask,
which is usually a binary mask [0, 1].
You can use this parameter to scale the mask to a larger range,
for example [0, 255]. Defaults to 255.
The parameter is ignored if unique is True.
unique (bool): Whether to assign a unique value to each object.
Defaults to True.
The unique value increases from 1 to the number of objects. The
larger the number, the larger the object area.
min_size (int): The minimum size of the object. Defaults to 0.
max_size (int): The maximum size of the object. Defaults to None.
**kwargs (Any): Additional keyword arguments.
Returns:
List[Dict[str, Any]]: A list of dictionaries containing the generated masks.
"""
if isinstance(source, str):
if source.startswith("http"):
source = common.download_file(source)
if not os.path.exists(source):
raise ValueError(f"Input path {source} does not exist.")
image = cv2.imread(source)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
elif isinstance(source, np.ndarray):
image = source
source = None
else:
raise ValueError("Input source must be either a path or a numpy array.")
self.source = source # Store the input image path
self.image = image # Store the input image as a numpy array
mask_generator = self.mask_generator # The automatic mask generator
masks = mask_generator.generate(image) # Segment the input image
self.masks = masks # Store the masks as a list of dictionaries
self._min_size = min_size
self._max_size = max_size
if output is not None:
# Save the masks to the output path. The output is either a binary mask or a mask of objects with unique values.
self.save_masks(
output,
foreground,
unique,
erosion_kernel,
mask_multiplier,
min_size,
max_size,
**kwargs,
)
def save_masks(
self,
output: Optional[str] = None,
foreground: bool = True,
unique: bool = True,
erosion_kernel: Optional[Tuple[int, int]] = None,
mask_multiplier: int = 255,
min_size: int = 0,
max_size: int = None,
**kwargs: Any,
) -> None:
"""Save the masks to the output path. The output is either a binary mask
or a mask of objects with unique values.
Args:
output (str, optional): The path to the output image. Defaults to
None, saving the masks to SamGeo.objects.
foreground (bool, optional): Whether to generate the foreground mask.
Defaults to True.
unique (bool, optional): Whether to assign a unique value to each
object. Defaults to True.
erosion_kernel (tuple, optional): The erosion kernel for filtering
object masks and extract borders.
Such as (3, 3) or (5, 5). Set to None to disable it. Defaults to
None.
mask_multiplier (int, optional): The mask multiplier for the output
mask, which is usually a binary mask [0, 1]. You can use this
parameter to scale the mask to a larger range, for example
[0, 255]. Defaults to 255.
min_size (int, optional): The minimum size of the object. Defaults to 0.
max_size (int, optional): The maximum size of the object. Defaults to None.
**kwargs: Additional keyword arguments for common.array_to_image().
"""
if self.masks is None:
raise ValueError("No masks found. Please run generate() first.")
h, w, _ = self.image.shape
masks = self.masks
# Set output image data type based on the number of objects
if len(masks) < 255:
dtype = np.uint8
elif len(masks) < 65535:
dtype = np.uint16
else:
dtype = np.uint32
# Generate a mask of objects with unique values
if unique:
# Sort the masks by area in descending order
sorted_masks = sorted(masks, key=(lambda x: x["area"]), reverse=True)
# Create an output image with the same size as the input image
objects = np.zeros(
(
sorted_masks[0]["segmentation"].shape[0],
sorted_masks[0]["segmentation"].shape[1],
)
)
# Assign a unique value to each object
count = len(sorted_masks)
for index, ann in enumerate(sorted_masks):
m = ann["segmentation"]
if min_size > 0 and ann["area"] < min_size:
continue
if max_size is not None and ann["area"] > max_size:
continue
objects[m] = count - index
# Generate a binary mask
else:
if foreground: # Extract foreground objects only
resulting_mask = np.zeros((h, w), dtype=dtype)
else:
resulting_mask = np.ones((h, w), dtype=dtype)
resulting_borders = np.zeros((h, w), dtype=dtype)
for m in masks:
if min_size > 0 and m["area"] < min_size:
continue
if max_size is not None and m["area"] > max_size:
continue
mask = (m["segmentation"] > 0).astype(dtype)
resulting_mask += mask
# Apply erosion to the mask
if erosion_kernel is not None:
mask_erode = cv2.erode(mask, erosion_kernel, iterations=1)
mask_erode = (mask_erode > 0).astype(dtype)
edge_mask = mask - mask_erode
resulting_borders += edge_mask
resulting_mask = (resulting_mask > 0).astype(dtype)
resulting_borders = (resulting_borders > 0).astype(dtype)
objects = resulting_mask - resulting_borders
objects = objects * mask_multiplier
objects = objects.astype(dtype)
self.objects = objects
if output is not None: # Save the output image
common.array_to_image(self.objects, output, self.source, **kwargs)
def show_masks(
self,
figsize: Tuple[int, int] = (12, 10),
cmap: str = "binary_r",
axis: str = "off",
foreground: bool = True,
**kwargs: Any,
) -> None:
"""Show the binary mask or the mask of objects with unique values.
Args:
figsize (tuple, optional): The figure size. Defaults to (12, 10).
cmap (str, optional): The colormap. Defaults to "binary_r".
axis (str, optional): Whether to show the axis. Defaults to "off".
foreground (bool, optional): Whether to show the foreground mask only.
Defaults to True.
**kwargs: Other arguments for save_masks().
"""
import matplotlib.pyplot as plt
if self.objects is None:
self.save_masks(foreground=foreground, **kwargs)
plt.figure(figsize=figsize)
plt.imshow(self.objects, cmap=cmap)
plt.axis(axis)
plt.show()
def show_anns(
self,
figsize: Tuple[int, int] = (12, 10),
axis: str = "off",
alpha: float = 0.35,
output: Optional[str] = None,
blend: bool = True,
**kwargs: Any,
) -> None:
"""Show the annotations (objects with random color) on the input image.
Args:
figsize (tuple, optional): The figure size. Defaults to (12, 10).
axis (str, optional): Whether to show the axis. Defaults to "off".
alpha (float, optional): The alpha value for the annotations. Defaults to 0.35.
output (str, optional): The path to the output image. Defaults to None.
blend (bool, optional): Whether to show the input image. Defaults to True.
"""
import matplotlib.pyplot as plt
anns = self.masks
if self.image is None:
print("Please run generate() first.")
return
if anns is None or len(anns) == 0:
return
plt.figure(figsize=figsize)
plt.imshow(self.image)
sorted_anns = sorted(anns, key=(lambda x: x["area"]), reverse=True)
ax = plt.gca()
ax.set_autoscale_on(False)
img = np.ones(
(
sorted_anns[0]["segmentation"].shape[0],
sorted_anns[0]["segmentation"].shape[1],
4,
)
)
img[:, :, 3] = 0
for ann in sorted_anns:
if hasattr(self, "_min_size") and (ann["area"] < self._min_size):
continue
if (
hasattr(self, "_max_size")
and isinstance(self._max_size, int)
and ann["area"] > self._max_size
):
continue
m = ann["segmentation"]
color_mask = np.concatenate([np.random.random(3), [alpha]])
img[m] = color_mask
ax.imshow(img)
if "dpi" not in kwargs:
kwargs["dpi"] = 100
if "bbox_inches" not in kwargs:
kwargs["bbox_inches"] = "tight"
plt.axis(axis)
self.annotations = (img[:, :, 0:3] * 255).astype(np.uint8)
if output is not None:
if blend:
array = common.blend_images(
self.annotations, self.image, alpha=alpha, show=False
)
else:
array = self.annotations
common.array_to_image(array, output, self.source)
@torch.no_grad()
def set_image(
self,
image: Union[str, np.ndarray, Image],
) -> None:
"""Set the input image as a numpy array.
Args:
image (Union[str, np.ndarray, Image]): The input image as a path,
a numpy array, or an Image.
"""
if isinstance(image, str):
if image.startswith("http"):
image = common.download_file(image)
if not os.path.exists(image):
raise ValueError(f"Input path {image} does not exist.")
self.source = image
image = cv2.imread(image)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
self.image = image
elif isinstance(image, np.ndarray) or isinstance(image, Image):
pass
else:
raise ValueError("Input image must be either a path or a numpy array.")
self.predictor.set_image(image)
@torch.no_grad()
def set_image_batch(
self,
image_list: List[Union[np.ndarray, str, Image]],
) -> None:
"""Set a batch of images for prediction.
Args:
image_list (List[Union[np.ndarray, str, Image]]): A list of images,
which can be numpy arrays, file paths, or PIL images.
Raises:
ValueError: If an input image path does not exist or if the input
image type is not supported.
"""
images = []
for image in image_list:
if isinstance(image, str):
if image.startswith("http"):
image = common.download_file(image)
if not os.path.exists(image):
raise ValueError(f"Input path {image} does not exist.")
image = cv2.imread(image)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
elif isinstance(image, Image):
image = np.array(image)
elif isinstance(image, np.ndarray):
pass
else:
raise ValueError("Input image must be either a path or a numpy array.")
images.append(image)
self.predictor.set_image_batch(images)
def predict(
self,
point_coords: Optional[np.ndarray] = None,
point_labels: Optional[np.ndarray] = None,
boxes: Optional[np.ndarray] = None,
mask_input: Optional[np.ndarray] = None,
multimask_output: bool = False,
return_logits: bool = False,
normalize_coords: bool = True,
point_crs: Optional[str] = None,
output: Optional[str] = None,
index: Optional[int] = None,
mask_multiplier: int = 255,
dtype: str = "float32",
return_results: bool = False,
**kwargs: Any,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Predict the mask for the input image.
Args:
point_coords (np.ndarray, optional): The point coordinates. Defaults to None.
point_labels (np.ndarray, optional): The point labels. Defaults to None.
boxes (list | np.ndarray, optional): A length 4 array given a box prompt to the
model, in XYXY format.
mask_input (np.ndarray, optional): A low resolution mask input to the model, typically
coming from a previous prediction iteration. Has form 1xHxW, where for SAM, H=W=256.
multimask_output (bool, optional): If true, the model will return three masks.
For ambiguous input prompts (such as a single click), this will often
produce better masks than a single prediction. If only a single
mask is needed, the model's predicted quality score can be used
to select the best mask. For non-ambiguous prompts, such as multiple
input prompts, multimask_output=False can give better results.
multimask_output (bool, optional): Whether to output multimask at each
point of the grid. Defaults to False.
return_logits (bool, optional): If true, returns un-thresholded masks logits
instead of a binary mask.
normalize_coords (bool, optional): Whether to normalize the coordinates.
Defaults to True.
point_crs (str, optional): The coordinate reference system (CRS) of the point prompts.
output (str, optional): The path to the output image. Defaults to None.
index (index, optional): The index of the mask to save. Defaults to None,
which will save the mask with the highest score.
mask_multiplier (int, optional): The mask multiplier for the output mask,
which is usually a binary mask [0, 1].
dtype (np.dtype, optional): The data type of the output image. Defaults to np.float32.
return_results (bool, optional): Whether to return the predicted masks,
scores, and logits. Defaults to False.
Returns:
Tuple[np.ndarray, np.ndarray, np.ndarray]: The mask, the multimask,
and the logits.
"""
import geopandas as gpd
out_of_bounds = []
if isinstance(boxes, str):
gdf = gpd.read_file(boxes)
if gdf.crs is not None:
gdf = gdf.to_crs("epsg:4326")
boxes = gdf.geometry.bounds.values.tolist()
elif isinstance(boxes, dict):
import json
geojson = json.dumps(boxes)
gdf = gpd.read_file(geojson, driver="GeoJSON")
boxes = gdf.geometry.bounds.values.tolist()
if isinstance(point_coords, str):
point_coords = common.vector_to_geojson(point_coords)
if isinstance(point_coords, dict):
point_coords = common.geojson_to_coords(point_coords)
if hasattr(self, "point_coords"):
point_coords = self.point_coords
if hasattr(self, "point_labels"):
point_labels = self.point_labels
if (point_crs is not None) and (point_coords is not None):
point_coords, out_of_bounds = common.coords_to_xy(
self.source, point_coords, point_crs, return_out_of_bounds=True
)
if isinstance(point_coords, list):
point_coords = np.array(point_coords)
if point_coords is not None:
if point_labels is None:
point_labels = [1] * len(point_coords)
elif isinstance(point_labels, int):
point_labels = [point_labels] * len(point_coords)
if isinstance(point_labels, list):
if len(point_labels) != len(point_coords):
if len(point_labels) == 1:
point_labels = point_labels * len(point_coords)
elif len(out_of_bounds) > 0:
print(f"Removing {len(out_of_bounds)} out-of-bound points.")
point_labels_new = []
for i, p in enumerate(point_labels):
if i not in out_of_bounds:
point_labels_new.append(p)
point_labels = point_labels_new
else:
raise ValueError(
"The length of point_labels must be equal to the length of point_coords."
)
point_labels = np.array(point_labels)
predictor = self.predictor
input_boxes = None
if isinstance(boxes, list) and (point_crs is not None):
coords = common.bbox_to_xy(self.source, boxes, point_crs)
input_boxes = np.array(coords)
elif isinstance(boxes, list) and (point_crs is None):
input_boxes = np.array(boxes)
self.boxes = input_boxes
masks, scores, logits = predictor.predict(
point_coords=point_coords,
point_labels=point_labels,
box=input_boxes,
mask_input=mask_input,
multimask_output=multimask_output,
return_logits=return_logits,
normalize_coords=normalize_coords,
)
self.masks = masks
self.scores = scores
self.logits = logits
if output is not None:
if boxes is None or (not isinstance(boxes[0], list)):
self.save_prediction(output, index, mask_multiplier, dtype, **kwargs)
else:
self.tensor_to_numpy(
index, output, mask_multiplier, dtype, save_args=kwargs
)
if return_results:
return masks, scores, logits
def predict_by_points(
self,
point_coords_batch: List[np.ndarray] = None,
point_labels_batch: List[np.ndarray] = None,
box_batch: List[np.ndarray] = None,
mask_input_batch: List[np.ndarray] = None,
multimask_output: bool = False,
return_logits: bool = False,
normalize_coords=True,
point_crs: Optional[str] = None,
output: Optional[str] = None,
index: Optional[int] = None,
unique: bool = True,
mask_multiplier: int = 255,
dtype: str = "int32",
return_results: bool = False,
**kwargs: Any,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Predict the mask for the input image.
Args:
point_coords (np.ndarray, optional): The point coordinates. Defaults to None.
point_labels (np.ndarray, optional): The point labels. Defaults to None.
boxes (list | np.ndarray, optional): A length 4 array given a box prompt to the
model, in XYXY format.
mask_input (np.ndarray, optional): A low resolution mask input to the model, typically
coming from a previous prediction iteration. Has form 1xHxW, where for SAM, H=W=256.
multimask_output (bool, optional): If true, the model will return three masks.
For ambiguous input prompts (such as a single click), this will often
produce better masks than a single prediction. If only a single
mask is needed, the model's predicted quality score can be used
to select the best mask. For non-ambiguous prompts, such as multiple
input prompts, multimask_output=False can give better results.
multimask_output (bool, optional): Whether to output multimask at each
point of the grid. Defaults to True.
return_logits (bool, optional): If true, returns un-thresholded masks logits
instead of a binary mask.
normalize_coords (bool, optional): Whether to normalize the coordinates.
Defaults to True.
point_crs (str, optional): The coordinate reference system (CRS) of the point prompts.
output (str, optional): The path to the output image. Defaults to None.
index (index, optional): The index of the mask to save. Defaults to None,
which will save the mask with the highest score.
mask_multiplier (int, optional): The mask multiplier for the output mask,
which is usually a binary mask [0, 1].
dtype (np.dtype, optional): The data type of the output image. Defaults to np.int32.
return_results (bool, optional): Whether to return the predicted masks,
scores, and logits. Defaults to False.
Returns:
Tuple[np.ndarray, np.ndarray, np.ndarray]: The mask, the multimask,
and the logits.
"""
import geopandas as gpd
if hasattr(self, "image_batch") and self.image_batch is not None:
pass
elif self.image is not None:
self.predictor.set_image_batch([self.image])
setattr(self, "image_batch", [self.image])
else:
raise ValueError("Please set the input image first using set_image().")
if isinstance(point_coords_batch, dict):
point_coords_batch = gpd.GeoDataFrame.from_features(point_coords_batch)
if isinstance(point_coords_batch, str) or isinstance(
point_coords_batch, gpd.GeoDataFrame
):
if isinstance(point_coords_batch, str):
gdf = gpd.read_file(point_coords_batch)
else:
gdf = point_coords_batch
if gdf.crs is None and (point_crs is not None):
gdf.crs = point_crs
points = gdf.geometry.apply(lambda geom: [geom.x, geom.y])
coordinates_array = np.array([[point] for point in points])
points = common.coords_to_xy(self.source, coordinates_array, point_crs)
num_points = points.shape[0]
if point_labels_batch is None:
labels = np.array([[1] for i in range(num_points)])
else:
labels = point_labels_batch
elif isinstance(point_coords_batch, list):
if point_crs is not None:
point_coords_batch_crs = common.coords_to_xy(
self.source, point_coords_batch, point_crs
)
else:
point_coords_batch_crs = point_coords_batch
num_points = len(point_coords_batch)
points = []
points.append([[point] for point in point_coords_batch_crs])
if point_labels_batch is None:
labels = np.array([[1] for i in range(num_points)])
elif isinstance(point_labels_batch, list):
labels = []
labels.append([[label] for label in point_labels_batch])
labels = labels[0]
else:
labels = point_labels_batch
points = np.array(points[0])
labels = np.array(labels)
elif isinstance(point_coords_batch, np.ndarray):
points = point_coords_batch
labels = point_labels_batch
else:
raise ValueError("point_coords must be a list, a GeoDataFrame, or a path.")
predictor = self.predictor
masks_batch, scores_batch, logits_batch = predictor.predict_batch(
point_coords_batch=[points],
point_labels_batch=[labels],
box_batch=box_batch,
mask_input_batch=mask_input_batch,
multimask_output=multimask_output,
return_logits=return_logits,
normalize_coords=normalize_coords,
)
masks = masks_batch[0]
scores = scores_batch[0]
logits = logits_batch[0]
if multimask_output and (index is not None):
masks = masks[:, index, :, :]
if masks.ndim > 3:
masks = masks.squeeze()
output_masks = []
sums = np.sum(masks, axis=(1, 2))
for index, mask in enumerate(masks):
item = {"segmentation": mask.astype("bool"), "area": sums[index]}
output_masks.append(item)
self.masks = output_masks
self.scores = scores
self.logits = logits
if output is not None:
self.save_masks(
output,
foreground=True,
unique=unique,
mask_multiplier=mask_multiplier,
dtype=dtype,
**kwargs,
)
if return_results:
return output_masks, scores, logits
def predict_batch(
self,
point_coords_batch: List[np.ndarray] = None,
point_labels_batch: List[np.ndarray] = None,
box_batch: List[np.ndarray] = None,
mask_input_batch: List[np.ndarray] = None,
multimask_output: bool = False,
return_logits: bool = False,
normalize_coords=True,
) -> Tuple[List[np.ndarray], List[np.ndarray], List[np.ndarray]]:
"""Predict masks for a batch of images.
Args:
point_coords_batch (Optional[List[np.ndarray]]): A batch of point
coordinates. Defaults to None.
point_labels_batch (Optional[List[np.ndarray]]): A batch of point
labels. Defaults to None.
box_batch (Optional[List[np.ndarray]]): A batch of bounding boxes.
Defaults to None.
mask_input_batch (Optional[List[np.ndarray]]): A batch of mask inputs.
Defaults to None.
multimask_output (bool): Whether to output multimask at each point
of the grid. Defaults to False.
return_logits (bool): Whether to return the logits. Defaults to False.
normalize_coords (bool): Whether to normalize the coordinates.
Defaults to True.
Returns:
Tuple[List[np.ndarray], List[np.ndarray], List[np.ndarray]]: Lists
of masks, multimasks, and logits.
"""
return self.predictor.predict_batch(
point_coords_batch=point_coords_batch,
point_labels_batch=point_labels_batch,
box_batch=box_batch,
mask_input_batch=mask_input_batch,
multimask_output=multimask_output,
return_logits=return_logits,
normalize_coords=normalize_coords,
)
@torch.inference_mode()
def init_state(
self,
video_path: str,
offload_video_to_cpu: bool = False,
offload_state_to_cpu: bool = False,
async_loading_frames: bool = False,
) -> Any:
"""Initialize an inference state.
Args:
video_path (str): The path to the video file.
offload_video_to_cpu (bool): Whether to offload the video to CPU.
Defaults to False.
offload_state_to_cpu (bool): Whether to offload the state to CPU.
Defaults to False.
async_loading_frames (bool): Whether to load frames asynchronously.
Defaults to False.
Returns:
Any: The initialized inference state.
"""
return self.predictor.init_state(
video_path,
offload_video_to_cpu=offload_video_to_cpu,
offload_state_to_cpu=offload_state_to_cpu,
async_loading_frames=async_loading_frames,
)
@torch.inference_mode()
def reset_state(self, inference_state: Any) -> None:
"""Remove all input points or masks in all frames throughout the video.
Args:
inference_state (Any): The current inference state.
"""
self.predictor.reset_state(inference_state)
@torch.inference_mode()
def add_new_points_or_box(
self,
inference_state: Any,
frame_idx: int,
obj_id: int,
points: Optional[np.ndarray] = None,
labels: Optional[np.ndarray] = None,
clear_old_points: bool = True,
normalize_coords: bool = True,
box: Optional[np.ndarray] = None,
) -> Any:
"""Add new points or a box to the inference state.
Args:
inference_state (Any): The current inference state.
frame_idx (int): The frame index.
obj_id (int): The object ID.
points (Optional[np.ndarray]): The points to add. Defaults to None.
labels (Optional[np.ndarray]): The labels for the points. Defaults to None.
clear_old_points (bool): Whether to clear old points. Defaults to True.
normalize_coords (bool): Whether to normalize the coordinates. Defaults to True.
box (Optional[np.ndarray]): The bounding box to add. Defaults to None.
Returns:
Any: The updated inference state.
"""
return self.predictor.add_new_points_or_box(
inference_state,
frame_idx,
obj_id,
points=points,
labels=labels,
clear_old_points=clear_old_points,
normalize_coords=normalize_coords,
box=box,
)
@torch.inference_mode()
def add_new_mask(
self,
inference_state: Any,
frame_idx: int,
obj_id: int,
mask: np.ndarray,
) -> Any:
"""Add a new mask to the inference state.
Args:
inference_state (Any): The current inference state.
frame_idx (int): The frame index.
obj_id (int): The object ID.
mask (np.ndarray): The mask to add.
Returns:
Any: The updated inference state.
"""
return self.predictor.add_new_mask(inference_state, frame_idx, obj_id, mask)
@torch.inference_mode()
def propagate_in_video_preflight(self, inference_state: Any) -> Any:
"""Propagate the inference state in video preflight.
Args:
inference_state (Any): The current inference state.
Returns:
Any: The propagated inference state.
"""
return self.predictor.propagate_in_video_preflight(inference_state)
@torch.inference_mode()
def propagate_in_video(
self,
inference_state: Any,
start_frame_idx: Optional[int] = None,
max_frame_num_to_track: Optional[int] = None,
reverse: bool = False,
) -> Any:
"""Propagate the inference state in video.
Args:
inference_state (Any): The current inference state.
start_frame_idx (Optional[int]): The starting frame index. Defaults to None.
max_frame_num_to_track (Optional[int]): The maximum number of frames
to track. Defaults to None.
reverse (bool): Whether to propagate in reverse. Defaults to False.
Returns:
Any: The propagated inference state.
"""
return self.predictor.propagate_in_video(
inference_state,
start_frame_idx=start_frame_idx,
max_frame_num_to_track=max_frame_num_to_track,
reverse=reverse,
)
def tensor_to_numpy(
self,
index: Optional[int] = None,
output: Optional[str] = None,
mask_multiplier: int = 255,
dtype: str = "uint8",
save_args: Optional[Dict[str, Any]] = None,
) -> Optional[np.ndarray]:
"""Convert the predicted masks from tensors to numpy arrays.
Args:
index (Optional[int], optional): The index of the mask to save.
Defaults to None, which will save the mask with the highest score.
output (Optional[str], optional): The path to the output image.
Defaults to None.
mask_multiplier (int, optional): The mask multiplier for the output
mask, which is usually a binary mask [0, 1].
dtype (str, optional): The data type of the output image. Defaults
to "uint8".
save_args (Optional[Dict[str, Any]], optional): Optional arguments
for saving the output image. Defaults to None.
Returns:
Optional[np.ndarray]: The predicted mask as a numpy array, or None
if output is specified.
"""
if save_args is None:
save_args = {}
boxes = self.boxes
masks = self.masks
image_pil = self.image
image_np = np.array(image_pil)
if index is None:
index = 0
masks = masks[:, index, :, :]
if len(masks.shape) == 4 and masks.shape[1] == 1:
masks = masks.squeeze(1)
if boxes is None or (len(boxes) == 0): # No "object" instances found
print("No objects found in the image.")
return
else:
# Create an empty image to store the mask overlays
mask_overlay = np.zeros_like(
image_np[..., 0], dtype=dtype
) # Adjusted for single channel
for i, (_, mask) in enumerate(zip(boxes, masks)):
# Convert tensor to numpy array if necessary and ensure it contains integers
if isinstance(mask, torch.Tensor):
mask = (
mask.cpu().numpy().astype(dtype)
) # If mask is on GPU, use .cpu() before .numpy()
mask_overlay += ((mask > 0) * (i + 1)).astype(
dtype
) # Assign a unique value for each mask
# Normalize mask_overlay to be in [0, 255]
mask_overlay = (
mask_overlay > 0
) * mask_multiplier # Binary mask in [0, 255]
if output is not None:
common.array_to_image(
mask_overlay, output, self.source, dtype=dtype, **save_args
)
else:
return mask_overlay
def save_prediction(
self,
output: str,
index: Optional[int] = None,
mask_multiplier: int = 255,
dtype: str = "float32",
vector: Optional[str] = None,
simplify_tolerance: Optional[float] = None,
**kwargs: Any,
) -> None:
"""Save the predicted mask to the output path.
Args:
output (str): The path to the output image.
index (Optional[int], optional): The index of the mask to save.
Defaults to None, which will save the mask with the highest score.
mask_multiplier (int, optional): The mask multiplier for the output
mask, which is usually a binary mask [0, 1].
dtype (str, optional): The data type of the output image. Defaults
to "float32".
vector (Optional[str], optional): The path to the output vector file.
Defaults to None.
simplify_tolerance (Optional[float], optional): The maximum allowed
geometry displacement. The higher this value, the smaller the
number of vertices in the resulting geometry.
**kwargs (Any): Additional keyword arguments.
"""
if self.scores is None:
raise ValueError("No predictions found. Please run predict() first.")
if index is None:
index = self.scores.argmax(axis=0)
array = self.masks[index] * mask_multiplier
self.prediction = array
common.array_to_image(array, output, self.source, dtype=dtype, **kwargs)
if vector is not None:
common.raster_to_vector(
output, vector, simplify_tolerance=simplify_tolerance
)
def show_map(
self,
basemap: str = "SATELLITE",
repeat_mode: bool = True,
out_dir: Optional[str] = None,
**kwargs: Any,
) -> Any:
"""Show the interactive map.
Args:
basemap (str, optional): The basemap. It can be one of the following:
SATELLITE, ROADMAP, TERRAIN, HYBRID.
repeat_mode (bool, optional): Whether to use the repeat mode for
draw control. Defaults to True.
out_dir (Optional[str], optional): The path to the output directory.
Defaults to None.
Returns:
Any: The map object.
"""
return common.sam_map_gui(
self, basemap=basemap, repeat_mode=repeat_mode, out_dir=out_dir, **kwargs
)
def show_canvas(
self,
fg_color: Tuple[int, int, int] = (0, 255, 0),
bg_color: Tuple[int, int, int] = (0, 0, 255),
radius: int = 5,
) -> Tuple[list, list]:
"""Show a canvas to collect foreground and background points.
Args:
fg_color (Tuple[int, int, int], optional): The color for the foreground points.
Defaults to (0, 255, 0).
bg_color (Tuple[int, int, int], optional): The color for the background points.
Defaults to (0, 0, 255).
radius (int, optional): The radius of the points. Defaults to 5.
Returns:
Tuple[list, list]: A tuple of two lists of foreground and background points.
"""
if self.image is None:
raise ValueError("Please run set_image() first.")
image = self.image
fg_points, bg_points = common.show_canvas(image, fg_color, bg_color, radius)
self.fg_points = fg_points
self.bg_points = bg_points
point_coords = fg_points + bg_points
point_labels = [1] * len(fg_points) + [0] * len(bg_points)
self.point_coords = point_coords
self.point_labels = point_labels
def _convert_prompts(self, prompts: Dict[int, Any]) -> Dict[int, Any]:
"""Convert the points and labels in the prompts to numpy arrays with specific data types.
Args:
prompts (Dict[str, Any]): A dictionary containing the prompts with points and labels.
Returns:
Dict[str, Any]: The updated dictionary with points and labels converted to numpy arrays.
"""
for _, value in prompts.items():
# Convert points to np.float32 array
if "points" in value:
value["points"] = np.array(value["points"], dtype=np.float32)
# Convert labels to np.int32 array
if "labels" in value:
value["labels"] = np.array(value["labels"], dtype=np.int32)
# Convert box to np.float32 array
if "box" in value:
value["box"] = np.array(value["box"], dtype=np.float32)
return prompts
def set_video(
self,
video_path: str,
output_dir: str = None,
frame_rate: Optional[int] = None,
prefix: str = "",
) -> None:
"""Set the video path and parameters.
Args:
video_path (str): The path to the video file.
start_frame (int, optional): The starting frame index. Defaults to 0.
end_frame (Optional[int], optional): The ending frame index. Defaults to None.
step (int, optional): The step size. Defaults to 1.
frame_rate (Optional[int], optional): The frame rate. Defaults to None.
"""
if isinstance(video_path, str):
if video_path.startswith("http"):
video_path = common.download_file(video_path)
if os.path.isfile(video_path):
if output_dir is None:
output_dir = common.make_temp_dir()
if not os.path.exists(output_dir):
os.makedirs(output_dir)
print(f"Output directory: {output_dir}")
common.video_to_images(
video_path, output_dir, frame_rate=frame_rate, prefix=prefix
)
elif os.path.isdir(video_path):
files = sorted(os.listdir(video_path))
if len(files) == 0:
raise ValueError(f"No files found in {video_path}.")
elif files[0].endswith(".tif"):
self._tif_source = os.path.join(video_path, files[0])
self._tif_dir = video_path
self._tif_names = files
video_path = common.geotiff_to_jpg_batch(video_path)
output_dir = video_path
if not os.path.exists(video_path):
raise ValueError(f"Input path {video_path} does not exist.")
else:
raise ValueError("Input video_path must be a string.")
self.video_path = output_dir
self._num_images = len(os.listdir(output_dir))
self._frame_names = sorted(os.listdir(output_dir))
self.inference_state = self.predictor.init_state(video_path=output_dir)
def predict_video(
self,
prompts: Dict[int, Any] = None,
point_crs: Optional[str] = None,
output_dir: Optional[str] = None,
img_ext: str = "png",
) -> None:
"""Predict masks for the video.
Args:
prompts (Dict[int, Any]): A dictionary containing the prompts with points and labels.
point_crs (Optional[str]): The coordinate reference system (CRS) of the point prompts.
output_dir (Optional[str]): The directory to save the output images. Defaults to None.
img_ext (str): The file extension for the output images. Defaults to "png".
"""
from PIL import Image
def save_image_from_dict(data, output_path="output_image.png"):
# Find the shape of the first array in the dictionary (assuming all arrays have the same shape)
array_shape = next(iter(data.values())).shape[1:]
# Initialize an empty array with the same shape as the arrays in the dictionary, filled with zeros
output_array = np.zeros(array_shape, dtype=np.uint8)
# Iterate over each key and array in the dictionary
for key, array in data.items():
# Assign the key value wherever the boolean array is True
output_array[array[0]] = key
# Convert the output array to a PIL image
image = Image.fromarray(output_array)
# Save the image
image.save(output_path)
if prompts is None:
if hasattr(self, "prompts"):
prompts = self.prompts
else:
raise ValueError("Please provide prompts.")
if point_crs is not None and self._tif_source is not None:
for prompt in prompts.values():
points = prompt.get("points", None)
if points is not None:
points = common.coords_to_xy(self._tif_source, points, point_crs)
prompt["points"] = points
box = prompt.get("box", None)
if box is not None:
box = common.bbox_to_xy(self._tif_source, box, point_crs)
prompt["box"] = box
prompts = self._convert_prompts(prompts)
predictor = self.predictor
inference_state = self.inference_state
for obj_id, prompt in prompts.items():
points = prompt.get("points", None)
labels = prompt.get("labels", None)
box = prompt.get("box", None)
frame_idx = prompt.get("frame_idx", None)
_, out_obj_ids, out_mask_logits = predictor.add_new_points_or_box(
inference_state=inference_state,
frame_idx=frame_idx,
obj_id=obj_id,
points=points,
labels=labels,
box=box,
)
video_segments = {}
num_frames = self._num_images
num_digits = len(str(num_frames))
if output_dir is not None:
if not os.path.exists(output_dir):
os.makedirs(output_dir)
for out_frame_idx, out_obj_ids, out_mask_logits in predictor.propagate_in_video(
inference_state
):
video_segments[out_frame_idx] = {
out_obj_id: (out_mask_logits[i] > 0.0).cpu().numpy()
for i, out_obj_id in enumerate(out_obj_ids)
}
if output_dir is not None:
output_path = os.path.join(
output_dir, f"{str(out_frame_idx).zfill(num_digits)}.{img_ext}"
)
save_image_from_dict(video_segments[out_frame_idx], output_path)
self.video_segments = video_segments
# if output_dir is not None:
# self.save_video_segments(output_dir, img_ext)
def save_video_segments(self, output_dir: str, img_ext: str = "png") -> None:
"""Save the video segments to the output directory.
Args:
output_dir (str): The path to the output directory.
img_ext (str): The file extension for the output images. Defaults to "png".
"""
from PIL import Image
def save_image_from_dict(
data, output_path="output_image.png", crs_source=None, **kwargs
):
# Find the shape of the first array in the dictionary (assuming all arrays have the same shape)
array_shape = next(iter(data.values())).shape[1:]
# Initialize an empty array with the same shape as the arrays in the dictionary, filled with zeros
output_array = np.zeros(array_shape, dtype=np.uint8)
# Iterate over each key and array in the dictionary
for key, array in data.items():
# Assign the key value wherever the boolean array is True
output_array[array[0]] = key
if crs_source is None:
# Convert the output array to a PIL image
image = Image.fromarray(output_array)
# Save the image
image.save(output_path)
else:
output_path = output_path.replace(".png", ".tif")
common.array_to_image(output_array, output_path, crs_source, **kwargs)
num_frames = len(self.video_segments)
num_digits = len(str(num_frames))
if hasattr(self, "_tif_source") and self._tif_source.endswith(".tif"):
crs_source = self._tif_source
filenames = self._tif_names
else:
crs_source = None
filenames = None
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Initialize the tqdm progress bar
for frame_idx, video_segment in tqdm(
self.video_segments.items(), desc="Rendering frames", total=num_frames
):
if filenames is None:
output_path = os.path.join(
output_dir, f"{str(frame_idx).zfill(num_digits)}.{img_ext}"
)
else:
output_path = os.path.join(output_dir, filenames[frame_idx])
save_image_from_dict(video_segment, output_path, crs_source)
def save_video_segments_blended(
self,
output_dir: str,
img_ext: str = "png",
alpha: float = 0.6,
dpi: int = 200,
frame_stride: int = 1,
output_video: Optional[str] = None,
fps: int = 30,
) -> None:
"""Save blended video segments to the output directory and optionally create a video.
Args:
output_dir (str): The directory to save the output images.
img_ext (str): The file extension for the output images. Defaults to "png".
alpha (float): The alpha value for the blended masks. Defaults to 0.6.
dpi (int): The DPI (dots per inch) for the output images. Defaults to 200.
frame_stride (int): The stride for selecting frames to save. Defaults to 1.
output_video (Optional[str]): The path to the output video file. Defaults to None.
fps (int): The frames per second for the output video. Defaults to 30.
"""
from PIL import Image
def show_mask(mask, ax, obj_id=None, random_color=False):
if random_color:
color = np.concatenate([np.random.random(3), np.array([alpha])], axis=0)
else:
cmap = plt.get_cmap("tab10")
cmap_idx = 0 if obj_id is None else obj_id
color = np.array([*cmap(cmap_idx)[:3], alpha])
h, w = mask.shape[-2:]
mask_image = mask.reshape(h, w, 1) * color.reshape(1, 1, -1)
ax.imshow(mask_image)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
plt.close("all")
video_segments = self.video_segments
video_dir = self.video_path
frame_names = self._frame_names
num_frames = len(frame_names)
num_digits = len(str(num_frames))
# Initialize the tqdm progress bar
for out_frame_idx in tqdm(
range(0, len(frame_names), frame_stride), desc="Rendering frames"
):
image = Image.open(os.path.join(video_dir, frame_names[out_frame_idx]))
# Get original image dimensions
w, h = image.size
# Set DPI and calculate figure size based on the original image dimensions
figsize = (
w / dpi,
h / dpi,
)
figsize = (
figsize[0] * 1.3,
figsize[1] * 1.3,
)
# Create a figure with the exact size and DPI
fig = plt.figure(figsize=figsize, dpi=dpi)
# Disable axis to prevent whitespace
plt.axis("off")
# Display the original image
plt.imshow(image)
# Overlay masks for each object ID
for out_obj_id, out_mask in video_segments[out_frame_idx].items():
show_mask(out_mask, plt.gca(), obj_id=out_obj_id)
# Save the figure with no borders or extra padding
filename = f"{str(out_frame_idx).zfill(num_digits)}.{img_ext}"
filepath = os.path.join(output_dir, filename)
plt.savefig(filepath, dpi=dpi, pad_inches=0, bbox_inches="tight")
plt.close(fig)
if output_video is not None:
common.images_to_video(output_dir, output_video, fps=fps)
def show_images(self, path: str = None) -> None:
"""Show the images in the video.
Args:
path (str, optional): The path to the images. Defaults to None.
"""
if path is None:
path = self.video_path
if path is not None:
common.show_image_gui(path)
def show_prompts(
self,
prompts: Dict[int, Any],
frame_idx: int = 0,
mask: Any = None,
random_color: bool = False,
point_crs: Optional[str] = None,
figsize: Tuple[int, int] = (9, 6),
) -> None:
"""Show the prompts on the image.
Args:
prompts (Dict[int, Any]): A dictionary containing the prompts with
points and labels.
frame_idx (int, optional): The frame index. Defaults to 0.
mask (Any, optional): The mask. Defaults to None.
random_color (bool, optional): Whether to use random colors for the
masks. Defaults to False.
point_crs (Optional[str], optional): The coordinate reference system
figsize (Tuple[int, int], optional): The figure size. Defaults to (9, 6).
"""
from PIL import Image
def show_mask(mask, ax, obj_id=None, random_color=random_color):
if random_color:
color = np.concatenate([np.random.random(3), np.array([0.6])], axis=0)
else:
cmap = plt.get_cmap("tab10")
cmap_idx = 0 if obj_id is None else obj_id
color = np.array([*cmap(cmap_idx)[:3], 0.6])
h, w = mask.shape[-2:]
mask_image = mask.reshape(h, w, 1) * color.reshape(1, 1, -1)
ax.imshow(mask_image)
def show_points(coords, labels, ax, marker_size=200):
pos_points = coords[labels == 1]
neg_points = coords[labels == 0]
ax.scatter(
pos_points[:, 0],
pos_points[:, 1],
color="green",
marker="*",
s=marker_size,
edgecolor="white",
linewidth=1.25,
)
ax.scatter(
neg_points[:, 0],
neg_points[:, 1],
color="red",
marker="*",
s=marker_size,
edgecolor="white",
linewidth=1.25,
)
def show_box(box, ax):
x0, y0 = box[0], box[1]
w, h = box[2] - box[0], box[3] - box[1]
ax.add_patch(
plt.Rectangle(
(x0, y0), w, h, edgecolor="green", facecolor=(0, 0, 0, 0), lw=2
)
)
if point_crs is not None and self._tif_source is not None:
for prompt in prompts.values():
points = prompt.get("points", None)
if points is not None:
points = common.coords_to_xy(self._tif_source, points, point_crs)
prompt["points"] = points
box = prompt.get("box", None)
if box is not None:
box = common.bbox_to_xy(self._tif_source, box, point_crs)
prompt["box"] = box
prompts = self._convert_prompts(prompts)
self.prompts = prompts
video_dir = self.video_path
frame_names = self._frame_names
fig = plt.figure(figsize=figsize)
fig.canvas.toolbar_visible = True
fig.canvas.header_visible = False
fig.canvas.footer_visible = True
plt.title(f"frame {frame_idx}")
plt.imshow(Image.open(os.path.join(video_dir, frame_names[frame_idx])))
for obj_id, prompt in prompts.items():
points = prompt.get("points", None)
labels = prompt.get("labels", None)
box = prompt.get("box", None)
anno_frame_idx = prompt.get("frame_idx", None)
if anno_frame_idx == frame_idx:
if points is not None:
show_points(points, labels, plt.gca())
if box is not None:
show_box(box, plt.gca())
if mask is not None:
show_mask(mask, plt.gca(), obj_id=obj_id)
plt.show()
def raster_to_vector(self, raster, vector, simplify_tolerance=None, **kwargs):
"""Convert a raster image file to a vector dataset.
Args:
raster (str): The path to the raster image.
output (str): The path to the vector file.
simplify_tolerance (float, optional): The maximum allowed geometry displacement.
The higher this value, the smaller the number of vertices in the resulting geometry.
"""
common.raster_to_vector(
raster, vector, simplify_tolerance=simplify_tolerance, **kwargs
)
def region_groups(
self,
image: Union[str, "xr.DataArray", np.ndarray],
connectivity: int = 1,
min_size: int = 10,
max_size: Optional[int] = None,
threshold: Optional[int] = None,
properties: Optional[List[str]] = None,
intensity_image: Optional[Union[str, "xr.DataArray", np.ndarray]] = None,
out_csv: Optional[str] = None,
out_vector: Optional[str] = None,
out_image: Optional[str] = None,
**kwargs: Any,
) -> Union[
Tuple[np.ndarray, "pd.DataFrame"], Tuple["xr.DataArray", "pd.DataFrame"]
]:
"""
Segment regions in an image and filter them based on size.
Args:
image (Union[str, xr.DataArray, np.ndarray]): Input image, can be a file
path, xarray DataArray, or numpy array.
connectivity (int, optional): Connectivity for labeling. Defaults to 1
for 4-connectivity. Use 2 for 8-connectivity.
min_size (int, optional): Minimum size of regions to keep. Defaults to 10.
max_size (Optional[int], optional): Maximum size of regions to keep.
Defaults to None.
threshold (Optional[int], optional): Threshold for filling holes.
Defaults to None, which is equal to min_size.
properties (Optional[List[str]], optional): List of properties to measure.
See https://scikit-image.org/docs/stable/api/skimage.measure.html#skimage.measure.regionprops
Defaults to None.
intensity_image (Optional[Union[str, xr.DataArray, np.ndarray]], optional):
Intensity image to use for properties. Defaults to None.
out_csv (Optional[str], optional): Path to save the properties as a CSV file.
Defaults to None.
out_vector (Optional[str], optional): Path to save the vector file.
Defaults to None.
out_image (Optional[str], optional): Path to save the output image.
Defaults to None.
Returns:
Union[Tuple[np.ndarray, pd.DataFrame], Tuple[xr.DataArray, pd.DataFrame]]: Labeled image and properties DataFrame.
"""
return common.region_groups(
image,
connectivity=connectivity,
min_size=min_size,
max_size=max_size,
threshold=threshold,
properties=properties,
intensity_image=intensity_image,
out_csv=out_csv,
out_vector=out_vector,
out_image=out_image,
**kwargs,
)
__init__(self, model_id='sam2-hiera-large', device=None, empty_cache=True, automatic=True, video=False, mode='eval', hydra_overrides_extra=None, apply_postprocessing=False, points_per_side=32, points_per_batch=64, pred_iou_thresh=0.8, stability_score_thresh=0.95, stability_score_offset=1.0, mask_threshold=0.0, box_nms_thresh=0.7, crop_n_layers=0, crop_nms_thresh=0.7, crop_overlap_ratio=0.3413333333333333, crop_n_points_downscale_factor=1, point_grids=None, min_mask_region_area=0, output_mode='binary_mask', use_m2m=False, multimask_output=False, max_hole_area=0.0, max_sprinkle_area=0.0, **kwargs)
special
¶
Initializes the SamGeo2 class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_id |
str |
The model ID to use. Can be one of the following: "sam2-hiera-tiny", "sam2-hiera-small", "sam2-hiera-base-plus", "sam2-hiera-large". Defaults to "sam2-hiera-large". |
'sam2-hiera-large' |
device |
Optional[str] |
The device to use (e.g., "cpu", "cuda", "mps"). Defaults to None. |
None |
empty_cache |
bool |
Whether to empty the cache. Defaults to True. |
True |
automatic |
bool |
Whether to use automatic mask generation. Defaults to True. |
True |
video |
bool |
Whether to use video prediction. Defaults to False. |
False |
mode |
str |
The mode to use. Defaults to "eval". |
'eval' |
hydra_overrides_extra |
Optional[List[str]] |
Additional Hydra overrides. Defaults to None. |
None |
apply_postprocessing |
bool |
Whether to apply postprocessing. Defaults to False. |
False |
points_per_side |
int or None |
The number of points to be sampled along one side of the image. The total number of points is points_per_side**2. If None, 'point_grids' must provide explicit point sampling. |
32 |
points_per_batch |
int |
Sets the number of points run simultaneously by the model. Higher numbers may be faster but use more GPU memory. |
64 |
pred_iou_thresh |
float |
A filtering threshold in [0,1], using the model's predicted mask quality. |
0.8 |
stability_score_thresh |
float |
A filtering threshold in [0,1], using the stability of the mask under changes to the cutoff used to binarize the model's mask predictions. |
0.95 |
stability_score_offset |
float |
The amount to shift the cutoff when calculated the stability score. |
1.0 |
mask_threshold |
float |
Threshold for binarizing the mask logits |
0.0 |
box_nms_thresh |
float |
The box IoU cutoff used by non-maximal suppression to filter duplicate masks. |
0.7 |
crop_n_layers |
int |
If >0, mask prediction will be run again on crops of the image. Sets the number of layers to run, where each layer has 2**i_layer number of image crops. |
0 |
crop_nms_thresh |
float |
The box IoU cutoff used by non-maximal suppression to filter duplicate masks between different crops. |
0.7 |
crop_overlap_ratio |
float |
Sets the degree to which crops overlap. In the first crop layer, crops will overlap by this fraction of the image length. Later layers with more crops scale down this overlap. |
0.3413333333333333 |
crop_n_points_downscale_factor |
int |
The number of points-per-side sampled in layer n is scaled down by crop_n_points_downscale_factor**n. |
1 |
point_grids |
list(np.ndarray) or None |
A list over explicit grids of points used for sampling, normalized to [0,1]. The nth grid in the list is used in the nth crop layer. Exclusive with points_per_side. |
None |
min_mask_region_area |
int |
If >0, postprocessing will be applied to remove disconnected regions and holes in masks with area smaller than min_mask_region_area. Requires opencv. |
0 |
output_mode |
str |
The form masks are returned in. Can be 'binary_mask', 'uncompressed_rle', or 'coco_rle'. 'coco_rle' requires pycocotools. For large resolutions, 'binary_mask' may consume large amounts of memory. |
'binary_mask' |
use_m2m |
bool |
Whether to add a one step refinement using previous mask predictions. |
False |
multimask_output |
bool |
Whether to output multimask at each point of the grid. Defaults to False. |
False |
max_hole_area |
int |
If max_hole_area > 0, we fill small holes in up to the maximum area of max_hole_area in low_res_masks. |
0.0 |
max_sprinkle_area |
int |
If max_sprinkle_area > 0, we remove small sprinkles up to the maximum area of max_sprinkle_area in low_res_masks. |
0.0 |
**kwargs |
Any |
Additional keyword arguments to pass to SAM2AutomaticMaskGenerator.from_pretrained() or SAM2ImagePredictor.from_pretrained(). |
{} |
Source code in samgeo/samgeo2.py
def __init__(
self,
model_id: str = "sam2-hiera-large",
device: Optional[str] = None,
empty_cache: bool = True,
automatic: bool = True,
video: bool = False,
mode: str = "eval",
hydra_overrides_extra: Optional[List[str]] = None,
apply_postprocessing: bool = False,
points_per_side: Optional[int] = 32,
points_per_batch: int = 64,
pred_iou_thresh: float = 0.8,
stability_score_thresh: float = 0.95,
stability_score_offset: float = 1.0,
mask_threshold: float = 0.0,
box_nms_thresh: float = 0.7,
crop_n_layers: int = 0,
crop_nms_thresh: float = 0.7,
crop_overlap_ratio: float = 512 / 1500,
crop_n_points_downscale_factor: int = 1,
point_grids: Optional[List[np.ndarray]] = None,
min_mask_region_area: int = 0,
output_mode: str = "binary_mask",
use_m2m: bool = False,
multimask_output: bool = False,
max_hole_area: float = 0.0,
max_sprinkle_area: float = 0.0,
**kwargs: Any,
) -> None:
"""
Initializes the SamGeo2 class.
Args:
model_id (str): The model ID to use. Can be one of the following: "sam2-hiera-tiny",
"sam2-hiera-small", "sam2-hiera-base-plus", "sam2-hiera-large".
Defaults to "sam2-hiera-large".
device (Optional[str]): The device to use (e.g., "cpu", "cuda", "mps"). Defaults to None.
empty_cache (bool): Whether to empty the cache. Defaults to True.
automatic (bool): Whether to use automatic mask generation. Defaults to True.
video (bool): Whether to use video prediction. Defaults to False.
mode (str): The mode to use. Defaults to "eval".
hydra_overrides_extra (Optional[List[str]]): Additional Hydra overrides. Defaults to None.
apply_postprocessing (bool): Whether to apply postprocessing. Defaults to False.
points_per_side (int or None): The number of points to be sampled
along one side of the image. The total number of points is
points_per_side**2. If None, 'point_grids' must provide explicit
point sampling.
points_per_batch (int): Sets the number of points run simultaneously
by the model. Higher numbers may be faster but use more GPU memory.
pred_iou_thresh (float): A filtering threshold in [0,1], using the
model's predicted mask quality.
stability_score_thresh (float): A filtering threshold in [0,1], using
the stability of the mask under changes to the cutoff used to binarize
the model's mask predictions.
stability_score_offset (float): The amount to shift the cutoff when
calculated the stability score.
mask_threshold (float): Threshold for binarizing the mask logits
box_nms_thresh (float): The box IoU cutoff used by non-maximal
suppression to filter duplicate masks.
crop_n_layers (int): If >0, mask prediction will be run again on
crops of the image. Sets the number of layers to run, where each
layer has 2**i_layer number of image crops.
crop_nms_thresh (float): The box IoU cutoff used by non-maximal
suppression to filter duplicate masks between different crops.
crop_overlap_ratio (float): Sets the degree to which crops overlap.
In the first crop layer, crops will overlap by this fraction of
the image length. Later layers with more crops scale down this overlap.
crop_n_points_downscale_factor (int): The number of points-per-side
sampled in layer n is scaled down by crop_n_points_downscale_factor**n.
point_grids (list(np.ndarray) or None): A list over explicit grids
of points used for sampling, normalized to [0,1]. The nth grid in the
list is used in the nth crop layer. Exclusive with points_per_side.
min_mask_region_area (int): If >0, postprocessing will be applied
to remove disconnected regions and holes in masks with area smaller
than min_mask_region_area. Requires opencv.
output_mode (str): The form masks are returned in. Can be 'binary_mask',
'uncompressed_rle', or 'coco_rle'. 'coco_rle' requires pycocotools.
For large resolutions, 'binary_mask' may consume large amounts of
memory.
use_m2m (bool): Whether to add a one step refinement using previous mask predictions.
multimask_output (bool): Whether to output multimask at each point of the grid.
Defaults to False.
max_hole_area (int): If max_hole_area > 0, we fill small holes in up to
the maximum area of max_hole_area in low_res_masks.
max_sprinkle_area (int): If max_sprinkle_area > 0, we remove small sprinkles up to
the maximum area of max_sprinkle_area in low_res_masks.
**kwargs (Any): Additional keyword arguments to pass to
SAM2AutomaticMaskGenerator.from_pretrained() or SAM2ImagePredictor.from_pretrained().
"""
if isinstance(model_id, str):
if not model_id.startswith("facebook/"):
model_id = f"facebook/{model_id}"
else:
raise ValueError("model_id must be a string")
allowed_models = [
"facebook/sam2-hiera-tiny",
"facebook/sam2-hiera-small",
"facebook/sam2-hiera-base-plus",
"facebook/sam2-hiera-large",
]
if model_id not in allowed_models:
raise ValueError(
f"model_id must be one of the following: {', '.join(allowed_models)}"
)
if device is None:
device = common.choose_device(empty_cache=empty_cache)
if hydra_overrides_extra is None:
hydra_overrides_extra = []
self.model_id = model_id
self.model_version = "sam2"
self.device = device
if video:
automatic = False
if automatic:
self.mask_generator = SAM2AutomaticMaskGenerator.from_pretrained(
model_id,
device=device,
mode=mode,
hydra_overrides_extra=hydra_overrides_extra,
apply_postprocessing=apply_postprocessing,
points_per_side=points_per_side,
points_per_batch=points_per_batch,
pred_iou_thresh=pred_iou_thresh,
stability_score_thresh=stability_score_thresh,
stability_score_offset=stability_score_offset,
mask_threshold=mask_threshold,
box_nms_thresh=box_nms_thresh,
crop_n_layers=crop_n_layers,
crop_nms_thresh=crop_nms_thresh,
crop_overlap_ratio=crop_overlap_ratio,
crop_n_points_downscale_factor=crop_n_points_downscale_factor,
point_grids=point_grids,
min_mask_region_area=min_mask_region_area,
output_mode=output_mode,
use_m2m=use_m2m,
multimask_output=multimask_output,
**kwargs,
)
elif video:
self.predictor = SAM2VideoPredictor.from_pretrained(
model_id,
device=device,
mode=mode,
hydra_overrides_extra=hydra_overrides_extra,
apply_postprocessing=apply_postprocessing,
**kwargs,
)
else:
self.predictor = SAM2ImagePredictor.from_pretrained(
model_id,
device=device,
mode=mode,
hydra_overrides_extra=hydra_overrides_extra,
apply_postprocessing=apply_postprocessing,
mask_threshold=mask_threshold,
max_hole_area=max_hole_area,
max_sprinkle_area=max_sprinkle_area,
**kwargs,
)
add_new_mask(self, inference_state, frame_idx, obj_id, mask)
¶
Add a new mask to the inference state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inference_state |
Any |
The current inference state. |
required |
frame_idx |
int |
The frame index. |
required |
obj_id |
int |
The object ID. |
required |
mask |
np.ndarray |
The mask to add. |
required |
Returns:
Type | Description |
---|---|
Any |
The updated inference state. |
Source code in samgeo/samgeo2.py
@torch.inference_mode()
def add_new_mask(
self,
inference_state: Any,
frame_idx: int,
obj_id: int,
mask: np.ndarray,
) -> Any:
"""Add a new mask to the inference state.
Args:
inference_state (Any): The current inference state.
frame_idx (int): The frame index.
obj_id (int): The object ID.
mask (np.ndarray): The mask to add.
Returns:
Any: The updated inference state.
"""
return self.predictor.add_new_mask(inference_state, frame_idx, obj_id, mask)
add_new_points_or_box(self, inference_state, frame_idx, obj_id, points=None, labels=None, clear_old_points=True, normalize_coords=True, box=None)
¶
Add new points or a box to the inference state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inference_state |
Any |
The current inference state. |
required |
frame_idx |
int |
The frame index. |
required |
obj_id |
int |
The object ID. |
required |
points |
Optional[np.ndarray] |
The points to add. Defaults to None. |
None |
labels |
Optional[np.ndarray] |
The labels for the points. Defaults to None. |
None |
clear_old_points |
bool |
Whether to clear old points. Defaults to True. |
True |
normalize_coords |
bool |
Whether to normalize the coordinates. Defaults to True. |
True |
box |
Optional[np.ndarray] |
The bounding box to add. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
Any |
The updated inference state. |
Source code in samgeo/samgeo2.py
@torch.inference_mode()
def add_new_points_or_box(
self,
inference_state: Any,
frame_idx: int,
obj_id: int,
points: Optional[np.ndarray] = None,
labels: Optional[np.ndarray] = None,
clear_old_points: bool = True,
normalize_coords: bool = True,
box: Optional[np.ndarray] = None,
) -> Any:
"""Add new points or a box to the inference state.
Args:
inference_state (Any): The current inference state.
frame_idx (int): The frame index.
obj_id (int): The object ID.
points (Optional[np.ndarray]): The points to add. Defaults to None.
labels (Optional[np.ndarray]): The labels for the points. Defaults to None.
clear_old_points (bool): Whether to clear old points. Defaults to True.
normalize_coords (bool): Whether to normalize the coordinates. Defaults to True.
box (Optional[np.ndarray]): The bounding box to add. Defaults to None.
Returns:
Any: The updated inference state.
"""
return self.predictor.add_new_points_or_box(
inference_state,
frame_idx,
obj_id,
points=points,
labels=labels,
clear_old_points=clear_old_points,
normalize_coords=normalize_coords,
box=box,
)
generate(self, source, output=None, foreground=True, erosion_kernel=None, mask_multiplier=255, unique=True, min_size=0, max_size=None, **kwargs)
¶
Generate masks for the input image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
Union[str, np.ndarray] |
The path to the input image or the input image as a numpy array. |
required |
output |
Optional[str] |
The path to the output image. Defaults to None. |
None |
foreground |
bool |
Whether to generate the foreground mask. Defaults to True. |
True |
erosion_kernel |
Optional[Tuple[int, int]] |
The erosion kernel for filtering object masks and extract borders. Such as (3, 3) or (5, 5). Set to None to disable it. Defaults to None. |
None |
mask_multiplier |
int |
The mask multiplier for the output mask, which is usually a binary mask [0, 1]. You can use this parameter to scale the mask to a larger range, for example [0, 255]. Defaults to 255. The parameter is ignored if unique is True. |
255 |
unique |
bool |
Whether to assign a unique value to each object. Defaults to True. The unique value increases from 1 to the number of objects. The larger the number, the larger the object area. |
True |
min_size |
int |
The minimum size of the object. Defaults to 0. |
0 |
max_size |
int |
The maximum size of the object. Defaults to None. |
None |
**kwargs |
Any |
Additional keyword arguments. |
{} |
Returns:
Type | Description |
---|---|
List[Dict[str, Any]] |
A list of dictionaries containing the generated masks. |
Source code in samgeo/samgeo2.py
def generate(
self,
source: Union[str, np.ndarray],
output: Optional[str] = None,
foreground: bool = True,
erosion_kernel: Optional[Tuple[int, int]] = None,
mask_multiplier: int = 255,
unique: bool = True,
min_size: int = 0,
max_size: int = None,
**kwargs: Any,
) -> List[Dict[str, Any]]:
"""
Generate masks for the input image.
Args:
source (Union[str, np.ndarray]): The path to the input image or the
input image as a numpy array.
output (Optional[str]): The path to the output image. Defaults to None.
foreground (bool): Whether to generate the foreground mask. Defaults
to True.
erosion_kernel (Optional[Tuple[int, int]]): The erosion kernel for
filtering object masks and extract borders.
Such as (3, 3) or (5, 5). Set to None to disable it. Defaults to None.
mask_multiplier (int): The mask multiplier for the output mask,
which is usually a binary mask [0, 1].
You can use this parameter to scale the mask to a larger range,
for example [0, 255]. Defaults to 255.
The parameter is ignored if unique is True.
unique (bool): Whether to assign a unique value to each object.
Defaults to True.
The unique value increases from 1 to the number of objects. The
larger the number, the larger the object area.
min_size (int): The minimum size of the object. Defaults to 0.
max_size (int): The maximum size of the object. Defaults to None.
**kwargs (Any): Additional keyword arguments.
Returns:
List[Dict[str, Any]]: A list of dictionaries containing the generated masks.
"""
if isinstance(source, str):
if source.startswith("http"):
source = common.download_file(source)
if not os.path.exists(source):
raise ValueError(f"Input path {source} does not exist.")
image = cv2.imread(source)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
elif isinstance(source, np.ndarray):
image = source
source = None
else:
raise ValueError("Input source must be either a path or a numpy array.")
self.source = source # Store the input image path
self.image = image # Store the input image as a numpy array
mask_generator = self.mask_generator # The automatic mask generator
masks = mask_generator.generate(image) # Segment the input image
self.masks = masks # Store the masks as a list of dictionaries
self._min_size = min_size
self._max_size = max_size
if output is not None:
# Save the masks to the output path. The output is either a binary mask or a mask of objects with unique values.
self.save_masks(
output,
foreground,
unique,
erosion_kernel,
mask_multiplier,
min_size,
max_size,
**kwargs,
)
init_state(self, video_path, offload_video_to_cpu=False, offload_state_to_cpu=False, async_loading_frames=False)
¶
Initialize an inference state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
video_path |
str |
The path to the video file. |
required |
offload_video_to_cpu |
bool |
Whether to offload the video to CPU. Defaults to False. |
False |
offload_state_to_cpu |
bool |
Whether to offload the state to CPU. Defaults to False. |
False |
async_loading_frames |
bool |
Whether to load frames asynchronously. Defaults to False. |
False |
Returns:
Type | Description |
---|---|
Any |
The initialized inference state. |
Source code in samgeo/samgeo2.py
@torch.inference_mode()
def init_state(
self,
video_path: str,
offload_video_to_cpu: bool = False,
offload_state_to_cpu: bool = False,
async_loading_frames: bool = False,
) -> Any:
"""Initialize an inference state.
Args:
video_path (str): The path to the video file.
offload_video_to_cpu (bool): Whether to offload the video to CPU.
Defaults to False.
offload_state_to_cpu (bool): Whether to offload the state to CPU.
Defaults to False.
async_loading_frames (bool): Whether to load frames asynchronously.
Defaults to False.
Returns:
Any: The initialized inference state.
"""
return self.predictor.init_state(
video_path,
offload_video_to_cpu=offload_video_to_cpu,
offload_state_to_cpu=offload_state_to_cpu,
async_loading_frames=async_loading_frames,
)
predict(self, point_coords=None, point_labels=None, boxes=None, mask_input=None, multimask_output=False, return_logits=False, normalize_coords=True, point_crs=None, output=None, index=None, mask_multiplier=255, dtype='float32', return_results=False, **kwargs)
¶
Predict the mask for the input image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
point_coords |
np.ndarray |
The point coordinates. Defaults to None. |
None |
point_labels |
np.ndarray |
The point labels. Defaults to None. |
None |
boxes |
list | np.ndarray |
A length 4 array given a box prompt to the model, in XYXY format. |
None |
mask_input |
np.ndarray |
A low resolution mask input to the model, typically coming from a previous prediction iteration. Has form 1xHxW, where for SAM, H=W=256. multimask_output (bool, optional): If true, the model will return three masks. For ambiguous input prompts (such as a single click), this will often produce better masks than a single prediction. If only a single mask is needed, the model's predicted quality score can be used to select the best mask. For non-ambiguous prompts, such as multiple input prompts, multimask_output=False can give better results. |
None |
multimask_output |
bool |
Whether to output multimask at each point of the grid. Defaults to False. |
False |
return_logits |
bool |
If true, returns un-thresholded masks logits instead of a binary mask. |
False |
normalize_coords |
bool |
Whether to normalize the coordinates. Defaults to True. |
True |
point_crs |
str |
The coordinate reference system (CRS) of the point prompts. |
None |
output |
str |
The path to the output image. Defaults to None. |
None |
index |
index |
The index of the mask to save. Defaults to None, which will save the mask with the highest score. |
None |
mask_multiplier |
int |
The mask multiplier for the output mask, which is usually a binary mask [0, 1]. |
255 |
dtype |
np.dtype |
The data type of the output image. Defaults to np.float32. |
'float32' |
return_results |
bool |
Whether to return the predicted masks, scores, and logits. Defaults to False. |
False |
Returns:
Type | Description |
---|---|
Tuple[np.ndarray, np.ndarray, np.ndarray] |
The mask, the multimask, and the logits. |
Source code in samgeo/samgeo2.py
def predict(
self,
point_coords: Optional[np.ndarray] = None,
point_labels: Optional[np.ndarray] = None,
boxes: Optional[np.ndarray] = None,
mask_input: Optional[np.ndarray] = None,
multimask_output: bool = False,
return_logits: bool = False,
normalize_coords: bool = True,
point_crs: Optional[str] = None,
output: Optional[str] = None,
index: Optional[int] = None,
mask_multiplier: int = 255,
dtype: str = "float32",
return_results: bool = False,
**kwargs: Any,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Predict the mask for the input image.
Args:
point_coords (np.ndarray, optional): The point coordinates. Defaults to None.
point_labels (np.ndarray, optional): The point labels. Defaults to None.
boxes (list | np.ndarray, optional): A length 4 array given a box prompt to the
model, in XYXY format.
mask_input (np.ndarray, optional): A low resolution mask input to the model, typically
coming from a previous prediction iteration. Has form 1xHxW, where for SAM, H=W=256.
multimask_output (bool, optional): If true, the model will return three masks.
For ambiguous input prompts (such as a single click), this will often
produce better masks than a single prediction. If only a single
mask is needed, the model's predicted quality score can be used
to select the best mask. For non-ambiguous prompts, such as multiple
input prompts, multimask_output=False can give better results.
multimask_output (bool, optional): Whether to output multimask at each
point of the grid. Defaults to False.
return_logits (bool, optional): If true, returns un-thresholded masks logits
instead of a binary mask.
normalize_coords (bool, optional): Whether to normalize the coordinates.
Defaults to True.
point_crs (str, optional): The coordinate reference system (CRS) of the point prompts.
output (str, optional): The path to the output image. Defaults to None.
index (index, optional): The index of the mask to save. Defaults to None,
which will save the mask with the highest score.
mask_multiplier (int, optional): The mask multiplier for the output mask,
which is usually a binary mask [0, 1].
dtype (np.dtype, optional): The data type of the output image. Defaults to np.float32.
return_results (bool, optional): Whether to return the predicted masks,
scores, and logits. Defaults to False.
Returns:
Tuple[np.ndarray, np.ndarray, np.ndarray]: The mask, the multimask,
and the logits.
"""
import geopandas as gpd
out_of_bounds = []
if isinstance(boxes, str):
gdf = gpd.read_file(boxes)
if gdf.crs is not None:
gdf = gdf.to_crs("epsg:4326")
boxes = gdf.geometry.bounds.values.tolist()
elif isinstance(boxes, dict):
import json
geojson = json.dumps(boxes)
gdf = gpd.read_file(geojson, driver="GeoJSON")
boxes = gdf.geometry.bounds.values.tolist()
if isinstance(point_coords, str):
point_coords = common.vector_to_geojson(point_coords)
if isinstance(point_coords, dict):
point_coords = common.geojson_to_coords(point_coords)
if hasattr(self, "point_coords"):
point_coords = self.point_coords
if hasattr(self, "point_labels"):
point_labels = self.point_labels
if (point_crs is not None) and (point_coords is not None):
point_coords, out_of_bounds = common.coords_to_xy(
self.source, point_coords, point_crs, return_out_of_bounds=True
)
if isinstance(point_coords, list):
point_coords = np.array(point_coords)
if point_coords is not None:
if point_labels is None:
point_labels = [1] * len(point_coords)
elif isinstance(point_labels, int):
point_labels = [point_labels] * len(point_coords)
if isinstance(point_labels, list):
if len(point_labels) != len(point_coords):
if len(point_labels) == 1:
point_labels = point_labels * len(point_coords)
elif len(out_of_bounds) > 0:
print(f"Removing {len(out_of_bounds)} out-of-bound points.")
point_labels_new = []
for i, p in enumerate(point_labels):
if i not in out_of_bounds:
point_labels_new.append(p)
point_labels = point_labels_new
else:
raise ValueError(
"The length of point_labels must be equal to the length of point_coords."
)
point_labels = np.array(point_labels)
predictor = self.predictor
input_boxes = None
if isinstance(boxes, list) and (point_crs is not None):
coords = common.bbox_to_xy(self.source, boxes, point_crs)
input_boxes = np.array(coords)
elif isinstance(boxes, list) and (point_crs is None):
input_boxes = np.array(boxes)
self.boxes = input_boxes
masks, scores, logits = predictor.predict(
point_coords=point_coords,
point_labels=point_labels,
box=input_boxes,
mask_input=mask_input,
multimask_output=multimask_output,
return_logits=return_logits,
normalize_coords=normalize_coords,
)
self.masks = masks
self.scores = scores
self.logits = logits
if output is not None:
if boxes is None or (not isinstance(boxes[0], list)):
self.save_prediction(output, index, mask_multiplier, dtype, **kwargs)
else:
self.tensor_to_numpy(
index, output, mask_multiplier, dtype, save_args=kwargs
)
if return_results:
return masks, scores, logits
predict_batch(self, point_coords_batch=None, point_labels_batch=None, box_batch=None, mask_input_batch=None, multimask_output=False, return_logits=False, normalize_coords=True)
¶
Predict masks for a batch of images.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
point_coords_batch |
Optional[List[np.ndarray]] |
A batch of point coordinates. Defaults to None. |
None |
point_labels_batch |
Optional[List[np.ndarray]] |
A batch of point labels. Defaults to None. |
None |
box_batch |
Optional[List[np.ndarray]] |
A batch of bounding boxes. Defaults to None. |
None |
mask_input_batch |
Optional[List[np.ndarray]] |
A batch of mask inputs. Defaults to None. |
None |
multimask_output |
bool |
Whether to output multimask at each point of the grid. Defaults to False. |
False |
return_logits |
bool |
Whether to return the logits. Defaults to False. |
False |
normalize_coords |
bool |
Whether to normalize the coordinates. Defaults to True. |
True |
Returns:
Type | Description |
---|---|
Tuple[List[np.ndarray], List[np.ndarray], List[np.ndarray]] |
Lists of masks, multimasks, and logits. |
Source code in samgeo/samgeo2.py
def predict_batch(
self,
point_coords_batch: List[np.ndarray] = None,
point_labels_batch: List[np.ndarray] = None,
box_batch: List[np.ndarray] = None,
mask_input_batch: List[np.ndarray] = None,
multimask_output: bool = False,
return_logits: bool = False,
normalize_coords=True,
) -> Tuple[List[np.ndarray], List[np.ndarray], List[np.ndarray]]:
"""Predict masks for a batch of images.
Args:
point_coords_batch (Optional[List[np.ndarray]]): A batch of point
coordinates. Defaults to None.
point_labels_batch (Optional[List[np.ndarray]]): A batch of point
labels. Defaults to None.
box_batch (Optional[List[np.ndarray]]): A batch of bounding boxes.
Defaults to None.
mask_input_batch (Optional[List[np.ndarray]]): A batch of mask inputs.
Defaults to None.
multimask_output (bool): Whether to output multimask at each point
of the grid. Defaults to False.
return_logits (bool): Whether to return the logits. Defaults to False.
normalize_coords (bool): Whether to normalize the coordinates.
Defaults to True.
Returns:
Tuple[List[np.ndarray], List[np.ndarray], List[np.ndarray]]: Lists
of masks, multimasks, and logits.
"""
return self.predictor.predict_batch(
point_coords_batch=point_coords_batch,
point_labels_batch=point_labels_batch,
box_batch=box_batch,
mask_input_batch=mask_input_batch,
multimask_output=multimask_output,
return_logits=return_logits,
normalize_coords=normalize_coords,
)
predict_by_points(self, point_coords_batch=None, point_labels_batch=None, box_batch=None, mask_input_batch=None, multimask_output=False, return_logits=False, normalize_coords=True, point_crs=None, output=None, index=None, unique=True, mask_multiplier=255, dtype='int32', return_results=False, **kwargs)
¶
Predict the mask for the input image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
point_coords |
np.ndarray |
The point coordinates. Defaults to None. |
required |
point_labels |
np.ndarray |
The point labels. Defaults to None. |
required |
boxes |
list | np.ndarray |
A length 4 array given a box prompt to the model, in XYXY format. |
required |
mask_input |
np.ndarray |
A low resolution mask input to the model, typically coming from a previous prediction iteration. Has form 1xHxW, where for SAM, H=W=256. multimask_output (bool, optional): If true, the model will return three masks. For ambiguous input prompts (such as a single click), this will often produce better masks than a single prediction. If only a single mask is needed, the model's predicted quality score can be used to select the best mask. For non-ambiguous prompts, such as multiple input prompts, multimask_output=False can give better results. |
required |
multimask_output |
bool |
Whether to output multimask at each point of the grid. Defaults to True. |
False |
return_logits |
bool |
If true, returns un-thresholded masks logits instead of a binary mask. |
False |
normalize_coords |
bool |
Whether to normalize the coordinates. Defaults to True. |
True |
point_crs |
str |
The coordinate reference system (CRS) of the point prompts. |
None |
output |
str |
The path to the output image. Defaults to None. |
None |
index |
index |
The index of the mask to save. Defaults to None, which will save the mask with the highest score. |
None |
mask_multiplier |
int |
The mask multiplier for the output mask, which is usually a binary mask [0, 1]. |
255 |
dtype |
np.dtype |
The data type of the output image. Defaults to np.int32. |
'int32' |
return_results |
bool |
Whether to return the predicted masks, scores, and logits. Defaults to False. |
False |
Returns:
Type | Description |
---|---|
Tuple[np.ndarray, np.ndarray, np.ndarray] |
The mask, the multimask, and the logits. |
Source code in samgeo/samgeo2.py
def predict_by_points(
self,
point_coords_batch: List[np.ndarray] = None,
point_labels_batch: List[np.ndarray] = None,
box_batch: List[np.ndarray] = None,
mask_input_batch: List[np.ndarray] = None,
multimask_output: bool = False,
return_logits: bool = False,
normalize_coords=True,
point_crs: Optional[str] = None,
output: Optional[str] = None,
index: Optional[int] = None,
unique: bool = True,
mask_multiplier: int = 255,
dtype: str = "int32",
return_results: bool = False,
**kwargs: Any,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Predict the mask for the input image.
Args:
point_coords (np.ndarray, optional): The point coordinates. Defaults to None.
point_labels (np.ndarray, optional): The point labels. Defaults to None.
boxes (list | np.ndarray, optional): A length 4 array given a box prompt to the
model, in XYXY format.
mask_input (np.ndarray, optional): A low resolution mask input to the model, typically
coming from a previous prediction iteration. Has form 1xHxW, where for SAM, H=W=256.
multimask_output (bool, optional): If true, the model will return three masks.
For ambiguous input prompts (such as a single click), this will often
produce better masks than a single prediction. If only a single
mask is needed, the model's predicted quality score can be used
to select the best mask. For non-ambiguous prompts, such as multiple
input prompts, multimask_output=False can give better results.
multimask_output (bool, optional): Whether to output multimask at each
point of the grid. Defaults to True.
return_logits (bool, optional): If true, returns un-thresholded masks logits
instead of a binary mask.
normalize_coords (bool, optional): Whether to normalize the coordinates.
Defaults to True.
point_crs (str, optional): The coordinate reference system (CRS) of the point prompts.
output (str, optional): The path to the output image. Defaults to None.
index (index, optional): The index of the mask to save. Defaults to None,
which will save the mask with the highest score.
mask_multiplier (int, optional): The mask multiplier for the output mask,
which is usually a binary mask [0, 1].
dtype (np.dtype, optional): The data type of the output image. Defaults to np.int32.
return_results (bool, optional): Whether to return the predicted masks,
scores, and logits. Defaults to False.
Returns:
Tuple[np.ndarray, np.ndarray, np.ndarray]: The mask, the multimask,
and the logits.
"""
import geopandas as gpd
if hasattr(self, "image_batch") and self.image_batch is not None:
pass
elif self.image is not None:
self.predictor.set_image_batch([self.image])
setattr(self, "image_batch", [self.image])
else:
raise ValueError("Please set the input image first using set_image().")
if isinstance(point_coords_batch, dict):
point_coords_batch = gpd.GeoDataFrame.from_features(point_coords_batch)
if isinstance(point_coords_batch, str) or isinstance(
point_coords_batch, gpd.GeoDataFrame
):
if isinstance(point_coords_batch, str):
gdf = gpd.read_file(point_coords_batch)
else:
gdf = point_coords_batch
if gdf.crs is None and (point_crs is not None):
gdf.crs = point_crs
points = gdf.geometry.apply(lambda geom: [geom.x, geom.y])
coordinates_array = np.array([[point] for point in points])
points = common.coords_to_xy(self.source, coordinates_array, point_crs)
num_points = points.shape[0]
if point_labels_batch is None:
labels = np.array([[1] for i in range(num_points)])
else:
labels = point_labels_batch
elif isinstance(point_coords_batch, list):
if point_crs is not None:
point_coords_batch_crs = common.coords_to_xy(
self.source, point_coords_batch, point_crs
)
else:
point_coords_batch_crs = point_coords_batch
num_points = len(point_coords_batch)
points = []
points.append([[point] for point in point_coords_batch_crs])
if point_labels_batch is None:
labels = np.array([[1] for i in range(num_points)])
elif isinstance(point_labels_batch, list):
labels = []
labels.append([[label] for label in point_labels_batch])
labels = labels[0]
else:
labels = point_labels_batch
points = np.array(points[0])
labels = np.array(labels)
elif isinstance(point_coords_batch, np.ndarray):
points = point_coords_batch
labels = point_labels_batch
else:
raise ValueError("point_coords must be a list, a GeoDataFrame, or a path.")
predictor = self.predictor
masks_batch, scores_batch, logits_batch = predictor.predict_batch(
point_coords_batch=[points],
point_labels_batch=[labels],
box_batch=box_batch,
mask_input_batch=mask_input_batch,
multimask_output=multimask_output,
return_logits=return_logits,
normalize_coords=normalize_coords,
)
masks = masks_batch[0]
scores = scores_batch[0]
logits = logits_batch[0]
if multimask_output and (index is not None):
masks = masks[:, index, :, :]
if masks.ndim > 3:
masks = masks.squeeze()
output_masks = []
sums = np.sum(masks, axis=(1, 2))
for index, mask in enumerate(masks):
item = {"segmentation": mask.astype("bool"), "area": sums[index]}
output_masks.append(item)
self.masks = output_masks
self.scores = scores
self.logits = logits
if output is not None:
self.save_masks(
output,
foreground=True,
unique=unique,
mask_multiplier=mask_multiplier,
dtype=dtype,
**kwargs,
)
if return_results:
return output_masks, scores, logits
predict_video(self, prompts=None, point_crs=None, output_dir=None, img_ext='png')
¶
Predict masks for the video.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
prompts |
Dict[int, Any] |
A dictionary containing the prompts with points and labels. |
None |
point_crs |
Optional[str] |
The coordinate reference system (CRS) of the point prompts. |
None |
output_dir |
Optional[str] |
The directory to save the output images. Defaults to None. |
None |
img_ext |
str |
The file extension for the output images. Defaults to "png". |
'png' |
Source code in samgeo/samgeo2.py
def predict_video(
self,
prompts: Dict[int, Any] = None,
point_crs: Optional[str] = None,
output_dir: Optional[str] = None,
img_ext: str = "png",
) -> None:
"""Predict masks for the video.
Args:
prompts (Dict[int, Any]): A dictionary containing the prompts with points and labels.
point_crs (Optional[str]): The coordinate reference system (CRS) of the point prompts.
output_dir (Optional[str]): The directory to save the output images. Defaults to None.
img_ext (str): The file extension for the output images. Defaults to "png".
"""
from PIL import Image
def save_image_from_dict(data, output_path="output_image.png"):
# Find the shape of the first array in the dictionary (assuming all arrays have the same shape)
array_shape = next(iter(data.values())).shape[1:]
# Initialize an empty array with the same shape as the arrays in the dictionary, filled with zeros
output_array = np.zeros(array_shape, dtype=np.uint8)
# Iterate over each key and array in the dictionary
for key, array in data.items():
# Assign the key value wherever the boolean array is True
output_array[array[0]] = key
# Convert the output array to a PIL image
image = Image.fromarray(output_array)
# Save the image
image.save(output_path)
if prompts is None:
if hasattr(self, "prompts"):
prompts = self.prompts
else:
raise ValueError("Please provide prompts.")
if point_crs is not None and self._tif_source is not None:
for prompt in prompts.values():
points = prompt.get("points", None)
if points is not None:
points = common.coords_to_xy(self._tif_source, points, point_crs)
prompt["points"] = points
box = prompt.get("box", None)
if box is not None:
box = common.bbox_to_xy(self._tif_source, box, point_crs)
prompt["box"] = box
prompts = self._convert_prompts(prompts)
predictor = self.predictor
inference_state = self.inference_state
for obj_id, prompt in prompts.items():
points = prompt.get("points", None)
labels = prompt.get("labels", None)
box = prompt.get("box", None)
frame_idx = prompt.get("frame_idx", None)
_, out_obj_ids, out_mask_logits = predictor.add_new_points_or_box(
inference_state=inference_state,
frame_idx=frame_idx,
obj_id=obj_id,
points=points,
labels=labels,
box=box,
)
video_segments = {}
num_frames = self._num_images
num_digits = len(str(num_frames))
if output_dir is not None:
if not os.path.exists(output_dir):
os.makedirs(output_dir)
for out_frame_idx, out_obj_ids, out_mask_logits in predictor.propagate_in_video(
inference_state
):
video_segments[out_frame_idx] = {
out_obj_id: (out_mask_logits[i] > 0.0).cpu().numpy()
for i, out_obj_id in enumerate(out_obj_ids)
}
if output_dir is not None:
output_path = os.path.join(
output_dir, f"{str(out_frame_idx).zfill(num_digits)}.{img_ext}"
)
save_image_from_dict(video_segments[out_frame_idx], output_path)
self.video_segments = video_segments
# if output_dir is not None:
# self.save_video_segments(output_dir, img_ext)
propagate_in_video(self, inference_state, start_frame_idx=None, max_frame_num_to_track=None, reverse=False)
¶
Propagate the inference state in video.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inference_state |
Any |
The current inference state. |
required |
start_frame_idx |
Optional[int] |
The starting frame index. Defaults to None. |
None |
max_frame_num_to_track |
Optional[int] |
The maximum number of frames to track. Defaults to None. |
None |
reverse |
bool |
Whether to propagate in reverse. Defaults to False. |
False |
Returns:
Type | Description |
---|---|
Any |
The propagated inference state. |
Source code in samgeo/samgeo2.py
@torch.inference_mode()
def propagate_in_video(
self,
inference_state: Any,
start_frame_idx: Optional[int] = None,
max_frame_num_to_track: Optional[int] = None,
reverse: bool = False,
) -> Any:
"""Propagate the inference state in video.
Args:
inference_state (Any): The current inference state.
start_frame_idx (Optional[int]): The starting frame index. Defaults to None.
max_frame_num_to_track (Optional[int]): The maximum number of frames
to track. Defaults to None.
reverse (bool): Whether to propagate in reverse. Defaults to False.
Returns:
Any: The propagated inference state.
"""
return self.predictor.propagate_in_video(
inference_state,
start_frame_idx=start_frame_idx,
max_frame_num_to_track=max_frame_num_to_track,
reverse=reverse,
)
propagate_in_video_preflight(self, inference_state)
¶
Propagate the inference state in video preflight.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inference_state |
Any |
The current inference state. |
required |
Returns:
Type | Description |
---|---|
Any |
The propagated inference state. |
Source code in samgeo/samgeo2.py
@torch.inference_mode()
def propagate_in_video_preflight(self, inference_state: Any) -> Any:
"""Propagate the inference state in video preflight.
Args:
inference_state (Any): The current inference state.
Returns:
Any: The propagated inference state.
"""
return self.predictor.propagate_in_video_preflight(inference_state)
raster_to_vector(self, raster, vector, simplify_tolerance=None, **kwargs)
¶
Convert a raster image file to a vector dataset.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
raster |
str |
The path to the raster image. |
required |
output |
str |
The path to the vector file. |
required |
simplify_tolerance |
float |
The maximum allowed geometry displacement. The higher this value, the smaller the number of vertices in the resulting geometry. |
None |
Source code in samgeo/samgeo2.py
def raster_to_vector(self, raster, vector, simplify_tolerance=None, **kwargs):
"""Convert a raster image file to a vector dataset.
Args:
raster (str): The path to the raster image.
output (str): The path to the vector file.
simplify_tolerance (float, optional): The maximum allowed geometry displacement.
The higher this value, the smaller the number of vertices in the resulting geometry.
"""
common.raster_to_vector(
raster, vector, simplify_tolerance=simplify_tolerance, **kwargs
)
region_groups(self, image, connectivity=1, min_size=10, max_size=None, threshold=None, properties=None, intensity_image=None, out_csv=None, out_vector=None, out_image=None, **kwargs)
¶
Segment regions in an image and filter them based on size.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
Union[str, xr.DataArray, np.ndarray] |
Input image, can be a file path, xarray DataArray, or numpy array. |
required |
connectivity |
int |
Connectivity for labeling. Defaults to 1 for 4-connectivity. Use 2 for 8-connectivity. |
1 |
min_size |
int |
Minimum size of regions to keep. Defaults to 10. |
10 |
max_size |
Optional[int] |
Maximum size of regions to keep. Defaults to None. |
None |
threshold |
Optional[int] |
Threshold for filling holes. Defaults to None, which is equal to min_size. |
None |
properties |
Optional[List[str]] |
List of properties to measure. See https://scikit-image.org/docs/stable/api/skimage.measure.html#skimage.measure.regionprops Defaults to None. |
None |
intensity_image |
Optional[Union[str, xr.DataArray, np.ndarray]] |
Intensity image to use for properties. Defaults to None. |
None |
out_csv |
Optional[str] |
Path to save the properties as a CSV file. Defaults to None. |
None |
out_vector |
Optional[str] |
Path to save the vector file. Defaults to None. |
None |
out_image |
Optional[str] |
Path to save the output image. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
Union[Tuple[np.ndarray, pd.DataFrame], Tuple[xr.DataArray, pd.DataFrame]] |
Labeled image and properties DataFrame. |
Source code in samgeo/samgeo2.py
def region_groups(
self,
image: Union[str, "xr.DataArray", np.ndarray],
connectivity: int = 1,
min_size: int = 10,
max_size: Optional[int] = None,
threshold: Optional[int] = None,
properties: Optional[List[str]] = None,
intensity_image: Optional[Union[str, "xr.DataArray", np.ndarray]] = None,
out_csv: Optional[str] = None,
out_vector: Optional[str] = None,
out_image: Optional[str] = None,
**kwargs: Any,
) -> Union[
Tuple[np.ndarray, "pd.DataFrame"], Tuple["xr.DataArray", "pd.DataFrame"]
]:
"""
Segment regions in an image and filter them based on size.
Args:
image (Union[str, xr.DataArray, np.ndarray]): Input image, can be a file
path, xarray DataArray, or numpy array.
connectivity (int, optional): Connectivity for labeling. Defaults to 1
for 4-connectivity. Use 2 for 8-connectivity.
min_size (int, optional): Minimum size of regions to keep. Defaults to 10.
max_size (Optional[int], optional): Maximum size of regions to keep.
Defaults to None.
threshold (Optional[int], optional): Threshold for filling holes.
Defaults to None, which is equal to min_size.
properties (Optional[List[str]], optional): List of properties to measure.
See https://scikit-image.org/docs/stable/api/skimage.measure.html#skimage.measure.regionprops
Defaults to None.
intensity_image (Optional[Union[str, xr.DataArray, np.ndarray]], optional):
Intensity image to use for properties. Defaults to None.
out_csv (Optional[str], optional): Path to save the properties as a CSV file.
Defaults to None.
out_vector (Optional[str], optional): Path to save the vector file.
Defaults to None.
out_image (Optional[str], optional): Path to save the output image.
Defaults to None.
Returns:
Union[Tuple[np.ndarray, pd.DataFrame], Tuple[xr.DataArray, pd.DataFrame]]: Labeled image and properties DataFrame.
"""
return common.region_groups(
image,
connectivity=connectivity,
min_size=min_size,
max_size=max_size,
threshold=threshold,
properties=properties,
intensity_image=intensity_image,
out_csv=out_csv,
out_vector=out_vector,
out_image=out_image,
**kwargs,
)
reset_state(self, inference_state)
¶
Remove all input points or masks in all frames throughout the video.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inference_state |
Any |
The current inference state. |
required |
Source code in samgeo/samgeo2.py
@torch.inference_mode()
def reset_state(self, inference_state: Any) -> None:
"""Remove all input points or masks in all frames throughout the video.
Args:
inference_state (Any): The current inference state.
"""
self.predictor.reset_state(inference_state)
save_masks(self, output=None, foreground=True, unique=True, erosion_kernel=None, mask_multiplier=255, min_size=0, max_size=None, **kwargs)
¶
Save the masks to the output path. The output is either a binary mask or a mask of objects with unique values.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output |
str |
The path to the output image. Defaults to None, saving the masks to SamGeo.objects. |
None |
foreground |
bool |
Whether to generate the foreground mask. Defaults to True. |
True |
unique |
bool |
Whether to assign a unique value to each object. Defaults to True. |
True |
erosion_kernel |
tuple |
The erosion kernel for filtering object masks and extract borders. Such as (3, 3) or (5, 5). Set to None to disable it. Defaults to None. |
None |
mask_multiplier |
int |
The mask multiplier for the output mask, which is usually a binary mask [0, 1]. You can use this parameter to scale the mask to a larger range, for example [0, 255]. Defaults to 255. |
255 |
min_size |
int |
The minimum size of the object. Defaults to 0. |
0 |
max_size |
int |
The maximum size of the object. Defaults to None. |
None |
**kwargs |
Any |
Additional keyword arguments for common.array_to_image(). |
{} |
Source code in samgeo/samgeo2.py
def save_masks(
self,
output: Optional[str] = None,
foreground: bool = True,
unique: bool = True,
erosion_kernel: Optional[Tuple[int, int]] = None,
mask_multiplier: int = 255,
min_size: int = 0,
max_size: int = None,
**kwargs: Any,
) -> None:
"""Save the masks to the output path. The output is either a binary mask
or a mask of objects with unique values.
Args:
output (str, optional): The path to the output image. Defaults to
None, saving the masks to SamGeo.objects.
foreground (bool, optional): Whether to generate the foreground mask.
Defaults to True.
unique (bool, optional): Whether to assign a unique value to each
object. Defaults to True.
erosion_kernel (tuple, optional): The erosion kernel for filtering
object masks and extract borders.
Such as (3, 3) or (5, 5). Set to None to disable it. Defaults to
None.
mask_multiplier (int, optional): The mask multiplier for the output
mask, which is usually a binary mask [0, 1]. You can use this
parameter to scale the mask to a larger range, for example
[0, 255]. Defaults to 255.
min_size (int, optional): The minimum size of the object. Defaults to 0.
max_size (int, optional): The maximum size of the object. Defaults to None.
**kwargs: Additional keyword arguments for common.array_to_image().
"""
if self.masks is None:
raise ValueError("No masks found. Please run generate() first.")
h, w, _ = self.image.shape
masks = self.masks
# Set output image data type based on the number of objects
if len(masks) < 255:
dtype = np.uint8
elif len(masks) < 65535:
dtype = np.uint16
else:
dtype = np.uint32
# Generate a mask of objects with unique values
if unique:
# Sort the masks by area in descending order
sorted_masks = sorted(masks, key=(lambda x: x["area"]), reverse=True)
# Create an output image with the same size as the input image
objects = np.zeros(
(
sorted_masks[0]["segmentation"].shape[0],
sorted_masks[0]["segmentation"].shape[1],
)
)
# Assign a unique value to each object
count = len(sorted_masks)
for index, ann in enumerate(sorted_masks):
m = ann["segmentation"]
if min_size > 0 and ann["area"] < min_size:
continue
if max_size is not None and ann["area"] > max_size:
continue
objects[m] = count - index
# Generate a binary mask
else:
if foreground: # Extract foreground objects only
resulting_mask = np.zeros((h, w), dtype=dtype)
else:
resulting_mask = np.ones((h, w), dtype=dtype)
resulting_borders = np.zeros((h, w), dtype=dtype)
for m in masks:
if min_size > 0 and m["area"] < min_size:
continue
if max_size is not None and m["area"] > max_size:
continue
mask = (m["segmentation"] > 0).astype(dtype)
resulting_mask += mask
# Apply erosion to the mask
if erosion_kernel is not None:
mask_erode = cv2.erode(mask, erosion_kernel, iterations=1)
mask_erode = (mask_erode > 0).astype(dtype)
edge_mask = mask - mask_erode
resulting_borders += edge_mask
resulting_mask = (resulting_mask > 0).astype(dtype)
resulting_borders = (resulting_borders > 0).astype(dtype)
objects = resulting_mask - resulting_borders
objects = objects * mask_multiplier
objects = objects.astype(dtype)
self.objects = objects
if output is not None: # Save the output image
common.array_to_image(self.objects, output, self.source, **kwargs)
save_prediction(self, output, index=None, mask_multiplier=255, dtype='float32', vector=None, simplify_tolerance=None, **kwargs)
¶
Save the predicted mask to the output path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output |
str |
The path to the output image. |
required |
index |
Optional[int] |
The index of the mask to save. Defaults to None, which will save the mask with the highest score. |
None |
mask_multiplier |
int |
The mask multiplier for the output mask, which is usually a binary mask [0, 1]. |
255 |
dtype |
str |
The data type of the output image. Defaults to "float32". |
'float32' |
vector |
Optional[str] |
The path to the output vector file. Defaults to None. |
None |
simplify_tolerance |
Optional[float] |
The maximum allowed geometry displacement. The higher this value, the smaller the number of vertices in the resulting geometry. |
None |
**kwargs |
Any |
Additional keyword arguments. |
{} |
Source code in samgeo/samgeo2.py
def save_prediction(
self,
output: str,
index: Optional[int] = None,
mask_multiplier: int = 255,
dtype: str = "float32",
vector: Optional[str] = None,
simplify_tolerance: Optional[float] = None,
**kwargs: Any,
) -> None:
"""Save the predicted mask to the output path.
Args:
output (str): The path to the output image.
index (Optional[int], optional): The index of the mask to save.
Defaults to None, which will save the mask with the highest score.
mask_multiplier (int, optional): The mask multiplier for the output
mask, which is usually a binary mask [0, 1].
dtype (str, optional): The data type of the output image. Defaults
to "float32".
vector (Optional[str], optional): The path to the output vector file.
Defaults to None.
simplify_tolerance (Optional[float], optional): The maximum allowed
geometry displacement. The higher this value, the smaller the
number of vertices in the resulting geometry.
**kwargs (Any): Additional keyword arguments.
"""
if self.scores is None:
raise ValueError("No predictions found. Please run predict() first.")
if index is None:
index = self.scores.argmax(axis=0)
array = self.masks[index] * mask_multiplier
self.prediction = array
common.array_to_image(array, output, self.source, dtype=dtype, **kwargs)
if vector is not None:
common.raster_to_vector(
output, vector, simplify_tolerance=simplify_tolerance
)
save_video_segments(self, output_dir, img_ext='png')
¶
Save the video segments to the output directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_dir |
str |
The path to the output directory. |
required |
img_ext |
str |
The file extension for the output images. Defaults to "png". |
'png' |
Source code in samgeo/samgeo2.py
def save_video_segments(self, output_dir: str, img_ext: str = "png") -> None:
"""Save the video segments to the output directory.
Args:
output_dir (str): The path to the output directory.
img_ext (str): The file extension for the output images. Defaults to "png".
"""
from PIL import Image
def save_image_from_dict(
data, output_path="output_image.png", crs_source=None, **kwargs
):
# Find the shape of the first array in the dictionary (assuming all arrays have the same shape)
array_shape = next(iter(data.values())).shape[1:]
# Initialize an empty array with the same shape as the arrays in the dictionary, filled with zeros
output_array = np.zeros(array_shape, dtype=np.uint8)
# Iterate over each key and array in the dictionary
for key, array in data.items():
# Assign the key value wherever the boolean array is True
output_array[array[0]] = key
if crs_source is None:
# Convert the output array to a PIL image
image = Image.fromarray(output_array)
# Save the image
image.save(output_path)
else:
output_path = output_path.replace(".png", ".tif")
common.array_to_image(output_array, output_path, crs_source, **kwargs)
num_frames = len(self.video_segments)
num_digits = len(str(num_frames))
if hasattr(self, "_tif_source") and self._tif_source.endswith(".tif"):
crs_source = self._tif_source
filenames = self._tif_names
else:
crs_source = None
filenames = None
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Initialize the tqdm progress bar
for frame_idx, video_segment in tqdm(
self.video_segments.items(), desc="Rendering frames", total=num_frames
):
if filenames is None:
output_path = os.path.join(
output_dir, f"{str(frame_idx).zfill(num_digits)}.{img_ext}"
)
else:
output_path = os.path.join(output_dir, filenames[frame_idx])
save_image_from_dict(video_segment, output_path, crs_source)
save_video_segments_blended(self, output_dir, img_ext='png', alpha=0.6, dpi=200, frame_stride=1, output_video=None, fps=30)
¶
Save blended video segments to the output directory and optionally create a video.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_dir |
str |
The directory to save the output images. |
required |
img_ext |
str |
The file extension for the output images. Defaults to "png". |
'png' |
alpha |
float |
The alpha value for the blended masks. Defaults to 0.6. |
0.6 |
dpi |
int |
The DPI (dots per inch) for the output images. Defaults to 200. |
200 |
frame_stride |
int |
The stride for selecting frames to save. Defaults to 1. |
1 |
output_video |
Optional[str] |
The path to the output video file. Defaults to None. |
None |
fps |
int |
The frames per second for the output video. Defaults to 30. |
30 |
Source code in samgeo/samgeo2.py
def save_video_segments_blended(
self,
output_dir: str,
img_ext: str = "png",
alpha: float = 0.6,
dpi: int = 200,
frame_stride: int = 1,
output_video: Optional[str] = None,
fps: int = 30,
) -> None:
"""Save blended video segments to the output directory and optionally create a video.
Args:
output_dir (str): The directory to save the output images.
img_ext (str): The file extension for the output images. Defaults to "png".
alpha (float): The alpha value for the blended masks. Defaults to 0.6.
dpi (int): The DPI (dots per inch) for the output images. Defaults to 200.
frame_stride (int): The stride for selecting frames to save. Defaults to 1.
output_video (Optional[str]): The path to the output video file. Defaults to None.
fps (int): The frames per second for the output video. Defaults to 30.
"""
from PIL import Image
def show_mask(mask, ax, obj_id=None, random_color=False):
if random_color:
color = np.concatenate([np.random.random(3), np.array([alpha])], axis=0)
else:
cmap = plt.get_cmap("tab10")
cmap_idx = 0 if obj_id is None else obj_id
color = np.array([*cmap(cmap_idx)[:3], alpha])
h, w = mask.shape[-2:]
mask_image = mask.reshape(h, w, 1) * color.reshape(1, 1, -1)
ax.imshow(mask_image)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
plt.close("all")
video_segments = self.video_segments
video_dir = self.video_path
frame_names = self._frame_names
num_frames = len(frame_names)
num_digits = len(str(num_frames))
# Initialize the tqdm progress bar
for out_frame_idx in tqdm(
range(0, len(frame_names), frame_stride), desc="Rendering frames"
):
image = Image.open(os.path.join(video_dir, frame_names[out_frame_idx]))
# Get original image dimensions
w, h = image.size
# Set DPI and calculate figure size based on the original image dimensions
figsize = (
w / dpi,
h / dpi,
)
figsize = (
figsize[0] * 1.3,
figsize[1] * 1.3,
)
# Create a figure with the exact size and DPI
fig = plt.figure(figsize=figsize, dpi=dpi)
# Disable axis to prevent whitespace
plt.axis("off")
# Display the original image
plt.imshow(image)
# Overlay masks for each object ID
for out_obj_id, out_mask in video_segments[out_frame_idx].items():
show_mask(out_mask, plt.gca(), obj_id=out_obj_id)
# Save the figure with no borders or extra padding
filename = f"{str(out_frame_idx).zfill(num_digits)}.{img_ext}"
filepath = os.path.join(output_dir, filename)
plt.savefig(filepath, dpi=dpi, pad_inches=0, bbox_inches="tight")
plt.close(fig)
if output_video is not None:
common.images_to_video(output_dir, output_video, fps=fps)
set_image(self, image)
¶
Set the input image as a numpy array.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
Union[str, np.ndarray, Image] |
The input image as a path, a numpy array, or an Image. |
required |
Source code in samgeo/samgeo2.py
@torch.no_grad()
def set_image(
self,
image: Union[str, np.ndarray, Image],
) -> None:
"""Set the input image as a numpy array.
Args:
image (Union[str, np.ndarray, Image]): The input image as a path,
a numpy array, or an Image.
"""
if isinstance(image, str):
if image.startswith("http"):
image = common.download_file(image)
if not os.path.exists(image):
raise ValueError(f"Input path {image} does not exist.")
self.source = image
image = cv2.imread(image)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
self.image = image
elif isinstance(image, np.ndarray) or isinstance(image, Image):
pass
else:
raise ValueError("Input image must be either a path or a numpy array.")
self.predictor.set_image(image)
set_image_batch(self, image_list)
¶
Set a batch of images for prediction.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image_list |
List[Union[np.ndarray, str, Image]] |
A list of images, |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If an input image path does not exist or if the input image type is not supported. |
Source code in samgeo/samgeo2.py
@torch.no_grad()
def set_image_batch(
self,
image_list: List[Union[np.ndarray, str, Image]],
) -> None:
"""Set a batch of images for prediction.
Args:
image_list (List[Union[np.ndarray, str, Image]]): A list of images,
which can be numpy arrays, file paths, or PIL images.
Raises:
ValueError: If an input image path does not exist or if the input
image type is not supported.
"""
images = []
for image in image_list:
if isinstance(image, str):
if image.startswith("http"):
image = common.download_file(image)
if not os.path.exists(image):
raise ValueError(f"Input path {image} does not exist.")
image = cv2.imread(image)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
elif isinstance(image, Image):
image = np.array(image)
elif isinstance(image, np.ndarray):
pass
else:
raise ValueError("Input image must be either a path or a numpy array.")
images.append(image)
self.predictor.set_image_batch(images)
set_video(self, video_path, output_dir=None, frame_rate=None, prefix='')
¶
Set the video path and parameters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
video_path |
str |
The path to the video file. |
required |
start_frame |
int |
The starting frame index. Defaults to 0. |
required |
end_frame |
Optional[int] |
The ending frame index. Defaults to None. |
required |
step |
int |
The step size. Defaults to 1. |
required |
frame_rate |
Optional[int] |
The frame rate. Defaults to None. |
None |
Source code in samgeo/samgeo2.py
def set_video(
self,
video_path: str,
output_dir: str = None,
frame_rate: Optional[int] = None,
prefix: str = "",
) -> None:
"""Set the video path and parameters.
Args:
video_path (str): The path to the video file.
start_frame (int, optional): The starting frame index. Defaults to 0.
end_frame (Optional[int], optional): The ending frame index. Defaults to None.
step (int, optional): The step size. Defaults to 1.
frame_rate (Optional[int], optional): The frame rate. Defaults to None.
"""
if isinstance(video_path, str):
if video_path.startswith("http"):
video_path = common.download_file(video_path)
if os.path.isfile(video_path):
if output_dir is None:
output_dir = common.make_temp_dir()
if not os.path.exists(output_dir):
os.makedirs(output_dir)
print(f"Output directory: {output_dir}")
common.video_to_images(
video_path, output_dir, frame_rate=frame_rate, prefix=prefix
)
elif os.path.isdir(video_path):
files = sorted(os.listdir(video_path))
if len(files) == 0:
raise ValueError(f"No files found in {video_path}.")
elif files[0].endswith(".tif"):
self._tif_source = os.path.join(video_path, files[0])
self._tif_dir = video_path
self._tif_names = files
video_path = common.geotiff_to_jpg_batch(video_path)
output_dir = video_path
if not os.path.exists(video_path):
raise ValueError(f"Input path {video_path} does not exist.")
else:
raise ValueError("Input video_path must be a string.")
self.video_path = output_dir
self._num_images = len(os.listdir(output_dir))
self._frame_names = sorted(os.listdir(output_dir))
self.inference_state = self.predictor.init_state(video_path=output_dir)
show_anns(self, figsize=(12, 10), axis='off', alpha=0.35, output=None, blend=True, **kwargs)
¶
Show the annotations (objects with random color) on the input image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
figsize |
tuple |
The figure size. Defaults to (12, 10). |
(12, 10) |
axis |
str |
Whether to show the axis. Defaults to "off". |
'off' |
alpha |
float |
The alpha value for the annotations. Defaults to 0.35. |
0.35 |
output |
str |
The path to the output image. Defaults to None. |
None |
blend |
bool |
Whether to show the input image. Defaults to True. |
True |
Source code in samgeo/samgeo2.py
def show_anns(
self,
figsize: Tuple[int, int] = (12, 10),
axis: str = "off",
alpha: float = 0.35,
output: Optional[str] = None,
blend: bool = True,
**kwargs: Any,
) -> None:
"""Show the annotations (objects with random color) on the input image.
Args:
figsize (tuple, optional): The figure size. Defaults to (12, 10).
axis (str, optional): Whether to show the axis. Defaults to "off".
alpha (float, optional): The alpha value for the annotations. Defaults to 0.35.
output (str, optional): The path to the output image. Defaults to None.
blend (bool, optional): Whether to show the input image. Defaults to True.
"""
import matplotlib.pyplot as plt
anns = self.masks
if self.image is None:
print("Please run generate() first.")
return
if anns is None or len(anns) == 0:
return
plt.figure(figsize=figsize)
plt.imshow(self.image)
sorted_anns = sorted(anns, key=(lambda x: x["area"]), reverse=True)
ax = plt.gca()
ax.set_autoscale_on(False)
img = np.ones(
(
sorted_anns[0]["segmentation"].shape[0],
sorted_anns[0]["segmentation"].shape[1],
4,
)
)
img[:, :, 3] = 0
for ann in sorted_anns:
if hasattr(self, "_min_size") and (ann["area"] < self._min_size):
continue
if (
hasattr(self, "_max_size")
and isinstance(self._max_size, int)
and ann["area"] > self._max_size
):
continue
m = ann["segmentation"]
color_mask = np.concatenate([np.random.random(3), [alpha]])
img[m] = color_mask
ax.imshow(img)
if "dpi" not in kwargs:
kwargs["dpi"] = 100
if "bbox_inches" not in kwargs:
kwargs["bbox_inches"] = "tight"
plt.axis(axis)
self.annotations = (img[:, :, 0:3] * 255).astype(np.uint8)
if output is not None:
if blend:
array = common.blend_images(
self.annotations, self.image, alpha=alpha, show=False
)
else:
array = self.annotations
common.array_to_image(array, output, self.source)
show_canvas(self, fg_color=(0, 255, 0), bg_color=(0, 0, 255), radius=5)
¶
Show a canvas to collect foreground and background points.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fg_color |
Tuple[int, int, int] |
The color for the foreground points. Defaults to (0, 255, 0). |
(0, 255, 0) |
bg_color |
Tuple[int, int, int] |
The color for the background points. Defaults to (0, 0, 255). |
(0, 0, 255) |
radius |
int |
The radius of the points. Defaults to 5. |
5 |
Returns:
Type | Description |
---|---|
Tuple[list, list] |
A tuple of two lists of foreground and background points. |
Source code in samgeo/samgeo2.py
def show_canvas(
self,
fg_color: Tuple[int, int, int] = (0, 255, 0),
bg_color: Tuple[int, int, int] = (0, 0, 255),
radius: int = 5,
) -> Tuple[list, list]:
"""Show a canvas to collect foreground and background points.
Args:
fg_color (Tuple[int, int, int], optional): The color for the foreground points.
Defaults to (0, 255, 0).
bg_color (Tuple[int, int, int], optional): The color for the background points.
Defaults to (0, 0, 255).
radius (int, optional): The radius of the points. Defaults to 5.
Returns:
Tuple[list, list]: A tuple of two lists of foreground and background points.
"""
if self.image is None:
raise ValueError("Please run set_image() first.")
image = self.image
fg_points, bg_points = common.show_canvas(image, fg_color, bg_color, radius)
self.fg_points = fg_points
self.bg_points = bg_points
point_coords = fg_points + bg_points
point_labels = [1] * len(fg_points) + [0] * len(bg_points)
self.point_coords = point_coords
self.point_labels = point_labels
show_images(self, path=None)
¶
Show the images in the video.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to the images. Defaults to None. |
None |
Source code in samgeo/samgeo2.py
def show_images(self, path: str = None) -> None:
"""Show the images in the video.
Args:
path (str, optional): The path to the images. Defaults to None.
"""
if path is None:
path = self.video_path
if path is not None:
common.show_image_gui(path)
show_map(self, basemap='SATELLITE', repeat_mode=True, out_dir=None, **kwargs)
¶
Show the interactive map.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
basemap |
str |
The basemap. It can be one of the following: SATELLITE, ROADMAP, TERRAIN, HYBRID. |
'SATELLITE' |
repeat_mode |
bool |
Whether to use the repeat mode for draw control. Defaults to True. |
True |
out_dir |
Optional[str] |
The path to the output directory. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
Any |
The map object. |
Source code in samgeo/samgeo2.py
def show_map(
self,
basemap: str = "SATELLITE",
repeat_mode: bool = True,
out_dir: Optional[str] = None,
**kwargs: Any,
) -> Any:
"""Show the interactive map.
Args:
basemap (str, optional): The basemap. It can be one of the following:
SATELLITE, ROADMAP, TERRAIN, HYBRID.
repeat_mode (bool, optional): Whether to use the repeat mode for
draw control. Defaults to True.
out_dir (Optional[str], optional): The path to the output directory.
Defaults to None.
Returns:
Any: The map object.
"""
return common.sam_map_gui(
self, basemap=basemap, repeat_mode=repeat_mode, out_dir=out_dir, **kwargs
)
show_masks(self, figsize=(12, 10), cmap='binary_r', axis='off', foreground=True, **kwargs)
¶
Show the binary mask or the mask of objects with unique values.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
figsize |
tuple |
The figure size. Defaults to (12, 10). |
(12, 10) |
cmap |
str |
The colormap. Defaults to "binary_r". |
'binary_r' |
axis |
str |
Whether to show the axis. Defaults to "off". |
'off' |
foreground |
bool |
Whether to show the foreground mask only. Defaults to True. |
True |
**kwargs |
Any |
Other arguments for save_masks(). |
{} |
Source code in samgeo/samgeo2.py
def show_masks(
self,
figsize: Tuple[int, int] = (12, 10),
cmap: str = "binary_r",
axis: str = "off",
foreground: bool = True,
**kwargs: Any,
) -> None:
"""Show the binary mask or the mask of objects with unique values.
Args:
figsize (tuple, optional): The figure size. Defaults to (12, 10).
cmap (str, optional): The colormap. Defaults to "binary_r".
axis (str, optional): Whether to show the axis. Defaults to "off".
foreground (bool, optional): Whether to show the foreground mask only.
Defaults to True.
**kwargs: Other arguments for save_masks().
"""
import matplotlib.pyplot as plt
if self.objects is None:
self.save_masks(foreground=foreground, **kwargs)
plt.figure(figsize=figsize)
plt.imshow(self.objects, cmap=cmap)
plt.axis(axis)
plt.show()
show_prompts(self, prompts, frame_idx=0, mask=None, random_color=False, point_crs=None, figsize=(9, 6))
¶
Show the prompts on the image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
prompts |
Dict[int, Any] |
A dictionary containing the prompts with points and labels. |
required |
frame_idx |
int |
The frame index. Defaults to 0. |
0 |
mask |
Any |
The mask. Defaults to None. |
None |
random_color |
bool |
Whether to use random colors for the masks. Defaults to False. |
False |
point_crs |
Optional[str] |
The coordinate reference system |
None |
figsize |
Tuple[int, int] |
The figure size. Defaults to (9, 6). |
(9, 6) |
Source code in samgeo/samgeo2.py
def show_prompts(
self,
prompts: Dict[int, Any],
frame_idx: int = 0,
mask: Any = None,
random_color: bool = False,
point_crs: Optional[str] = None,
figsize: Tuple[int, int] = (9, 6),
) -> None:
"""Show the prompts on the image.
Args:
prompts (Dict[int, Any]): A dictionary containing the prompts with
points and labels.
frame_idx (int, optional): The frame index. Defaults to 0.
mask (Any, optional): The mask. Defaults to None.
random_color (bool, optional): Whether to use random colors for the
masks. Defaults to False.
point_crs (Optional[str], optional): The coordinate reference system
figsize (Tuple[int, int], optional): The figure size. Defaults to (9, 6).
"""
from PIL import Image
def show_mask(mask, ax, obj_id=None, random_color=random_color):
if random_color:
color = np.concatenate([np.random.random(3), np.array([0.6])], axis=0)
else:
cmap = plt.get_cmap("tab10")
cmap_idx = 0 if obj_id is None else obj_id
color = np.array([*cmap(cmap_idx)[:3], 0.6])
h, w = mask.shape[-2:]
mask_image = mask.reshape(h, w, 1) * color.reshape(1, 1, -1)
ax.imshow(mask_image)
def show_points(coords, labels, ax, marker_size=200):
pos_points = coords[labels == 1]
neg_points = coords[labels == 0]
ax.scatter(
pos_points[:, 0],
pos_points[:, 1],
color="green",
marker="*",
s=marker_size,
edgecolor="white",
linewidth=1.25,
)
ax.scatter(
neg_points[:, 0],
neg_points[:, 1],
color="red",
marker="*",
s=marker_size,
edgecolor="white",
linewidth=1.25,
)
def show_box(box, ax):
x0, y0 = box[0], box[1]
w, h = box[2] - box[0], box[3] - box[1]
ax.add_patch(
plt.Rectangle(
(x0, y0), w, h, edgecolor="green", facecolor=(0, 0, 0, 0), lw=2
)
)
if point_crs is not None and self._tif_source is not None:
for prompt in prompts.values():
points = prompt.get("points", None)
if points is not None:
points = common.coords_to_xy(self._tif_source, points, point_crs)
prompt["points"] = points
box = prompt.get("box", None)
if box is not None:
box = common.bbox_to_xy(self._tif_source, box, point_crs)
prompt["box"] = box
prompts = self._convert_prompts(prompts)
self.prompts = prompts
video_dir = self.video_path
frame_names = self._frame_names
fig = plt.figure(figsize=figsize)
fig.canvas.toolbar_visible = True
fig.canvas.header_visible = False
fig.canvas.footer_visible = True
plt.title(f"frame {frame_idx}")
plt.imshow(Image.open(os.path.join(video_dir, frame_names[frame_idx])))
for obj_id, prompt in prompts.items():
points = prompt.get("points", None)
labels = prompt.get("labels", None)
box = prompt.get("box", None)
anno_frame_idx = prompt.get("frame_idx", None)
if anno_frame_idx == frame_idx:
if points is not None:
show_points(points, labels, plt.gca())
if box is not None:
show_box(box, plt.gca())
if mask is not None:
show_mask(mask, plt.gca(), obj_id=obj_id)
plt.show()
tensor_to_numpy(self, index=None, output=None, mask_multiplier=255, dtype='uint8', save_args=None)
¶
Convert the predicted masks from tensors to numpy arrays.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
index |
Optional[int] |
The index of the mask to save. Defaults to None, which will save the mask with the highest score. |
None |
output |
Optional[str] |
The path to the output image. Defaults to None. |
None |
mask_multiplier |
int |
The mask multiplier for the output mask, which is usually a binary mask [0, 1]. |
255 |
dtype |
str |
The data type of the output image. Defaults to "uint8". |
'uint8' |
save_args |
Optional[Dict[str, Any]] |
Optional arguments for saving the output image. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
Optional[np.ndarray] |
The predicted mask as a numpy array, or None if output is specified. |
Source code in samgeo/samgeo2.py
def tensor_to_numpy(
self,
index: Optional[int] = None,
output: Optional[str] = None,
mask_multiplier: int = 255,
dtype: str = "uint8",
save_args: Optional[Dict[str, Any]] = None,
) -> Optional[np.ndarray]:
"""Convert the predicted masks from tensors to numpy arrays.
Args:
index (Optional[int], optional): The index of the mask to save.
Defaults to None, which will save the mask with the highest score.
output (Optional[str], optional): The path to the output image.
Defaults to None.
mask_multiplier (int, optional): The mask multiplier for the output
mask, which is usually a binary mask [0, 1].
dtype (str, optional): The data type of the output image. Defaults
to "uint8".
save_args (Optional[Dict[str, Any]], optional): Optional arguments
for saving the output image. Defaults to None.
Returns:
Optional[np.ndarray]: The predicted mask as a numpy array, or None
if output is specified.
"""
if save_args is None:
save_args = {}
boxes = self.boxes
masks = self.masks
image_pil = self.image
image_np = np.array(image_pil)
if index is None:
index = 0
masks = masks[:, index, :, :]
if len(masks.shape) == 4 and masks.shape[1] == 1:
masks = masks.squeeze(1)
if boxes is None or (len(boxes) == 0): # No "object" instances found
print("No objects found in the image.")
return
else:
# Create an empty image to store the mask overlays
mask_overlay = np.zeros_like(
image_np[..., 0], dtype=dtype
) # Adjusted for single channel
for i, (_, mask) in enumerate(zip(boxes, masks)):
# Convert tensor to numpy array if necessary and ensure it contains integers
if isinstance(mask, torch.Tensor):
mask = (
mask.cpu().numpy().astype(dtype)
) # If mask is on GPU, use .cpu() before .numpy()
mask_overlay += ((mask > 0) * (i + 1)).astype(
dtype
) # Assign a unique value for each mask
# Normalize mask_overlay to be in [0, 255]
mask_overlay = (
mask_overlay > 0
) * mask_multiplier # Binary mask in [0, 255]
if output is not None:
common.array_to_image(
mask_overlay, output, self.source, dtype=dtype, **save_args
)
else:
return mask_overlay