Skip to content

Commit

Permalink
Merge pull request #962 from roboflow/feature/add_indexing_for_video_…
Browse files Browse the repository at this point in the history
…processing

Add changes to enable indexing video processing results in workflows CLI
  • Loading branch information
grzegorz-roboflow authored Jan 22, 2025
2 parents b175b5a + 1ef88a8 commit 9d835b4
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 13 deletions.
2 changes: 1 addition & 1 deletion inference/core/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.34.0"
__version__ = "0.35.0rc1"


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion inference_cli/lib/workflows/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def run_video_processing_with_workflows(

from inference_cli.lib.workflows.video_adapter import process_video_with_workflow

process_video_with_workflow(
_ = process_video_with_workflow(
input_video_path=input_video_path,
output_directory=output_directory,
output_file_type=output_file_type,
Expand Down
6 changes: 6 additions & 0 deletions inference_cli/lib/workflows/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,9 @@ class ImagesDirectoryProcessingDetails:
]
aggregated_results_path: Optional[str] = field(default=None)
failures_report_path: Optional[str] = field(default=None)


@dataclass(frozen=True)
class VideoProcessingDetails:
structured_results_file: Optional[str]
video_outputs: Optional[Dict[str, str]]
40 changes: 29 additions & 11 deletions inference_cli/lib/workflows/video_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from inference.core.utils.image_utils import load_image_bgr
from inference_cli.lib.utils import dump_jsonl
from inference_cli.lib.workflows.common import deduct_images, dump_objects_to_json
from inference_cli.lib.workflows.entities import OutputFileType
from inference_cli.lib.workflows.entities import OutputFileType, VideoProcessingDetails


def process_video_with_workflow(
Expand All @@ -31,10 +31,11 @@ def process_video_with_workflow(
max_fps: Optional[float] = None,
save_image_outputs_as_video: bool = True,
api_key: Optional[str] = None,
) -> None:
) -> VideoProcessingDetails:
structured_sink = WorkflowsStructuredDataSink(
output_directory=output_directory,
output_file_type=output_file_type,
numbers_of_streams=1,
)
progress_sink = ProgressSink.init(input_video_path=input_video_path)
sinks = [structured_sink.on_prediction, progress_sink.on_prediction]
Expand All @@ -61,9 +62,14 @@ def process_video_with_workflow(
pipeline.start(use_main_thread=True)
pipeline.join()
progress_sink.stop()
structured_sink.flush()
structured_results_file = structured_sink.flush()[0]
video_outputs = None
if video_sink is not None:
video_sink.release()
video_outputs = video_sink.release()
return VideoProcessingDetails(
structured_results_file=structured_results_file,
video_outputs=video_outputs,
)


class WorkflowsStructuredDataSink:
Expand All @@ -72,10 +78,12 @@ def __init__(
self,
output_directory: str,
output_file_type: OutputFileType,
numbers_of_streams: int = 1,
):
self._output_directory = output_directory
self._structured_results_buffer = defaultdict(list)
self._output_file_type = output_file_type
self._numbers_of_streams = numbers_of_streams

def on_prediction(
self,
Expand All @@ -94,11 +102,17 @@ def on_prediction(
}
self._structured_results_buffer[stream_idx].append(prediction)

def flush(self) -> None:
def flush(self) -> List[Optional[str]]:
stream_idx2file_path = {}
for stream_idx, buffer in self._structured_results_buffer.items():
self._flush_stream_buffer(stream_idx=stream_idx)

def _flush_stream_buffer(self, stream_idx: int) -> None:
file_path = self._flush_stream_buffer(stream_idx=stream_idx)
stream_idx2file_path[stream_idx] = file_path
return [
stream_idx2file_path.get(stream_idx)
for stream_idx in range(self._numbers_of_streams)
]

def _flush_stream_buffer(self, stream_idx: int) -> Optional[str]:
content = self._structured_results_buffer[stream_idx]
if len(content) == 0:
return None
Expand All @@ -114,6 +128,7 @@ def _flush_stream_buffer(self, stream_idx: int) -> None:
else:
dump_jsonl(path=file_path, content=content)
self._structured_results_buffer[stream_idx] = []
return file_path

def __del__(self):
self.flush()
Expand Down Expand Up @@ -182,11 +197,14 @@ def on_prediction(
image = load_image_bgr(value)
stream_sinks[key].write_frame(frame=image)

def release(self) -> None:
for stream_sinks in self._video_sinks.values():
for sink in stream_sinks.values():
def release(self) -> Optional[Dict[str, str]]:
stream_idx2keys_videos: Dict[int, Dict[str, str]] = defaultdict(dict)
for stream_idx, stream_sinks in self._video_sinks.items():
for key, sink in stream_sinks.items():
sink.release()
stream_idx2keys_videos[stream_idx][key] = sink.target_path
self._video_sinks = defaultdict(dict)
return stream_idx2keys_videos.get(0)

def __del__(self):
self.release()
Expand Down

0 comments on commit 9d835b4

Please sign in to comment.