forked from PulseFocusPlatform/PulseFocusPlatform
282 lines
10 KiB
Python
282 lines
10 KiB
Python
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import paddle
|
|
import paddle.nn as nn
|
|
import paddle.nn.functional as F
|
|
from paddle.nn.initializer import Normal
|
|
|
|
from ppdet.core.workspace import register
|
|
from .bbox_head import BBoxHead, TwoFCHead, XConvNormHead
|
|
from .roi_extractor import RoIAlign
|
|
from ..shape_spec import ShapeSpec
|
|
from ..bbox_utils import delta2bbox, clip_bbox, nonempty_bbox
|
|
|
|
__all__ = ['CascadeTwoFCHead', 'CascadeXConvNormHead', 'CascadeHead']
|
|
|
|
|
|
@register
|
|
class CascadeTwoFCHead(nn.Layer):
|
|
__shared__ = ['num_cascade_stage']
|
|
"""
|
|
Cascade RCNN bbox head with Two fc layers to extract feature
|
|
|
|
Args:
|
|
in_channel (int): Input channel which can be derived by from_config
|
|
out_channel (int): Output channel
|
|
resolution (int): Resolution of input feature map, default 7
|
|
num_cascade_stage (int): The number of cascade stage, default 3
|
|
"""
|
|
|
|
def __init__(self,
|
|
in_channel=256,
|
|
out_channel=1024,
|
|
resolution=7,
|
|
num_cascade_stage=3):
|
|
super(CascadeTwoFCHead, self).__init__()
|
|
|
|
self.in_channel = in_channel
|
|
self.out_channel = out_channel
|
|
|
|
self.head_list = []
|
|
for stage in range(num_cascade_stage):
|
|
head_per_stage = self.add_sublayer(
|
|
str(stage), TwoFCHead(in_channel, out_channel, resolution))
|
|
self.head_list.append(head_per_stage)
|
|
|
|
@classmethod
|
|
def from_config(cls, cfg, input_shape):
|
|
s = input_shape
|
|
s = s[0] if isinstance(s, (list, tuple)) else s
|
|
return {'in_channel': s.channels}
|
|
|
|
@property
|
|
def out_shape(self):
|
|
return [ShapeSpec(channels=self.out_channel, )]
|
|
|
|
def forward(self, rois_feat, stage=0):
|
|
out = self.head_list[stage](rois_feat)
|
|
return out
|
|
|
|
|
|
@register
|
|
class CascadeXConvNormHead(nn.Layer):
|
|
__shared__ = ['norm_type', 'freeze_norm', 'num_cascade_stage']
|
|
"""
|
|
Cascade RCNN bbox head with serveral convolution layers
|
|
|
|
Args:
|
|
in_channel (int): Input channels which can be derived by from_config
|
|
num_convs (int): The number of conv layers
|
|
conv_dim (int): The number of channels for the conv layers
|
|
out_channel (int): Output channels
|
|
resolution (int): Resolution of input feature map
|
|
norm_type (string): Norm type, bn, gn, sync_bn are available,
|
|
default `gn`
|
|
freeze_norm (bool): Whether to freeze the norm
|
|
num_cascade_stage (int): The number of cascade stage, default 3
|
|
"""
|
|
|
|
def __init__(self,
|
|
in_channel=256,
|
|
num_convs=4,
|
|
conv_dim=256,
|
|
out_channel=1024,
|
|
resolution=7,
|
|
norm_type='gn',
|
|
freeze_norm=False,
|
|
num_cascade_stage=3):
|
|
super(CascadeXConvNormHead, self).__init__()
|
|
self.in_channel = in_channel
|
|
self.out_channel = out_channel
|
|
|
|
self.head_list = []
|
|
for stage in range(num_cascade_stage):
|
|
head_per_stage = self.add_sublayer(
|
|
str(stage),
|
|
XConvNormHead(
|
|
in_channel,
|
|
num_convs,
|
|
conv_dim,
|
|
out_channel,
|
|
resolution,
|
|
norm_type,
|
|
freeze_norm,
|
|
stage_name='stage{}_'.format(stage)))
|
|
self.head_list.append(head_per_stage)
|
|
|
|
@classmethod
|
|
def from_config(cls, cfg, input_shape):
|
|
s = input_shape
|
|
s = s[0] if isinstance(s, (list, tuple)) else s
|
|
return {'in_channel': s.channels}
|
|
|
|
@property
|
|
def out_shape(self):
|
|
return [ShapeSpec(channels=self.out_channel, )]
|
|
|
|
def forward(self, rois_feat, stage=0):
|
|
out = self.head_list[stage](rois_feat)
|
|
return out
|
|
|
|
|
|
@register
|
|
class CascadeHead(BBoxHead):
|
|
__shared__ = ['num_classes', 'num_cascade_stages']
|
|
__inject__ = ['bbox_assigner', 'bbox_loss']
|
|
"""
|
|
Cascade RCNN bbox head
|
|
|
|
Args:
|
|
head (nn.Layer): Extract feature in bbox head
|
|
in_channel (int): Input channel after RoI extractor
|
|
roi_extractor (object): The module of RoI Extractor
|
|
bbox_assigner (object): The module of Box Assigner, label and sample the
|
|
box.
|
|
num_classes (int): The number of classes
|
|
bbox_weight (List[List[float]]): The weight to get the decode box and the
|
|
length of weight is the number of cascade stage
|
|
num_cascade_stages (int): THe number of stage to refine the box
|
|
"""
|
|
|
|
def __init__(self,
|
|
head,
|
|
in_channel,
|
|
roi_extractor=RoIAlign().__dict__,
|
|
bbox_assigner='BboxAssigner',
|
|
num_classes=80,
|
|
bbox_weight=[[10., 10., 5., 5.], [20.0, 20.0, 10.0, 10.0],
|
|
[30.0, 30.0, 15.0, 15.0]],
|
|
num_cascade_stages=3,
|
|
bbox_loss=None):
|
|
nn.Layer.__init__(self, )
|
|
self.head = head
|
|
self.roi_extractor = roi_extractor
|
|
if isinstance(roi_extractor, dict):
|
|
self.roi_extractor = RoIAlign(**roi_extractor)
|
|
self.bbox_assigner = bbox_assigner
|
|
|
|
self.num_classes = num_classes
|
|
self.bbox_weight = bbox_weight
|
|
self.num_cascade_stages = num_cascade_stages
|
|
self.bbox_loss = bbox_loss
|
|
|
|
self.bbox_score_list = []
|
|
self.bbox_delta_list = []
|
|
for i in range(num_cascade_stages):
|
|
score_name = 'bbox_score_stage{}'.format(i)
|
|
delta_name = 'bbox_delta_stage{}'.format(i)
|
|
bbox_score = self.add_sublayer(
|
|
score_name,
|
|
nn.Linear(
|
|
in_channel,
|
|
self.num_classes + 1,
|
|
weight_attr=paddle.ParamAttr(initializer=Normal(
|
|
mean=0.0, std=0.01))))
|
|
|
|
bbox_delta = self.add_sublayer(
|
|
delta_name,
|
|
nn.Linear(
|
|
in_channel,
|
|
4,
|
|
weight_attr=paddle.ParamAttr(initializer=Normal(
|
|
mean=0.0, std=0.001))))
|
|
self.bbox_score_list.append(bbox_score)
|
|
self.bbox_delta_list.append(bbox_delta)
|
|
self.assigned_label = None
|
|
self.assigned_rois = None
|
|
|
|
def forward(self, body_feats=None, rois=None, rois_num=None, inputs=None):
|
|
"""
|
|
body_feats (list[Tensor]): Feature maps from backbone
|
|
rois (Tensor): RoIs generated from RPN module
|
|
rois_num (Tensor): The number of RoIs in each image
|
|
inputs (dict{Tensor}): The ground-truth of image
|
|
"""
|
|
targets = []
|
|
if self.training:
|
|
rois, rois_num, targets = self.bbox_assigner(rois, rois_num, inputs)
|
|
targets_list = [targets]
|
|
self.assigned_rois = (rois, rois_num)
|
|
self.assigned_targets = targets
|
|
|
|
pred_bbox = None
|
|
head_out_list = []
|
|
for i in range(self.num_cascade_stages):
|
|
if i > 0:
|
|
rois, rois_num = self._get_rois_from_boxes(pred_bbox,
|
|
inputs['im_shape'])
|
|
if self.training:
|
|
rois, rois_num, targets = self.bbox_assigner(
|
|
rois, rois_num, inputs, i, is_cascade=True)
|
|
targets_list.append(targets)
|
|
|
|
rois_feat = self.roi_extractor(body_feats, rois, rois_num)
|
|
bbox_feat = self.head(rois_feat, i)
|
|
scores = self.bbox_score_list[i](bbox_feat)
|
|
deltas = self.bbox_delta_list[i](bbox_feat)
|
|
head_out_list.append([scores, deltas, rois])
|
|
pred_bbox = self._get_pred_bbox(deltas, rois, self.bbox_weight[i])
|
|
|
|
if self.training:
|
|
loss = {}
|
|
for stage, value in enumerate(zip(head_out_list, targets_list)):
|
|
(scores, deltas, rois), targets = value
|
|
loss_stage = self.get_loss(scores, deltas, targets, rois,
|
|
self.bbox_weight[stage])
|
|
for k, v in loss_stage.items():
|
|
loss[k + "_stage{}".format(
|
|
stage)] = v / self.num_cascade_stages
|
|
|
|
return loss, bbox_feat
|
|
else:
|
|
scores, deltas, self.refined_rois = self.get_prediction(
|
|
head_out_list)
|
|
return (deltas, scores), self.head
|
|
|
|
def _get_rois_from_boxes(self, boxes, im_shape):
|
|
rois = []
|
|
for i, boxes_per_image in enumerate(boxes):
|
|
clip_box = clip_bbox(boxes_per_image, im_shape[i])
|
|
if self.training:
|
|
keep = nonempty_bbox(clip_box)
|
|
if keep.shape[0] == 0:
|
|
keep = paddle.zeros([1], dtype='int32')
|
|
clip_box = paddle.gather(clip_box, keep)
|
|
rois.append(clip_box)
|
|
rois_num = paddle.concat([paddle.shape(r)[0] for r in rois])
|
|
return rois, rois_num
|
|
|
|
def _get_pred_bbox(self, deltas, proposals, weights):
|
|
pred_proposals = paddle.concat(proposals) if len(
|
|
proposals) > 1 else proposals[0]
|
|
pred_bbox = delta2bbox(deltas, pred_proposals, weights)
|
|
pred_bbox = paddle.reshape(pred_bbox, [-1, deltas.shape[-1]])
|
|
num_prop = [p.shape[0] for p in proposals]
|
|
return pred_bbox.split(num_prop)
|
|
|
|
def get_prediction(self, head_out_list):
|
|
"""
|
|
head_out_list(List[Tensor]): scores, deltas, rois
|
|
"""
|
|
pred_list = []
|
|
scores_list = [F.softmax(head[0]) for head in head_out_list]
|
|
scores = paddle.add_n(scores_list) / self.num_cascade_stages
|
|
# Get deltas and rois from the last stage
|
|
_, deltas, rois = head_out_list[-1]
|
|
return scores, deltas, rois
|
|
|
|
def get_refined_rois(self, ):
|
|
return self.refined_rois
|