Autoformer算法与代码分析
原文链接:https://arxiv.org/pdf/2106.13008.pdf
代码链接:https://github.com/thuml/Autoformer
简介
Autoformer本质上是基于Transformer结构的时序预测模型,因此如果有Transformer基础,对于理解Autoformer以及所有基于Transformer的时序预测模型都比较有帮助。
所有基于Transfomer结构的时序预测模型,本质上都是对于其中最重要的模块:Self-attention进行创新。例如LogsTransformer,reformer,informer,fedformer无一例外都是对于Self-attention模块进行创新。其中LogsTransformer,reformer,informer都是对于Self-attention的计算复杂度方面进行创新,并且提出各自的modified-self-attention。而到了Autoformer首先对于Self-attention的计算方式进行创新。
模型结构
Autoformer相对于普通的Transformer结构无非就是加了时序拆解模块,修改了Self-attention模块。其他的例如Feed Forward模块并没有改动,那么接下来就从这两个新的模块进行详细拆解:
Series Decmop
时序拆解器本质上来自于传统的时序预测算法,例如Arima,Fbprophet。Arima等传统算法本质上思考的就是从统计的角度对于时间序列进行拆解,并赋予拆解的子项以不同的物理意义例如:趋势项,季节项,残差项等。
因此传统的时序拆解的普遍形式是:
X ( t ) = T ( t ) + S ( t ) + R ( t ) X(t) = T(t)+S(t)+R(t) X(t)=T(t)+S(t)+R(t)
其中 X ( t ) X(t) X(t)是待拆解的时间序列, T ( t ) , S ( t ) , R ( t ) T(t),S(t),R(t) T(t),S(t),R(t)是对应拆解后的趋势项,季节项,残差项。每一种算法对于其中每一拆解项的计算方式都千差万别,因为这是它们的主要创新点。以及每个子项的组合方式也各不相同,上述介绍的是最普遍的加法模型,实际上还有乘法模型以及混合模型。
从这个角度看,Autoformer的时序拆解模块其实是最简单的一种:
第一行的式子解释了趋势项 X t X_t Xt的具体算法,第二行的式子解释了Autoformer采用的是最简单的加法模型,并且仅仅拆解两个子项:趋势项和季节项。趋势项的具体算法是:使用一个均值滤波器处理时序,也可以看做使用了一个等长的卷积(same convolution)只不过卷积核的参数是确定的。
时序拆解部分的算法比较简单,没有什么创新点。作者的思路可能来源于传统算法对于时序拆解的使用,试图移植到Transformer结构中。但实际上该部分还有很多可以深挖的部分,例如使用更复杂的混合模型,以及拆解出多个子项。实际上后来的Fedformer也在这个部分作出了相应的优化和改进。
Auto-correlation
和大部分的基于Transformer的时序预测模型一样,Autoformer也是将Self-attention的改进作为主要的创新点。和reformer,informer等在它之前的模型不同的是,Autoformer的注意力不主要在于对于Self-attention计算量的减小上,它更加关注将Self-attention移植到时序计算上,设计一个更贴合时序分析的attention结构。
而时序数据的分析无外乎为:找周期特性,频域分析等,而Auto-correlation关注的就是寻找时序数据的周期特性,它希望模型能够更好的记住时序的周期特性,从而预测时序的未来值。
Attention注意力的计算核心在于:相似性的度量。即通过向量内积来衡量向量之间的相似性。而Autoformer度量时序周期性的方式是,对于时间序列进行平移,度量平移前后的时间序列的相似性,度量的具体方式类似于向量的内积。基于这种方法,Autoformer认为,相似性高的平移序列对应的平移量就是潜在的周期。
当我们每次的平移量为 τ \tau τ时,上述的相似性计算可以表示为:
其中 X t X_t Xt为需要寻找周期性的时间序列, X t − τ X_{t-\tau } Xt−τ表示原时间序列平移 τ \tau τ后的序列。也就是说如果 X t − τ X_{t-\tau} Xt−τ和 X t X_t Xt的相似性比较高,那么我们可以将 τ \tau τ看做 X t X_t Xt的潜在周期。
如果我们迭代计算 X t X_t Xt和所有平移变量之间的相似性,且每次的平移增量为1,那么迭代计算可以用频域计算表示为:
因此在实际的Auto-correlation中,输入时间序列 X t X_t Xt经过 Q , K , V Q,K,V Q,K,V的映射后,先变换到频域,进而在频域中计算平移相似性。
代码讲解
Autoformer的主要创新点以及独特的模块已经在原理层面介绍完毕,接下来笔者将会在实际的代码层面进行细致的讲解。具体的代码细节将会在注释中注明,而需要注意的点将会着重说明
模块初始化
class Autoformer(nn.Module): """ Autoformer is the first method to achieve the series-wise connection, with inherent O(LlogL) complexity """ def __init__(self, configs): super(Autoformer, self).__init__() #利用历史时间序列的时间戳长度,编码器输入的时间维度 self.seq_len = configs.seq_len #解码器输入的历史时间序列的时间戳长度。 self.label_len = configs.label_len self.pred_len = configs.pred_len self.output_attention = False # Decomp,传入参数均值滤波器的核大小 kernel_size = configs.moving_avg self.decomp = series_decomp(kernel_size) # Embedding # embedding操作,由于时间序列天然在时序上具有先后关系,因此这里embedding的作用更多的是为了调整维度 self.enc_embedding = DataEmbedding_wo_pos(configs.d_feature, configs.d_model, configs.embed, configs.freq, configs.dropout) self.dec_embedding = DataEmbedding_wo_pos(configs.d_feature, configs.d_model, configs.embed, configs.freq, configs.dropout) # Encoder,采用的是多编码层堆叠 self.encoder = Encoder( [ EncoderLayer( AutoCorrelationLayer( #这里的第一个False表明是否使用mask机制。 AutoCorrelation(False, configs.factor, attention_dropout=configs.dropout, output_attention=False), configs.d_model, configs.n_heads), #编码过程中的特征维度设置 configs.d_model, configs.d_ff, moving_avg=configs.moving_avg, dropout=configs.dropout, #激活函数 activation=configs.activation ) for l in range(configs.e_layers) ], #时间序列通常采用Layernorm而不适用BN层 norm_layer=my_Layernorm(configs.d_model) ) # Decoder也是才是用多解码器堆叠 self.decoder = Decoder( [ DecoderLayer( #如同传统的Transformer结构,decoder的第一个attention需要mask,保证当前的位置的预测不能看到之前的内容 #这个做法是来源于NLP中的作法,但是换成时序预测,实际上应该是不需要使用mask机制的。 #而在后续的代码中可以看出,这里的attention模块实际上都没有使用mask机制。 #self-attention,输入全部来自于decoder自身 AutoCorrelationLayer( AutoCorrelation(True, configs.factor, attention_dropout=configs.dropout, output_attention=False), configs.d_model, configs.n_heads), #cross-attention,输入一部分来自于decoder,另一部分来自于encoder的输出 AutoCorrelationLayer( AutoCorrelation(False, configs.factor, attention_dropout=configs.dropout, output_attention=False), configs.d_model, configs.n_heads), configs.d_model, #任务要求的输出特征维度 configs.c_out, configs.d_ff, moving_avg=configs.moving_avg, dropout=configs.dropout, activation=configs.activation, ) for l in range(configs.d_layers) ], norm_layer=my_Layernorm(configs.d_model), projection=nn.Linear(configs.d_model, configs.c_out, bias=True) )
各个模块的初始化没有什么特别的,参照Autoformer给出的结构图即可一一对应的找到各个模块的具体含义。值得一提的是,这里的初始化考虑了attention模块是否需要使用mask机制。mask-attention来源于NLP任务中的特殊要求,而在时序预测中,attention其实没有mask的必要。因此在后续的代码也可以看出,实际上所有的attention结构都没用使用mask机制。
Autoformer的前向传播
首先介绍Autoformer的原始输入:
def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec, enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None): #x_enc表示编码器的输入 #x_mark_enc表示x_enc中各个时间戳的先后关系 #x_dec表示解码器的输入 #x_mark_dec表示x_dec中各个时间戳的先后关系
这里x_mark_enc,x_mark_dec都是为了后续输入的Embedding操作服务的。作者的想法可能来自于原始Transformer的输入Embedding操作。因为在原始Transformer使用的NLP领域,词向量的输入是需要通过Embedding操作来表征词之间的先后关系。但是在时间序列预测领域中,时间序列的时间戳之间天然的具有先后关系,因此笔者认为这里的Embedding操作仅仅起到调整输入的特征维度的作用,对于表明先后关系没有太多作用。
接下来介绍编码器和解码器的输入:
# decomp init # 因为需要使用生成式预测,所以需要用均值和0来占位,占住预测部分的位置。 mean = torch.mean(x_enc, dim=1).unsqueeze(1).repeat(1, self.pred_len, 1) zeros = torch.zeros([x_dec.shape[0], self.pred_len, x_dec.shape[2]], device=x_enc.device) seasonal_init, trend_init = self.decomp(x_enc) # decoder input trend_init = torch.cat([trend_init[:, -self.label_len:, :], mean], dim=1) seasonal_init = torch.cat([seasonal_init[:, -self.label_len:, :], zeros], dim=1)
因为Autoformer沿用了Informer的生成式预测的方式,所以直接将空有预测部分的输入送到模型中。所以需要初始化预测部分的输入,编码器的输入正常,解码器的输入由Trend和Season构成:
- Trend部分:前半部分来自于时序拆解,后半部分即预测部分用均值占位;
- Season部分:前半部分来自于时序拆解,后半部分即预测部分用零值占位;
接下来的代码都没有什么特别的,根据paper中的流程图就可以轻松理解:
def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec, enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None): # decomp init # 因为需要使用生成式预测,所以需要用均值和0来占位,占住预测部分的位置。 mean = torch.mean(x_enc, dim=1).unsqueeze(1).repeat(1, self.pred_len, 1) zeros = torch.zeros([x_dec.shape[0], self.pred_len, x_dec.shape[2]], device=x_enc.device) seasonal_init, trend_init = self.decomp(x_enc) # decoder input trend_init = torch.cat([trend_init[:, -self.label_len:, :], mean], dim=1) seasonal_init = torch.cat([seasonal_init[:, -self.label_len:, :], zeros], dim=1) # enc enc_out = self.enc_embedding(x_enc, x_mark_enc) enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask) # dec dec_out = self.dec_embedding(seasonal_init, x_mark_dec) seasonal_part, trend_part = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask, trend=trend_init) # final dec_out = trend_part + seasonal_part if self.output_attention: return dec_out[:, -self.pred_len:, :], attns else: return dec_out[:, -self.pred_len:, :]
时序拆解器
时序拆解器的算法原理可以参照上述的讲解以及paper,它的代码如下所示:
class moving_avg(nn.Module): """ Moving average block to highlight the trend of time series """ def __init__(self, kernel_size, stride): super(moving_avg, self).__init__() self.kernel_size = kernel_size self.avg = nn.AvgPool1d(kernel_size=kernel_size, stride=stride, padding=0) def forward(self, x): # padding on the both ends of time series # 这里的判断语句主要是为了和Fedformer的部分进行区分,如果只考虑autoformer可以默认这里的判断全是True if type(self.kernel_size) == list: if len(self.kernel_size) == 1: self.kernel_size = self.kernel_size[0] front = x[:, 0:1, :].repeat(1, self.kernel_size - 1-math.floor((self.kernel_size - 1) // 2), 1) end = x[:, -1:, :].repeat(1, math.floor((self.kernel_size - 1) // 2), 1) x = torch.cat([front, x, end], dim=1) x = self.avg(x.permute(0, 2, 1)) x = x.permute(0, 2, 1) return xclass series_decomp(nn.Module): """ Series decomposition block """ def __init__(self, kernel_size): super(series_decomp, self).__init__() self.moving_avg = moving_avg(kernel_size, stride=1) def forward(self, x): moving_mean = self.moving_avg(x) res = x - moving_mean return res, moving_mean
时序拆解器就是一个简单的均值滤波器,值得注意的是这里的滤波全部沿用的是same conv 即不改变输入的时间维度大小。其中一个目的是为了后续的维度统一,笔者认为在传统的LSTM或GRU模块中,一维卷积还是valid conv会好一些因为如果传入LSTM的时间戳个数过多,会加剧梯度爆炸和梯度弥散等问题。
Autocorrelation
Autocorrelation是autoformer的最大创新点,算法细节如前所述,这里主要讲解实际的代码细节,一边参照流程图,一边看代码会更好理解:
实际的代码也基本是照着流程图走的,只是有些实现的小细节第一遍看来不太好懂,首先是qkv的处理,需要注意的是encoder和decoder中qkv的来源不太相同,encoder中的qkv全部来源一个输入,而decoder的qkv不知来源于decoder本身还来源于encoder的输出:
#首先对于q,k进行FFT变换 q_fft = torch.fft.rfft(queries.permute(0, 2, 3, 1).contiguous(), dim=-1) # size=[B, H, E, L] k_fft = torch.fft.rfft(keys.permute(0, 2, 3, 1).contiguous(), dim=-1) #对k进行求共轭,并与q点乘 res = q_fft * torch.conj(k_fft) #傅里叶反变换计算得出的值就是每一个时移因子的相似性大小 corr = torch.fft.irfft(res, dim=-1) # size=[B, H, E, L] V = self.time_delay_agg_training(values.permute(0, 2, 3, 1).contiguous(), corr).permute(0, 3, 1, 2)
计算出了每个时移因子的相似性大小,接下来就要基于此对于V进行处理:
def time_delay_agg_full(self, values, corr): """ Standard version of Autocorrelation """ batch = values.shape[0] head = values.shape[1] channel = values.shape[2] length = values.shape[3] # index init,索引值初始化,类似于初始化自变量x,由于是多维处理,因此需要在各个维度上进行repeat init_index = torch.arange(length).unsqueeze(0).unsqueeze(0).unsqueeze(0).repeat(batch, head, channel, 1).cuda() # find top k,选取最大的几个时移因子 top_k = int(self.factor * math.log(length)) #[0]返回的是权重值,[1]返回的是对应的索引d weights = torch.topk(corr, top_k, dim=-1)[0] delay = torch.topk(corr, top_k, dim=-1)[1] # update corr # 使用softmax对权重值进行归一化处理 tmp_corr = torch.softmax(weights, dim=-1) # aggregation # 这里用的操作比较巧,笔者一开始看的也不明所以,这里之所以最后一个维度repeat两次是因为流程图中Roll(V)的操作,也就是将前一部分的序列位置平移拼接到原序列的末尾,因此这里repeat两次是为了后续通过截取直接实现Roll(V)操作 tmp_values = values.repeat(1, 1, 1, 2) delays_agg = torch.zeros_like(values).float() for i in range(top_k): #init_index+delay就相当于自变量给一个平移增量即 x = x+d tmp_delay = init_index + delay[..., i].unsqueeze(-1) #这里就是Roll(V)操作,相当于根据索引x直接对于V进行截取 pattern = torch.gather(tmp_values, dim=-1, index=tmp_delay) delays_agg = delays_agg + pattern * (tmp_corr[..., i].unsqueeze(-1)) return delays_agg
Encoder
编码器和原始的Transformer结构没差,除了将self-attention改为auto-correlation外,采用的多个编码层堆叠的结构。我们直接上代码:
class EncoderLayer(nn.Module): """ Autoformer encoder layer with the progressive decomposition architecture """ def __init__(self, attention, d_model, d_ff=None, moving_avg=25, dropout=0.1, activation="relu"): super(EncoderLayer, self).__init__() d_ff = d_ff or 4 * d_model self.attention = attention #这里的卷积是为了替代feed forward模块 self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1, bias=False) self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1, bias=False) self.decomp1 = series_decomp(moving_avg) self.decomp2 = series_decomp(moving_avg) self.dropout = nn.Dropout(dropout) self.activation = F.relu if activation == "relu" else F.gelu def forward(self, x, attn_mask=None):#Autocorrelation new_x, attn = self.attention( x, x, x, attn_mask=attn_mask ) x = x + self.dropout(new_x) x, _ = self.decomp1(x) y = x y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1)))) y = self.dropout(self.conv2(y).transpose(-1, 1)) res, _ = self.decomp2(x + y) return res, attn
只要理解了Autocorrelation,autoformer的代码就没有什么复杂的地方,完全是按照正常的Transformer模块进行的设计。对着autoformer的流程图就可以很好的理解。
Decoder
class Decoder(nn.Module): """ Autoformer Decoder """ def __init__(self, layers, norm_layer=None, projection=None): super(Decoder, self).__init__() self.layers = nn.ModuleList(layers) self.norm = norm_layer self.projection = projection def forward(self, x, cross, x_mask=None, cross_mask=None, trend=None): for layer in self.layers: #每一次都将趋势项累加,季节项传递给更深的网络进行处理 x, residual_trend = layer(x, cross, x_mask=x_mask, cross_mask=cross_mask) trend = trend + residual_trend if self.norm is not None: x = self.norm(x) if self.projection is not None: #从高维映射回输出维度 x = self.projection(x) return x, trend
Decoder也是多个decoder层进行堆叠,而每一个decoder层都共享encoder的输出
class DecoderLayer(nn.Module): """ Autoformer decoder layer """ def __init__(self, self_attention, cross_attention, d_model, c_out, d_ff=None, moving_avg=25, dropout=0.1, activation="relu"): super(DecoderLayer, self).__init__() d_ff = d_ff or 4 * d_model self.self_attention = self_attention self.cross_attention = cross_attention self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1, bias=False) self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1, bias=False) self.decomp1 = series_decomp(moving_avg) self.decomp2 = series_decomp(moving_avg) self.decomp3 = series_decomp(moving_avg) self.dropout = nn.Dropout(dropout) self.projection = nn.Conv1d(in_channels=d_model, out_channels=c_out, kernel_size=3, stride=1, padding=1, padding_mode='circular', bias=False) self.activation = F.relu if activation == "relu" else F.gelu def forward(self, x, cross, x_mask=None, cross_mask=None): #self-attention输入全部来自于decoder自身 x = x + self.dropout(self.self_attention( x, x, x, attn_mask=x_mask )[0]) x, trend1 = self.decomp1(x) #cross-attention:q来自于decoder,k,v则来自于encoder的输出 x = x + self.dropout(self.cross_attention( x, cross, cross, attn_mask=cross_mask )[0]) x, trend2 = self.decomp2(x) y = x y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1)))) y = self.dropout(self.conv2(y).transpose(-1, 1)) x, trend3 = self.decomp3(x + y) #趋势项累加,季节项通过模块进行处理 residual_trend = trend1 + trend2 + trend3 residual_trend = self.projection(residual_trend.permute(0, 2, 1)).transpose(1, 2) return x, residual_trend
Decoder部分的代码也是比较清晰明了的,至此autoformer部分的代码已经介绍完毕,下一篇会介绍基于autoformer进行改进的Fedformer。