tensorflow常用模板

本文记录如何使用tensorflow实现基于transformer的轨迹分类。

在技术实现上包含以下关键细节需要注意:

  • 轨迹点如果超出模型序列长度则进行中间截断,如果不足则进行中间补0
  • 轨迹数据需要对单条数据进行归一化
  • 只使用transformer的encoder部分进行建模,不使用embedding或positional embedding机制,这种方式工程实现简单,但模型可能无法捕获序列前后的区域信息

代码实现

首先实现transformer encoder,内部结构为self-attention和feed forward网络。

def scaled_dot_product_attention(q, k, v, mask=None):
    matmul_qk = tf.matmul(q, k, transpose_b=True)  # (..., seq_len_q, seq_len_k)
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
    if mask is not None:
        scaled_attention_logits += (mask * -1e9)
    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)  # (..., seq_len_q, seq_len_k)
    output = tf.matmul(attention_weights, v)  # (..., seq_len_q, depth_v)
    return output, attention_weights


class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dropout):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        assert d_model % self.num_heads == 0
        self.depth = d_model // self.num_heads
        self.wq = tf.keras.layers.Dense(d_model)
        self.wk = tf.keras.layers.Dense(d_model)
        self.wv = tf.keras.layers.Dense(d_model)
        self.dropout = tf.keras.layers.Dropout(dropout)
        self.dense = tf.keras.layers.Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask=None):
        batch_size = tf.shape(q)[0]
        q = self.wq(q)  # (batch_size, seq_len, d_model)
        k = self.wk(k)  # (batch_size, seq_len, d_model)
        v = self.wv(v)  # (batch_size, seq_len, d_model)

        q = self.split_heads(q, batch_size)  # (batch_size, num_heads, seq_len_q, depth)
        k = self.split_heads(k, batch_size)  # (batch_size, num_heads, seq_len_k, depth)
        v = self.split_heads(v, batch_size)  # (batch_size, num_heads, seq_len_v, depth)
        # scaled_attention.shape == (batch_size, num_heads, seq_len_q, depth)
        # attention_weights.shape == (batch_size, num_heads, seq_len_q, seq_len_k)
        scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)

        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        # (batch_size, seq_len_q, num_heads, depth)
        concat_attention = tf.reshape(scaled_attention,  (batch_size, -1, self.d_model))
        # (batch_size, seq_len_q, d_model)
        concat_attention = self.dropout(concat_attention)
        output = self.dense(concat_attention)  # (batch_size, seq_len_q, d_model)
        return output, attention_weights


class GlobalSelfAttention(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super().__init__()
        self.mha = MultiHeadAttention(**kwargs)
        self.layernorm = tf.keras.layers.LayerNormalization()
        self.add = tf.keras.layers.Add()

    def call(self, x):
        attn_output, attention = self.mha(x, k=x, q=x, mask=None)
        x = self.add([x, attn_output])
        x = self.layernorm(x)
        return x


class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, *, d_model, num_heads, dff, dropout_rate=0.1):
        super().__init__()
        self.self_attention = GlobalSelfAttention(d_model=d_model, num_heads=num_heads, dropout=dropout_rate)
        self.ffn = FeedForward(d_model, dff)

    def call(self, x):
        x = self.self_attention(x)
        x = self.ffn(x)
        return x


class FeedForward(tf.keras.layers.Layer):
    def __init__(self, d_model, dff, dropout_rate=0.1):
        super().__init__()
        self.seq = tf.keras.Sequential([
            tf.keras.layers.Dense(dff, activation='relu'),
            tf.keras.layers.Dense(d_model),
            tf.keras.layers.Dropout(dropout_rate)
        ])
        self.add = tf.keras.layers.Add()
        self.layer_norm = tf.keras.layers.LayerNormalization()

    def call(self, x):
        x = self.add([x, self.seq(x)])
        x = self.layer_norm(x)
        return x

gps数据中包含“经度、纬度、时间戳”三元素,需要按照时间戳排序之后将一组gps数据合并为一条记录。原始数据从hive中取出。

sdf_gps.withColumn(
    'gps'
    ,F.concat(
        col('t'),lit(',')
        ,col('lat'),lit(','),col('lng'),lit(',')
        ,col('v'),lit(','),col('acc'),lit(','),col('provider')))\
    .groupBy('order_id','dt','label')\
    .agg(F.sort_array(F.collect_list(F.struct('t','gps'))).alias('collected'))\
    .withColumn('gps_sorted',F.col('collected.gps')).drop("collected")\
    .withColumn('gps_sorted',F.concat_ws('+',F.col('gps_sorted')))\
    .write.mode('overwrite').saveAsTable(target_table)

可以使用python生成器的形式定义主句转换,从而实现数据的模型输入。

def create_generator(raw_input, seq_len=512):

    def calc_dist(lat_a, lng_a, lat_b, lng_b):
        distance = 999999999
        pi = math.pi
        if lat_a == 0 or lng_a == 0 or lat_b == 0 or lng_b == 0:
            return distance
        ra = 6378138
        c1 = math.pow(math.sin((lat_a / float(1000000) * pi / 180 - lat_b / float(1000000) * pi / 180) / 2), 2)
        c2 = math.pow(math.sin((lng_a / float(1000000) * pi / 180 - lng_b / float(1000000) * pi / 180) / 2), 2)
        x1 = math.cos(lat_a / float(1000000) * pi / 180)
        x2 = math.cos(lat_b / float(1000000) * pi / 180)
        try:
            distance = round(ra * 2 * math.asin(math.sqrt(c1 + x1 * x2 * c2)))
        except:
            pass
        return int(distance)

    def process(df):
        gps_timeline, order_timeline = df[13], df[14]
        rider_fetch_lng, rider_fetch_lat = int(df[3]), int(df[4])
        rider_arrived_lng, rider_arrived_lat = int(df[5]), int(df[6])
        poi_lng, poi_lat = int(df[7]), int(df[8])
        usr_lng, usr_lat = int(df[9]), int(df[10])
        feat_names = ['t', 'lat', 'lng', ]
        
        elements = {fn: [] for fn in feat_names}
        provider_map = {'gps': 0.0, 'iOS': 1.0, 'network': 2.0, 'fail': 3.0}
        ts_idx, pre_t, pre_lat, pre_lng = 1, 0, 0, 0
        checkpoints = [-math.inf] + [float(v) for v in order_timeline.split(',')] + [math.inf]

        for idx, point in enumerate(gps_timeline.split('+')):
            p = point.split(',')
            if idx == 0:
                pre_t, pre_lat, pre_lng = float(p[0]), float(p[1]), float(p[2])

            elements['t'].append(float(p[0]))
            elements['lat'].append(float(p[1]))
            elements['lng'].append(float(p[2]))
            # some fancy transformation
            # ...

        ret = [elements[t] for t in feat_names]

        # 数据归一化
        a = np.vstack(ret)
        a_min = np.transpose(np.tile(np.min(a, axis=1), (a.shape[1], 1)))
        a_max = np.transpose(np.tile(np.max(a, axis=1), (a.shape[1], 1)))
        b = ((a - a_min + 0.01) / (a_max - a_min + 0.01))

        # 截断padding
        l = b[:, :int((min(seq_len, b.shape[1]) + 1) / 2)]
        r = b[:, b.shape[1] - int((min(seq_len, b.shape[1]) + 1) / 2) + 1:]
        b_concat = np.concatenate((l, np.zeros([b.shape[0], max(0, seq_len - l.shape[1] - r.shape[1])]), r), axis=1)
        return np.transpose(b_concat)

    def data_generator():
        for row in raw_input:
            yield process(row), int(row[12])

    return data_generator

模型训练主体。

class TraceEncoderNetwork(keras.Model):

    def __init__(self, num_layers, d_model, num_heads, dff, dropout_rate=0.1):
        super().__init__()
        self.num_layers = num_layers
        self.enc_layers = [
            EncoderLayer(d_model=d_model, num_heads=num_heads, dff=dff, dropout_rate=dropout_rate)
            for _ in range(num_layers)]
        self.flatten = layers.Flatten()
        self.logit = layers.Dense(1, activation='sigmoid')

    def call(self, x):
        for i in range(self.num_layers):
            x = self.enc_layers[i](x)
        x = self.flatten(x)
        x = self.logit(x)
        return x


batch_size = 16
seq_len = 512
feature_size = 14
num_layers = 3
num_heads = 2
dff = 256
dropout_rate = 0.01

df = pd.read_csv('trace.tsv',sep='\t')
gen = create_generator(df.to_numpy(), seq_len=seq_len)
ds = tf.data.Dataset.from_generator(
    gen, output_types=(tf.float32, tf.int32), output_shapes=((seq_len, feature_size), ()))

ds_train = ds.take(int(df.shape[0]*0.9))
ds_test = ds.skip(int(df.shape[0]*0.9))
ds_train = ds_train.repeat().shuffle(buffer_size=50).batch(batch_size)
ds_test = ds_test.repeat().shuffle(buffer_size=50).batch(batch_size)

inputs = layers.Input(shape=(seq_len, feature_size))
outputs = TraceEncoderNetwork(num_layers, feature_size, num_heads, dff, dropout_rate)(inputs)
model = Model(inputs=[inputs], outputs=[outputs])
model.summary()

model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    loss=keras.losses.BinaryCrossentropy(),
    metrics=[tf.keras.metrics.AUC()],
)

model.fit(ds_train, epochs=10, validation_data=ds_test, validation_steps=100, steps_per_epoch=1000)
model.save_weights(path_output + 'model')

参考代码

Verifying the implementation of Multihead Attention in Transformer