Autoformer算法与代码分析


原文链接:https://arxiv.org/pdf/2106.13008.pdf
代码链接:https://github.com/thuml/Autoformer

简介

Autoformer本质上是基于Transformer结构的时序预测模型,因此如果有Transformer基础,对于理解Autoformer以及所有基于Transformer的时序预测模型都比较有帮助。

所有基于Transfomer结构的时序预测模型,本质上都是对于其中最重要的模块:Self-attention进行创新。例如LogsTransformer,reformer,informer,fedformer无一例外都是对于Self-attention模块进行创新。其中LogsTransformer,reformer,informer都是对于Self-attention的计算复杂度方面进行创新,并且提出各自的modified-self-attention。而到了Autoformer首先对于Self-attention的计算方式进行创新。

模型结构

Autoformer相对于普通的Transformer结构无非就是加了时序拆解模块,修改了Self-attention模块。其他的例如Feed Forward模块并没有改动,那么接下来就从这两个新的模块进行详细拆解:

Series Decmop

时序拆解器本质上来自于传统的时序预测算法,例如Arima,Fbprophet。Arima等传统算法本质上思考的就是从统计的角度对于时间序列进行拆解,并赋予拆解的子项以不同的物理意义例如:趋势项,季节项,残差项等。

因此传统的时序拆解的普遍形式是:

X ( t ) = T ( t ) + S ( t ) + R ( t ) X(t) = T(t)+S(t)+R(t) X(t)=T(t)+S(t)+R(t)
其中 X ( t ) X(t) X(t)是待拆解的时间序列, T ( t ) , S ( t ) , R ( t ) T(t),S(t),R(t) T(t),S(t),R(t)是对应拆解后的趋势项,季节项,残差项。每一种算法对于其中每一拆解项的计算方式都千差万别,因为这是它们的主要创新点。以及每个子项的组合方式也各不相同,上述介绍的是最普遍的加法模型,实际上还有乘法模型以及混合模型。

从这个角度看,Autoformer的时序拆解模块其实是最简单的一种:

第一行的式子解释了趋势项 X t X_t Xt的具体算法,第二行的式子解释了Autoformer采用的是最简单的加法模型,并且仅仅拆解两个子项:趋势项和季节项。趋势项的具体算法是:使用一个均值滤波器处理时序,也可以看做使用了一个等长的卷积(same convolution)只不过卷积核的参数是确定的。

时序拆解部分的算法比较简单,没有什么创新点。作者的思路可能来源于传统算法对于时序拆解的使用,试图移植到Transformer结构中。但实际上该部分还有很多可以深挖的部分,例如使用更复杂的混合模型,以及拆解出多个子项。实际上后来的Fedformer也在这个部分作出了相应的优化和改进。

Auto-correlation

和大部分的基于Transformer的时序预测模型一样,Autoformer也是将Self-attention的改进作为主要的创新点。和reformer,informer等在它之前的模型不同的是,Autoformer的注意力不主要在于对于Self-attention计算量的减小上,它更加关注将Self-attention移植到时序计算上,设计一个更贴合时序分析的attention结构。

而时序数据的分析无外乎为:找周期特性,频域分析等,而Auto-correlation关注的就是寻找时序数据的周期特性,它希望模型能够更好的记住时序的周期特性,从而预测时序的未来值。

Attention注意力的计算核心在于:相似性的度量。即通过向量内积来衡量向量之间的相似性。而Autoformer度量时序周期性的方式是,对于时间序列进行平移,度量平移前后的时间序列的相似性,度量的具体方式类似于向量的内积。基于这种方法,Autoformer认为,相似性高的平移序列对应的平移量就是潜在的周期。

当我们每次的平移量为 τ \tau τ时,上述的相似性计算可以表示为:

其中 X t X_t Xt为需要寻找周期性的时间序列, X t − τ X_{t-\tau } Xtτ表示原时间序列平移 τ \tau τ后的序列。也就是说如果 X t − τ X_{t-\tau} Xtτ X t X_t Xt的相似性比较高,那么我们可以将 τ \tau τ看做 X t X_t Xt的潜在周期。

如果我们迭代计算 X t X_t Xt和所有平移变量之间的相似性,且每次的平移增量为1,那么迭代计算可以用频域计算表示为:

因此在实际的Auto-correlation中,输入时间序列 X t X_t Xt经过 Q , K , V Q,K,V Q,K,V的映射后,先变换到频域,进而在频域中计算平移相似性。

代码讲解

Autoformer的主要创新点以及独特的模块已经在原理层面介绍完毕,接下来笔者将会在实际的代码层面进行细致的讲解。具体的代码细节将会在注释中注明,而需要注意的点将会着重说明

模块初始化

class Autoformer(nn.Module):    """    Autoformer is the first method to achieve the series-wise connection,    with inherent O(LlogL) complexity    """    def __init__(self, configs):        super(Autoformer, self).__init__()        #利用历史时间序列的时间戳长度,编码器输入的时间维度        self.seq_len = configs.seq_len        #解码器输入的历史时间序列的时间戳长度。        self.label_len = configs.label_len        self.pred_len = configs.pred_len        self.output_attention = False        # Decomp,传入参数均值滤波器的核大小        kernel_size = configs.moving_avg        self.decomp = series_decomp(kernel_size)        # Embedding        # embedding操作,由于时间序列天然在时序上具有先后关系,因此这里embedding的作用更多的是为了调整维度        self.enc_embedding = DataEmbedding_wo_pos(configs.d_feature, configs.d_model, configs.embed, configs.freq,                                                  configs.dropout)        self.dec_embedding = DataEmbedding_wo_pos(configs.d_feature, configs.d_model, configs.embed, configs.freq,                                                  configs.dropout)        # Encoder,采用的是多编码层堆叠        self.encoder = Encoder(            [                EncoderLayer(                    AutoCorrelationLayer(                        #这里的第一个False表明是否使用mask机制。                        AutoCorrelation(False, configs.factor, attention_dropout=configs.dropout,                                        output_attention=False),                        configs.d_model, configs.n_heads),                    #编码过程中的特征维度设置                    configs.d_model,                    configs.d_ff,                    moving_avg=configs.moving_avg,                    dropout=configs.dropout,                    #激活函数                    activation=configs.activation                ) for l in range(configs.e_layers)            ],            #时间序列通常采用Layernorm而不适用BN层            norm_layer=my_Layernorm(configs.d_model)        )        # Decoder也是才是用多解码器堆叠        self.decoder = Decoder(            [                DecoderLayer(                    #如同传统的Transformer结构,decoder的第一个attention需要mask,保证当前的位置的预测不能看到之前的内容                    #这个做法是来源于NLP中的作法,但是换成时序预测,实际上应该是不需要使用mask机制的。                    #而在后续的代码中可以看出,这里的attention模块实际上都没有使用mask机制。                    #self-attention,输入全部来自于decoder自身                    AutoCorrelationLayer(                        AutoCorrelation(True, configs.factor, attention_dropout=configs.dropout,                                        output_attention=False),                        configs.d_model, configs.n_heads),                    #cross-attention,输入一部分来自于decoder,另一部分来自于encoder的输出                    AutoCorrelationLayer(                        AutoCorrelation(False, configs.factor, attention_dropout=configs.dropout,                                        output_attention=False),                        configs.d_model, configs.n_heads),                    configs.d_model,                    #任务要求的输出特征维度                    configs.c_out,                    configs.d_ff,                    moving_avg=configs.moving_avg,                    dropout=configs.dropout,                    activation=configs.activation,                )                for l in range(configs.d_layers)            ],            norm_layer=my_Layernorm(configs.d_model),            projection=nn.Linear(configs.d_model, configs.c_out, bias=True)        )

各个模块的初始化没有什么特别的,参照Autoformer给出的结构图即可一一对应的找到各个模块的具体含义。值得一提的是,这里的初始化考虑了attention模块是否需要使用mask机制。mask-attention来源于NLP任务中的特殊要求,而在时序预测中,attention其实没有mask的必要。因此在后续的代码也可以看出,实际上所有的attention结构都没用使用mask机制。

Autoformer的前向传播

首先介绍Autoformer的原始输入:

    def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec,                enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None):        #x_enc表示编码器的输入        #x_mark_enc表示x_enc中各个时间戳的先后关系        #x_dec表示解码器的输入        #x_mark_dec表示x_dec中各个时间戳的先后关系

这里x_mark_enc,x_mark_dec都是为了后续输入的Embedding操作服务的。作者的想法可能来自于原始Transformer的输入Embedding操作。因为在原始Transformer使用的NLP领域,词向量的输入是需要通过Embedding操作来表征词之间的先后关系。但是在时间序列预测领域中,时间序列的时间戳之间天然的具有先后关系,因此笔者认为这里的Embedding操作仅仅起到调整输入的特征维度的作用,对于表明先后关系没有太多作用。

接下来介绍编码器和解码器的输入:

        # decomp init        # 因为需要使用生成式预测,所以需要用均值和0来占位,占住预测部分的位置。        mean = torch.mean(x_enc, dim=1).unsqueeze(1).repeat(1, self.pred_len, 1)        zeros = torch.zeros([x_dec.shape[0], self.pred_len, x_dec.shape[2]], device=x_enc.device)        seasonal_init, trend_init = self.decomp(x_enc)        # decoder input        trend_init = torch.cat([trend_init[:, -self.label_len:, :], mean], dim=1)        seasonal_init = torch.cat([seasonal_init[:, -self.label_len:, :], zeros], dim=1)

因为Autoformer沿用了Informer的生成式预测的方式,所以直接将空有预测部分的输入送到模型中。所以需要初始化预测部分的输入,编码器的输入正常,解码器的输入由Trend和Season构成:

  • Trend部分:前半部分来自于时序拆解,后半部分即预测部分用均值占位;
  • Season部分:前半部分来自于时序拆解,后半部分即预测部分用零值占位;

接下来的代码都没有什么特别的,根据paper中的流程图就可以轻松理解:

def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec,            enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None):    # decomp init    # 因为需要使用生成式预测,所以需要用均值和0来占位,占住预测部分的位置。    mean = torch.mean(x_enc, dim=1).unsqueeze(1).repeat(1, self.pred_len, 1)    zeros = torch.zeros([x_dec.shape[0], self.pred_len, x_dec.shape[2]], device=x_enc.device)    seasonal_init, trend_init = self.decomp(x_enc)    # decoder input    trend_init = torch.cat([trend_init[:, -self.label_len:, :], mean], dim=1)    seasonal_init = torch.cat([seasonal_init[:, -self.label_len:, :], zeros], dim=1)    # enc    enc_out = self.enc_embedding(x_enc, x_mark_enc)    enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask)    # dec    dec_out = self.dec_embedding(seasonal_init, x_mark_dec)    seasonal_part, trend_part = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask,                                             trend=trend_init)    # final    dec_out = trend_part + seasonal_part    if self.output_attention:        return dec_out[:, -self.pred_len:, :], attns    else:        return dec_out[:, -self.pred_len:, :]

时序拆解器

时序拆解器的算法原理可以参照上述的讲解以及paper,它的代码如下所示:

class moving_avg(nn.Module):    """    Moving average block to highlight the trend of time series    """    def __init__(self, kernel_size, stride):        super(moving_avg, self).__init__()        self.kernel_size = kernel_size        self.avg = nn.AvgPool1d(kernel_size=kernel_size, stride=stride, padding=0)    def forward(self, x):        # padding on the both ends of time series        # 这里的判断语句主要是为了和Fedformer的部分进行区分,如果只考虑autoformer可以默认这里的判断全是True        if type(self.kernel_size) == list:            if len(self.kernel_size) == 1:                self.kernel_size = self.kernel_size[0]        front = x[:, 0:1, :].repeat(1, self.kernel_size - 1-math.floor((self.kernel_size - 1) // 2), 1)        end = x[:, -1:, :].repeat(1, math.floor((self.kernel_size - 1) // 2), 1)        x = torch.cat([front, x, end], dim=1)        x = self.avg(x.permute(0, 2, 1))        x = x.permute(0, 2, 1)        return xclass series_decomp(nn.Module):    """    Series decomposition block    """    def __init__(self, kernel_size):        super(series_decomp, self).__init__()        self.moving_avg = moving_avg(kernel_size, stride=1)    def forward(self, x):        moving_mean = self.moving_avg(x)        res = x - moving_mean        return res, moving_mean

时序拆解器就是一个简单的均值滤波器,值得注意的是这里的滤波全部沿用的是same conv 即不改变输入的时间维度大小。其中一个目的是为了后续的维度统一,笔者认为在传统的LSTM或GRU模块中,一维卷积还是valid conv会好一些因为如果传入LSTM的时间戳个数过多,会加剧梯度爆炸和梯度弥散等问题。

Autocorrelation

Autocorrelation是autoformer的最大创新点,算法细节如前所述,这里主要讲解实际的代码细节,一边参照流程图,一边看代码会更好理解:

实际的代码也基本是照着流程图走的,只是有些实现的小细节第一遍看来不太好懂,首先是qkv的处理,需要注意的是encoder和decoder中qkv的来源不太相同,encoder中的qkv全部来源一个输入,而decoder的qkv不知来源于decoder本身还来源于encoder的输出:

    #首先对于q,k进行FFT变换    q_fft = torch.fft.rfft(queries.permute(0, 2, 3, 1).contiguous(), dim=-1)  # size=[B, H, E, L]    k_fft = torch.fft.rfft(keys.permute(0, 2, 3, 1).contiguous(), dim=-1)    #对k进行求共轭,并与q点乘    res = q_fft * torch.conj(k_fft)    #傅里叶反变换计算得出的值就是每一个时移因子的相似性大小    corr = torch.fft.irfft(res, dim=-1) # size=[B, H, E, L]    V = self.time_delay_agg_training(values.permute(0, 2, 3, 1).contiguous(), corr).permute(0, 3, 1, 2)

计算出了每个时移因子的相似性大小,接下来就要基于此对于V进行处理:

def time_delay_agg_full(self, values, corr):    """    Standard version of Autocorrelation    """    batch = values.shape[0]    head = values.shape[1]    channel = values.shape[2]    length = values.shape[3]    # index init,索引值初始化,类似于初始化自变量x,由于是多维处理,因此需要在各个维度上进行repeat    init_index = torch.arange(length).unsqueeze(0).unsqueeze(0).unsqueeze(0).repeat(batch, head, channel, 1).cuda()    # find top k,选取最大的几个时移因子    top_k = int(self.factor * math.log(length))    #[0]返回的是权重值,[1]返回的是对应的索引d    weights = torch.topk(corr, top_k, dim=-1)[0]    delay = torch.topk(corr, top_k, dim=-1)[1]    # update corr    # 使用softmax对权重值进行归一化处理    tmp_corr = torch.softmax(weights, dim=-1)    # aggregation    # 这里用的操作比较巧,笔者一开始看的也不明所以,这里之所以最后一个维度repeat两次是因为流程图中Roll(V)的操作,也就是将前一部分的序列位置平移拼接到原序列的末尾,因此这里repeat两次是为了后续通过截取直接实现Roll(V)操作    tmp_values = values.repeat(1, 1, 1, 2)    delays_agg = torch.zeros_like(values).float()    for i in range(top_k):        #init_index+delay就相当于自变量给一个平移增量即 x = x+d        tmp_delay = init_index + delay[..., i].unsqueeze(-1)        #这里就是Roll(V)操作,相当于根据索引x直接对于V进行截取        pattern = torch.gather(tmp_values, dim=-1, index=tmp_delay)        delays_agg = delays_agg + pattern * (tmp_corr[..., i].unsqueeze(-1))    return delays_agg

Encoder

编码器和原始的Transformer结构没差,除了将self-attention改为auto-correlation外,采用的多个编码层堆叠的结构。我们直接上代码:

class EncoderLayer(nn.Module):    """    Autoformer encoder layer with the progressive decomposition architecture    """    def __init__(self, attention, d_model, d_ff=None, moving_avg=25, dropout=0.1, activation="relu"):        super(EncoderLayer, self).__init__()        d_ff = d_ff or 4 * d_model        self.attention = attention        #这里的卷积是为了替代feed forward模块        self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1, bias=False)        self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1, bias=False)        self.decomp1 = series_decomp(moving_avg)        self.decomp2 = series_decomp(moving_avg)        self.dropout = nn.Dropout(dropout)        self.activation = F.relu if activation == "relu" else F.gelu    def forward(self, x, attn_mask=None):#Autocorrelation        new_x, attn = self.attention(            x, x, x,            attn_mask=attn_mask        )        x = x + self.dropout(new_x)        x, _ = self.decomp1(x)        y = x        y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))        y = self.dropout(self.conv2(y).transpose(-1, 1))        res, _ = self.decomp2(x + y)        return res, attn

只要理解了Autocorrelation,autoformer的代码就没有什么复杂的地方,完全是按照正常的Transformer模块进行的设计。对着autoformer的流程图就可以很好的理解。

Decoder

class Decoder(nn.Module):    """    Autoformer Decoder    """    def __init__(self, layers, norm_layer=None, projection=None):        super(Decoder, self).__init__()        self.layers = nn.ModuleList(layers)        self.norm = norm_layer        self.projection = projection    def forward(self, x, cross, x_mask=None, cross_mask=None, trend=None):        for layer in self.layers:            #每一次都将趋势项累加,季节项传递给更深的网络进行处理            x, residual_trend = layer(x, cross, x_mask=x_mask, cross_mask=cross_mask)            trend = trend + residual_trend        if self.norm is not None:            x = self.norm(x)        if self.projection is not None:            #从高维映射回输出维度            x = self.projection(x)        return x, trend

Decoder也是多个decoder层进行堆叠,而每一个decoder层都共享encoder的输出

class DecoderLayer(nn.Module):    """    Autoformer decoder layer     """    def __init__(self, self_attention, cross_attention, d_model, c_out, d_ff=None,                 moving_avg=25, dropout=0.1, activation="relu"):        super(DecoderLayer, self).__init__()        d_ff = d_ff or 4 * d_model        self.self_attention = self_attention        self.cross_attention = cross_attention        self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1, bias=False)        self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1, bias=False)        self.decomp1 = series_decomp(moving_avg)        self.decomp2 = series_decomp(moving_avg)        self.decomp3 = series_decomp(moving_avg)        self.dropout = nn.Dropout(dropout)        self.projection = nn.Conv1d(in_channels=d_model, out_channels=c_out, kernel_size=3, stride=1, padding=1,                                    padding_mode='circular', bias=False)        self.activation = F.relu if activation == "relu" else F.gelu    def forward(self, x, cross, x_mask=None, cross_mask=None):        #self-attention输入全部来自于decoder自身        x = x + self.dropout(self.self_attention(            x, x, x,            attn_mask=x_mask        )[0])        x, trend1 = self.decomp1(x)        #cross-attention:q来自于decoder,k,v则来自于encoder的输出        x = x + self.dropout(self.cross_attention(            x, cross, cross,            attn_mask=cross_mask        )[0])        x, trend2 = self.decomp2(x)        y = x        y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))        y = self.dropout(self.conv2(y).transpose(-1, 1))        x, trend3 = self.decomp3(x + y)        #趋势项累加,季节项通过模块进行处理        residual_trend = trend1 + trend2 + trend3        residual_trend = self.projection(residual_trend.permute(0, 2, 1)).transpose(1, 2)        return x, residual_trend

Decoder部分的代码也是比较清晰明了的,至此autoformer部分的代码已经介绍完毕,下一篇会介绍基于autoformer进行改进的Fedformer。