[FIXED] Keras LSTM – looping over variable sequence length

Issue

I want to manually loop over the varying sequence lengths of the input sequences but Tensorflow automatically makes the time axis to None after noticing varying sequence lengths. Is there any work around for this?

Sample example

import tensorflow as tf
import numpy as np

class MyExample(tf.keras.Model):

    def __init__(self, int_dim, **kwargs):
        super(MyExample, self).__init__(**kwargs)
        self.int_dim = int_dim
        self.lstm = tf.keras.layers.LSTMCell(self.int_dim)
        self.d2 = tf.keras.layers.Dense(self.int_dim)

    def call(self, inputs):
        states = (tf.zeros((1, self.int_dim)), 
                  tf.zeros((1, self.int_dim)))
        outputs = []
        for t in range(inputs.shape[1]):
            lstm_out, states = self.lstm(inputs[:, t, :], states)
            d2_out = self.d2(lstm_out)
            outputs.append(d2_out)
        output_stack = tf.stack(outputs, 1)
        return output_stack

def generator():
    while True:
      seq_len = np.random.randint(2, 10)
      X = tf.random.uniform((1, seq_len, 5))
      Y = tf.random.uniform((1, seq_len, 5))
      yield X, Y

model = MyExample(5)
model.compile('adam', 'BinaryCrossentropy')
model.fit(generator(), batch_size=1)

Solution

Here is a fix for Eager Execution mode:

import tensorflow as tf
import numpy as np

class MyExample(tf.keras.Model):

    def __init__(self, int_dim, **kwargs):
        super(MyExample, self).__init__(**kwargs)
        self.int_dim = int_dim
        self.lstm = tf.keras.layers.LSTMCell(self.int_dim)
        self.d2 = tf.keras.layers.Dense(self.int_dim)

    def call(self, inputs):
        states = (tf.zeros((tf.shape(inputs)[0], self.int_dim)), 
                  tf.zeros((tf.shape(inputs)[0], self.int_dim)))
        outputs = []
        for t in range(tf.shape(inputs)[1]):
            lstm_out, states = self.lstm(inputs[:, t, :], states)
            d2_out = self.d2(lstm_out)
            outputs.append(d2_out)
        output_stack = tf.stack(outputs, 1)
        return output_stack

def generator():
    while True:
      seq_len = np.random.randint(2, 10)
      X = tf.random.uniform((1, seq_len, 5))
      Y = tf.random.uniform((1, seq_len, 5))
      yield X, Y

model = MyExample(5)
model.compile('adam', 'BinaryCrossentropy', run_eagerly=True)
model.fit(generator(), batch_size=1)

A Graph mode solution could look like this:

import tensorflow as tf
import numpy as np

class MyExample(tf.keras.Model):

    def __init__(self, int_dim, **kwargs):
        super(MyExample, self).__init__(**kwargs)
        self.int_dim = int_dim
        self.lstm = tf.keras.layers.LSTMCell(self.int_dim)
        self.d2 = tf.keras.layers.Dense(self.int_dim)


    def some_logic(self, i, inputs, s1, s2, o):
      lstm_out, s = self.lstm(inputs[:, i, :], (s1, s2))
      d2_out = self.d2(lstm_out)
      o = o.write(o.size(), d2_out)
      s1, s2 = s
      return tf.add(i, 1), inputs, s1, s2, o


    def call(self, inputs):
        states = (tf.zeros((tf.shape(inputs)[0], self.int_dim)), 
                  tf.zeros((tf.shape(inputs)[0], self.int_dim)))
    
        s1, s2 = states
        outputs = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)

        i = tf.constant(0)
        while_condition = lambda i, inputs, s1, s2, outputs: tf.less(i, tf.shape(inputs)[1])
        _, _, _, _, result = tf.while_loop(while_condition, self.some_logic, loop_vars=(i, inputs, s1, s2, outputs))
        return result.stack()

def generator():
    while True:
      seq_len = np.random.randint(2, 10)
      X = tf.random.uniform((1, seq_len, 5))
      Y = tf.random.uniform((1, seq_len, 5))
      yield X, Y

model = MyExample(5)
model.compile('adam', 'BinaryCrossentropy')
model.fit(generator(), batch_size=1)

Answered By – AloneTogether

Answer Checked By – Mary Flores (Easybugfix Volunteer)

Leave a Reply

(*) Required, Your email will not be published