Source code for ptlflow.utils.flow_utils

"""Basic functions to handle optical flow."""

# =============================================================================
# Copyright 2021 Henrique Morimitsu
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================

import json
from pathlib import Path
from typing import Any, IO, Optional, Sequence, Union

import cv2 as cv
import numpy as np
import torch

from .external import flowpy, raft, selflow, flow_IO
from . import flowpy_torch


[docs] def flow_to_rgb( flow: Union[np.ndarray, torch.Tensor], flow_max_radius: Optional[Union[float, torch.Tensor]] = None, background: str = "bright", custom_colorwheel: Optional[torch.Tensor] = None, ) -> Union[np.ndarray, torch.Tensor]: """Convert flows to RGB images. The input can be either numpy or torch tensors. This function is just a wrapper for flowpy and flowpy_torch. Parameters ---------- flow : np.ndarray or torch.Tensor If flow is a numpy array, then it must have 3 dimensions HWC (Height, Width, Channels) - notice it is channels last. If it is a torch tensor, then it has at least 3 dimensions in the ...CHW (..., Channels, Height, Width) layout, where ... represents any number of dimensions. Channel 0 should be the x-displacement. Channel 1 should be the y-displacement. flow_max_radius : float or torch.Tensor, optional Set the radius that gives the maximum color intensity, useful for comparing different flows. Default: The normalization is based on the input flow maximum radius per batch element. background : str, default 'bright' States if zero-valued flow should look 'bright' or 'dark'. custom_colorwheel : np.ndarray or torch.Tensor Use a custom colorwheel for specific hue transition lengths. By default, the default transition lengths are used. Returns ------- np.ndarray or torch.Tensor The RGB image representing the flow. It keeps the same dimensions and type as the input. See Also -------- ptlflow.utils.external.flowpy.flow_to_rgb ptlflow.utils.flowpy_torch.flow_to_rgb """ if isinstance(flow, np.ndarray): flow_rgb = flowpy.flow_to_rgb( flow, flow_max_radius, background, custom_colorwheel ) else: flow_rgb = flowpy_torch.flow_to_rgb( flow, flow_max_radius, background, custom_colorwheel ) return flow_rgb
[docs] def flow_read( input_data: Union[Sequence[Any], str, Path, IO], format: Optional[str] = None, ) -> np.ndarray: """Read optical flow from file. This is just a wrapper for flowpy (for .flo and .png) or raft (for pfm), added for convenience. Parameters ---------- input_data: Sequence[Any], str, Path or IO Path of the file to read or a sequence containing the path and extra information. format: str, optional Specify in what format the flow is read, accepted formats: "flo", "flo5", "kubric_png", "npz", "pfm", "png". If None, it is guessed from the file extension. Returns ------- numpy.ndarray 3D flow in the HWF (Height, Width, Flow) layout. flow[..., 0] is the x-displacement. flow[..., 1] is the y-displacement. See Also -------- ptlflow.utils.external.flowpy.flow_read ptlflow.utils.external.raft.read_pfm ptlflow.utils.external.flow_IO.readFlo5Flow write_pfm """ if (format is not None and format == "pfm") or str(input_data).endswith("pfm"): return raft.read_pfm(input_data) elif (format is not None and format == "flo5") or str(input_data).endswith("flo5"): return flow_IO.readFlo5Flow(input_data) elif (format is not None and format == "npy") or str(input_data).endswith("npy"): return np.load(input_data) elif format is not None and format == "kubric_png": return read_kubric_flow(input_data[0], input_data[1]) elif format is not None and format == "viper_npz": return read_viper_flow(input_data) elif (format is not None and format == "png128") or str(input_data).endswith( "png128" ): return flowpy.flow_read(input_data, format, png_flow_mult=128.0) else: return flowpy.flow_read(input_data, format)
[docs] def flow_write( output_file: Union[str, Path, IO], flow: np.ndarray, format: str = None ) -> None: """Write optical flow to file. This is just a wrapper for flowpy (for .flo and .png) or selflow (for pfm), added for convenience. Parameters ---------- output_file: str, Path or IO Path of the file to write or file object. flow: numpy.ndarray 3D flow in the HWF (Height, Width, Flow) layout. flow[..., 0] should be the x-displacement flow[..., 1] should be the y-displacement format: str, optional Specify in what format the flow is written, accepted formats: "png" or "flo" If None, it is guessed on the file extension See Also -------- ptlflow.utils.external.flowpy.flow_write """ if (format is not None and format == "pfm") or str(output_file).endswith("pfm"): selflow.write_pfm(output_file, flow) elif (format is not None and format == "flo5") or str(output_file).endswith("flo5"): flow_IO.writeFlo5File(flow, output_file) elif (format is not None and format == "npy") or str(output_file).endswith("npy"): np.save(output_file, flow) elif format is not None and format == "viper_npz": return write_viper_flow(output_file, flow) elif (format is not None and format == "png128") or str(output_file).endswith( "png128" ): flowpy.flow_write(output_file, flow, format, png_flow_mult=128.0) else: flowpy.flow_write(output_file, flow, format)
[docs] def read_kubric_flow( input_file: Union[str, Path, IO], flow_direction: str, ) -> np.ndarray: """Read optical flow in Kubric PNG format from file. Parameters ---------- input_file: str, Path or IO Path of the file to read or file object. flow_direction: str Either "backward_flow" or "forward_flow". Returns ------- numpy.ndarray 3D flow in the HWF (Height, Width, Flow) layout. flow[..., 0] is the x-displacement. flow[..., 1] is the y-displacement. """ with open(Path(input_file).parent / "data_ranges.json", "r") as f: data_ranges = json.load(f) flow_min = data_ranges[flow_direction]["min"] flow_max = data_ranges[flow_direction]["max"] flow = cv.imread(str(input_file), cv.IMREAD_UNCHANGED)[..., 1:].astype(np.float32) flow = flow / 65535 * (flow_max - flow_min) + flow_min return flow
[docs] def read_viper_flow(input_file: Union[str, Path, IO]) -> np.ndarray: """Read optical flow in VIPER npz format from file. Parameters ---------- input_file: str, Path or IO Path of the file to read or file object. Returns ------- numpy.ndarray 3D flow in the HWF (Height, Width, Flow) layout. flow[..., 0] is the x-displacement. flow[..., 1] is the y-displacement. """ flow_npz = np.load(input_file) flow = np.stack([flow_npz["u"], flow_npz["v"]], 2).astype(np.float32) flow[np.abs(flow) > 512] = np.nan return flow
def fb_check( forward_flow: Union[np.ndarray, torch.Tensor], backward_flow: Union[np.ndarray, torch.Tensor], threshold: float = 1.0, ): is_np_input = False if isinstance(forward_flow, np.ndarray): assert len(forward_flow.shape) == 3 forward_flow = torch.from_numpy(forward_flow).permute(2, 0, 1)[None] backward_flow = torch.from_numpy(backward_flow).permute(2, 0, 1)[None] is_np_input = True assert len(forward_flow.shape) == 4 coords = torch.meshgrid( torch.arange(forward_flow.shape[-2], dtype=torch.float32), torch.arange(forward_flow.shape[-1], dtype=torch.float32), indexing="ij", ) coords = torch.stack(coords[::-1], dim=0)[None] coords = coords + forward_flow coords = coords.permute(0, 2, 3, 1) warped_backward_flow, in_mask = raft.bilinear_sampler( backward_flow, coords, mask=True ) fb_diff = torch.norm(forward_flow + warped_backward_flow, p=2, dim=1) fb_mask = (fb_diff < threshold) & (in_mask[..., 0] > 0.5) if is_np_input: fb_mask = fb_mask[0].numpy() return fb_mask def write_viper_flow(output_file: Union[str, Path], flow: np.ndarray): flow = flow.astype(np.float16) np.savez(output_file, u=flow[..., 0], v=flow[..., 1])