Source code for flowvision.data.mixup

"""OneFlow implementation of Mixup and Cutmix
Modified from https://github.com/rwightman/pytorch-image-models/blob/master/timm/data/mixup.py
"""

import numpy as np

import oneflow as flow


def one_hot(x, num_classes, on_value=1.0, off_value=0.0, device="cuda"):
    x = x.long().view(-1, 1)
    # TODO: switch to tensor.scatter method
    return flow.scatter(
        flow.full((x.size()[0], num_classes), off_value, device=device),
        dim=1,
        index=x,
        src=on_value,
    )


[docs]def mixup_target(target, num_classes, lam=1.0, smoothing=0.0, device="cuda"): """ Mixup the targets with label-smoothing """ off_value = smoothing / num_classes on_value = 1.0 - smoothing + off_value y1 = one_hot( target, num_classes, on_value=on_value, off_value=off_value, device=device ) y2 = one_hot( target.flip(0), num_classes, on_value=on_value, off_value=off_value, device=device, ) return y1 * lam + y2 * (1.0 - lam)
[docs]def rand_bbox(img_shape, lam, margin=0.0, count=None): """ Standard CutMix bounding-box Generates a random square bbox based on lambda value. This impl includes support for enforcing a border margin as percent of bbox dimensions. Args: img_shape (tuple): Image shape as tuple lam (float): Cutmix lambda value margin (float): Percentage of bbox dimension to enforce as margin (reduce amount of box outside image) count (int): Number of bbox to generate """ ratio = np.sqrt(1 - lam) img_h, img_w = img_shape[-2:] cut_h, cut_w = int(img_h * ratio), int(img_w * ratio) margin_y, margin_x = int(margin * cut_h), int(margin * cut_w) cy = np.random.randint(0 + margin_y, img_h - margin_y, size=count) cx = np.random.randint(0 + margin_x, img_w - margin_x, size=count) yl = np.clip(cy - cut_h // 2, 0, img_h) yh = np.clip(cy + cut_h // 2, 0, img_h) xl = np.clip(cx - cut_w // 2, 0, img_w) xh = np.clip(cx + cut_w // 2, 0, img_w) return yl, yh, xl, xh
[docs]def rand_bbox_minmax(img_shape, minmax, count=None): """ Min-Max CutMix bounding-box Inspired by Darknet cutmix impl, generates a random rectangular bbox based on min/max percent values applied to each dimension of the input image. Typical defaults for minmax are usually in the .2-.3 for min and .8-.9 range for max. Args: img_shape (tuple): Image shape as tuple minmax (tuple or list): Min and max bbox ratios (as percent of image size) count (int): Number of bbox to generate """ assert len(minmax) == 2 img_h, img_w = img_shape[-2:] cut_h = np.random.randint( int(img_h * minmax[0]), int(img_h * minmax[1]), size=count ) cut_w = np.random.randint( int(img_w * minmax[0]), int(img_w * minmax[1]), size=count ) yl = np.random.randint(0, img_h - cut_h, size=count) xl = np.random.randint(0, img_w - cut_w, size=count) yu = yl + cut_h xu = xl + cut_w return yl, yu, xl, xu
[docs]def cutmix_bbox_and_lam( img_shape, lam, ratio_minmax=None, correct_lam=True, count=None ): """ Generate bbox and apply lambda correction. """ if ratio_minmax is not None: yl, yu, xl, xu = rand_bbox_minmax(img_shape, ratio_minmax, count=count) else: yl, yu, xl, xu = rand_bbox(img_shape, lam, count=count) # lam may not be the correct one due to the clip func if correct_lam or ratio_minmax is not None: bbox_area = (yu - yl) * (xu - xl) lam = 1.0 - bbox_area / float(img_shape[-2] * img_shape[-1]) return (yl, yu, xl, xu), lam
[docs]class Mixup: """ Mixup/Cutmix that applies different params to each element or whole batch Args: mixup_alpha (float): Mixup alpha value, mixup is active if > 0 cutmix_alpha (float): Cutmix alpha value, cutmix is active if > 0 cutmix_minmax (List[float]): Cutmix min/max image ratio, cutmix is active and uses this vs alpha if not None prob (float): Probability of applying mixup or cutmix per batch or element switch_prob (float): Probability of switching to cutmix instead of mixup when both are active mode (str): How to apply mixup/cutmix params (per 'batch', 'pair' (pair of elements), 'elem' (element) correct_lam (bool): Apply lambda correction when cutmix bbox clipped by image borders label_smoothing (float): Apply label smoothing to the mixed target tensor num_classes (int): Number of classes for target """ def __init__( self, mixup_alpha=1.0, cutmix_alpha=0.0, cutmix_minmax=None, prob=1.0, switch_prob=0.5, mode="batch", correct_lam=True, label_smoothing=0.1, num_classes=1000, ): self.mixup_alpha = mixup_alpha self.cutmix_alpha = cutmix_alpha self.cutmix_minmax = cutmix_minmax if self.cutmix_minmax is not None: assert len(self.cutmix_minmax) == 2 # force cutmix alpha == 1.0 when minmax active to keep logic simple & safe self.cutmix_alpha = 1.0 self.mix_prob = prob self.switch_prob = switch_prob self.label_smoothing = label_smoothing self.num_classes = num_classes self.mode = mode self.correct_lam = ( correct_lam # correct lambda based on clipped area for cutmix ) self.mixup_enabled = ( True # set to false to disable mixing (intended tp be set by train loop) ) def _params_per_elem(self, batch_size): lam = np.ones(batch_size, dtype=np.float32) use_cutmix = np.zeros(batch_size, dtype=np.bool) if self.mixup_enabled: if self.mixup_alpha > 0.0 and self.cutmix_alpha > 0.0: use_cutmix = np.random.rand(batch_size) < self.switch_prob lam_mix = np.where( use_cutmix, np.random.beta( self.cutmix_alpha, self.cutmix_alpha, size=batch_size ), np.random.beta(self.mixup_alpha, self.mixup_alpha, size=batch_size), ) elif self.mixup_alpha > 0.0: lam_mix = np.random.beta( self.mixup_alpha, self.mixup_alpha, size=batch_size ) elif self.cutmix_alpha > 0.0: use_cutmix = np.ones(batch_size, dtype=np.bool) lam_mix = np.random.beta( self.cutmix_alpha, self.cutmix_alpha, size=batch_size ) else: assert ( False ), "One of mixup_alpha > 0., cutmix_alpha > 0., cutmix_minmax not None should be true." lam = np.where( np.random.rand(batch_size) < self.mix_prob, lam_mix.astype(np.float32), lam, ) return lam, use_cutmix def _params_per_batch(self): lam = 1.0 use_cutmix = False if self.mixup_enabled and np.random.rand() < self.mix_prob: if self.mixup_alpha > 0.0 and self.cutmix_alpha > 0.0: use_cutmix = np.random.rand() < self.switch_prob lam_mix = ( np.random.beta(self.cutmix_alpha, self.cutmix_alpha) if use_cutmix else np.random.beta(self.mixup_alpha, self.mixup_alpha) ) elif self.mixup_alpha > 0.0: lam_mix = np.random.beta(self.mixup_alpha, self.mixup_alpha) elif self.cutmix_alpha > 0.0: use_cutmix = True lam_mix = np.random.beta(self.cutmix_alpha, self.cutmix_alpha) else: assert ( False ), "One of mixup_alpha > 0., cutmix_alpha > 0., cutmix_minmax not None should be true." lam = float(lam_mix) return lam, use_cutmix def _mix_elem(self, x): batch_size = len(x) lam_batch, use_cutmix = self._params_per_elem(batch_size) x_orig = x.clone() for i in range(batch_size): j = batch_size - i - 1 lam = lam_batch[i] if lam != 1: if use_cutmix[i]: (yl, yh, xl, xh), lam = cutmix_bbox_and_lam( x[i].shape, lam, ratio_minmax=self.cutmix_minmax, correct_lam=self.correct_lam, ) x[i][:, yl:yh, xl:xh] = x_orig[j][:, yl:yh, xl:xh] lam_batch[i] = lam else: lam = flow.tensor(lam, device=x.device, dtype=x.dtype) x[i] = x[i] * lam + x_orig[i] * (1 - lam) return flow.tensor(lam_batch, device=x.device, dtype=x.dtype).unsqueeze(1) def _mix_pair(self, x): batch_size = len(x) lam_batch, use_cutmix = self._params_per_elem(batch_size // 2) x_orig = x.clone() for i in range(batch_size // 2): j = batch_size - i - 1 lam = lam_batch[i] if lam != 1: if use_cutmix[i]: (yl, yh, xl, xh), lam = cutmix_bbox_and_lam( x[i].shape, lam, ratio_minmax=self.cutmix_minmax, correct_lam=self.correct_lam, ) x[i][:, yl:yh, xl:xh] = x_orig[j][:, yl:yh, xl:xh] x[j][:, yl:yh, xl:xh] = x_orig[i][:, yl:yh, xl:xh] lam_batch[i] = lam else: # TODO: support tensor * numpy lam = flow.tensor(lam, device=x.device, dtype=x.dtype) x[i] = x[i] * lam + x_orig[j] * (1 - lam) x[j] = x[j] * lam + x_orig[i] * (1 - lam) lam_batch = np.concatenate((lam_batch, lam_batch[::-1])) return flow.tensor(lam_batch, device=x.device, dtype=x.dtype).unsqueeze(1) def _mix_batch(self, x): lam, use_cutmix = self._params_per_batch() if lam == 1.0: return 1.0 if use_cutmix: (yl, yh, xl, xh), lam = cutmix_bbox_and_lam( x.shape, lam, ratio_minmax=self.cutmix_minmax, correct_lam=self.correct_lam, ) x[:, :, yl:yh, xl:xh] = x.flip(0)[:, :, yl:yh, xl:xh] else: x_flipped = x.flip(0).mul(1.0 - lam) x.mul_(lam).add_(x_flipped) return lam def __call__(self, x, target): assert len(x) % 2 == 0, "Batch size should be even when using this" if self.mode == "elem": lam = self._mix_elem(x) elif self.mode == "pair": lam = self._mix_pair(x) else: lam = self._mix_batch(x) target = mixup_target( target, self.num_classes, lam, self.label_smoothing, x.device ) return x, target