|
|
import warnings |
|
|
warnings.filterwarnings("ignore") |
|
|
import os |
|
|
import glob |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import matplotlib.pyplot as plt |
|
|
import torch |
|
|
from torchvision import models, transforms |
|
|
from thop import profile |
|
|
is_flop_cal = False |
|
|
|
|
|
|
|
|
def get_activation(model, layer, input_img_data): |
|
|
model.eval() |
|
|
activations = [] |
|
|
inputs = [] |
|
|
|
|
|
def hook(module, input, output): |
|
|
activations.append(output) |
|
|
inputs.append(input[0]) |
|
|
|
|
|
hook_handle = layer.register_forward_hook(hook) |
|
|
with torch.no_grad(): |
|
|
model(input_img_data) |
|
|
hook_handle.remove() |
|
|
return activations, inputs |
|
|
|
|
|
def get_activation_map(frame, layer_name, resnet50, device): |
|
|
|
|
|
transform = transforms.Compose([ |
|
|
transforms.Resize((224, 224)), |
|
|
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), |
|
|
]) |
|
|
|
|
|
|
|
|
frame_tensor = transform(frame) |
|
|
|
|
|
|
|
|
if frame_tensor.dim() == 3: |
|
|
frame_tensor = frame_tensor.unsqueeze(0) |
|
|
|
|
|
|
|
|
|
|
|
layer_obj = eval(layer_name) |
|
|
activations, inputs = get_activation(resnet50, layer_obj, frame_tensor) |
|
|
activated_img = activations[0][0] |
|
|
activation_array = activated_img.cpu().numpy() |
|
|
|
|
|
|
|
|
if is_flop_cal == True: |
|
|
flops, params = profile(layer_obj, inputs=(inputs[0],), verbose=False) |
|
|
if params == 0 and isinstance(layer_obj, torch.nn.Conv2d): |
|
|
params = layer_obj.in_channels * layer_obj.out_channels * layer_obj.kernel_size[0] * layer_obj.kernel_size[1] |
|
|
if layer_obj.bias is not None: |
|
|
params += layer_obj.out_channels |
|
|
|
|
|
else: |
|
|
flops, params = None, None |
|
|
return activated_img, activation_array, flops, params |
|
|
|
|
|
def process_video_frame(video_name, frame, frame_number, all_layers, resnet50, device): |
|
|
|
|
|
activations_dict = {} |
|
|
total_flops = 0 |
|
|
total_params = 0 |
|
|
for layer_name in all_layers: |
|
|
fig_name = f"resnet50_feature_map_layer_{layer_name}" |
|
|
combined_name = f"resnet50_feature_map" |
|
|
|
|
|
activated_img, activation_array, flops, params = get_activation_map(frame, layer_name, resnet50, device) |
|
|
if is_flop_cal == True: |
|
|
total_flops += flops |
|
|
total_params += params |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
activations_dict[layer_name] = activated_img |
|
|
|
|
|
|
|
|
frame_npy_path = f'../features/resnet50/{video_name}/frame_{frame_number}_{combined_name}.npy' |
|
|
return activations_dict, frame_npy_path, total_flops, total_params |
|
|
|
|
|
def get_activation_png(png_path, fig_name, activated_img, n=8): |
|
|
fig = plt.figure(figsize=(10, 10)) |
|
|
|
|
|
|
|
|
for i in range(n): |
|
|
for j in range(n): |
|
|
idx = (n * i) + j |
|
|
if idx >= activated_img.shape[0]: |
|
|
break |
|
|
ax = fig.add_subplot(n, n, idx + 1) |
|
|
ax.imshow(activated_img[idx].cpu().numpy(), cmap='viridis') |
|
|
ax.axis('off') |
|
|
|
|
|
|
|
|
fig_path = f'{png_path}{fig_name}.png' |
|
|
print(fig_path) |
|
|
print("----------------" + '\n') |
|
|
plt.savefig(fig_path) |
|
|
plt.close() |
|
|
|
|
|
def get_activation_npy(npy_path, fig_name, activation_array): |
|
|
np.save(f'{npy_path}{fig_name}.npy', activation_array) |
|
|
|
|
|
if __name__ == '__main__': |
|
|
device_name = "gpu" |
|
|
if device_name == "gpu": |
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
else: |
|
|
device = torch.device("cpu") |
|
|
print(f"Running on {'GPU' if device.type == 'cuda' else 'CPU'}") |
|
|
|
|
|
resnet50 = models.resnet50(pretrained=True).to(device) |
|
|
|
|
|
all_layers = ['resnet50.conv1', |
|
|
'resnet50.layer1[0]', 'resnet50.layer1[1]', 'resnet50.layer1[2]', |
|
|
'resnet50.layer2[0]', 'resnet50.layer2[1]', 'resnet50.layer2[2]', 'resnet50.layer2[3]', |
|
|
'resnet50.layer3[0]', 'resnet50.layer3[1]', 'resnet50.layer3[2]', 'resnet50.layer3[3]', |
|
|
'resnet50.layer4[0]', 'resnet50.layer4[1]', 'resnet50.layer4[2]'] |
|
|
|
|
|
video_type = 'test' |
|
|
|
|
|
if video_type == 'test': |
|
|
metadata_path = "../../metadata/test_videos.csv" |
|
|
|
|
|
elif video_type == 'resolution_ugc': |
|
|
resolution = '360P' |
|
|
metadata_path = f"../../metadata/YOUTUBE_UGC_{resolution}_metadata.csv" |
|
|
else: |
|
|
metadata_path = f'../../metadata/{video_type.upper()}_metadata.csv' |
|
|
|
|
|
ugcdata = pd.read_csv(metadata_path) |
|
|
for i in range(len(ugcdata)): |
|
|
video_name = ugcdata['vid'][i] |
|
|
sampled_frame_path = os.path.join('../..', 'video_sampled_frame', 'sampled_frame', f'{video_name}') |
|
|
|
|
|
print(f"Processing video: {video_name}") |
|
|
image_paths = glob.glob(os.path.join(sampled_frame_path, f'{video_name}_*.png')) |
|
|
frame_number = 0 |
|
|
for image in image_paths: |
|
|
print(f"{image}") |
|
|
frame_number += 1 |
|
|
process_video_frame(video_name, image, frame_number, all_layers, resnet50, device) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|