PulseFocusPlatform/static/ppdet/modeling/backbones/efficientnet.py

292 lines
8.9 KiB
Python

# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from __future__ import division
import collections
import math
import re
from paddle import fluid
from paddle.fluid.regularizer import L2Decay
from ppdet.core.workspace import register
__all__ = ['EfficientNet']
GlobalParams = collections.namedtuple('GlobalParams', [
'batch_norm_momentum', 'batch_norm_epsilon', 'width_coefficient',
'depth_coefficient', 'depth_divisor'
])
BlockArgs = collections.namedtuple('BlockArgs', [
'kernel_size', 'num_repeat', 'input_filters', 'output_filters',
'expand_ratio', 'stride', 'se_ratio'
])
GlobalParams.__new__.__defaults__ = (None, ) * len(GlobalParams._fields)
BlockArgs.__new__.__defaults__ = (None, ) * len(BlockArgs._fields)
def _decode_block_string(block_string):
assert isinstance(block_string, str)
ops = block_string.split('_')
options = {}
for op in ops:
splits = re.split(r'(\d.*)', op)
if len(splits) >= 2:
key, value = splits[:2]
options[key] = value
assert (('s' in options and len(options['s']) == 1) or
(len(options['s']) == 2 and options['s'][0] == options['s'][1]))
return BlockArgs(
kernel_size=int(options['k']),
num_repeat=int(options['r']),
input_filters=int(options['i']),
output_filters=int(options['o']),
expand_ratio=int(options['e']),
se_ratio=float(options['se']) if 'se' in options else None,
stride=int(options['s'][0]))
def get_model_params(scale):
block_strings = [
'r1_k3_s11_e1_i32_o16_se0.25',
'r2_k3_s22_e6_i16_o24_se0.25',
'r2_k5_s22_e6_i24_o40_se0.25',
'r3_k3_s22_e6_i40_o80_se0.25',
'r3_k5_s11_e6_i80_o112_se0.25',
'r4_k5_s22_e6_i112_o192_se0.25',
'r1_k3_s11_e6_i192_o320_se0.25',
]
block_args = []
for block_string in block_strings:
block_args.append(_decode_block_string(block_string))
params_dict = {
# width, depth
'b0': (1.0, 1.0),
'b1': (1.0, 1.1),
'b2': (1.1, 1.2),
'b3': (1.2, 1.4),
'b4': (1.4, 1.8),
'b5': (1.6, 2.2),
'b6': (1.8, 2.6),
'b7': (2.0, 3.1),
}
w, d = params_dict[scale]
global_params = GlobalParams(
batch_norm_momentum=0.99,
batch_norm_epsilon=1e-3,
width_coefficient=w,
depth_coefficient=d,
depth_divisor=8)
return block_args, global_params
def round_filters(filters, global_params):
multiplier = global_params.width_coefficient
if not multiplier:
return filters
divisor = global_params.depth_divisor
filters *= multiplier
min_depth = divisor
new_filters = max(min_depth,
int(filters + divisor / 2) // divisor * divisor)
if new_filters < 0.9 * filters: # prevent rounding by more than 10%
new_filters += divisor
return int(new_filters)
def round_repeats(repeats, global_params):
multiplier = global_params.depth_coefficient
if not multiplier:
return repeats
return int(math.ceil(multiplier * repeats))
def conv2d(inputs,
num_filters,
filter_size,
stride=1,
padding='SAME',
groups=1,
use_bias=False,
name='conv2d'):
param_attr = fluid.ParamAttr(name=name + '_weights')
bias_attr = False
if use_bias:
bias_attr = fluid.ParamAttr(
name=name + '_offset', regularizer=L2Decay(0.))
feats = fluid.layers.conv2d(
inputs,
num_filters,
filter_size,
groups=groups,
name=name,
stride=stride,
padding=padding,
param_attr=param_attr,
bias_attr=bias_attr)
return feats
def batch_norm(inputs, momentum, eps, name=None):
param_attr = fluid.ParamAttr(name=name + '_scale', regularizer=L2Decay(0.))
bias_attr = fluid.ParamAttr(name=name + '_offset', regularizer=L2Decay(0.))
return fluid.layers.batch_norm(
input=inputs,
momentum=momentum,
epsilon=eps,
name=name,
moving_mean_name=name + '_mean',
moving_variance_name=name + '_variance',
param_attr=param_attr,
bias_attr=bias_attr)
def mb_conv_block(inputs,
input_filters,
output_filters,
expand_ratio,
kernel_size,
stride,
momentum,
eps,
se_ratio=None,
name=None):
feats = inputs
num_filters = input_filters * expand_ratio
if expand_ratio != 1:
feats = conv2d(feats, num_filters, 1, name=name + '_expand_conv')
feats = batch_norm(feats, momentum, eps, name=name + '_bn0')
feats = fluid.layers.swish(feats)
feats = conv2d(
feats,
num_filters,
kernel_size,
stride,
groups=num_filters,
name=name + '_depthwise_conv')
feats = batch_norm(feats, momentum, eps, name=name + '_bn1')
feats = fluid.layers.swish(feats)
if se_ratio is not None:
filter_squeezed = max(1, int(input_filters * se_ratio))
squeezed = fluid.layers.pool2d(
feats, pool_type='avg', global_pooling=True)
squeezed = conv2d(
squeezed,
filter_squeezed,
1,
use_bias=True,
name=name + '_se_reduce')
squeezed = fluid.layers.swish(squeezed)
squeezed = conv2d(
squeezed, num_filters, 1, use_bias=True, name=name + '_se_expand')
feats = feats * fluid.layers.sigmoid(squeezed)
feats = conv2d(feats, output_filters, 1, name=name + '_project_conv')
feats = batch_norm(feats, momentum, eps, name=name + '_bn2')
if stride == 1 and input_filters == output_filters:
feats = fluid.layers.elementwise_add(feats, inputs)
return feats
@register
class EfficientNet(object):
"""
EfficientNet, see https://arxiv.org/abs/1905.11946
Args:
scale (str): compounding scale factor, 'b0' - 'b7'.
use_se (bool): use squeeze and excite module.
norm_type (str): normalization type, 'bn' and 'sync_bn' are supported
"""
__shared__ = ['norm_type']
def __init__(self, scale='b0', use_se=True, norm_type='bn'):
assert scale in ['b' + str(i) for i in range(8)], \
"valid scales are b0 - b7"
assert norm_type in ['bn', 'sync_bn'], \
"only 'bn' and 'sync_bn' are supported"
super(EfficientNet, self).__init__()
self.norm_type = norm_type
self.scale = scale
self.use_se = use_se
def __call__(self, inputs):
blocks_args, global_params = get_model_params(self.scale)
momentum = global_params.batch_norm_momentum
eps = global_params.batch_norm_epsilon
num_filters = round_filters(32, global_params)
feats = conv2d(
inputs,
num_filters=num_filters,
filter_size=3,
stride=2,
name='_conv_stem')
feats = batch_norm(feats, momentum=momentum, eps=eps, name='_bn0')
feats = fluid.layers.swish(feats)
layer_count = 0
feature_maps = []
for b, block_arg in enumerate(blocks_args):
for r in range(block_arg.num_repeat):
input_filters = round_filters(block_arg.input_filters,
global_params)
output_filters = round_filters(block_arg.output_filters,
global_params)
kernel_size = block_arg.kernel_size
stride = block_arg.stride
se_ratio = None
if self.use_se:
se_ratio = block_arg.se_ratio
if r > 0:
input_filters = output_filters
stride = 1
feats = mb_conv_block(
feats,
input_filters,
output_filters,
block_arg.expand_ratio,
kernel_size,
stride,
momentum,
eps,
se_ratio=se_ratio,
name='_blocks.{}.'.format(layer_count))
layer_count += 1
feature_maps.append(feats)
return list(feature_maps[i] for i in [2, 4, 6])