CondLaneNet: a Top-to-down Lane Detection Framework Based on Conditional Convolution
Paper:https://arxiv.org/pdf/2105.05003.pdf
code:GitHub – aliyun/conditional-lane-detection
论文解读:
一、摘要
- backbone采用的是普通的CNN,比如ResNet;
- neck采用的是TransformerFPN,实际上就是考虑到车道线比较长,需要全局注意力,因此就在基础FPN构造金字塔之前对backbone输出的feature进行了Transformer的self-attention操作
- head分为两部分:
- Proposal head用于检测车道线实例,并为每个实例生成动态的卷积核参数;
- Conditional shape head利用Proposal head步骤生成的动态卷积核参数和conditional卷积确定车道线的point set。然后根据这些point set进行连线得到最后的车道线结果。
代码解析:
代码基于mmdetection框架(v2.0.0)开发。在config/condlanenet/里可以看到有三个文件夹,分别对应作者在三个数据集CurveLanes、CULane、TuSimple上的配置。它们之间最大的区别在于针对CurveLanes设计了RIM。下面我重点分析一下它们共同的一些模块:
backbone
采用的是resnet,根据模型的大小可能选择resnet18到resnet101不等
neck
这里采用的是TransConvFPN,在mmdet/models/necks/trans_fpn.py
跟FPN不同点主要在于多了个transformer操作。动机是觉得车道线比较细长,需要有self-attention这样non-local的结构。
也就是在resnet和FPN的中间多了一个transformer模块。
## TransConvFPN 不重要的代码部分已省略 def forward(self, src): assert len(src) >= len(self.in_channels) src = list(src) if self.attention: trans_feat = self.trans_head(src[self.trans_idx]) else: trans_feat = src[self.trans_idx] inputs = src[:-1] inputs.append(trans_feat) if len(inputs) > len(self.in_channels): for _ in range(len(inputs) - len(self.in_channels)): del inputs[0] ## 下面内容跟FPN一致 # build laterals laterals = [ lateral_conv(inputs[i + self.start_level]) for i, lateral_conv in enumerate(self.lateral_convs) ] ## 省略
## 在TransConvFPN的__init__里if self.attention: self.trans_head = TransConvEncoderModule(**trans_cfg)class TransConvEncoderModule(nn.Module): def __init__(self, in_dim, attn_in_dims, attn_out_dims, strides, ratios, downscale=True, pos_shape=None): super(TransConvEncoderModule, self).__init__() if downscale: stride = 2 else: stride = 1 # self.first_conv = ConvModule(in_dim, 2*in_dim, kernel_size=3, stride=stride, padding=1) # self.final_conv = ConvModule(attn_out_dims[-1], attn_out_dims[-1], kernel_size=3, stride=1, padding=1) attn_layers = [] for dim1, dim2, stride, ratio in zip(attn_in_dims, attn_out_dims, strides, ratios): attn_layers.append(AttentionLayer(dim1, dim2, ratio, stride)) if pos_shape is not None: self.attn_layers = nn.ModuleList(attn_layers) else: self.attn_layers = nn.Sequential(*attn_layers) self.pos_shape = pos_shape self.pos_embeds = [] if pos_shape is not None: for dim in attn_out_dims: pos_embed = build_position_encoding(dim, pos_shape).cuda() self.pos_embeds.append(pos_embed) def forward(self, src): # src = self.first_conv(src) if self.pos_shape is None: src = self.attn_layers(src) else: for layer, pos in zip(self.attn_layers, self.pos_embeds): src = layer(src, pos.to(src.device)) # src = self.final_conv(src) return srcclass AttentionLayer(nn.Module): """ Position attention module""" def __init__(self, in_dim, out_dim, ratio=4, stride=1): super(AttentionLayer, self).__init__() self.chanel_in = in_dim norm_cfg = dict(type='BN', requires_grad=True) act_cfg = dict(type='ReLU') self.pre_conv = ConvModule( in_dim, out_dim, kernel_size=3, stride=stride, padding=1, norm_cfg=norm_cfg, act_cfg=act_cfg, inplace=False) self.query_conv = nn.Conv2d( in_channels=out_dim, out_channels=out_dim // ratio, kernel_size=1) self.key_conv = nn.Conv2d( in_channels=out_dim, out_channels=out_dim // ratio, kernel_size=1) self.value_conv = nn.Conv2d( in_channels=out_dim, out_channels=out_dim, kernel_size=1) self.final_conv = ConvModule( out_dim, out_dim, kernel_size=3, padding=1, norm_cfg=norm_cfg, act_cfg=act_cfg) self.softmax = nn.Softmax(dim=-1) self.gamma = nn.Parameter(torch.zeros(1)) def forward(self, x, pos=None): """ inputs : x : inpput feature maps( B X C X H X W) returns : out : attention value + input feature attention: B X (HxW) X (HxW) """ x = self.pre_conv(x) m_batchsize, _, height, width = x.size() if pos is not None: x += pos proj_query = self.query_conv(x).view(m_batchsize, -1, width * height).permute(0, 2, 1) proj_key = self.key_conv(x).view(m_batchsize, -1, width * height) energy = torch.bmm(proj_query, proj_key) attention = self.softmax(energy) attention = attention.permute(0, 2, 1) proj_value = self.value_conv(x).view(m_batchsize, -1, width * height) out = torch.bmm(proj_value, attention) out = out.view(m_batchsize, -1, height, width) proj_value = proj_value.view(m_batchsize, -1, height, width) out_feat = self.gamma * out + x out_feat = self.final_conv(out_feat) return out_feat
head
用的是CondLaneHead,在mmdet/models/dense_heads/condlanenet_head.py
需要重点分析,跟一般的检测任务差别很大:
首先这个CondLaneHead类的forward方法是直接调用了forward_test,因此要从model去看到neck输出后具体调用的是head的什么函数
# mmdet/models/detectors/condlanenet.py def forward(self, img, img_metas=None, return_loss=True, **kwargs): ... if img_metas is None: return self.test_inference(img) elif return_loss: return self.forward_train(img, img_metas, **kwargs) else: return self.forward_test(img, img_metas, **kwargs) def forward_train(self, img, img_metas, **kwargs): ... if self.head: outputs = self.bbox_head.forward_train(output, poses, num_ins) ... def forward_test(self, img, img_metas, benchmark=False, hack_seeds=None, **kwargs): ... if self.head: seeds, hm = self.bbox_head.forward_test(output, hack_seeds, kwargs['thr']) ...
所以实际上head的forward是没用到的,直接去看head的forward_train和forward_test就行
forward_train
# mmdet/models/dense_heads/condlanenet_head.py def forward_train(self, inputs, pos, num_ins): # x_list是backbone+neck输出后的multi level feature map x_list = list(inputs) # 这里根据hm_idx参数来取某个level 的feature map,用它去生成heat_map # mask同理 f_hm = x_list[self.hm_idx] f_mask = x_list[self.mask_idx] m_batchsize = f_hm.size()[0] # f_mask z = self.ctnet_head(f_hm) hm, params = z['hm'], z['params'] h_hm, w_hm = hm.size()[2:] h_mask, w_mask = f_mask.size()[2:] params = params.view(m_batchsize, self.num_classes, -1, h_hm, w_hm) mask_branch = self.mask_branch(f_mask) reg_branch = mask_branch # reg_branch = self.reg_branch(f_mask) params = params.permute(0, 1, 3, 4, 2).contiguous().view(-1, self.num_gen_params) pos_tensor = torch.from_numpy(np.array(pos)).long().to( params.device).unsqueeze(1) pos_tensor = pos_tensor.expand(-1, self.num_gen_params) mask_pos_tensor = pos_tensor[:, :self.num_mask_params] reg_pos_tensor = pos_tensor[:, self.num_mask_params:] if pos_tensor.size()[0] == 0: masks = None feat_range = None else: mask_params = params[:, :self.num_mask_params].gather( 0, mask_pos_tensor) masks = self.mask_head(mask_branch, mask_params, num_ins) if self.regression: reg_params = params[:, self.num_mask_params:].gather( 0, reg_pos_tensor) regs = self.reg_head(reg_branch, reg_params, num_ins) else: regs = masks # regs = regs.view(sum(num_ins), 1, h_mask, w_mask) feat_range = masks.permute(0, 1, 3, 2).view(sum(num_ins), w_mask, h_mask) feat_range = self.mlp(feat_range) return hm, regs, masks, feat_range, [mask_branch, reg_branch]
forward_test
# mmdet/models/dense_heads/condlanenet_head.py def forward_test( self, inputs, hack_seeds=None, hm_thr=0.3, ): def parse_pos(seeds, batchsize, num_classes, h, w, device): pos_list = [[p['coord'], p['id_class'] - 1] for p in seeds] poses = [] for p in pos_list: [c, r], label = p pos = label * h * w + r * w + c poses.append(pos) poses = torch.from_numpy(np.array( poses, np.long)).long().to(device).unsqueeze(1) return poses # with Timer("Elapsed time in stage1: %f"): # ignore x_list = list(inputs) f_hm = x_list[self.hm_idx] f_mask = x_list[self.mask_idx] m_batchsize = f_hm.size()[0] f_deep = f_mask m_batchsize = f_deep.size()[0] # with Timer("Elapsed time in ctnet_head: %f"): # 0.3ms z = self.ctnet_head(f_hm) h_hm, w_hm = f_hm.size()[2:] h_mask, w_mask = f_mask.size()[2:] hm, params = z['hm'], z['params'] hm = torch.clamp(hm.sigmoid(), min=1e-4, max=1 - 1e-4) params = params.view(m_batchsize, self.num_classes, -1, h_hm, w_hm) # with Timer("Elapsed time in two branch: %f"): # 0.6ms mask_branch = self.mask_branch(f_mask) reg_branch = mask_branch # reg_branch = self.reg_branch(f_mask) params = params.permute(0, 1, 3, 4, 2).contiguous().view(-1, self.num_gen_params) batch_size, num_classes, h, w = hm.size() # with Timer("Elapsed time in ct decode: %f"): # 0.2ms seeds = self.ctdet_decode(hm, thr=hm_thr) if hack_seeds is not None: seeds = hack_seeds # with Timer("Elapsed time in stage2: %f"): # 0.08ms pos_tensor = parse_pos(seeds, batch_size, num_classes, h, w, hm.device) pos_tensor = pos_tensor.expand(-1, self.num_gen_params) num_ins = [pos_tensor.size()[0]] mask_pos_tensor = pos_tensor[:, :self.num_mask_params] if self.regression: reg_pos_tensor = pos_tensor[:, self.num_mask_params:] # with Timer("Elapsed time in stage3: %f"): # 0.8ms if pos_tensor.size()[0] == 0: return [], hm else: mask_params = params[:, :self.num_mask_params].gather( 0, mask_pos_tensor) # with Timer("Elapsed time in mask_head: %f"): #0.3ms masks = self.mask_head(mask_branch, mask_params, num_ins) if self.regression: reg_params = params[:, self.num_mask_params:].gather( 0, reg_pos_tensor) # with Timer("Elapsed time in reg_head: %f"): # 0.25ms regs = self.reg_head(reg_branch, reg_params, num_ins) else: regs = masks feat_range = masks.permute(0, 1, 3, 2).view(sum(num_ins), w_mask, h_mask) feat_range = self.mlp(feat_range) for i in range(len(seeds)): seeds[i]['reg'] = regs[0, i:i + 1, :, :] m = masks[0, i:i + 1, :, :] seeds[i]['mask'] = m seeds[i]['range'] = feat_range[i:i + 1] return seeds, hm
可以发现,这部分的操作跟论文中描述的差不多。
(等我具体有时间再慢慢弄来看,最近很忙)