"""Validate optical flow estimation performance on standard datasets."""
# =============================================================================
# Copyright 2021 Henrique Morimitsu
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
import logging
import sys
from argparse import ArgumentParser, Namespace
from pathlib import Path
from typing import Any, Dict, List, Optional
import cv2 as cv
import numpy as np
import pandas as pd
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm
import ptlflow
from ptlflow import get_model, get_model_reference
from ptlflow.models.base_model.base_model import BaseModel
from ptlflow.utils import flow_utils
from ptlflow.utils.io_adapter import IOAdapter
from ptlflow.utils.utils import (
add_datasets_to_parser,
config_logging,
get_list_of_available_models_list,
tensor_dict_to_numpy,
)
config_logging()
def _init_parser() -> ArgumentParser:
parser = ArgumentParser()
parser.add_argument(
"model",
type=str,
choices=["all", "select"] + get_list_of_available_models_list(),
help="Name of the model to use.",
)
parser.add_argument(
"--selection",
type=str,
nargs="+",
default=None,
help=(
"Used in combination with model=select. The select mode can be used to run the validation on multiple models "
"at once. Put a list of model names here separated by spaces."
),
)
parser.add_argument(
"--exclude",
type=str,
nargs="+",
default=None,
help=(
"Used in combination with model=all. A list of model names that will not be validated."
),
)
parser.add_argument(
"--output_path",
type=str,
default=str(Path("outputs/validate")),
help="Path to the directory where the validation results will be saved.",
)
parser.add_argument(
"--write_outputs",
action="store_true",
help="If set, the estimated flow is saved to disk.",
)
parser.add_argument(
"--show",
action="store_true",
help="If set, the results are shown on the screen.",
)
parser.add_argument(
"--flow_format",
type=str,
default="original",
choices=["flo", "png", "original"],
help=(
"The format to use when saving the estimated optical flow. If 'original', then the format will be the same "
+ "one the dataset uses for the groundtruth."
),
)
parser.add_argument(
"--max_forward_side",
type=int,
default=None,
help=(
"If max(height, width) of the input image is larger than this value, then the image is downscaled "
"before the forward and the outputs are bilinearly upscaled to the original resolution."
),
)
parser.add_argument(
"--scale_factor",
type=float,
default=None,
help=("Multiply the input image by this scale factor before forwarding."),
)
parser.add_argument(
"--max_show_side",
type=int,
default=1000,
help=(
"If max(height, width) of the output image is larger than this value, then the image is downscaled "
"before showing it on the screen."
),
)
parser.add_argument(
"--max_samples",
type=int,
default=None,
help=(
"Maximum number of samples per dataset will be used for calculating the metrics."
),
)
parser.add_argument(
"--reversed",
action="store_true",
help="To be combined with model all or select. Iterates over the list of models in reversed order",
)
parser.add_argument(
"--warm_start",
action="store_true",
help="If set, stores the previous estimation to be used a starting point for prediction.",
)
parser.add_argument(
"--fp16", action="store_true", help="If set, use half floating point precision."
)
parser.add_argument(
"--seq_val_mode",
type=str,
default="all",
choices=("all", "first", "middle", "last"),
help=(
"Used only when the model predicts outputs for more than one frame. Select which predictions will be used for evaluation."
),
)
parser.add_argument(
"--write_individual_metrics",
action="store_true",
help="If set, save a table of metrics for every image.",
)
return parser
[docs]
def generate_outputs(
args: Namespace,
inputs: Dict[str, torch.Tensor],
preds: Dict[str, torch.Tensor],
dataloader_name: str,
batch_idx: int,
metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""Display on screen and/or save outputs to disk, if required.
Parameters
----------
args : Namespace
The arguments with the required values to manage the outputs.
inputs : Dict[str, torch.Tensor]
The inputs loaded from the dataset (images, groundtruth).
preds : Dict[str, torch.Tensor]
The model predictions (optical flow and others).
dataloader_name : str
A string to identify from which dataloader these inputs came from.
batch_idx : int
Indicates in which position of the loader this input is.
metadata : Dict[str, Any], optional
Metadata about this input, if available.
"""
inputs = tensor_dict_to_numpy(inputs)
inputs["flows_viz"] = flow_utils.flow_to_rgb(inputs["flows"])[:, :, ::-1]
if inputs.get("flows_b") is not None:
inputs["flows_b_viz"] = flow_utils.flow_to_rgb(inputs["flows_b"])[:, :, ::-1]
preds = tensor_dict_to_numpy(preds)
preds["flows_viz"] = flow_utils.flow_to_rgb(preds["flows"])[:, :, ::-1]
if preds.get("flows_b") is not None:
preds["flows_b_viz"] = flow_utils.flow_to_rgb(preds["flows_b"])[:, :, ::-1]
if args.show:
_show(inputs, preds, args.max_show_side)
if args.write_outputs:
_write_to_file(args, preds, dataloader_name, batch_idx, metadata)
[docs]
def validate(args: Namespace, model: BaseModel) -> pd.DataFrame:
"""Perform the validation.
Parameters
----------
args : Namespace
Arguments to configure the model and the validation.
model : BaseModel
The model to be used for validation.
Returns
-------
pd.DataFrame
A DataFrame with the metric results.
See Also
--------
ptlflow.models.base_model.base_model.BaseModel : The parent class of the available models.
"""
model.eval()
if torch.cuda.is_available():
model = model.cuda()
if args.fp16:
model = model.half()
dataloaders = model.val_dataloader()
dataloaders = {
model.val_dataloader_names[i]: dataloaders[i] for i in range(len(dataloaders))
}
metrics_df = pd.DataFrame()
metrics_df["model"] = [args.model]
metrics_df["checkpoint"] = [args.pretrained_ckpt]
for dataset_name, dl in dataloaders.items():
metrics_mean = validate_one_dataloader(args, model, dl, dataset_name)
metrics_df[[f"{dataset_name}-{k}" for k in metrics_mean.keys()]] = list(
metrics_mean.values()
)
args.output_path.mkdir(parents=True, exist_ok=True)
metrics_df.T.to_csv(args.output_path / "metrics.csv", header=False)
metrics_df = metrics_df.round(3)
return metrics_df
[docs]
def validate_list_of_models(args: Namespace) -> None:
"""Perform the validation.
Parameters
----------
args : Namespace
Arguments to configure the list of models and the validation.
"""
metrics_df = pd.DataFrame()
model_names = _get_model_names(args)
if args.reversed:
model_names = reversed(model_names)
exclude = args.exclude
if exclude is None:
exclude = []
for mname in model_names:
if mname in exclude:
continue
logging.info(mname)
model_ref = ptlflow.get_model_reference(mname)
if hasattr(model_ref, "pretrained_checkpoints"):
ckpt_names = model_ref.pretrained_checkpoints.keys()
for cname in ckpt_names:
try:
logging.info(cname)
parser_tmp = model_ref.add_model_specific_args(parser)
args = parser_tmp.parse_args()
args.model = mname
args.pretrained_ckpt = cname
model_id = args.model
if args.pretrained_ckpt is not None:
model_id += f"_{args.pretrained_ckpt}"
args.output_path = Path(args.output_path) / model_id
model = get_model(mname, cname, args)
instance_metrics_df = validate(args, model)
metrics_df = pd.concat([metrics_df, instance_metrics_df])
args.output_path.parent.mkdir(parents=True, exist_ok=True)
if args.reversed:
metrics_df.to_csv(
args.output_path.parent / "metrics_all_rev.csv", index=False
)
else:
metrics_df.to_csv(
args.output_path.parent / "metrics_all.csv", index=False
)
except Exception as e: # noqa: B902
logging.warning("Skipping model %s due to exception %s", mname, e)
break
[docs]
@torch.no_grad()
def validate_one_dataloader(
args: Namespace,
model: BaseModel,
dataloader: DataLoader,
dataloader_name: str,
) -> Dict[str, float]:
"""Perform validation for all examples of one dataloader.
Parameters
----------
args : Namespace
Arguments to configure the model and the validation.
model : BaseModel
The model to be used for validation.
dataloader : DataLoader
The dataloader for the validation.
dataloader_name : str
A string to identify this dataloader.
Returns
-------
Dict[str, float]
The average metric values for this dataloader.
"""
metrics_sum = {}
metrics_individual = None
if args.write_individual_metrics:
metrics_individual = {"filename": [], "epe": [], "outlier": []}
with tqdm(dataloader) as tdl:
prev_preds = None
for i, inputs in enumerate(tdl):
if args.scale_factor is not None:
scale_factor = args.scale_factor
else:
scale_factor = (
None
if args.max_forward_side is None
else float(args.max_forward_side) / min(inputs["images"].shape[-2:])
)
io_adapter = IOAdapter(
model,
inputs["images"].shape[-2:],
target_scale_factor=scale_factor,
cuda=torch.cuda.is_available(),
fp16=args.fp16,
)
inputs = io_adapter.prepare_inputs(inputs=inputs, image_only=True)
inputs["prev_preds"] = prev_preds
preds = model(inputs)
if args.warm_start:
if (
"is_seq_start" in inputs["meta"]
and inputs["meta"]["is_seq_start"][0]
):
prev_preds = None
else:
prev_preds = preds
for k, v in prev_preds.items():
if isinstance(v, torch.Tensor):
prev_preds[k] = v.detach()
inputs = io_adapter.unscale(inputs, image_only=True)
preds = io_adapter.unscale(preds)
if inputs["flows"].shape[1] > 1 and args.seq_val_mode != "all":
if args.seq_val_mode == "first":
k = 0
elif args.seq_val_mode == "middle":
k = inputs["images"].shape[1] // 2
elif args.seq_val_mode == "last":
k = inputs["flows"].shape[1] - 1
for key, val in inputs.items():
if key == "meta":
inputs["meta"]["image_paths"] = inputs["meta"]["image_paths"][
k : k + 1
]
elif key == "images":
inputs[key] = val[:, k : k + 2]
elif isinstance(val, torch.Tensor) and len(val.shape) == 5:
inputs[key] = val[:, k : k + 1]
metrics = model.val_metrics(preds, inputs)
for k in metrics.keys():
if metrics_sum.get(k) is None:
metrics_sum[k] = 0.0
metrics_sum[k] += metrics[k].item()
tdl.set_postfix(
epe=metrics_sum["val/epe"] / (i + 1),
outlier=metrics_sum["val/outlier"] / (i + 1),
)
filename = ""
if "sintel" in inputs["meta"]["dataset_name"][0].lower():
filename = f'{Path(inputs["meta"]["image_paths"][0][0]).parent.name}/'
filename += Path(inputs["meta"]["image_paths"][0][0]).stem
if metrics_individual is not None:
metrics_individual["filename"].append(filename)
metrics_individual["epe"].append(metrics["val/epe"].item())
metrics_individual["outlier"].append(metrics["val/outlier"].item())
generate_outputs(
args, inputs, preds, dataloader_name, i, inputs.get("meta")
)
if args.max_samples is not None and i >= (args.max_samples - 1):
break
if args.write_individual_metrics:
ind_df = pd.DataFrame(metrics_individual)
args.output_path.mkdir(parents=True, exist_ok=True)
ind_df.to_csv(
Path(args.output_path) / f"{dataloader_name}_epe_outlier.csv", index=None
)
metrics_mean = {}
for k, v in metrics_sum.items():
metrics_mean[k] = v / len(dataloader)
return metrics_mean
def _get_model_names(args: Namespace) -> List[str]:
if args.model == "all":
model_names = ptlflow.models_dict.keys()
elif args.model == "select":
if args.selection is None:
raise ValueError(
"When select is chosen, model names must be provided to --selection."
)
model_names = args.selection
return model_names
def _show(
inputs: Dict[str, torch.Tensor], preds: Dict[str, torch.Tensor], max_show_side: int
) -> None:
for k, v in inputs.items():
if isinstance(v, np.ndarray) and (
len(v.shape) == 2 or v.shape[2] == 1 or v.shape[2] == 3
):
if max(v.shape[:2]) > max_show_side:
scale_factor = float(max_show_side) / max(v.shape[:2])
v = cv.resize(
v, (int(scale_factor * v.shape[1]), int(scale_factor * v.shape[0]))
)
cv.imshow(k, v)
for k, v in preds.items():
if isinstance(v, np.ndarray) and (
len(v.shape) == 2 or v.shape[2] == 1 or v.shape[2] == 3
):
if max(v.shape[:2]) > max_show_side:
scale_factor = float(max_show_side) / max(v.shape[:2])
v = cv.resize(
v, (int(scale_factor * v.shape[1]), int(scale_factor * v.shape[0]))
)
cv.imshow("pred_" + k, v)
cv.waitKey(1)
def _write_to_file(
args: Namespace,
preds: Dict[str, torch.Tensor],
dataloader_name: str,
batch_idx: int,
metadata: Optional[Dict[str, Any]] = None,
) -> None:
out_root_dir = Path(args.output_path) / dataloader_name
extra_dirs = ""
if metadata is not None:
img_path = Path(metadata["image_paths"][0][0])
image_name = img_path.stem
if "sintel" in dataloader_name:
seq_name = img_path.parts[-2]
extra_dirs = seq_name
else:
image_name = f"{batch_idx:08d}"
if args.flow_format != "original":
flow_ext = args.flow_format
else:
if "kitti" in dataloader_name or "hd1k" in dataloader_name:
flow_ext = "png"
else:
flow_ext = "flo"
for k, v in preds.items():
if isinstance(v, np.ndarray):
out_dir = out_root_dir / k / extra_dirs
out_dir.mkdir(parents=True, exist_ok=True)
if k == "flows" or k == "flows_b":
flow_utils.flow_write(out_dir / f"{image_name}.{flow_ext}", v)
elif len(v.shape) == 2 or (
len(v.shape) == 3 and (v.shape[2] == 1 or v.shape[2] == 3)
):
if v.max() <= 1:
v = v * 255
cv.imwrite(str(out_dir / f"{image_name}.png"), v.astype(np.uint8))
if __name__ == "__main__":
parser = _init_parser()
# TODO: It is ugly that the model has to be gotten from the argv rather than the argparser.
# However, I do not see another way, since the argparser requires the model to load some of the args.
FlowModel = None
if len(sys.argv) > 1 and sys.argv[1] not in ["-h", "--help", "all", "select"]:
FlowModel = get_model_reference(sys.argv[1])
parser = FlowModel.add_model_specific_args(parser)
add_datasets_to_parser(parser, "datasets.yml")
args = parser.parse_args()
if args.model not in ["all", "select"]:
model_id = args.model
if args.pretrained_ckpt is not None:
model_id += f"_{Path(args.pretrained_ckpt).stem}"
if args.max_forward_side is not None:
model_id += f"_maxside{args.max_forward_side}"
if args.scale_factor is not None:
model_id += f"_scale{args.scale_factor}"
args.output_path = Path(args.output_path) / model_id
model = get_model(sys.argv[1], args.pretrained_ckpt, args)
args.output_path.mkdir(parents=True, exist_ok=True)
validate(args, model)
else:
validate_list_of_models(args)