Skip to content

AutoDataset

AutoDataset #

Source code in focoos/data/auto_dataset.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class AutoDataset:
    def __init__(
        self,
        dataset_name: str,
        task: Task,
        layout: DatasetLayout,
        datasets_dir: str = DATASETS_DIR,
    ):
        self.task = task
        self.layout = layout
        self.datasets_dir = datasets_dir
        self.dataset_name = dataset_name

        if self.layout is not DatasetLayout.CATALOG:
            dataset_path = os.path.join(self.datasets_dir, dataset_name)
        else:
            dataset_path = self.datasets_dir

        if dataset_path.endswith(".zip") or dataset_path.endswith(".gz"):
            # compressed path: datasets_root_dir/dataset_compressed/{dataset_name}.zip
            # _dest_path = os.path.join(self.datasets_root_dir, dataset_name.split(".")[0])
            assert not (self.layout == DatasetLayout.CATALOG and not is_inside_sagemaker()), (
                "Catalog layout does not support compressed datasets externally to Sagemaker."
            )
            if self.layout == DatasetLayout.CATALOG:
                dataset_path = extract_archive(dataset_path)
                logger.info(f"Extracted archive: {dataset_path}, {os.listdir(dataset_path)}")
            else:
                dataset_name = dataset_name.split(".")[0]
                _dest_path = os.path.join(self.datasets_dir, dataset_name)
                dataset_path = extract_archive(dataset_path, _dest_path)
                logger.info(f"Extracted archive: {dataset_path}, {os.listdir(dataset_path)}")

        self.dataset_path = str(dataset_path)
        self.dataset_name = dataset_name
        logger.info(
            f"🔄 Loading dataset {self.dataset_name}, 📁 Dataset Path: {self.dataset_path}, 🗂️ Dataset Layout: {self.layout}"
        )

    def _load_split(self, dataset_name: str, split: DatasetSplitType) -> DictDataset:
        if self.layout == DatasetLayout.CATALOG:
            return DictDataset.from_catalog(ds_name=dataset_name, split_type=split, root=self.dataset_path)
        else:
            ds_root = self.dataset_path
            if not check_folder_exists(ds_root):
                raise FileNotFoundError(f"Dataset {ds_root} not found")
            split_path = self._get_split_path(dataset_root=ds_root, split_type=split)
            if self.layout == DatasetLayout.ROBOFLOW_SEG:
                return DictDataset.from_roboflow_seg(ds_dir=split_path, task=self.task, split_type=split)
            elif self.layout == DatasetLayout.CLS_FOLDER:
                return DictDataset.from_folder(root_dir=split_path, split_type=split)
            elif self.layout == DatasetLayout.ROBOFLOW_COCO:
                return DictDataset.from_roboflow_coco(ds_dir=split_path, task=self.task, split_type=split)
            else:  # Focoos
                raise NotImplementedError(f"Dataset layout {self.layout} not implemented")

    def _load_mapper(
        self,
        augs: List[Union[A.Augmentation, T.Transform]],
        is_validation_split: bool,
        resolution: Optional[Union[int, Tuple[int, int]]] = None,
    ) -> DatasetMapper:
        if self.task == Task.SEMSEG:
            return SemanticDatasetMapper(
                image_format="RGB",
                ignore_label=255,
                augmentations=augs,
                is_train=not is_validation_split,
                resolution=resolution,
            )
        elif self.task == Task.DETECTION:
            return DetectionDatasetMapper(
                image_format="RGB",
                is_train=not is_validation_split,
                augmentations=augs,
                resolution=resolution,
            )
        elif self.task == Task.INSTANCE_SEGMENTATION:
            return DetectionDatasetMapper(
                image_format="RGB",
                is_train=not is_validation_split,
                augmentations=augs,
                use_instance_mask=True,
                resolution=resolution,
            )
        elif self.task == Task.CLASSIFICATION:
            return ClassificationDatasetMapper(
                image_format="RGB",
                is_train=not is_validation_split,
                augmentations=augs,
                resolution=resolution,
            )
        elif self.task == Task.KEYPOINT:
            return KeypointDatasetMapper(
                image_format="RGB",
                augmentations=augs,
                is_train=not is_validation_split,
                resolution=resolution,
                # keypoint_hflip_indices=np.array(keypoint_hflip_indices),
            )
        else:
            raise NotImplementedError(f"Task {self.task} not found in autodataset _load_mapper()")

    def _get_split_path(self, dataset_root: str, split_type: DatasetSplitType) -> str:
        if split_type == DatasetSplitType.TRAIN:
            possible_names = ["train", "training"]
            for name in possible_names:
                split_path = os.path.join(dataset_root, name)
                if check_folder_exists(split_path):
                    return split_path
            raise FileNotFoundError(f"Train split not found in {dataset_root}")
        elif split_type == DatasetSplitType.VAL:
            possible_names = ["valid", "val", "validation"]
            for name in possible_names:
                split_path = os.path.join(dataset_root, name)
                if check_folder_exists(split_path):
                    return split_path
            raise FileNotFoundError(f"Validation split not found in {dataset_root}")
        else:
            raise ValueError(f"Invalid split type: {split_type}")

    def get_split(
        self,
        augs: DatasetAugmentations,
        split: DatasetSplitType = DatasetSplitType.TRAIN,
    ) -> MapDataset:
        """
        Generate a dataset for a given dataset name with optional augmentations.

        Parameters:
            augs (DatasetAugmentations): Augmentations configuration.
                Resolution will be automatically extracted from this object.
            split (DatasetSplitType): Dataset split type (TRAIN or VAL).

        Returns:
            MapDataset: A DictDataset with DatasetMapper for training.
        """
        dict_split = self._load_split(dataset_name=self.dataset_name, split=split)
        assert dict_split.metadata.num_classes > 0, "Number of dataset classes must be greater than 0"

        # Extract resolution and augmentations from DatasetAugmentations
        resolution = augs.resolution
        augs_list = augs.get_augmentations()

        return MapDataset(
            dataset=dict_split,
            mapper=self._load_mapper(
                augs=augs_list,
                is_validation_split=(split == DatasetSplitType.VAL),
                resolution=resolution,
            ),
        )  # type: ignore

get_split(augs, split=DatasetSplitType.TRAIN) #

Generate a dataset for a given dataset name with optional augmentations.

Parameters:

Name Type Description Default
augs DatasetAugmentations

Augmentations configuration. Resolution will be automatically extracted from this object.

required
split DatasetSplitType

Dataset split type (TRAIN or VAL).

TRAIN

Returns:

Name Type Description
MapDataset MapDataset

A DictDataset with DatasetMapper for training.

Source code in focoos/data/auto_dataset.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def get_split(
    self,
    augs: DatasetAugmentations,
    split: DatasetSplitType = DatasetSplitType.TRAIN,
) -> MapDataset:
    """
    Generate a dataset for a given dataset name with optional augmentations.

    Parameters:
        augs (DatasetAugmentations): Augmentations configuration.
            Resolution will be automatically extracted from this object.
        split (DatasetSplitType): Dataset split type (TRAIN or VAL).

    Returns:
        MapDataset: A DictDataset with DatasetMapper for training.
    """
    dict_split = self._load_split(dataset_name=self.dataset_name, split=split)
    assert dict_split.metadata.num_classes > 0, "Number of dataset classes must be greater than 0"

    # Extract resolution and augmentations from DatasetAugmentations
    resolution = augs.resolution
    augs_list = augs.get_augmentations()

    return MapDataset(
        dataset=dict_split,
        mapper=self._load_mapper(
            augs=augs_list,
            is_validation_split=(split == DatasetSplitType.VAL),
            resolution=resolution,
        ),
    )  # type: ignore

DatasetAugmentations dataclass #

Configuration class for dataset augmentations.

This class defines parameters for various image transformations used in training and validation pipelines for computer vision tasks. It provides a comprehensive set of options for both color and geometric augmentations.

Attributes:

Name Type Description
resolution Union[int, Tuple[int, int]]

Target image size for resizing operations. If int, treated as square (size, size). If tuple, treated as (height, width). Range [256, 1024] for int. Default: 640.

color_augmentation float

Strenght of color augmentations. Range [0,1]. Default: 0.0.

horizontal_flip float

Probability of applying horizontal flip. Range [0,1]. Default: 0.0.

vertical_flip float

Probability of applying vertical flip. Range [0,1]. Default: 0.0.

zoom_out float

Probability of applying RandomZoomOut. Range [0,1]. Default: 0.0.

zoom_out_side float

Zoom out side range. Range [1,5]. Default: 4.0.

rotation float

Probability of applying RandomRotation. 1 equals +/-180 degrees. Range [0,1]. Default: 0.0.

square bool

Whether to Square the image. Default: False.

aspect_ratio float

Aspect ratio for resizing (actual scale range is (2 ** -aspect_ratio, 2 ** aspect_ratio). Range [0,1]. Default: 0.0.

scale_ratio Optional[float]

scale factor for resizing (actual scale range is (2 ** -scale_ratio, 2 ** scale_ratio). Range [0,1]. Default: None.

max_size Optional[int]

Maximum allowed dimension after resizing. Range [256, sys.maxsize]. Default: sys.maxsize.

crop bool

Whether to apply RandomCrop. Default: False.

crop_size_min Optional[int]

Minimum crop size for RandomCrop. Range [256, 1024]. Default: None.

crop_size_max Optional[int]

Maximum crop size for RandomCrop. Range [256, 1024]. Default: None.

Source code in focoos/data/default_aug.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
@dataclass
class DatasetAugmentations:
    """
    Configuration class for dataset augmentations.

    This class defines parameters for various image transformations used in training and validation
    pipelines for computer vision tasks. It provides a comprehensive set of options for both
    color and geometric augmentations.

    Attributes:
        resolution (Union[int, Tuple[int, int]]): Target image size for resizing operations.
            If int, treated as square (size, size). If tuple, treated as (height, width).
            Range [256, 1024] for int. Default: 640.
        ==
        color_augmentation (float): Strenght of color augmentations.
            Range [0,1]. Default: 0.0.
        ==
        horizontal_flip (float): Probability of applying horizontal flip.
            Range [0,1]. Default: 0.0.
        vertical_flip (float): Probability of applying vertical flip.
            Range [0,1]. Default: 0.0.
        zoom_out (float): Probability of applying RandomZoomOut.
            Range [0,1]. Default: 0.0.
        zoom_out_side (float): Zoom out side range.
            Range [1,5]. Default: 4.0.
        rotation (float): Probability of applying RandomRotation. 1 equals +/-180 degrees.
            Range [0,1]. Default: 0.0.
        ==
        square (bool): Whether to Square the image.
            Default: False.
        aspect_ratio (float): Aspect ratio for resizing (actual scale range is (2 ** -aspect_ratio, 2 ** aspect_ratio).
            Range [0,1]. Default: 0.0.
        scale_ratio (Optional[float]): scale factor for resizing (actual scale range is (2 ** -scale_ratio, 2 ** scale_ratio).
            Range [0,1]. Default: None.
        max_size (Optional[int]): Maximum allowed dimension after resizing.
            Range [256, sys.maxsize]. Default: sys.maxsize.
        ==
        crop (bool): Whether to apply RandomCrop.
            Default: False.
        crop_size_min (Optional[int]): Minimum crop size for RandomCrop.
            Range [256, 1024]. Default: None.
        crop_size_max (Optional[int]): Maximum crop size for RandomCrop.
            Range [256, 1024]. Default: None.
    """

    # Resolution for resizing
    resolution: Union[int, Tuple[int, int]] = 640

    # Color augmentation parameters
    color_augmentation: float = 0.0
    color_base_brightness: int = 32
    color_base_saturation: float = 0.5
    color_base_contrast: float = 0.5
    color_base_hue: float = 18
    # blur: float = 0.0
    # noise: float = 0.0

    # Geometric augmentation
    horizontal_flip: float = 0.0
    vertical_flip: float = 0.0
    zoom_out: float = 0.0
    zoom_out_side: float = 4.0
    rotation: float = 0.0
    aspect_ratio: float = 0.0

    ## Rescaling
    square: float = 0.0
    scale_ratio: float = 0.0
    max_size: int = 4096

    # Cropping
    crop: bool = False
    crop_size: Optional[int] = None

    # TODO: Add more augmentations like:
    # - GaussianBlur
    # - RandomNoise
    # - RandomResizedCrop

    def override(self, args):
        if not isinstance(args, dict):
            args = vars(args)
        for key, value in args.items():
            if hasattr(self, key) and value is not None:
                setattr(self, key, value)
        return self

    def get_augmentations(self, img_format="RGB", task: Optional[Task] = None) -> List[T.Transform]:
        """Generate augmentation pipeline based on configuration."""
        augs = []
        self.max_size = self.max_size if self.max_size else sys.maxsize

        # Normalize resolution to tuple format for easier handling
        if isinstance(self.resolution, int):
            resolution_tuple = (self.resolution, self.resolution)
            resolution_value = self.resolution  # For scalar operations
        else:
            resolution_tuple = self.resolution
            resolution_value = min(self.resolution)  # Use min for scalar operations (shortest edge)

        ### Add color augmentation if configured
        if self.color_augmentation > 0:
            brightness_delta = int(self.color_base_brightness * self.color_augmentation)
            contrast_delta = self.color_base_contrast * self.color_augmentation
            saturation_delta = self.color_base_saturation * self.color_augmentation
            hue_delta = int(self.color_base_hue * self.color_augmentation)
            augs.append(
                T.ColorAugSSDTransform(
                    img_format=img_format,
                    brightness_delta=brightness_delta,
                    contrast_low=(1 - contrast_delta),
                    contrast_high=(1 + contrast_delta),
                    saturation_low=(1 - saturation_delta),
                    saturation_high=(1 + saturation_delta),
                    hue_delta=hue_delta,
                ),
            )

        ### Add geometric augmentations
        # Add flipping augmentations if configured
        if self.horizontal_flip > 0:
            augs.append(A.RandomFlip(prob=self.horizontal_flip, horizontal=True))
        if self.vertical_flip > 0:
            augs.append(A.RandomFlip(prob=self.vertical_flip, horizontal=False, vertical=True))

        # Add zoom out augmentations if configured
        if self.zoom_out > 0.0:
            seg_pad_value = 255 if task == Task.SEMSEG else 0
            augs.append(
                A.RandomApply(
                    A.RandomZoomOut(side_range=(1.0, self.zoom_out_side), pad_value=0, seg_pad_value=seg_pad_value),
                    prob=self.zoom_out,
                )
            )

        ### Add AspectRatio augmentations based on configuration
        if self.square > 0.0:
            augs.append(A.RandomApply(A.Resize(shape=resolution_tuple), prob=self.square))
        elif self.aspect_ratio > 0.0:
            augs.append(A.RandomAspectRatio(aspect_ratio=self.aspect_ratio))

        ### Add Resizing augmentations based on configuration
        # For non-square resolutions, use Resize directly to get exact dimensions
        # For square resolutions, use ResizeShortestEdge with scale augmentation
        is_non_square = isinstance(self.resolution, tuple) and self.resolution[0] != self.resolution[1]

        if is_non_square:
            # Non-square resolution: use direct Resize to get exact (height, width)
            # Note: scale_ratio is ignored for non-square resolutions
            augs.append(A.Resize(shape=resolution_tuple))
        else:
            # Square resolution: use ResizeShortestEdge with scale augmentation
            min_scale, max_scale = 2 ** (-self.scale_ratio), 2**self.scale_ratio
            augs.append(
                A.ResizeShortestEdge(
                    short_edge_length=[int(x * resolution_value) for x in [min_scale, max_scale]],
                    sample_style="range",
                    max_size=self.max_size,
                )
            )

        ### Add rotation augmentations if configured
        if self.rotation > 0:
            angle = self.rotation * 180
            augs.append(A.RandomRotation(angle=(-angle, angle), expand=False))

        # Add cropping if configured
        if self.crop:
            if self.crop_size:
                crop_range = (self.crop_size, self.crop_size)
            else:
                crop_range = resolution_tuple
            augs.append(A.RandomCrop(crop_type="absolute", crop_size=crop_range))

        return augs

get_augmentations(img_format='RGB', task=None) #

Generate augmentation pipeline based on configuration.

Source code in focoos/data/default_aug.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def get_augmentations(self, img_format="RGB", task: Optional[Task] = None) -> List[T.Transform]:
    """Generate augmentation pipeline based on configuration."""
    augs = []
    self.max_size = self.max_size if self.max_size else sys.maxsize

    # Normalize resolution to tuple format for easier handling
    if isinstance(self.resolution, int):
        resolution_tuple = (self.resolution, self.resolution)
        resolution_value = self.resolution  # For scalar operations
    else:
        resolution_tuple = self.resolution
        resolution_value = min(self.resolution)  # Use min for scalar operations (shortest edge)

    ### Add color augmentation if configured
    if self.color_augmentation > 0:
        brightness_delta = int(self.color_base_brightness * self.color_augmentation)
        contrast_delta = self.color_base_contrast * self.color_augmentation
        saturation_delta = self.color_base_saturation * self.color_augmentation
        hue_delta = int(self.color_base_hue * self.color_augmentation)
        augs.append(
            T.ColorAugSSDTransform(
                img_format=img_format,
                brightness_delta=brightness_delta,
                contrast_low=(1 - contrast_delta),
                contrast_high=(1 + contrast_delta),
                saturation_low=(1 - saturation_delta),
                saturation_high=(1 + saturation_delta),
                hue_delta=hue_delta,
            ),
        )

    ### Add geometric augmentations
    # Add flipping augmentations if configured
    if self.horizontal_flip > 0:
        augs.append(A.RandomFlip(prob=self.horizontal_flip, horizontal=True))
    if self.vertical_flip > 0:
        augs.append(A.RandomFlip(prob=self.vertical_flip, horizontal=False, vertical=True))

    # Add zoom out augmentations if configured
    if self.zoom_out > 0.0:
        seg_pad_value = 255 if task == Task.SEMSEG else 0
        augs.append(
            A.RandomApply(
                A.RandomZoomOut(side_range=(1.0, self.zoom_out_side), pad_value=0, seg_pad_value=seg_pad_value),
                prob=self.zoom_out,
            )
        )

    ### Add AspectRatio augmentations based on configuration
    if self.square > 0.0:
        augs.append(A.RandomApply(A.Resize(shape=resolution_tuple), prob=self.square))
    elif self.aspect_ratio > 0.0:
        augs.append(A.RandomAspectRatio(aspect_ratio=self.aspect_ratio))

    ### Add Resizing augmentations based on configuration
    # For non-square resolutions, use Resize directly to get exact dimensions
    # For square resolutions, use ResizeShortestEdge with scale augmentation
    is_non_square = isinstance(self.resolution, tuple) and self.resolution[0] != self.resolution[1]

    if is_non_square:
        # Non-square resolution: use direct Resize to get exact (height, width)
        # Note: scale_ratio is ignored for non-square resolutions
        augs.append(A.Resize(shape=resolution_tuple))
    else:
        # Square resolution: use ResizeShortestEdge with scale augmentation
        min_scale, max_scale = 2 ** (-self.scale_ratio), 2**self.scale_ratio
        augs.append(
            A.ResizeShortestEdge(
                short_edge_length=[int(x * resolution_value) for x in [min_scale, max_scale]],
                sample_style="range",
                max_size=self.max_size,
            )
        )

    ### Add rotation augmentations if configured
    if self.rotation > 0:
        angle = self.rotation * 180
        augs.append(A.RandomRotation(angle=(-angle, angle), expand=False))

    # Add cropping if configured
    if self.crop:
        if self.crop_size:
            crop_range = (self.crop_size, self.crop_size)
        else:
            crop_range = resolution_tuple
        augs.append(A.RandomCrop(crop_type="absolute", crop_size=crop_range))

    return augs