Commit b1d5f4be authored by Anna Fomina's avatar Anna Fomina
Browse files

learning notebooks

parent 61a3ace9
This diff is collapsed.
This diff is collapsed.
%% Cell type:code id:5eabf2ef tags:
``` python
```
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
from torch import nn, sqrt
import torch
import sys
from math import sqrt
sys.path.append('.')
from Sources.MBConv import MBConvBlock
from Sources.SelfAttention import ScaledDotProductAttention
class CoAtNet(nn.Module):
def __init__(self,in_ch,image_size,out_chs=[64,96,192,384,768], num_classes = 3):
super().__init__()
self.out_chs=out_chs
self.maxpool2d=nn.MaxPool2d(kernel_size=2,stride=2)
self.maxpool1d = nn.MaxPool1d(kernel_size=2, stride=2)
self.s0=nn.Sequential(
nn.Conv2d(in_ch,in_ch,kernel_size=3,padding=1),
nn.ReLU(),
nn.Conv2d(in_ch,in_ch,kernel_size=3,padding=1)
)
self.mlp0=nn.Sequential(
nn.Conv2d(in_ch,out_chs[0],kernel_size=1),
nn.ReLU(),
nn.Conv2d(out_chs[0],out_chs[0],kernel_size=1)
)
self.s1=MBConvBlock(ksize=3,input_filters=out_chs[0],output_filters=out_chs[0],image_size=image_size//2)
self.mlp1=nn.Sequential(
nn.Conv2d(out_chs[0],out_chs[1],kernel_size=1),
nn.ReLU(),
nn.Conv2d(out_chs[1],out_chs[1],kernel_size=1)
)
self.s2=MBConvBlock(ksize=3,input_filters=out_chs[1],output_filters=out_chs[1],image_size=image_size//4)
self.mlp2=nn.Sequential(
nn.Conv2d(out_chs[1],out_chs[2],kernel_size=1),
nn.ReLU(),
nn.Conv2d(out_chs[2],out_chs[2],kernel_size=1)
)
self.s3=ScaledDotProductAttention(out_chs[2],out_chs[2]//8,out_chs[2]//8,8)
self.mlp3=nn.Sequential(
nn.Linear(out_chs[2],out_chs[3]),
nn.ReLU(),
nn.Linear(out_chs[3],out_chs[3])
)
self.s4=ScaledDotProductAttention(out_chs[3],out_chs[3]//8,out_chs[3]//8,8)
self.mlp4=nn.Sequential(
nn.Linear(out_chs[3],out_chs[4]),
nn.ReLU(),
nn.Linear(out_chs[4],out_chs[4])
)
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(768, num_classes)
def forward(self, x) :
B,C,H,W=x.shape
#stage0
y=self.mlp0(self.s0(x))
y=self.maxpool2d(y)
#stage1
y=self.mlp1(self.s1(y))
y=self.maxpool2d(y)
#stage2
y=self.mlp2(self.s2(y))
y=self.maxpool2d(y)
#stage3
y=y.reshape(B,self.out_chs[2],-1).permute(0,2,1) #B,N,C
y=self.mlp3(self.s3(y,y,y))
y=self.maxpool1d(y.permute(0,2,1)).permute(0,2,1)
y1 = y.detach().clone()
#stage4
y=self.mlp4(self.s4(y,y,y))
y=self.maxpool1d(y.permute(0,2,1))
N=y.shape[-1]
y=y.reshape(B,self.out_chs[4],int(sqrt(N)),int(sqrt(N)))
y2 = y.detach().clone()
#global_pool
y=self.avgpool(y)
#FC
y=torch.flatten(y, 1)
y=self.fc(y)
return y, y1, y2
if __name__ == '__main__':
x=torch.randn(1,3,224,224)
coatnet=CoAtNet(3,224)
y=coatnet(x)
print(y.shape)
\ No newline at end of file
import math
from functools import partial
import torch
from torch import nn
from torch.nn import functional as F
class SwishImplementation(torch.autograd.Function):
@staticmethod
def forward(ctx, i):
result = i * torch.sigmoid(i)
ctx.save_for_backward(i)
return result
@staticmethod
def backward(ctx, grad_output):
i = ctx.saved_variables[0]
sigmoid_i = torch.sigmoid(i)
return grad_output * (sigmoid_i * (1 + i * (1 - sigmoid_i)))
class MemoryEfficientSwish(nn.Module):
def forward(self, x):
return SwishImplementation.apply(x)
def drop_connect(inputs, p, training):
""" Drop connect. """
if not training: return inputs
batch_size = inputs.shape[0]
keep_prob = 1 - p
random_tensor = keep_prob
random_tensor += torch.rand([batch_size, 1, 1, 1], dtype=inputs.dtype, device=inputs.device)
binary_tensor = torch.floor(random_tensor)
output = inputs / keep_prob * binary_tensor
return output
def get_same_padding_conv2d(image_size=None):
return partial(Conv2dStaticSamePadding, image_size=image_size)
def get_width_and_height_from_size(x):
""" Obtains width and height from a int or tuple """
if isinstance(x, int): return x, x
if isinstance(x, list) or isinstance(x, tuple): return x
else: raise TypeError()
def calculate_output_image_size(input_image_size, stride):
"""
计算出 Conv2dSamePadding with a stride.
"""
if input_image_size is None: return None
image_height, image_width = get_width_and_height_from_size(input_image_size)
stride = stride if isinstance(stride, int) else stride[0]
image_height = int(math.ceil(image_height / stride))
image_width = int(math.ceil(image_width / stride))
return [image_height, image_width]
class Conv2dStaticSamePadding(nn.Conv2d):
""" 2D Convolutions like TensorFlow, for a fixed image size"""
def __init__(self, in_channels, out_channels, kernel_size, image_size=None, **kwargs):
super().__init__(in_channels, out_channels, kernel_size, **kwargs)
self.stride = self.stride if len(self.stride) == 2 else [self.stride[0]] * 2
# Calculate padding based on image size and save it
assert image_size is not None
ih, iw = (image_size, image_size) if isinstance(image_size, int) else image_size
kh, kw = self.weight.size()[-2:]
sh, sw = self.stride
oh, ow = math.ceil(ih / sh), math.ceil(iw / sw)
pad_h = max((oh - 1) * self.stride[0] + (kh - 1) * self.dilation[0] + 1 - ih, 0)
pad_w = max((ow - 1) * self.stride[1] + (kw - 1) * self.dilation[1] + 1 - iw, 0)
if pad_h > 0 or pad_w > 0:
self.static_padding = nn.ZeroPad2d((pad_w // 2, pad_w - pad_w // 2, pad_h // 2, pad_h - pad_h // 2))
else:
self.static_padding = Identity()
def forward(self, x):
x = self.static_padding(x)
x = F.conv2d(x, self.weight, self.bias, self.stride, self.padding, self.dilation, self.groups)
return x
class Identity(nn.Module):
def __init__(self, ):
super(Identity, self).__init__()
def forward(self, input):
return input
# MBConvBlock
class MBConvBlock(nn.Module):
'''
层 ksize3*3 输入32 输出16 conv1 stride步长1
'''
def __init__(self, ksize, input_filters, output_filters, expand_ratio=1, stride=1, image_size=224):
super().__init__()
self._bn_mom = 0.1
self._bn_eps = 0.01
self._se_ratio = 0.25
self._input_filters = input_filters
self._output_filters = output_filters
self._expand_ratio = expand_ratio
self._kernel_size = ksize
self._stride = stride
inp = self._input_filters
oup = self._input_filters * self._expand_ratio
if self._expand_ratio != 1:
Conv2d = get_same_padding_conv2d(image_size=image_size)
self._expand_conv = Conv2d(in_channels=inp, out_channels=oup, kernel_size=1, bias=False)
self._bn0 = nn.BatchNorm2d(num_features=oup, momentum=self._bn_mom, eps=self._bn_eps)
# Depthwise convolution
k = self._kernel_size
s = self._stride
Conv2d = get_same_padding_conv2d(image_size=image_size)
self._depthwise_conv = Conv2d(
in_channels=oup, out_channels=oup, groups=oup,
kernel_size=k, stride=s, bias=False)
self._bn1 = nn.BatchNorm2d(num_features=oup, momentum=self._bn_mom, eps=self._bn_eps)
image_size = calculate_output_image_size(image_size, s)
# Squeeze and Excitation layer, if desired
Conv2d = get_same_padding_conv2d(image_size=(1,1))
num_squeezed_channels = max(1, int(self._input_filters * self._se_ratio))
self._se_reduce = Conv2d(in_channels=oup, out_channels=num_squeezed_channels, kernel_size=1)
self._se_expand = Conv2d(in_channels=num_squeezed_channels, out_channels=oup, kernel_size=1)
# Output phase
final_oup = self._output_filters
Conv2d = get_same_padding_conv2d(image_size=image_size)
self._project_conv = Conv2d(in_channels=oup, out_channels=final_oup, kernel_size=1, bias=False)
self._bn2 = nn.BatchNorm2d(num_features=final_oup, momentum=self._bn_mom, eps=self._bn_eps)
self._swish = MemoryEfficientSwish()
def forward(self, inputs, drop_connect_rate=None):
"""
:param inputs: input tensor
:param drop_connect_rate: drop connect rate (float, between 0 and 1)
:return: output of block
"""
# Expansion and Depthwise Convolution
x = inputs
if self._expand_ratio != 1:
expand = self._expand_conv(inputs)
bn0 = self._bn0(expand)
x = self._swish(bn0)
depthwise = self._depthwise_conv(x)
bn1 = self._bn1(depthwise)
x = self._swish(bn1)
# Squeeze and Excitation
x_squeezed = F.adaptive_avg_pool2d(x, 1)
x_squeezed = self._se_reduce(x_squeezed)
x_squeezed = self._swish(x_squeezed)
x_squeezed = self._se_expand(x_squeezed)
x = torch.sigmoid(x_squeezed) * x
x = self._bn2(self._project_conv(x))
# Skip connection and drop connect
input_filters, output_filters = self._input_filters, self._output_filters
if self._stride == 1 and input_filters == output_filters:
if drop_connect_rate:
x = drop_connect(x, p=drop_connect_rate, training=self.training)
x = x + inputs # skip connection
return x
if __name__ == '__main__':
input=torch.randn(1,3,112,112)
mbconv=MBConvBlock(ksize=3,input_filters=3,output_filters=3,image_size=112)
out=mbconv(input)
print(out.shape)
\ No newline at end of file
import numpy as np
import torch
from torch import nn
from torch.nn import init
class ScaledDotProductAttention(nn.Module):
'''
Scaled dot-product attention
'''
def __init__(self, d_model, d_k, d_v, h,dropout=.1):
'''
:param d_model: Output dimensionality of the model
:param d_k: Dimensionality of queries and keys
:param d_v: Dimensionality of values
:param h: Number of heads
'''
super(ScaledDotProductAttention, self).__init__()
self.fc_q = nn.Linear(d_model, h * d_k)
self.fc_k = nn.Linear(d_model, h * d_k)
self.fc_v = nn.Linear(d_model, h * d_v)
self.fc_o = nn.Linear(h * d_v, d_model)
self.dropout=nn.Dropout(dropout)
self.d_model = d_model
self.d_k = d_k
self.d_v = d_v
self.h = h
self.init_weights()
def init_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
init.kaiming_normal_(m.weight, mode='fan_out')
if m.bias is not None:
init.constant_(m.bias, 0)
elif isinstance(m, nn.BatchNorm2d):
init.constant_(m.weight, 1)
init.constant_(m.bias, 0)
elif isinstance(m, nn.Linear):
init.normal_(m.weight, std=0.001)
if m.bias is not None:
init.constant_(m.bias, 0)
def forward(self, queries, keys, values, attention_mask=None, attention_weights=None):
'''
Computes
:param queries: Queries (b_s, nq, d_model)
:param keys: Keys (b_s, nk, d_model)
:param values: Values (b_s, nk, d_model)
:param attention_mask: Mask over attention values (b_s, h, nq, nk). True indicates masking.
:param attention_weights: Multiplicative weights for attention values (b_s, h, nq, nk).
:return:
'''
b_s, nq = queries.shape[:2]
nk = keys.shape[1]
q = self.fc_q(queries).view(b_s, nq, self.h, self.d_k).permute(0, 2, 1, 3) # (b_s, h, nq, d_k)
k = self.fc_k(keys).view(b_s, nk, self.h, self.d_k).permute(0, 2, 3, 1) # (b_s, h, d_k, nk)
v = self.fc_v(values).view(b_s, nk, self.h, self.d_v).permute(0, 2, 1, 3) # (b_s, h, nk, d_v)
att = torch.matmul(q, k) / np.sqrt(self.d_k) # (b_s, h, nq, nk)
if attention_weights is not None:
att = att * attention_weights
if attention_mask is not None:
att = att.masked_fill(attention_mask, -np.inf)
att = torch.softmax(att, -1)
att=self.dropout(att)
out = torch.matmul(att, v).permute(0, 2, 1, 3).contiguous().view(b_s, nq, self.h * self.d_v) # (b_s, nq, h*d_v)
out = self.fc_o(out) # (b_s, nq, d_model)
return out
if __name__ == '__main__':
input=torch.randn(50,49,512)
sa = ScaledDotProductAttention(d_model=512, d_k=512, d_v=512, h=8)
output=sa(input,input,input)
print(output.shape)
\ No newline at end of file
import torch
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt
import os
import pandas as pd
from torchvision.io import read_image
import numpy as np
import cv2
import torchvision
from torchvision import transforms
from torch import nn
import torch.nn.functional as F
from sklearn.metrics import confusion_matrix
from torch.utils.tensorboard import SummaryWriter
from sklearn.metrics import accuracy_score
from sklearn.metrics import recall_score
from sklearn.metrics import precision_score
from sklearn.metrics import roc_auc_score
import io
from pytorch_lightning.callbacks import Callback
from PIL import Image
import itertools
def plot_confusion_matrix(cm, class_names):
"""
Returns a matplotlib figure containing the plotted confusion matrix.
Args:
cm (array, shape = [n, n]): a confusion matrix of integer classes
class_names (array, shape = [n]): String names of the integer classes
"""
# Normalize the confusion matrix.
cm = np.around(cm.astype('float') / cm.sum(axis=1)[:, np.newaxis], decimals=2)
figure = plt.figure(figsize=(8, 8))
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title("Confusion matrix")
plt.colorbar()
tick_marks = np.arange(len(class_names))
plt.xticks(tick_marks, class_names, rotation=45)
plt.yticks(tick_marks, class_names)
# Use white text if squares are dark; otherwise black.
threshold = cm.max() / 3 * 2.
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
color = "white" if cm[i, j] > threshold else "black"
plt.text(j, i, cm[i, j], horizontalalignment="center", color=color)
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
return figure
def plot_to_image(figure):
"""
Converts the matplotlib plot specified by 'figure' to a PNG image and
returns it. The supplied figure is closed and inaccessible after this call.
"""
buf = io.BytesIO()
# Use plt.savefig to save the plot to a PNG in memory.
plt.savefig(buf, format='png')
# Closing the figure prevents it from being displayed directly inside
# the notebook.
plt.close(figure)
buf.seek(0)
image = Image.open(buf)
image = image.convert("RGB")
transform = transforms.ToTensor()
image = transform(image)
return image
def get_true_classes(input_classes):
true_classes = []
for i in range(len(input_classes)):
input_classes[i] = input_classes[i].to('cpu')
for item in input_classes[i]:
true_classes.append(item.item())
out = np.array(true_classes, dtype=np.int64)
return out
def get_predicted_classes(input_classes):
pred_classes = []
for i in range(len(input_classes)):
input_classes[i] = input_classes[i].to('cpu')
for item in input_classes[i]:
pred_classes.append(item.detach().numpy())
pred_classes = np.array(pred_classes, dtype=object).astype(float)
out = np.argmax(pred_classes, axis = 1)
return out
def get_classes_probs(input_classes):
pred_classes = []
for i in range(len(input_classes)):
input_classes[i] = input_classes[i].to('cpu')
for item in input_classes[i]:
pred_classes.append(item.detach().numpy())
pred_classes = np.array(pred_classes, dtype=object).astype(float)
return pred_classes
def log_confusion_matrix(epoch, true, pred, class_names):
# Calculate the confusion matrix using sklearn.metrics
cm = confusion_matrix(true, pred)
figure = plot_confusion_matrix(cm, class_names=class_names)
cm_image = plot_to_image(figure)
# Log the confusion matrix as an image summary.
writer.add_image('Confusion matrix', cm_image, epoch)
def log_running_loss(running_loss, epoch):
writer.add_scalar('Training loss', running_loss / 250, epoch)
def log_accuracy(true, pred, epoch):
writer.add_scalar('Accuracy', accuracy_score(true, pred), epoch)
def log_rocauc(true, pred, epoch):
writer.add_scalar('ROC_AUC', roc_auc_score(true, pred,
multi_class='ovr', average='weighted'), epoch)
def log_precision(true, pred, epoch):
writer.add_scalar('Precision', precision_score(true, pred, average='weighted'),
epoch)
def log_recall(true, pred, epoch):
writer.add_scalar('Recall', recall_score(true, pred, average='weighted'),
epoch)
class callback(Callback):
def __init__(self):
pass
def on_epoch_begin(self, epoch):
self.epoch = epoch
def on_epoch_end(self, t, p, class_names, running_loss):
true = get_true_classes(t)
pred = get_predicted_classes(p)
probs = get_classes_probs(p)
log_confusion_matrix(self.epoch, true, pred, class_names)
log_running_loss(running_loss, self.epoch)
log_accuracy(true, pred, self.epoch)
log_rocauc(true, probs, self.epoch)
log_precision(true, pred, self.epoch)
log_recall(true, pred, self.epoch)
\ No newline at end of file
import torch
from torch.utils.data import Dataset
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt
import os
import pandas as pd
from torchvision.io import read_image
from torch.utils.data import DataLoader
import csv
import random
import numpy as np
import cv2
import torchvision
from torchvision import transforms
from torch import nn
import torch.nn.functional as F
import io
from PIL import Image
labels_map = {
"Benign": 0,
"InSitu": 1,
"Invasive": 2,
}
class ImageDataset(Dataset):
def __init__(self, annotations_file, paths_file, transform=None, target_transform=None):
self.img_labels = pd.read_csv(annotations_file)
f = open(paths_file, 'r')
self.img_paths = f.read().split('\n')
f.close()
self.transform = transform
self.target_transform = target_transform
def __len__(self):
return len(self.img_paths)
def __getitem__(self, idx):
img_path = self.img_paths[idx]
image = read_image(img_path)
img_folder = int(img_path.split("/")[3])
label_name = self.img_labels.iloc[img_folder - 1][4]
label = torch.tensor(labels_map[label_name])
if self.transform:
image = image.float()
image = self.transform(image)
if self.target_transform:
label = self.target_transform(label)
image = image.float()