"""
Modified from https://github.com/pytorch/vision/blob/main/torchvision/models/mobilenetv3.py
"""
from functools import partial
from typing import Any, Callable, Dict, List, Optional, Sequence
import oneflow as flow
from oneflow import nn, Tensor
from .utils import load_state_dict_from_url
from .registry import ModelCreator
from .helpers import make_divisible
__all__ = ["MobileNetV3", "mobilenet_v3_large", "mobilenet_v3_small"]
model_urls = {
"mobilenet_v3_large": "https://oneflow-public.oss-cn-beijing.aliyuncs.com/model_zoo/flowvision/classification/MobileNet/mobilenet_v3_large.zip",
"mobilenet_v3_small": "https://oneflow-public.oss-cn-beijing.aliyuncs.com/model_zoo/flowvision/classification/MobileNet/mobilenet_v3_small.zip",
}
class ConvBNActivation(nn.Sequential):
def __init__(
self,
in_planes: int,
out_planes: int,
kernel_size: int = 3,
stride: int = 1,
groups: int = 1,
norm_layer: Optional[Callable[..., nn.Module]] = None,
activation_layer: Optional[Callable[..., nn.Module]] = None,
dilation: int = 1,
) -> None:
padding = (kernel_size - 1) // 2 * dilation
if norm_layer is None:
norm_layer = nn.BatchNorm2d
if activation_layer is None:
activation_layer = nn.ReLU6
super().__init__(
nn.Conv2d(
in_planes,
out_planes,
kernel_size,
stride,
padding,
dilation=dilation,
groups=groups,
bias=False,
),
norm_layer(out_planes),
activation_layer(inplace=True),
)
self.out_channels = out_planes
class SqueezeExcitation(nn.Module):
# Implemented as described at Figure 4 of the MobileNetV3 paper
def __init__(self, input_channels: int, squeeze_factor: int = 4):
super().__init__()
squeeze_channels = make_divisible(input_channels // squeeze_factor, 8)
self.fc1 = nn.Conv2d(input_channels, squeeze_channels, 1)
self.relu = nn.ReLU(inplace=True)
self.fc2 = nn.Conv2d(squeeze_channels, input_channels, 1)
self.adaptive_avg_pool2d = nn.AdaptiveAvgPool2d(1)
self.hardsigmoid = nn.Hardsigmoid(inplace=True)
def _scale(self, input: Tensor, inplace: bool) -> Tensor:
scale = self.adaptive_avg_pool2d(input)
scale = self.fc1(scale)
scale = self.relu(scale)
scale = self.fc2(scale)
return self.hardsigmoid(scale)
def forward(self, input: Tensor) -> Tensor:
scale = self._scale(input, True)
return scale * input
class InvertedResidualConfig:
# Stores information listed at Tables 1 and 2 of the MobileNetV3 paper
def __init__(
self,
input_channels: int,
kernel: int,
expanded_channels: int,
out_channels: int,
use_se: bool,
activation: str,
stride: int,
dilation: int,
width_mult: float,
):
self.input_channels = self.adjust_channels(input_channels, width_mult)
self.kernel = kernel
self.expanded_channels = self.adjust_channels(expanded_channels, width_mult)
self.out_channels = self.adjust_channels(out_channels, width_mult)
self.use_se = use_se
self.use_hs = activation == "HS"
self.stride = stride
self.dilation = dilation
@staticmethod
def adjust_channels(channels: int, width_mult: float):
return make_divisible(channels * width_mult, 8)
class InvertedResidual(nn.Module):
# Implemented as described at section 5 of MobileNetV3 paper
def __init__(
self,
cnf: InvertedResidualConfig,
norm_layer: Callable[..., nn.Module],
se_layer: Callable[..., nn.Module] = SqueezeExcitation,
):
super().__init__()
if not (1 <= cnf.stride <= 2):
raise ValueError("illegal stride value")
self.use_res_connect = (
cnf.stride == 1 and cnf.input_channels == cnf.out_channels
)
layers: List[nn.Module] = []
activation_layer = nn.Hardswish if cnf.use_hs else nn.ReLU
# expand
if cnf.expanded_channels != cnf.input_channels:
layers.append(
ConvBNActivation(
cnf.input_channels,
cnf.expanded_channels,
kernel_size=1,
norm_layer=norm_layer,
activation_layer=activation_layer,
)
)
# depthwise
stride = 1 if cnf.dilation > 1 else cnf.stride
layers.append(
ConvBNActivation(
cnf.expanded_channels,
cnf.expanded_channels,
kernel_size=cnf.kernel,
stride=stride,
dilation=cnf.dilation,
groups=cnf.expanded_channels,
norm_layer=norm_layer,
activation_layer=activation_layer,
)
)
if cnf.use_se:
layers.append(se_layer(cnf.expanded_channels))
# project
layers.append(
ConvBNActivation(
cnf.expanded_channels,
cnf.out_channels,
kernel_size=1,
norm_layer=norm_layer,
activation_layer=nn.Identity,
)
)
self.block = nn.Sequential(*layers)
self.out_channels = cnf.out_channels
self._is_cn = cnf.stride > 1
def forward(self, input: Tensor) -> Tensor:
result = self.block(input)
if self.use_res_connect:
result += input
return result
class MobileNetV3(nn.Module):
def __init__(
self,
inverted_residual_setting: List[InvertedResidualConfig],
last_channel: int,
num_classes: int = 1000,
block: Optional[Callable[..., nn.Module]] = None,
norm_layer: Optional[Callable[..., nn.Module]] = None,
**kwargs: Any
) -> None:
"""
MobileNet V3 main class
Args:
inverted_residual_setting (List[InvertedResidualConfig]): Network structure
last_channel (int): The number of channels on the penultimate layer
num_classes (int): Number of classes
block (Optional[Callable[..., nn.Module]]): Module specifying inverted residual building block for mobilenet
norm_layer (Optional[Callable[..., nn.Module]]): Module specifying the normalization layer to use
"""
super().__init__()
if not inverted_residual_setting:
raise ValueError("The inverted_residual_setting should not be empty")
elif not (
isinstance(inverted_residual_setting, Sequence)
and all(
[
isinstance(s, InvertedResidualConfig)
for s in inverted_residual_setting
]
)
):
raise TypeError(
"The inverted_residual_setting should be List[InvertedResidualConfig]"
)
if block is None:
block = InvertedResidual
if norm_layer is None:
norm_layer = partial(nn.BatchNorm2d, eps=0.001, momentum=0.01)
layers: List[nn.Module] = []
# building first layer
firstconv_output_channels = inverted_residual_setting[0].input_channels
layers.append(
ConvBNActivation(
3,
firstconv_output_channels,
kernel_size=3,
stride=2,
norm_layer=norm_layer,
activation_layer=nn.Hardswish,
)
)
# building inverted residual blocks
for cnf in inverted_residual_setting:
layers.append(block(cnf, norm_layer))
# building last several layers
lastconv_input_channels = inverted_residual_setting[-1].out_channels
lastconv_output_channels = 6 * lastconv_input_channels
layers.append(
ConvBNActivation(
lastconv_input_channels,
lastconv_output_channels,
kernel_size=1,
norm_layer=norm_layer,
activation_layer=nn.Hardswish,
)
)
self.features = nn.Sequential(*layers)
self.avgpool = nn.AdaptiveAvgPool2d(1)
self.classifier = nn.Sequential(
nn.Linear(lastconv_output_channels, last_channel),
nn.Hardswish(inplace=True),
nn.Dropout(p=0.2, inplace=True),
nn.Linear(last_channel, num_classes),
)
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode="fan_out")
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, nn.BatchNorm2d):
nn.init.ones_(m.weight)
nn.init.zeros_(m.bias)
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight, 0, 0.01)
nn.init.zeros_(m.bias)
def _forward_impl(self, x: Tensor) -> Tensor:
x = self.features(x)
x = self.avgpool(x)
x = flow.flatten(x, 1)
x = self.classifier(x)
return x
def forward(self, x: Tensor) -> Tensor:
return self._forward_impl(x)
def _mobilenet_v3_conf(
arch: str,
width_mult: float = 1.0,
reduced_tail: bool = False,
dilated: bool = False,
**kwargs: Any
):
reduce_divider = 2 if reduced_tail else 1
dilation = 2 if dilated else 1
bneck_conf = partial(InvertedResidualConfig, width_mult=width_mult)
adjust_channels = partial(
InvertedResidualConfig.adjust_channels, width_mult=width_mult
)
if arch == "mobilenet_v3_large":
inverted_residual_setting = [
bneck_conf(16, 3, 16, 16, False, "RE", 1, 1),
bneck_conf(16, 3, 64, 24, False, "RE", 2, 1), # C1
bneck_conf(24, 3, 72, 24, False, "RE", 1, 1),
bneck_conf(24, 5, 72, 40, True, "RE", 2, 1), # C2
bneck_conf(40, 5, 120, 40, True, "RE", 1, 1),
bneck_conf(40, 5, 120, 40, True, "RE", 1, 1),
bneck_conf(40, 3, 240, 80, False, "HS", 2, 1), # C3
bneck_conf(80, 3, 200, 80, False, "HS", 1, 1),
bneck_conf(80, 3, 184, 80, False, "HS", 1, 1),
bneck_conf(80, 3, 184, 80, False, "HS", 1, 1),
bneck_conf(80, 3, 480, 112, True, "HS", 1, 1),
bneck_conf(112, 3, 672, 112, True, "HS", 1, 1),
bneck_conf(
112, 5, 672, 160 // reduce_divider, True, "HS", 2, dilation
), # C4
bneck_conf(
160 // reduce_divider,
5,
960 // reduce_divider,
160 // reduce_divider,
True,
"HS",
1,
dilation,
),
bneck_conf(
160 // reduce_divider,
5,
960 // reduce_divider,
160 // reduce_divider,
True,
"HS",
1,
dilation,
),
]
last_channel = adjust_channels(1280 // reduce_divider) # C5
elif arch == "mobilenet_v3_small":
inverted_residual_setting = [
bneck_conf(16, 3, 16, 16, True, "RE", 2, 1), # C1
bneck_conf(16, 3, 72, 24, False, "RE", 2, 1), # C2
bneck_conf(24, 3, 88, 24, False, "RE", 1, 1),
bneck_conf(24, 5, 96, 40, True, "HS", 2, 1), # C3
bneck_conf(40, 5, 240, 40, True, "HS", 1, 1),
bneck_conf(40, 5, 240, 40, True, "HS", 1, 1),
bneck_conf(40, 5, 120, 48, True, "HS", 1, 1),
bneck_conf(48, 5, 144, 48, True, "HS", 1, 1),
bneck_conf(48, 5, 288, 96 // reduce_divider, True, "HS", 2, dilation), # C4
bneck_conf(
96 // reduce_divider,
5,
576 // reduce_divider,
96 // reduce_divider,
True,
"HS",
1,
dilation,
),
bneck_conf(
96 // reduce_divider,
5,
576 // reduce_divider,
96 // reduce_divider,
True,
"HS",
1,
dilation,
),
]
last_channel = adjust_channels(1024 // reduce_divider) # C5
else:
raise ValueError("Unsupported model type {}".format(arch))
return inverted_residual_setting, last_channel
def _mobilenet_v3_model(
arch: str,
inverted_residual_setting: List[InvertedResidualConfig],
last_channel: int,
pretrained: bool,
progress: bool,
**kwargs: Any
):
model = MobileNetV3(inverted_residual_setting, last_channel, **kwargs)
if pretrained:
if model_urls.get(arch, None) is None:
raise ValueError(
"No checkpoint is available for model type {}".format(arch)
)
state_dict = load_state_dict_from_url(model_urls[arch], progress=progress)
model.load_state_dict(state_dict)
return model
[docs]@ModelCreator.register_model
def mobilenet_v3_large(
pretrained: bool = False, progress: bool = True, **kwargs: Any
) -> MobileNetV3:
"""
Constructs the MobileNetV3-Large model.
.. note::
MobileNetV3-Large model architecture from the `Searching for MobileNetV3 <https://arxiv.org/abs/1905.02244>`_ paper.
Args:
pretrained (bool): Whether to download the pre-trained model on ImageNet. Default: ``False``
progress (bool): If True, displays a progress bar of the download to stderr. Default: ``True``
For example:
.. code-block:: python
>>> import flowvision
>>> mobilenet_v3_large = flowvision.models.mobilenet_v3_large(pretrained=False, progress=True)
"""
arch = "mobilenet_v3_large"
inverted_residual_setting, last_channel = _mobilenet_v3_conf(arch, **kwargs)
return _mobilenet_v3_model(
arch, inverted_residual_setting, last_channel, pretrained, progress, **kwargs
)
[docs]@ModelCreator.register_model
def mobilenet_v3_small(
pretrained: bool = False, progress: bool = True, **kwargs: Any
) -> MobileNetV3:
"""
Constructs the MobileNetV3-Small model.
.. note::
MobileNetV3-Small model architecture from the `Searching for MobileNetV3 <https://arxiv.org/abs/1905.02244>`_ paper.
Args:
pretrained (bool): Whether to download the pre-trained model on ImageNet. Default: ``False``
progress (bool): If True, displays a progress bar of the download to stderr. Default: ``True``
For example:
.. code-block:: python
>>> import flowvision
>>> mobilenet_v3_small = flowvision.models.mobilenet_v3_small(pretrained=False, progress=True)
"""
arch = "mobilenet_v3_small"
inverted_residual_setting, last_channel = _mobilenet_v3_conf(arch, **kwargs)
return _mobilenet_v3_model(
arch, inverted_residual_setting, last_channel, pretrained, progress, **kwargs
)