"""
Code for PhiNets (https://doi.org/10.1145/3510832).
Authors:
- Francesco Paissan, 2023
- Alberto Ancilotto, 2023
- Matteo Beltrami, 2023
- Matteo Tremonti, 2023
"""
from typing import List
import torch
import torch.ao.nn.quantized as nnq
import torch.nn as nn
import torch.nn.functional as F
from torchinfo import summary
def _make_divisible(v, divisor=8, min_value=None):
"""
This function is taken from the original tf repo. It can be seen here:
https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
It ensures that all layers have a channel number that is divisible by divisor.
Arguments
---------
v : int
The original number of channels.
divisor : int, optional
The divisor to ensure divisibility (default is 8).
min_value : int or None, optional
The minimum value for the divisible channels (default is None).
Returns
-------
int
The adjusted number of channels.
"""
if min_value is None:
min_value = divisor
new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
# Make sure that round down does not go down by more than 10%.
if new_v < 0.9 * v:
new_v += divisor
return new_v
[docs]
def correct_pad(input_shape, kernel_size):
"""Returns a tuple for zero-padding for 2D convolution with downsampling.
Arguments
---------
input_shape : tuple or list
Shape of the input tensor (height, width).
kernel_size : int or tuple
Size of the convolution kernel.
Returns
-------
tuple
A tuple representing the zero-padding in the format (left, right, top, bottom).
"""
if isinstance(kernel_size, int):
kernel_size = (kernel_size, kernel_size)
if input_shape[0] is None:
adjust = (1, 1)
else:
adjust = (1 - input_shape[0] % 2, 1 - input_shape[1] % 2)
correct = (kernel_size[0] // 2, kernel_size[1] // 2)
return (
int(correct[1] - adjust[1]),
int(correct[1]),
int(correct[0] - adjust[0]),
int(correct[0]),
)
[docs]
def get_xpansion_factor(t_zero, beta, block_id, num_blocks):
"""Compute the expansion factor based on the formula from the paper.
Arguments
---------
t_zero : float
The base expansion factor.
beta : float
The shape factor.
block_id : int
The identifier of the current block.
num_blocks : int
The total number of blocks.
Returns
-------
float
The computed expansion factor.
"""
return (t_zero * beta) * block_id / num_blocks + t_zero * (
num_blocks - block_id
) / num_blocks
[docs]
class ReLUMax(torch.nn.Module):
"""Implements ReLUMax.
Arguments
---------
max_value : float
The maximum value for the clamp operation.
"""
def __init__(self, max):
super(ReLUMax, self).__init__()
self.max = max
[docs]
def forward(self, x):
"""Forward pass of ReLUMax.
Arguments
---------
x : torch.Tensor
Input tensor.
Returns
-------
torch.Tensor
Output tensor after applying ReLU with max value.
"""
return torch.clamp(x, min=0, max=self.max)
[docs]
class SEBlock(torch.nn.Module):
"""Implements squeeze-and-excitation block.
Arguments
---------
in_channels : int
Input number of channels.
out_channels : int
Output number of channels.
h_swish : bool, optional
Whether to use the h_swish (default is True).
"""
def __init__(self, in_channels, out_channels, h_swish=True):
super(SEBlock, self).__init__()
self.se_conv = nn.Conv2d(
in_channels,
out_channels,
kernel_size=1,
padding=0,
bias=False,
)
self.se_conv2 = nn.Conv2d(
out_channels, in_channels, kernel_size=1, bias=False, padding=0
)
if h_swish:
self.activation = nn.Hardswish(inplace=True)
else:
self.activation = ReLUMax(6)
# It serves for the quantization.
# The behavior remains equivalent for the unquantized models.
self.mult = nnq.FloatFunctional()
[docs]
def forward(self, x):
"""Executes the squeeze-and-excitation block.
Arguments
---------
x : torch.Tensor
Input tensor.
Returns
-------
torch.Tensor
Output of the squeeze-and-excitation block.
"""
inp = x
x = F.adaptive_avg_pool2d(x, (1, 1))
x = self.se_conv(x)
x = self.activation(x)
x = self.se_conv2(x)
x = torch.sigmoid(x)
return self.mult.mul(inp, x) # Equivalent to ``torch.mul(a, b)``
[docs]
class DepthwiseConv2d(torch.nn.Conv2d):
"""Depthwise 2D convolution layer.
Arguments
---------
in_channels : int
Number of input channels.
depth_multiplier : int, optional
The channel multiplier for the output channels (default is 1).
kernel_size : int or tuple, optional
Size of the convolution kernel (default is 3).
stride : int or tuple, optional
Stride of the convolution (default is 1).
padding : int or tuple, optional
Zero-padding added to both sides of the input (default is 0).
dilation : int or tuple, optional
Spacing between kernel elements (default is 1).
bias : bool, optional
If True, adds a learnable bias to the output (default is False).
padding_mode : str, optional
'zeros' or 'circular'. Padding mode for convolution (default is 'zeros').
"""
def __init__(
self,
in_channels,
depth_multiplier=1,
kernel_size=3,
stride=1,
padding=0,
dilation=1,
bias=False,
padding_mode="zeros",
):
out_channels = in_channels * depth_multiplier
super().__init__(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
groups=in_channels,
bias=bias,
padding_mode=padding_mode,
)
[docs]
class SeparableConv2d(torch.nn.Module):
"""Implements SeparableConv2d.
Arguments
---------
in_channels : int
Input number of channels.
out_channels : int
Output number of channels.
activation : function, optional
Activation function to apply (default is torch.nn.functional.relu).
kernel_size : int, optional
Kernel size (default is 3).
stride : int, optional
Stride for convolution (default is 1).
padding : int, optional
Padding for convolution (default is 0).
dilation : int, optional
Dilation factor for convolution (default is 1).
bias : bool, optional
If True, adds a learnable bias to the output (default is True).
padding_mode : str, optional
Padding mode for convolution (default is 'zeros').
depth_multiplier : int, optional
Depth multiplier (default is 1).
"""
def __init__(
self,
in_channels,
out_channels,
activation=torch.nn.functional.relu,
kernel_size=3,
stride=1,
padding=0,
dilation=1,
bias=True,
padding_mode="zeros",
depth_multiplier=1,
):
super().__init__()
self._layers = torch.nn.ModuleList()
depthwise = torch.nn.Conv2d(
in_channels=in_channels,
out_channels=in_channels,
kernel_size=3,
stride=stride,
padding=0,
dilation=1,
groups=in_channels,
bias=bias,
padding_mode=padding_mode,
)
spatialConv = torch.nn.Conv2d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=1,
stride=1,
padding=0,
dilation=dilation,
# groups=in_channels,
bias=bias,
padding_mode=padding_mode,
)
bn = torch.nn.BatchNorm2d(out_channels, eps=1e-3, momentum=0.999)
self._layers.append(depthwise)
self._layers.append(spatialConv)
self._layers.append(bn)
self._layers.append(activation)
[docs]
def forward(self, x):
"""Executes the SeparableConv2d block.
Arguments
---------
x : torch.Tensor
Input tensor.
Returns
-------
torch.Tensor
Output of the convolution.
"""
for layer in self._layers:
x = layer(x)
return x
[docs]
class PhiNetConvBlock(nn.Module):
"""Implements PhiNet's convolutional block.
Arguments
---------
in_shape : tuple
Input shape of the conv block.
expansion : float
Expansion coefficient for this convolutional block.
stride: int
Stride for the conv block.
filters : int
Output channels of the convolutional block.
block_id : int
ID of the convolutional block.
has_se : bool
Whether to include use Squeeze and Excite or not.
res : bool
Whether to use the residual connection or not.
h_swish : bool
Whether to use HSwish or not.
k_size : int
Kernel size for the depthwise convolution.
"""
def __init__(
self,
in_shape,
expansion,
stride,
filters,
has_se,
block_id=None,
res=True,
h_swish=True,
k_size=3,
dp_rate=0.05,
divisor=1,
):
super(PhiNetConvBlock, self).__init__()
self.param_count = 0
self.skip_conn = False
self._layers = torch.nn.ModuleList()
in_channels = in_shape[0]
# Define activation function
if h_swish:
activation = nn.Hardswish(inplace=True)
else:
activation = ReLUMax(6)
if block_id:
# Expand
conv1 = nn.Conv2d(
in_channels,
_make_divisible(int(expansion * in_channels), divisor=divisor),
kernel_size=1,
padding=0,
bias=False,
)
bn1 = nn.BatchNorm2d(
_make_divisible(int(expansion * in_channels), divisor=divisor),
eps=1e-3,
momentum=0.999,
)
self._layers.append(conv1)
self._layers.append(bn1)
self._layers.append(activation)
if stride == 2:
padding = correct_pad([res, res], 3)
self._layers.append(nn.Dropout2d(dp_rate))
d_mul = 1
in_channels_dw = (
_make_divisible(int(expansion * in_channels), divisor=divisor)
if block_id
else in_channels
)
out_channels_dw = in_channels_dw * d_mul
dw1 = DepthwiseConv2d(
in_channels=in_channels_dw,
depth_multiplier=d_mul,
kernel_size=k_size,
stride=stride,
bias=False,
padding=k_size // 2 if stride == 1 else (padding[1], padding[3]),
)
bn_dw1 = nn.BatchNorm2d(
out_channels_dw,
eps=1e-3,
momentum=0.999,
)
# It is necessary to reinitialize the activation
# for functions using Module.children() to work properly.
# Module.children() does not return repeated layers.
if h_swish:
activation = nn.Hardswish(inplace=True)
else:
activation = ReLUMax(6)
self._layers.append(dw1)
self._layers.append(bn_dw1)
self._layers.append(activation)
if has_se:
num_reduced_filters = _make_divisible(
max(1, int(out_channels_dw / 6)), divisor=divisor
)
se_block = SEBlock(out_channels_dw, num_reduced_filters, h_swish=h_swish)
self._layers.append(se_block)
conv2 = nn.Conv2d(
in_channels=out_channels_dw,
out_channels=filters,
kernel_size=1,
padding=0,
bias=False,
)
bn2 = nn.BatchNorm2d(
filters,
eps=1e-3,
momentum=0.999,
)
self._layers.append(conv2)
self._layers.append(bn2)
if res and in_channels == filters and stride == 1:
self.skip_conn = True
# It serves for the quantization.
# The behavior remains equivalent for the unquantized models.
self.op = nnq.FloatFunctional()
[docs]
def forward(self, x):
"""Executes the PhiNet convolutional block.
Arguments
---------
x : torch.Tensor
Input to the convolutional block.
Returns
-------
torch.Tensor
Output of the convolutional block.
"""
if self.skip_conn:
inp = x
for layer in self._layers:
x = layer(x)
if self.skip_conn:
return self.op.add(x, inp) # Equivalent to ``torch.add(a, b)``
return x
[docs]
class PhiNet(nn.Module):
"""
This class implements the PhiNet architecture.
Arguments
---------
input_shape : tuple
Input resolution as (C, H, W).
num_layers : int
Number of convolutional blocks.
alpha: float
Width multiplier for PhiNet architecture.
beta : float
Shape factor of PhiNet.
t_zero : float
Base expansion factor for PhiNet.
include_top : bool
Whether to include classification head or not.
num_classes : int
Number of classes for the classification head.
compatibility : bool
`True` to maximise compatibility among embedded platforms (changes network).
"""
[docs]
def get_complexity(self):
"""Returns MAC and number of parameters of initialized architecture.
Returns
-------
Dictionary with complexity characterization of the network. : dict
Example
-------
.. doctest::
>>> from micromind.networks import PhiNet
>>> model = PhiNet((3, 224, 224))
>>> model.get_complexity()
{'MAC': 9817670, 'params': 30917}
"""
temp = summary(
self, input_data=torch.zeros([1] + list(self.input_shape)), verbose=0
)
return {"MAC": temp.total_mult_adds, "params": temp.total_params}
[docs]
def get_MAC(self):
"""Returns number of MACs for this architecture.
Returns
-------
Number of MAC for this network. : int
Example
-------
.. doctest::
>>> from micromind.networks import PhiNet
>>> model = PhiNet((3, 224, 224))
>>> model.get_MAC()
9817670
"""
return self.get_complexity()["MAC"]
[docs]
def get_params(self):
"""Returns number of params for this architecture.
Returns
-------
Number of parameters for this network. : int
Example
-------
.. doctest::
>>> from micromind.networks import PhiNet
>>> model = PhiNet((3, 224, 224))
>>> model.get_params()
30917
"""
return self.get_complexity()["params"]
def __init__(
self,
input_shape: List[int],
num_layers: int = 7, # num_layers
alpha: float = 0.2,
beta: float = 1.0,
t_zero: float = 6,
include_top: bool = False,
num_classes: int = 10,
compatibility: bool = False,
downsampling_layers: List[int] = [5, 7], # S2
conv5_percent: float = 0.0, # S2
first_conv_stride: int = 2, # S2
residuals: bool = True, # S2
conv2d_input: bool = False, # S2
pool: bool = False, # S2
h_swish: bool = True, # S1
squeeze_excite: bool = True, # S1
divisor: int = 1,
return_layers=None,
) -> None:
super(PhiNet, self).__init__()
self.alpha = alpha
self.beta = beta
self.t_zero = t_zero
self.num_layers = num_layers
self.num_classes = num_classes
self.return_layers = return_layers
if compatibility: # disables operations hard for some platforms
h_swish = False
squeeze_excite = False
# this hyperparameters are hard-coded. Defined here as variables just so
# you can play with them.
first_conv_filters = 48
b1_filters = 24
b2_filters = 48
if not isinstance(num_layers, int):
num_layers = round(num_layers)
assert len(input_shape) == 3, "Expected 3 elements list as input_shape."
in_channels = input_shape[0]
res = max(input_shape[1], input_shape[2]) # assumes squared input
self.input_shape = input_shape
self.classify = include_top
self._layers = torch.nn.ModuleList()
# Define self.activation function
if h_swish:
activation = nn.Hardswish(inplace=True)
else:
activation = ReLUMax(6)
mp = nn.MaxPool2d((2, 2))
if not conv2d_input:
pad = nn.ZeroPad2d(
padding=correct_pad([res, res], 3),
)
self._layers.append(pad)
sep1 = SeparableConv2d(
in_channels,
_make_divisible(int(first_conv_filters * alpha), divisor=divisor),
kernel_size=3,
stride=(first_conv_stride, first_conv_stride),
padding=0,
bias=False,
activation=activation,
)
self._layers.append(sep1)
# self._layers.append(activation)
block1 = PhiNetConvBlock(
in_shape=(
_make_divisible(int(first_conv_filters * alpha), divisor=divisor),
res / first_conv_stride,
res / first_conv_stride,
),
filters=_make_divisible(int(b1_filters * alpha), divisor=divisor),
stride=1,
expansion=1,
has_se=False,
res=residuals,
h_swish=h_swish,
divisor=divisor,
)
self._layers.append(block1)
else:
c1 = nn.Conv2d(
in_channels, int(b1_filters * alpha), kernel_size=(3, 3), bias=False
)
bn_c1 = nn.BatchNorm2d(int(b1_filters * alpha))
self._layers.append(c1)
self._layers.append(activation)
self._layers.append(bn_c1)
block2 = PhiNetConvBlock(
(
_make_divisible(int(b1_filters * alpha), divisor=divisor),
res / first_conv_stride,
res / first_conv_stride,
),
filters=_make_divisible(int(b1_filters * alpha), divisor=divisor),
stride=2 if (not pool) else 1,
expansion=get_xpansion_factor(t_zero, beta, 1, num_layers),
block_id=1,
has_se=squeeze_excite,
res=residuals,
h_swish=h_swish,
divisor=divisor,
)
block3 = PhiNetConvBlock(
(
_make_divisible(int(b1_filters * alpha), divisor=divisor),
res / first_conv_stride / 2,
res / first_conv_stride / 2,
),
filters=_make_divisible(int(b1_filters * alpha), divisor=divisor),
stride=1,
expansion=get_xpansion_factor(t_zero, beta, 2, num_layers),
block_id=2,
has_se=squeeze_excite,
res=residuals,
h_swish=h_swish,
divisor=divisor,
)
block4 = PhiNetConvBlock(
(
_make_divisible(int(b1_filters * alpha), divisor=divisor),
res / first_conv_stride / 2,
res / first_conv_stride / 2,
),
filters=_make_divisible(int(b2_filters * alpha), divisor=divisor),
stride=2 if (not pool) else 1,
expansion=get_xpansion_factor(t_zero, beta, 3, num_layers),
block_id=3,
has_se=squeeze_excite,
res=residuals,
h_swish=h_swish,
divisor=divisor,
)
self._layers.append(block2)
if pool:
self._layers.append(mp)
self._layers.append(block3)
self._layers.append(block4)
if pool:
self._layers.append(mp)
block_id = 4
block_filters = b2_filters
spatial_res = res / first_conv_stride / 4
in_channels_next = _make_divisible(int(b2_filters * alpha), divisor=divisor)
while num_layers >= block_id:
if block_id in downsampling_layers:
block_filters *= 2
if pool:
self._layers.append(mp)
pn_block = PhiNetConvBlock(
(in_channels_next, spatial_res, spatial_res),
filters=_make_divisible(int(block_filters * alpha), divisor=divisor),
stride=(2 if (block_id in downsampling_layers) and (not pool) else 1),
expansion=get_xpansion_factor(t_zero, beta, block_id, num_layers),
block_id=block_id,
has_se=squeeze_excite,
res=residuals,
h_swish=h_swish,
k_size=(5 if (block_id / num_layers) > (1 - conv5_percent) else 3),
divisor=divisor,
)
self._layers.append(pn_block)
in_channels_next = _make_divisible(
int(block_filters * alpha), divisor=divisor
)
spatial_res = (
spatial_res / 2 if block_id in downsampling_layers else spatial_res
)
block_id += 1
if include_top:
# Includes classification head if required
self.classifier = nn.Sequential(
nn.AdaptiveAvgPool2d((1, 1)),
nn.Flatten(),
nn.Linear(
_make_divisible(int(block_filters * alpha), divisor=divisor),
num_classes,
bias=True,
),
)
if self.return_layers is not None:
print(f"PhiNet configured to return layers {self.return_layers}:")
for i in self.return_layers:
print(f"Layer {i} - {self._layers[i].__class__}")
[docs]
def forward(self, x):
"""Executes PhiNet network
Arguments
-------
x : torch.Tensor
Network input.
Returns
------
Logits if `include_top=True`, otherwise embeddings : torch.Tensor
"""
ret = []
for i, layers in enumerate(self._layers):
x = layers(x)
if self.return_layers is not None:
if i in self.return_layers:
ret.append(x)
if self.classify:
x = self.classifier(x)
if self.return_layers is not None:
return x, ret
return x