Basic GAN Modelling
Generative adversarial networks or Gans introduced in 2014 by J Good Fellow and co-authors became very popular in the entire machine learning. Generative adversarial networks, shortly called GAN's, is an approach to generative modeling using deep learning methods. Generative modeling is an unsupervised machine learning task that involves automatically discovering and learning the regularities or patterns in input data in such a way that the model can be used to generate new examples that possibly could have been drawn from the original dataset.
GAN contains a generator and a discriminator , the generator generates the fake data and the discriminator tries to differentiate the fake data from the original data. The discriminator loss is back propagated to the generator and the generator learns from this. They both play an adversarial game where the generator tries to fool the discriminator. The discriminator tries not to be fooled by the generator. They both work simultaneously to learn and train complex data.
GAN’s are used to generate synthetic tabular data. Among various types of Gans CTGAN and WGAN are most popular and are used in synthetic data generation.
In this article we are going to build a basic Gan model for generating tabular data.
Here we are using the dataset of different patients.
Data Preprocessing:
First download and unzip the publicly available synthea dataset.
!wget https://synthetichealth.github.io/synthea-sample-data/downloads/synthea_sample_data_csv_apr2020.zip
!unzip synthea_sample_data_csv_apr2020.zip
Remove unnecessary columns and separate categorical and continuous features.
file_name = "csv/patients.csv"
columns_to_drop = ['Id', 'BIRTHDATE', 'DEATHDATE', 'SSN', 'DRIVERS', 'PASSPORT', 'PREFIX', 'FIRST', 'ADDRESS', 'LAST', 'SUFFIX', 'MAIDEN','LAT', 'LON']
categorical_features = ['MARITAL', 'RACE', 'ETHNICITY', 'GENDER', 'BIRTHPLACE', 'CITY', 'STATE', 'COUNTY', 'ZIP']
continuous_features = ['HEALTHCARE_EXPENSES', 'HEALTHCARE_COVERAGE']
col1, col2 = 'num_of_doors', 'price'
Import pandas library and store the csv file in a dataframe.
import pandas as pd
df = pd.read_csv(file_name)
print(df.columns)
Drop unnecessary columns
df.drop(columns_to_drop, axis=1, inplace=True)
print(df.columns)
Import numpy library. And then find the minimum and maximum values HEALTHCARE_EXPENSES and HEALTHCARE_COVERAGE and create bins
import numpy as np
for column in continuous_features:
min=df[column].min()
max=df[column].max()
feature_bins=pd.cut(df[column],bins=np.linspace(min,max),labels=False)
df.drop([column],axis=1,inplace=True)
df=pd.concat([df,feature_bins],axis=1)
Print first 5 rows of dataframe
df.head()
Convert categorical features to Numeric
for column in categorical_features:
df[column] = df[column].astype('category').cat.codes
df.head()
Filling nan values
First let us find the number of unique values in a particular column
df.nunique()
Next count all the nan values in each column
df.isnull().sum()
Replace the nan values with mean value
df.fillna(df.mean, inplace=True)
After replacing the nan values check if still there are any nan values
df.isnull().sum()
Transform the data
df = df[['MARITAL', 'RACE', 'ETHNICITY', 'GENDER', 'BIRTHPLACE', 'CITY', 'STATE','COUNTY', 'ZIP']]
We are applying power transformer to get the gaussian distribution of the data
from sklearn.preprocessing import PowerTransformer
pw = PowerTransformer(method='yeo-johnson', standardize=True, copy=True)
df[df.columns] = pw.fit_transform(df[df.columns])
df.head()
df.info()
Training the model:
Generator is built using build_model which takes three parameters. After the input layer the hidden layers are dense layers ,the first hidden layer with ‘dim’ units and ReLU activation function. The second hidden layer with ‘dim*2’ units and uses the ReLU activation function. The third hidden layer with ‘dim*4’ units and uses ReLU activation function and the final layer uses data_dim units.
Discriminator is built using build_model which takes two parameters. After the input layer the hidden layers are dense layers and dropout layers. The first hidden layer is dense layer with ‘dim*4’ units and it uses the ReLU activation function. The second hidden layer is a dropout layer which sets fraction of inputs to 0 at each update during training with a dropout rate of 0.1,this helps to prevent overfitting. The third hidden layer is same as that of the first hidden layer but with ‘dim*2’ units and the fourth hidden layer is the same as that of the second hidden layer. The output layer is with a single unit and sigmoid activation function. It produces a probability score that the likelihood of the input being real(1) or fake(0).
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Dropout
from tensorflow.keras import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers.legacy import Adam
class Generator():
def init(self, batch_size):
self.batch_size=batch_size
def build_model(self, input_shape, dim, data_dim):
input= Input(shape=input_shape, batch_size=self.batch_size)
x = Dense(dim, activation='relu')(input)
x = Dense(dim * 2, activation='relu')(x)
x = Dense(dim * 4, activation='relu')(x)
x = Dense(data_dim)(x)
return Model(inputs=input, outputs=x)
class Discriminator():
def init(self,batch_size):
self.batch_size=batch_size
def build_model(self, input_shape, dim):
input = Input(shape=input_shape, batch_size=self.batch_size)
x = Dense(dim * 4, activation='relu')(input)
x = Dropout(0.1)(x)
x = Dense(dim * 2, activation='relu')(x)
x = Dropout(0.1)(x)
x = Dense(dim, activation='relu')(x)
x = Dense(1, activation='sigmoid')(x)
return Model(inputs=input, outputs=x)
class GAN():
def __init__(self, gan_args):
[self.batch_size, lr, self.noise_dim,
self.data_dim, layers_dim] = gan_args
self.generator = Generator(self.batch_size).\
build_model(input_shape=(self.noise_dim,), dim=layers_dim, data_dim=self.data_dim)
self.discriminator = Discriminator(self.batch_size).\
build_model(input_shape=(self.data_dim,), dim=layers_dim)
optimizer = Adam(lr, 0.5)
# Build and compile the discriminator
self.discriminator.compile(loss='binary_crossentropy',
optimizer=optimizer,
metrics=['accuracy'])
# The generator takes noise as input and generates imgs
z = Input(shape=(self.noise_dim,))
record = self.generator(z)
# For the combined model we will only train the generator
self.discriminator.trainable = False
# The discriminator takes generated images as input and determines validity
validity = self.discriminator(record)
# The combined model (stacked generator and discriminator)
# Trains the generator to fool the discriminator
self.combined = Model(z, validity)
self.combined.compile(loss='binary_crossentropy', optimizer=optimizer)
def get_data_batch(self, train, batch_size, seed=0):
# # random sampling - some samples will have excessively low or high sampling, but easy to implement
# np.random.seed(seed)
# x = train.loc[ np.random.choice(train.index, batch_size) ].values
# iterate through shuffled indices, so every sample gets covered evenly
start_i = (batch_size * seed) % len(train)
stop_i = start_i + batch_size
shuffle_seed = (batch_size * seed) // len(train)
np.random.seed(shuffle_seed)
train_ix = np.random.choice(list(train.index), replace=False, size=len(train)) # wasteful to shuffle every time
train_ix = list(train_ix) + list(train_ix) # duplicate to cover ranges past the end of the set
x = train.loc[train_ix[start_i: stop_i]].values
return np.reshape(x, (batch_size, -1))
def train(self, data, train_arguments):
[cache_prefix, epochs, sample_interval] = train_arguments
data_cols = data.columns
# Adversarial ground truths
valid = np.ones((self.batch_size, 1))
fake = np.zeros((self.batch_size, 1))
for epoch in range(epochs):
# ---------------------
# Train Discriminator
# ---------------------
batch_data = self.get_data_batch(data, self.batch_size)
noise = tf.random.normal((self.batch_size, self.noise_dim))
# Generate a batch of new images
gen_data = self.generator.predict(noise)
# gen_data = tf.convert_to_tensor(gen_data, dtype=tf.float32)
# Train the discriminator
d_loss_real = self.discriminator.train_on_batch(batch_data, valid)
d_loss_fake = self.discriminator.train_on_batch(gen_data, fake)
d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
# ---------------------
# Train Generator
# ---------------------
noise = tf.random.normal((self.batch_size, self.noise_dim))
# Train the generator (to have the discriminator label samples as valid)
g_loss = self.combined.train_on_batch(noise, valid)
# Plot the progress
print("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, d_loss[0], 100 * d_loss[1], g_loss))
# If at save interval => save generated events
if epoch % sample_interval == 0:
#Test here data generation step
# save model checkpoints
model_checkpoint_base_name = 'model/' + cache_prefix + '_{}_model_weights_step_{}.h5'
self.generator.save_weights(model_checkpoint_base_name.format('generator', epoch))
self.discriminator.save_weights(model_checkpoint_base_name.format('discriminator', epoch))
#Here is generating the data
z = tf.random.normal((432, self.noise_dim))
gen_data = self.generator(z)
print('generated_data')
def save(self, path, name):
assert os.path.isdir(path) == True, \
"Please provide a valid path. Path must be a directory."
model_path = os.path.join(path, name)
self.generator.save_weights(model_path) # Load the generator
return
def load(self, path):
assert os.path.isdir(path) == True, \
"Please provide a valid path. Path must be a directory."
self.generator = Generator(self.batch_size)
self.generator = self.generator.load_weights(path)
return self.generator
Training configuration:
noise_dim = 32
dim = 128
batch_size = 32
log_step = 100
epochs = 500+1
learning_rate = 5e-4
models_dir = 'model'
print(df.shape[1])
gan_args = [batch_size, learning_rate, noise_dim, df.shape[1], dim]
train_args = ['', epochs, log_step]
Creating folders:
import os
os.mkdir('/content/model')
os.mkdir('/content/model/gan')
os.mkdir('/content/model/gan/saved')
Training our GAN
synthesizer = GAN(gan_args)
Training the GAN model chosen: Vanilla GAN, CGAN, DCGAN, etc.
synthesizer.train(df, train_args)
Saving model to directory
synthesizer.save('model/gan/saved', 'generator_patients')
To get the number of layers used in generator and discriminator we use .summary()
synthesizer.generator.summary()
synthesizer.discriminator.summary()
models = {'GAN': ['GAN', False, synthesizer.generator]}
Evaluating model
import matplotlib.pyplot as plt
Setup parameters visualization parameters
seed = 17
test_size = 492 # number of fraud cases
noise_dim = 32
np.random.seed(seed)
z = np.random.normal(size=(test_size, noise_dim))
real_samples = df #pd.DataFrame(real, columns=data_cols)
model_names = ['GAN']
colors = ['deepskyblue','blue']
markers = ['o','^']
col1, col2 = 'CITY', 'ETHNICITY'
base_dir = 'model/'
Actual fraud data visualization
model_steps = [ 0, 100, 200, 300, 400, 500]
rows = len(model_steps)
columns = 5
data_cols = df.columns
axarr = [[]]*len(model_steps)
fig = plt.figure(figsize=(14,rows*3))
for model_step_ix, model_step in enumerate(model_steps):
axarr[model_step_ix] = plt.subplot(rows, columns, model_step_ix*columns + 1)
for group, color, marker in zip(real_samples.groupby('RACE'), colors, markers):
plt.scatter( group[1][[col1]], group[1][[col2]], marker=marker, edgecolors=color, facecolors='none' )
plt.title('Actual Patients Data')
plt.ylabel(col2) # Only add y label to left plot
plt.xlabel(col1)
xlims, ylims = axarr[model_step_ix].get_xlim(), axarr[model_step_ix].get_ylim()
if model_step_ix == 0:
legend = plt.legend()
legend.get_frame().set_facecolor('white')
i=0
[model_name, with_class, generator_model] = models['GAN']
generator_model.load_weights( base_dir + '_generator_model_weights_step_'+str(model_step)+'.h5')
ax = plt.subplot(rows, columns, model_step_ix*columns + 1 + (i+1) )
g_z = generator_model.predict(z)
gen_samples = pd.DataFrame(g_z, columns=data_cols)
gen_samples.to_csv('Generated_sample.csv')
plt.scatter( gen_samples[[col1]], gen_samples[[col2]], marker=markers[0], edgecolors=colors[0], facecolors='none' )
plt.title("Generated Data")
plt.xlabel(data_cols[0])
ax.set_xlim(xlims), ax.set_ylim(ylims)
plt.suptitle('Comparison of GAN outputs', size=16, fontweight='bold')
plt.tight_layout(rect=[0.075,0,1,0.95])
Adding text labels for traning steps
vpositions = np.array([ i._position.bounds[1] for i in axarr ])
vpositions += ((vpositions[0] - vpositions[1]) * 0.35 )
for model_step_ix, model_step in enumerate( model_steps ):
fig.text( 0.05, vpositions[model_step_ix], 'training\nstep\n'+str(model_step), ha='center', va='center', size=12)
plt.savefig('Comparison_of_GAN_outputs.png')
Generating synthetic data
[model_name, with_class, generator_model] = models['GAN']
generator_model.load_weights( base_dir + 'generator_model_weights_step'+str(500)+'.h5')
g_z = generator_model.predict(z)
g_z.shape
gen_samples
Saving generated data
g_z = pw.inverse_transform(g_z)
gen_samples = pd.DataFrame(g_z, columns=data_cols)
gen_samples.to_csv('/content/credit.csv')
gen_samples.head()
Evaluating data table evaluator
!pip install table_evaluator
from table_evaluator import TableEvaluator
print(len(df), len(gen_samples))
table_evaluator = TableEvaluator(df, gen_samples)
from table_evaluator import TableEvaluator
table_evaluator.visual_evaluation()
Do Checkout
To get deeply indulged into AI visit TestAing
References:
https://www.maskaravivek.com/post/gan-synthetic-data-generation/
What Are GANs? | Generative Adversarial Networks Tutorial | Deep Learning Tutorial | Simplilearn
-Grandhi Priya