Shortcuts

Source code for talib.finetune.bi_tuning

"""
@author: Junguang Jiang
@contact: [email protected]
"""
import torch
import torch.nn as nn
from torch.nn.functional import normalize
from common.modules.classifier import Classifier as ClassifierBase


class Classifier(ClassifierBase):
    """Classifier class for Bi-Tuning.

    Args:
        backbone (torch.nn.Module): Any backbone to extract 2-d features from data
        num_classes (int): Number of classes
        projection_dim (int, optional): Dimension of the projector head. Default: 128
        finetune (bool): Whether finetune the classifier or train from scratch. Default: True

    .. note::
        The learning rate of this classifier is set 10 times to that of the feature extractor for better accuracy
        by default. If you have other optimization strategies, please over-ride :meth:`~Classifier.get_parameters`.

    Inputs:
        - x (tensor): input data fed to `backbone`

    Outputs:
        In the training mode,
            - y: classifier's predictions
            - z: projector's predictions
            - hn: normalized features after `bottleneck` layer and before `head` layer
        In the eval mode,
            - y: classifier's predictions

    Shape:
        - Inputs: (minibatch, *) where * means, any number of additional dimensions
        - y: (minibatch, `num_classes`)
        - z: (minibatch, `projection_dim`)
        - hn: (minibatch, `features_dim`)

    """
    def __init__(self, backbone: nn.Module, num_classes: int, projection_dim=128, finetune=True, pool_layer=None):
        head = nn.Linear(backbone.out_features, num_classes)
        head.weight.data.normal_(0, 0.01)
        head.bias.data.fill_(0.0)
        super(Classifier, self).__init__(backbone, num_classes=num_classes, head=head, finetune=finetune, pool_layer=pool_layer)
        self.projector = nn.Linear(backbone.out_features, projection_dim)
        self.projection_dim = projection_dim

    def forward(self, x: torch.Tensor):
        batch_size = x.shape[0]
        h = self.backbone(x)
        h = self.pool_layer(h)
        h = self.bottleneck(h)
        y = self.head(h)
        z = normalize(self.projector(h), dim=1)
        hn = torch.cat([h, torch.ones(batch_size, 1, dtype=torch.float).to(h.device)], dim=1)
        hn = normalize(hn, dim=1)
        if self.training:
            return y, z, hn
        else:
            return y

    def get_parameters(self, base_lr=1.0):
        """A parameter list which decides optimization hyper-parameters,
            such as the relative learning rate of each layer
        """
        params = [
            {"params": self.backbone.parameters(), "lr": 0.1 * base_lr if self.finetune else 1.0 * base_lr},
            {"params": self.bottleneck.parameters(), "lr": 1.0 * base_lr},
            {"params": self.head.parameters(), "lr": 1.0 * base_lr},
            {"params": self.projector.parameters(), "lr": 0.1 * base_lr if self.finetune else 1.0 * base_lr},
        ]

        return params


[docs]class Bituning(nn.Module): """ Bi-Tuning Module in `Bi-tuning of Pre-trained Representations <https://arxiv.org/abs/2011.06182?utm_source=feedburner&utm_medium=feed&utm_campaign=Feed%3A+arxiv%2FQSXk+%28ExcitingAds%21+cs+updates+on+arXiv.org%29>`_. Args: encoder_q (Classifier): Query encoder. encoder_k (Classifier): Key encoder. num_classes (int): Number of classes K (int): Queue size. Default: 40 m (float): Momentum coefficient. Default: 0.999 T (float): Temperature. Default: 0.07 Inputs: - im_q (tensor): input data fed to `encoder_q` - im_k (tensor): input data fed to `encoder_k` - labels (tensor): classification labels of input data Outputs: y_q, logits_z, logits_y, labels_c - y_q: query classifier's predictions - logits_z: projector's predictions on both positive and negative samples - logits_y: classifier's predictions on both positive and negative samples - labels_c: contrastive labels Shape: - im_q, im_k: (minibatch, *) where * means, any number of additional dimensions - labels: (minibatch, ) - y_q: (minibatch, `num_classes`) - logits_z: (minibatch, 1 + `num_classes` x `K`, `projection_dim`) - logits_y: (minibatch, 1 + `num_classes` x `K`, `num_classes`) - labels_c: (minibatch, 1 + `num_classes` x `K`) """ def __init__(self, encoder_q: Classifier, encoder_k: Classifier, num_classes, K=40, m=0.999, T=0.07): super(Bituning, self).__init__() self.K = K self.m = m self.T = T self.num_classes = num_classes # create the encoders # num_classes is the output fc dimension self.encoder_q =encoder_q self.encoder_k = encoder_k for param_q, param_k in zip(self.encoder_q.parameters(), self.encoder_k.parameters()): param_k.data.copy_(param_q.data) # initialize param_k.requires_grad = False # not update by gradient # create the queue self.register_buffer("queue_h", torch.randn(encoder_q.features_dim+1, num_classes, K)) self.register_buffer("queue_z", torch.randn(encoder_q.projection_dim, num_classes, K)) self.queue_h = normalize(self.queue_h, dim=0) self.queue_z = normalize(self.queue_z, dim=0) self.register_buffer("queue_ptr", torch.zeros(num_classes, dtype=torch.long)) @torch.no_grad() def _momentum_update_key_encoder(self): """ Momentum update of the key encoder """ for param_q, param_k in zip(self.encoder_q.parameters(), self.encoder_k.parameters()): param_k.data = param_k.data * self.m + param_q.data * (1. - self.m) @torch.no_grad() def _dequeue_and_enqueue(self, h, z, label): batch_size = h.shape[0] assert self.K % batch_size == 0 # for simplicity ptr = int(self.queue_ptr[label]) # replace the keys at ptr (dequeue and enqueue) self.queue_h[:, label, ptr: ptr+batch_size] = h.T self.queue_z[:, label, ptr: ptr+batch_size] = z.T # move pointer self.queue_ptr[label] = (ptr + batch_size) % self.K def forward(self, im_q, im_k, labels): batch_size = im_q.size(0) device = im_q.device # compute query features y_q, z_q, h_q = self.encoder_q(im_q) # compute key features with torch.no_grad(): # no gradient to keys self._momentum_update_key_encoder() # update the key encoder y_k, z_k, h_k = self.encoder_k(im_k) # compute logits for projection z # current positive logits: Nx1 logits_z_cur = torch.einsum('nc,nc->n', [z_q, z_k]).unsqueeze(-1) queue_z = self.queue_z.clone().detach().to(device) # positive logits: N x K logits_z_pos = torch.Tensor([]).to(device) # negative logits: N x ((C-1) x K) logits_z_neg = torch.Tensor([]).to(device) for i in range(batch_size): c = labels[i] pos_samples = queue_z[:, c, :] # D x K neg_samples = torch.cat([queue_z[:, 0: c, :], queue_z[:, c+1:, :]], dim=1).flatten(start_dim=1) # D x ((C-1)xK) ith_pos = torch.einsum('nc,ck->nk', [z_q[i: i+1], pos_samples]) # 1 x D ith_neg = torch.einsum('nc,ck->nk', [z_q[i: i+1], neg_samples]) # 1 x ((C-1)xK) logits_z_pos = torch.cat((logits_z_pos, ith_pos), dim=0) logits_z_neg = torch.cat((logits_z_neg, ith_neg), dim=0) self._dequeue_and_enqueue(h_k[i:i+1], z_k[i:i+1], labels[i]) logits_z = torch.cat([logits_z_cur, logits_z_pos, logits_z_neg], dim=1) # Nx(1+C*K) # apply temperature logits_z /= self.T logits_z = nn.LogSoftmax(dim=1)(logits_z) # compute logits for classification y w = torch.cat([self.encoder_q.head.weight.data, self.encoder_q.head.bias.data.unsqueeze(-1)], dim=1) w = normalize(w, dim=1) # C x F # current positive logits: Nx1 logits_y_cur = torch.einsum('nk,kc->nc', [h_q, w.T]) # N x C queue_y = self.queue_h.clone().detach().to(device).flatten(start_dim=1).T # (C * K) x F logits_y_queue = torch.einsum('nk,kc->nc', [queue_y, w.T]).reshape(self.num_classes, -1, self.num_classes) # C x K x C logits_y = torch.Tensor([]).to(device) for i in range(batch_size): c = labels[i] # calculate the ith sample in the batch cur_sample = logits_y_cur[i:i+1, c] # 1 pos_samples = logits_y_queue[c, :, c] # K neg_samples = torch.cat([logits_y_queue[0: c, :, c], logits_y_queue[c + 1:, :, c]], dim=0).view(-1) # (C-1)*K ith = torch.cat([cur_sample, pos_samples, neg_samples]) # 1+C*K logits_y = torch.cat([logits_y, ith.unsqueeze(dim=0)], dim=0) logits_y /= self.T logits_y = nn.LogSoftmax(dim=1)(logits_y) # contrastive labels labels_c = torch.zeros([batch_size, self.K * self.num_classes + 1]).to(device) labels_c[:, 0:self.K + 1].fill_(1.0 / (self.K + 1)) return y_q, logits_z, logits_y, labels_c

Docs

Access comprehensive documentation for Transfer Learning Library

View Docs

Tutorials

Get started for Transfer Learning Library

Get Started