14  Autoencoder

Simple autoencoder of the iris dataset:

library(cito)

df = iris
df[,1:4] = scale(df[,1:4])

autoencoder = dnn(cbind(Sepal.Length, Sepal.Width, Petal.Length, Petal.Width) ~ Sepal.Length+Sepal.Width+Petal.Length+Petal.Width, hidden = c(10L, 5L, 2L, 5L, 10L), data = df, lr = 0.1, verbose = FALSE)

autoencoder$net
An `nn_module` containing 236 parameters.

── Modules ─────────────────────────────────────────────────────────────────────
• 0: <nn_linear> #50 parameters
• 1: <nn_selu> #0 parameters
• 2: <nn_linear> #55 parameters
• 3: <nn_selu> #0 parameters
• 4: <nn_linear> #12 parameters
• 5: <nn_selu> #0 parameters
• 6: <nn_linear> #15 parameters
• 7: <nn_selu> #0 parameters
• 8: <nn_linear> #60 parameters
• 9: <nn_selu> #0 parameters
• 10: <nn_linear> #44 parameters
predictions = 
as.matrix(df[,1:4]) %>% 
  torch_tensor() %>% 
  autoencoder$net$`0`() %>% 
  autoencoder$net$`1`() %>% 
  autoencoder$net$`2`() %>% 
  autoencoder$net$`3`() %>% 
  autoencoder$net$`4`() %>% 
  as.matrix()


plot(predictions, col = iris$Species)

14.1 Autoencoder - Convolutional Neural Networks for Flower

library(torch)
library(coro)

Flatten = 
  nn_module(
    forward = function(input) return(input$view(list(input$size(1L), -1)))
  )

UnFlatten = 
  nn_module(
    forward = function(input, size=1152) return( input$view(list(input$size(1L), size, 1L, 1L)))
  )

AE = nn_module(
  initialize = function(image_channels = 3L, h_dim=1152, z_dim=2L) {
    self$encoder = nn_sequential(
      nn_conv2d(image_channels, 16, kernel_size=4, stride=2),
      nn_relu(),
      nn_conv2d(16, 32, kernel_size=4, stride=2),
      nn_relu(),
      nn_conv2d(32, 64, kernel_size=4, stride=2),
      nn_relu(),
      nn_conv2d(64, 128, kernel_size=4, stride=2),
      nn_relu(),
      Flatten()
    )
    self$fc1 = nn_linear(h_dim, z_dim)
    self$fc2 = nn_linear(z_dim, h_dim)
    
    self$decoder = nn_sequential(
      UnFlatten(),
      nn_conv_transpose2d(h_dim, 128, kernel_size=6, stride=2),
      nn_relu(),
      nn_conv_transpose2d(128, 64, kernel_size=6, stride=2),
      nn_relu(),
      nn_conv_transpose2d(64, 32, kernel_size=7, stride=2),
      nn_relu(),
      nn_conv_transpose2d(32, image_channels, kernel_size=8, stride=2),
      nn_sigmoid(),
    )
  },
  
  
  encode = function( x) {
    h = self$encoder(x)
    return(self$fc1(h))
  },
  
  decode = function(z) {
    z = self$fc2(z)
    z = self$decoder(z)
    return(z)
  },
  
  forward = function(input) {
    results = self$encode(input)
    z = self$decode(results)
    return(z)
  }
)
ae = AE()

device = "cuda:0"

data = EcoData::dataset_flower()
train = data$train/255
labels = data$labels


ae$to(device = device)
ae$encoder$to(device = device)
ae$decoder$to(device = device)
ae$fc1$to(device = device)
ae$fc2$to(device = device)



train = aperm(train, perm = c(1, 4, 2, 3))
dataset = torch::tensor_dataset(torch_tensor(train))
dataLoader = torch::dataloader(dataset, batch_size = 50L, shuffle = TRUE, pin_memory = TRUE)

optimizer = optim_adam(ae$parameters, lr=0.01) 


for(e in 1:200) {
  batch_losses = NULL
  counter = 1
  coro::loop(for (b in dataLoader) {
    optimizer$zero_grad()
    batch = b[[1]]$to(device = device)
    pred = ae(batch)
    loss = nnf_binary_cross_entropy(pred, batch)
    loss$backward()
    optimizer$step()
    batch_losses[counter] <- loss$item()
    counter = counter + 1
  })
  cat("Epoch: ", e, " loss: ", mean(batch_losses), "\n")
}

14.2 Variational Autoencoder (VAE)

The difference between a variational and a normal autoencoder is that a variational autoencoder assumes a distribution for the latent variables (latent variables cannot be observed and are composed of other variables) and the parameters of this distribution are learned. Thus new objects can be generated by inserting valid (!) (with regard to the assumed distribution) “seeds” to the decoder. To achieve the property that more or less randomly chosen points in the low dimensional latent space are meaningful and yield suitable results after decoding, the latent space/training process must be regularized. In this process, the input to the VAE is encoded to a distribution in the latent space rather than a single point.

Helper functions:

data = EcoData::dataset_flower()
train = data$train/255
labels = data$labels
train = aperm(train, perm = c(1, 4, 2, 3))
library(torch)
library(coro)

Flatten = 
  nn_module(
    forward = function(input) return(input$view(list(input$size(1L), -1)))
  )

UnFlatten = 
  nn_module(
    forward = function(input, size=1152) return( input$view(list(input$size(1L), size, 1L, 1L)))
  )

VAE = nn_module(
  initialize = function(image_channels = 3L, h_dim=1152, z_dim=30L) {
    self$encoder = nn_sequential(
      nn_conv2d(image_channels, 16, kernel_size=4, stride=2),
      nn_relu(),
      nn_conv2d(16, 32, kernel_size=4, stride=2),
      nn_relu(),
      nn_conv2d(32, 64, kernel_size=4, stride=2),
      nn_relu(),
      nn_conv2d(64, 128, kernel_size=4, stride=2),
      nn_relu(),
      Flatten()
    )
    self$fc1 = nn_linear(h_dim, z_dim)
    self$fc2 = nn_linear(h_dim, z_dim)
    self$fc3 = nn_linear(z_dim, h_dim)
    
    self$decoder = nn_sequential(
      UnFlatten(),
      nn_conv_transpose2d(h_dim, 128, kernel_size=6, stride=2),
      nn_relu(),
      nn_conv_transpose2d(128, 64, kernel_size=6, stride=2),
      nn_relu(),
      nn_conv_transpose2d(64, 32, kernel_size=7, stride=2),
      nn_relu(),
      nn_conv_transpose2d(32, image_channels, kernel_size=8, stride=2),
      nn_sigmoid(),
    )
  },
  
  sample_from_normal = function(mu, logvar) {
    std = (logvar$mul(0.5)$exp_())$to(device = mu$device)
    esp = torch_randn(mu$size())$to(device = mu$device)
    z = mu + std * esp
    return(z)  
  },
  
  get_mu_and_sample = function(h) {
    mu = self$fc1(h)
    logvar = self$fc2(h)
    samples = self$sample_from_normal(mu, logvar)
    return(list(samples, mu, logvar))
  },
  
  encode = function( x) {
    h = self$encoder(x)
    results = self$get_mu_and_sample(h)
    return(results) #z, mu, logvar
  },
  
  decode = function(z) {
    z = self$fc3(z)
    z = self$decoder(z)
    return(z)
  },
  
  forward = function(input) {
    results = self$encode(input)
    z = results[[1]]
    mu = results[[2]]
    logvar = results[[3]]
    z = self$decode(z)
    return(list(z, mu, logvar))
  }
)
vae = VAE()
device = "cpu"
vae$to(device = device)
vae$encoder$to(device = device)
vae$decoder$to(device = device)
vae$fc1$to(device = device)
vae$fc2$to(device = device)
vae$fc3$to(device = device)

Try untrained VAE:

preditions = as_array(vae(torch_tensor(train[1:5,,,]))[[1]])
preditions = aperm(preditions, c(1, 3, 4, 2))
preditions[1,,,]  %>%
  keras3::image_to_array() %>% 
  as.raster() %>%
  plot()

Train VAE:

# Loss function:
loss_function = function(reconstructed, x, mu, logvar){
  loss_bce = nnf_binary_cross_entropy(reconstructed, x, reduction = "sum")
  # Kullback–Leibler divergence / Normal prior on our latent dimensions!
  KLD = -0.5 * torch_mean(1 + logvar - mu$pow(2) - logvar$exp())
  return(loss_bce+KLD)
}

dataset = torch::tensor_dataset(torch_tensor(train))
dataLoader = torch::dataloader(dataset, batch_size = 50L, shuffle = TRUE, pin_memory = TRUE)

optimizer = optim_adam(vae$parameters, lr=0.001) 

for(e in 1:200) {
  batch_losses = NULL
  counter = 1
  coro::loop(for (b in dataLoader) {
    optimizer$zero_grad()
    batch = b[[1]]$to(device = device)
    pred = vae(batch)
    loss = loss_function(pred[[1]], batch, pred[[2]], pred[[3]])
    loss$backward()
    optimizer$step()
    batch_losses[counter] <- loss$item()
    counter = counter + 1
  })
  cat("Epoch: ", e, " loss: ", mean(batch_losses), "\n")
}

Sample from our Decoder:

images = vae$decode(torch_randn(c(20L, 30L))$to( device = device)) $cpu()
images = as_array(images$cpu())
images = aperm(images, c(1, 3, 4, 2))
images[1,,,]  %>%
  as.raster() %>%
  plot()

Example images (from the above trained VAE):