Retina Blood Vessel Segmentation using UNET

Saif Gazali
8 min readAug 14, 2021

Image Segmentation is the process of dividing an image into segments in order to make use of important segments for processing the image instead of processing the entire image. An image is made of pixels and using image segmentation the pixels having similar attributes are grouped together which allows us to get a far better granular understanding about the content of the image. Each grouped part is assigned a label and using these labels we specify boundaries which separate the required objects in the image from the rest of the image.

In this article we would be segmenting retina blood vessel from the retina image and the dataset can be downloaded from here. The training dataset contains the retina image, mask and the annotated image. The training set and the test dataset both contains around 20 images. Plotting the retina and the annotated image below.

Retina Image (Left) and Segmented Image(Right)

Loading Dataset

Loading our training and testing dataset. Images folder contains the retina images whereas the 1st_manual folder contains the annotated images.

def load_data(path):
train_x = sorted(glob(os.path.join(path,"training","images","*.tif")))
train_y = sorted(glob(os.path.join(path,"training","1st_manual","*.gif")))
test_x = sorted(glob(os.path.join(path,"test","images","*.tif")))
test_y = sorted(glob(os.path.join(path,"test","1st_manual","*.gif")))
return (train_x,train_y),(test_x,test_y)

Loading the data.

# load the data(train_x,train_y),(test_x,test_y) = load_data('/content/drive/MyDrive/DRIVE')

Data augmentation

Data augmentation is the technique to artificially increase the size of our training dataset by creating new images from the existing images in the dataset using certain modifications. As we have only 20 images in our dataset and given that the deep learning neural networks perform better when trained on large set of data. Hence data augmentation is used to improve the performance of our model.

We would be using four transformations that would be applied to each image of our training dataset:

  1. Horizontal Flip.
  2. Vertical Flip.
  3. Elastic Transform.
  4. Grid Distortion.
#data augmentationfrom tqdm import tqdmdef augment_data(images,mask,save_path,augment=True):
H = 256
W = 256
print(save_path)
for idx, (x,y) in tqdm(enumerate(zip(images,mask)),total=len(images)):
#extracting names of the image
name = x.split("/") [- 1] .split (".") [0]
print(name)
#Reading image and mask
x = cv2.imread(x,cv2.IMREAD_COLOR)
y = imageio.mimread(y)[0]
#data augmentation
if augment == True:

# probability of horizantal flip applied to an image
aug = HorizontalFlip(p=1.0)
augmented = aug(image=x,mask=y)
x1 = augmented['image']
y1 = augmented['mask']
aug = VerticalFlip(p=1.0)
augmented = aug(image=x,mask=y)
x2 = augmented['image']
y2 = augmented['mask']
aug = ElasticTransform(p=1.0,alpha=120,sigma=120*0.05)
augmented = aug(image=x,mask=y)
x3 = augmented['image']
y3 = augmented['mask']
aug = GridDistortion(p=1.0)
augmented = aug(image=x,mask=y)
x4 = augmented['image']
y4 = augmented['mask']
X = [x,x1,x2,x3,x4]
Y = [y,y1,y2,y3,y4]
# no need to augment the data else:
X = [x]
Y = [y]
index = 0
for i,m in zip(X,Y):
i = cv2.resize(i,(W,H))
m = cv2.resize(m,(W,H))

if len(X) == 1:
tmp_image_name = f"{name}.jpg"
tmp_mask_name = f"{name}.jpg"
else:
tmp_image_name = f"{name}_{index}.jpg"
tmp_mask_name = f"{name}_{index}.jpg"
image_path = os.path.join(save_path,"images",tmp_image_name)
mask_path = os.path.join(save_path,"mask",tmp_mask_name)
print(image_path)
print(mask_path)
cv2.imwrite(image_path, i)
cv2.imwrite(mask_path, m)
index+=1

Creating new directories in order to save the generated augmented images.

#create new directoriescreate_dir('/content/drive/MyDrive/new_drive_data/train/images')
create_dir('/content/drive/MyDrive/new_drive_data/train/mask')
create_dir('/content/drive/MyDrive/new_drive_data/test/images')
create_dir('/content/drive/MyDrive/new_drive_data/test/mask')

Augmenting the data.

augment_data(train_x,train_y,'/content/drive/MyDrive/new_drive_data/train',True)

Plotting the augmented images.

UNET Architecture

UNET Architecture

The UNET model uses a Convolution-BatchNormalization-ReLU blocks of layers to create deep convolutional neural networks. The configuration of a specific layer can be seen in the appendix of the paper. The U-Net model architecture is used for the segmentation process rather than the traditional encoder-decoder model which involves taking image as input and down-sampling it for a few layers until a layer where in the image is up-sampled for a few layers and a final image is outputted. The UNet architecture also down-samples the image and up-samples it again but would have skip-connections between layers of same size in encoder and decoder which would allow the information to be shared between input and output.

Defining the encoder block which is a part of UNET model and is used for down-sampling an image.

# define downsampling block for Unet
def encoder_block(n_nodes,con_layer,batchNorm=True):
#points from gaussian distribution
init = RandomNormal(stddev=0.002)
g = Conv2D(n_nodes,(4,4),strides=(2,2),padding='same',kernel_initializer=init)(con_layer) if batchNorm:
g = BatchNormalization()(g)

g = LeakyReLU(0.2)(g)
return g

Defining the decoder block which is a part of UNET model and is used for up-sampling an image.

#define upsampling block for Unet
def decoder_block(n_nodes,con_layer,skip_layer,dropout=True):
#points from gaussian distribution
init = RandomNormal(stddev=0.002)
g = Conv2DTranspose(n_nodes,(4,4),strides=(2,2),padding='same',kernel_initializer=init)(con_layer)
g = BatchNormalization()(g) if dropout:
g = Dropout(0.2)(g)
g = Concatenate()([g,skip_layer]) g = Activation('relu')(g) return g

Defining our UNET model.

# define unet model
def define_unet(img_shape=(256,256,3)):
#getting points from gaussian distribution
init = RandomNormal(stddev=0.02)
inp = Input(shape=img_shape)
#Downsampling layers
e1 = encoder_block(64,inp,False)
e2 = encoder_block(128,e1)
e3 = encoder_block(256,e2)
e4 = encoder_block(512,e3)
e5 = encoder_block(512,e4)
e6 = encoder_block(512,e5)
e7 = encoder_block(512,e6)
#Bottleneck layer
b1 = Conv2D(512,(4,4),strides=(2,2),padding='same',kernel_initializer=init)(e7)
b1 = Activation('relu')(b1)
#Upsampling layers
d1 = decoder_block(512,b1,e7)
d2 = decoder_block(512,d1,e6)
d3 = decoder_block(512,d2,e5)
d4 = decoder_block(512,d3,e4,False)
d5 = decoder_block(256,d4,e3,False)
d6 = decoder_block(128,d5,e2,False)
d7 = decoder_block(64,d6,e1,False)
#Output layer
g = Conv2DTranspose(1,1,strides=(2,2),padding='same',activation='sigmoid')(d7)
#define model
model = Model(inp,g)
return model

Plotting our UNET model.

UNET Model Architecture

Defining our Hyperparameters.

# directory to save files
create_dir("/content/drive/MyDrive/files")
#Hyperparameters
batch_size=2
lr = 1e-4
num_epochs = 100
model_path = os.path.join("/content/drive/MyDrive/files","model.h5")
csv_path = os.path.join("/content/drive/MyDrive/files","data.csv")

Creating a method to load images from a given path.

def load_path(path):
X = sorted(glob(os.path.join(path,"images","*.jpg")))
Y = sorted(glob(os.path.join(path,"mask","*.jpg")))
return X,Y

Creating a method to shuffle the images.

from sklearn.utils import shuffle
def shuffling(x,y):
x,y = shuffle(x,y)
return x,y

Defining the training and validation dataset path.

# Datasetdataset_path = "/content/drive/MyDrive/new_drive_data"
train_path = os.path.join(dataset_path,"train")
valid_path = os.path.join(dataset_path,"test")

Loading and shuffling training dataset whereas just loading the testing dataset.

#Load and shuffle the data
train_x,train_y = load_path(train_path)
train_x,train_y = shuffling(train_x,train_y)
valid_x, valid_y = load_path(valid_path)

Data Pipeline

Creating a method to read the image which would take a path and read an image from it followed by normalizing the image.

def read_image(path):
path = path.decode()
x = cv2.imread(path,cv2.IMREAD_COLOR)
x = x/255.0
x = x.astype(np.float32)
return x

Creating a method to read the annotated image which would take a path and read the annotated image. We would normalize the image and expand its dimension to make it as 3D.

def read_mask(path):
path = path.decode()
x = cv2.imread(path,cv2.IMREAD_GRAYSCALE)
x = x/255.0
x = x.astype(np.float32)
x = np.expand_dims(x,axis=-1) #(256,256,1)
return x

Creating a method to parse the image and its mask which would process the images.

def tf_parse(x,y):
def _parse(x,y):
x = read_image(x)
y = read_mask(y)
return x,y
x,y = tf.numpy_function(_parse,[x,y],[tf.float32,tf.float32])
x.set_shape([256,256,3])
y.set_shape([256,256,1])
return x,y

Creating a method tf_dataset() which would take list of images and masks along with a batch size. It would use tensorflow Dataset API and returned a batched dataset after processing it through the above defined methods.

def tf_dataset(X,Y,batch_size=2):
dataset = tf.data.Dataset.from_tensor_slices((X,Y))
dataset = dataset.map(tf_parse)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(4)
return dataset

Getting the dataset.

train_dataset = tf_dataset(train_x,train_y)
valid_dataset = tf_dataset(valid_x,valid_y)

Printing the number of training and validation images.

valid_x = np.array(valid_x)
valid_y = np.array(valid_y)
train_x = np.array(train_x)
train_y = np.array(train_y)
print(train_x.shape) #(75,)
print(train_y.shape) #(75,)
print(valid_x.shape) #(5,)
print(valid_y.shape) #(5,)

Calculating the number of steps to train our model.

#no of steps
train_steps = len(train_x)//batch_size
test_steps = len(valid_x)//batch_size
if len(train_x) % batch_size != 0:
train_steps +=1
if len(valid_x) % batch_size != 0:
test_steps +=1
print(train_steps) #39
print(test_steps) #4

Evaluation Metrics

Intersection-Over-Union is a common evaluation metric for semantic image segmentation, which first computes the IOU for each semantic class and then computes the average over classes. IOU is defined as follows: IOU = true_positive / (true_positive + false_positive + false_negative). The predictions are accumulated in a confusion matrix, weighted by sample_weight and the metric is then calculated from it

def iou(y_true,y_pred):
def f(y_true,y_pred):
intersection = (y_true*y_pred).sum()
union = y_true.sum() + y_pred.sum() - intersection
x = (intersection + 1e-15) / (union + 1e-15)
x = x.astype(np.float32)
return x
return tf.numpy_function(f,[y_true,y_pred],tf.float32)

Dice Loss is essentially a measure of overlap between two samples. This measure ranges from 0 to 1 where a Dice coefficient of 1 denotes perfect and complete overlap. The Dice coefficient was originally developed for binary data and can be used in our case.

smooth = 1e-15
def dice_coef(y_true,y_pred):
y_true = tf.keras.layers.Flatten()(y_true)
y_pred = tf.keras.layers.Flatten()(y_pred)
intersection = tf.reduce_sum(y_true*y_pred)
return (2. * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred))
def dice_loss(y_true,y_pred):
return 1.0 - dice_coef(y_true,y_pred)

Compiling our UNET model using Adam optimizer and the above 2 metrics are used.

model.compile(loss=dice_loss,optimizer=Adam(lr),metrics=[dice_coef,iou,Recall(),Precision()])

Model Training

Callbacks are part of code which are executed during training of the model. We would save our model periodically. CSVLogger is also used in order to log all the losses.

#it will run during the trainingcallbacks = [
ModelCheckpoint(model_path,verbose=1,save_best_only=True),
CSVLogger(csv_path)
]

Training our model for 100 epochs.

model.fit(
train_dataset,
epochs=500,
validation_data=valid_dataset,
steps_per_epoch=train_steps,
validation_steps=test_steps,
callbacks=callbacks)

Saving our model

model.save('/content/drive/MyDrive/500epochs_model_vsl_loss.h5')

Creating a method save_results which would save the retina image, its annotated image and our predicted image.

def save_results(ori_x,ori_y,y_pred,save_image_path):
line = np.ones((256,10,3))*255
ori_y = np.expand_dims(ori_y,axis=-1)
ori_y = np.concatenate([ori_y,ori_y,ori_y],axis=-1)
y_pred = np.expand_dims(y_pred,axis=-1)
y_pred = np.concatenate([y_pred,y_pred,y_pred],axis=-1) * 255
cat_images = np.concatenate([ori_x,line,ori_y,line,y_pred],axis=1) cv2.imwrite(save_image_path,cat_images)

Testing our model

import matplotlib.pyplot as plt#Make prediction and calculate the metrics valuedef make_Predictions(mask_index,model):
for x,y in tqdm(zip(test_x,test_y),total=len(test_x)):
#extracting names of the image
name = x.split ("\\") [- 1] .split (".") [0]
#Read the image and mask
ori_x,x = read_image(x) #(256,256,3)
ori_y,y = read_mask(y)
#Prediction
y_pred = model.predict(np.expand_dims(x,axis=0))[0] # (1,256,256,3)
y_pred = y_pred > 0.5
y_pred = y_pred.astype(np.int32)
y_pred = np.squeeze(y_pred,axis=-1)

#Save the images
save_image_path = f"/content/drive/MyDrive/results/(name)_{index+mask_index}.png"
save_results(ori_x,ori_y,y_pred,save_image_path)

Plotting some of the generated images.

The model does not seem to generate plausible segmented images due to the limited number of data available. We would improve the model using transfer learning here.

--

--