Tacotron2 for Automatic Image Captioning

Models used here on Tacotron2 were trained on LJSpeech dataset.

Notice: The waveform generation is super slow since it implements naive autoregressive generation. It doesn't use parallel generation method described in Parallel WaveNet.

Estimated time to complete: 1 hours.


Setup Image captioning

% cd /content/
! git clone https://github.com/glh3025/ml-art-project3.git
% cd /content/ml-art-project3/data

import requests
import zipfile
import io

if not os.path.exists("/content/ml-art-project3/data/Flickr8k_Dataset"):
  url = 'https://github.com/jbrownlee/Datasets/releases/download/Flickr8k/Flickr8k_Dataset.zip'
  r = requests.get(url)
  z = zipfile.ZipFile(io.BytesIO(r.content))

if not os.path.exists("/content/ml-art-project3/data/Flickr8k_Dataset"):
  url = 'http://nlp.stanford.edu/data/glove.6B.zip'
  r = requests.get(url)
  z = zipfile.ZipFile(io.BytesIO(r.content))
import numpy as np
from numpy import array
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import string
import os
from PIL import Image
import glob
import pickle
# from pickle #import dump, load
from time import time
from keras.preprocessing import sequence
from keras.models import Sequential
from keras.layers import LSTM, Embedding, TimeDistributed, Dense, RepeatVector,\
                         Activation, Flatten, Reshape, concatenate, Dropout, BatchNormalization
from keras.optimizers import Adam, RMSprop
from keras.layers.wrappers import Bidirectional
from keras.layers.merge import add
from keras.applications.inception_v3 import InceptionV3
from keras.preprocessing import image
from keras.models import Model
from keras import Input, layers
from keras import optimizers
from keras.applications.inception_v3 import preprocess_input
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical

def load_doc(filename):
    file = open(filename, 'r')
    text = file.read()
    return text

def load_set(filename):
    doc = load_doc(filename)
    dataset = list()
    for line in doc.split('\n'):
        if len(line) < 1:
        identifier = line.split('.')[0]
    return set(dataset)

filename = '/content/ml-art-project3/data/Flickr_8k.trainImages.txt'
train = load_set(filename)

def to_lines(descriptions):
    all_desc = list()
    for key in descriptions.keys():
        [all_desc.append(d) for d in descriptions[key]]
    return all_desc

def load_clean_descriptions(filename, dataset):
    doc = load_doc(filename)
    descriptions = dict()
    for line in doc.split('\n'):
        tokens = line.split()
        image_id, image_desc = tokens[0], tokens[1:]
        if image_id in dataset:
            if image_id not in descriptions:
                descriptions[image_id] = list()
            desc = 'startseq ' + ' '.join(image_desc) + ' endseq'
    return descriptions

train_descriptions = load_clean_descriptions('/content/ml-art-project3/data/descriptions.txt', train)

def max_length(descriptions):
    lines = to_lines(descriptions)
    return max(len(d.split()) for d in lines)

max_length = max_length(train_descriptions)

def preprocess(image_path):
    img = image.load_img(image_path, target_size=(299, 299))
    x = image.img_to_array(img)
    x = np.expand_dims(x, axis=0)
    x = preprocess_input(x)
    return x

model = InceptionV3(weights='imagenet')
model_new = Model(model.input, model.layers[-2].output)

def encode(image):
    image = preprocess(image) # preprocess the image
    fea_vec = model_new.predict(image) # Get the encoding vector for the image
    fea_vec = np.reshape(fea_vec, fea_vec.shape[1]) # reshape from (1, 2048) to (2048, )
    return fea_vec

all_train_captions = []
for key, val in train_descriptions.items():
    for cap in val:

word_count_threshold = 10
word_counts = {}
nsents = 0
for sent in all_train_captions:
    nsents += 1
    for w in sent.split(' '):
        word_counts[w] = word_counts.get(w, 0) + 1
vocab = [w for w in word_counts if word_counts[w] >= word_count_threshold]
print('preprocessed words %d -> %d' % (len(word_counts), len(vocab)))

ixtoword = {}
wordtoix = {}

ix = 1
for w in vocab:
    wordtoix[w] = ix
    ixtoword[ix] = w
    ix += 1
vocab_size = len(ixtoword) + 1     

def data_generator(descriptions, photos, wordtoix, max_length, num_photos_per_batch):
    X1, X2, y = list(), list(), list()
    # loop for ever over images
    while 1:
        for key, desc_list in descriptions.items():
            # retrieve the photo feature
            photo = photos[key+'.jpg']
            for desc in desc_list:
                # encode the sequence
                seq = [wordtoix[word] for word in desc.split(' ') if word in wordtoix]
                # split one sequence into multiple X, y pairs
                for i in range(1, len(seq)):
                    # split into input and output pair
                    in_seq, out_seq = seq[:i], seq[i]
                    # pad input sequence
                    in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
                    # encode output sequence
                    out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
                    # store
            # yield the batch data
            if n==num_photos_per_batch:
                yield [[array(X1), array(X2)], array(y)]
                X1, X2, y = list(), list(), list()
glove_dir = '/content/ml-art-project3/data/'
embeddings_index = {} # empty dictionary
f = open(os.path.join(glove_dir, 'glove.6B.200d.txt'), encoding="utf-8")

for line in f:
    values = line.split()
    word = values[0]
    coefs = np.asarray(values[1:], dtype='float32')
    embeddings_index[word] = coefs
print('Found %s word vectors.' % len(embeddings_index))

embedding_dim = 200

# Get 200-dim dense vector for each of the 10000 words in out vocabulary
embedding_matrix = np.zeros((vocab_size, embedding_dim))

for word, i in wordtoix.items():
    #if i < max_words:
    embedding_vector = embeddings_index.get(word)
    if embedding_vector is not None:
        # Words not found in the embedding index will be all zeros
        embedding_matrix[i] = embedding_vector
inputs1 = Input(shape=(2048,))
fe1 = Dropout(0.5)(inputs1)
fe2 = Dense(256, activation='relu')(fe1)
inputs2 = Input(shape=(max_length,))
se1 = Embedding(vocab_size, embedding_dim, mask_zero=True)(inputs2)
se2 = Dropout(0.5)(se1)
se3 = LSTM(256)(se2)
decoder1 = add([fe2, se3])
decoder2 = Dense(256, activation='relu')(decoder1)
outputs = Dense(vocab_size, activation='softmax')(decoder2)
model = Model(inputs=[inputs1, inputs2], outputs=outputs)

model.layers[2].trainable = False

model.compile(loss='categorical_crossentropy', optimizer='adam')

Install Tacotron-2 and WaveNet dependencies

import os
from os.path import exists, join, expanduser


wavenet_dir = "wavenet_vocoder"
if not exists(wavenet_dir):
  ! git clone https://github.com/r9y9/$wavenet_dir
taco2_dir = "Tacotron-2"
if not exists(taco2_dir):
  ! git clone https://github.com/r9y9/$taco2_dir
  ! cd $taco2_dir && git checkout -B wavenet3 origin/wavenet3
# Install dependencies
! pip install -q -U "numpy<1.16"
! pip install -q -U "pysptk<=0.1.14"

! pip install -q -U "tensorflow<=1.9.0"

os.chdir(join(expanduser("~"), taco2_dir))
! pip install -q -r requirements.txt

os.chdir(join(expanduser("~"), wavenet_dir))
! pip install -q -e '.[train]'
import torch
import tensorflow
import pysptk
import numpy as np
tensorflow.__version__, pysptk.__version__, np.__version__

Download pretrained models

# Tacotron2 (mel-spectrogram prediction part)
os.chdir(join(expanduser("~"), taco2_dir))
! mkdir -p logs-Tacotron
if not exists("logs-Tacotron/pretrained"):
  ! curl -O -L "https://www.dropbox.com/s/vx7y4qqs732sqgg/pretrained.tar.gz"
  ! tar xzvf pretrained.tar.gz
  ! mv pretrained logs-Tacotron
# WaveNet
os.chdir(join(expanduser("~"), wavenet_dir))
wn_preset = "20180510_mixture_lj_checkpoint_step000320000_ema.json"
wn_checkpoint_path = "20180510_mixture_lj_checkpoint_step000320000_ema.pth"

if not exists(wn_preset):
  !curl -O -L "https://www.dropbox.com/s/0vsd7973w20eskz/20180510_mixture_lj_checkpoint_step000320000_ema.json"
if not exists(wn_checkpoint_path):
  !curl -O -L "https://www.dropbox.com/s/zdbfprugbagfp2w/20180510_mixture_lj_checkpoint_step000320000_ema.pth"

Input texts to be synthesized

def Search(photo):
    in_text = 'startseq'
    for i in range(max_length):
        sequence = [wordtoix[w] for w in in_text.split() if w in wordtoix]
        sequence = pad_sequences([sequence], maxlen=max_length)
        yhat = model.predict([photo,sequence], verbose=0)
        yhat = np.argmax(yhat)
        word = ixtoword[yhat]
        in_text += ' ' + word
        if word == 'endseq':
    final = in_text.split()
    final = final[1:-1]
    final = ' '.join(final)
    return final
def comment(zlist, print_text=False):
  text_list = []
  for z in zlist:
    pic = list(encoding_test.keys())[z]
    image = encoding_test[pic].reshape((1,2048))
    images = '/content/ml-art-project3/data/Flicker8k_Dataset/'
    comment = Search(image)+'.'
    comment = comment[0].upper() + comment[1:]
  return text_list
with open("/content/ml-art-project3/data/encoded_test_images.pkl", "rb") as encoded_pickle:
    encoding_test = pickle.load(encoded_pickle)
text_list = comment([106,8,101])
text_list.append("Peter Piper picked a peck of pickled peppers. How many pickled peppers did Peter Piper pick?")
text_list.append("A skunk sat on a stump and thunk the stump stunk, but the stump thunk the skunk stunk")
with open('/root/Tacotron-2/text_list.txt', 'w') as f:
    for item in text_list:
        f.write("%s\n" % item)

Mel-spectrogram prediction by Tacoron2

os.chdir(join(expanduser("~"), taco2_dir))
# Remove old files if exist
! rm -rf tacotron_output
! python synthesize.py --model='Tacotron' --mode='eval' --text_list=./text_list.txt

Waveform synthesis by WaveNet

import librosa.display
import IPython
from IPython.display import Audio
import numpy as np
import torch
os.chdir(join(expanduser("~"), wavenet_dir))

# Setup WaveNet vocoder hparams
from hparams import hparams
with open(wn_preset) as f:

# Setup WaveNet vocoder
from train import build_model
from synthesis import wavegen
import torch

use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

model = build_model().to(device)

print("Load checkpoint from {}".format(wn_checkpoint_path))
checkpoint = torch.load(wn_checkpoint_path)
from glob import glob
from tqdm import tqdm

with open("../Tacotron-2/tacotron_output/eval/map.txt") as f:
  maps = f.readlines()
maps = list(map(lambda x:x[:-1].split("|"), maps))
# filter out invalid ones
maps = list(filter(lambda x:len(x) == 2, maps))

print("List of texts to be synthesized")
for idx, (text,_) in enumerate(maps):
  print(idx, text)

Waveform generation

Note: This will takes hours to finish depending on the number and lenght of texts. Try short sentences first if you would like to see samples quickly.

waveforms = []

for idx, (text, mel) in enumerate(maps):
  print("\n", idx, text)
  mel_path = join("../Tacotron-2", mel)
  c = np.load(mel_path)
  if c.shape[1] != hparams.num_mels:
    np.swapaxes(c, 0, 1)
  # Range [0, 4] was used for training Tacotron2 but WaveNet vocoder assumes [0, 1]
  c = np.interp(c, (0, 4), (0, 1))
  # Generate
  waveform = wavegen(model, c=c, fast=True, tqdm=tqdm)

  # Audio
  IPython.display.display(Audio(waveform, rate=hparams.sample_rate))

Summary: audio samples

for idx, (text, mel) in enumerate(maps):
  print(idx, text)
  IPython.display.display(Audio(waveforms[idx], rate=hparams.sample_rate))
