Skip to content

Commit

Permalink
Merge pull request #493 from stepan-anokhin/459-display-embeddings
Browse files Browse the repository at this point in the history
Display embeddings (#459)
  • Loading branch information
johnhbenetech authored Apr 21, 2022
2 parents f6fbffe + 483357a commit c7765fb
Show file tree
Hide file tree
Showing 80 changed files with 3,274 additions and 1,872 deletions.
33 changes: 12 additions & 21 deletions cli/cli/handlers/finder.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Optional

from cli.handlers.errors import handle_errors
from winnow.utils.logging import configure_logging_cli


class FinderCli:
Expand All @@ -13,30 +12,22 @@ def __init__(self, pipeline):
@handle_errors
def local_matches(self):
"""Find matches between local videos."""
from winnow.pipeline.generate_local_matches import generate_local_matches
from winnow.utils.files import scan_videos
import luigi

config = self._pipeline.config
configure_logging_cli(config.logging)
from winnow.pipeline.luigi.matches import MatchesReportTask

videos = scan_videos(config.sources.root, "**", extensions=config.sources.extensions)
generate_local_matches(files=videos, pipeline=self._pipeline)
luigi.build([MatchesReportTask(config=self._pipeline.config)], local_scheduler=True, workers=1)

def remote_matches(self, repo: Optional[str] = None, contributor: Optional[str] = None):
def remote_matches(self, repo: Optional[str] = None):
"""Find matches between local files and remote fingerprints."""
from winnow.pipeline.generate_remote_matches import generate_remote_matches
import logging.config
import luigi

config = self._pipeline.config
configure_logging_cli(config.logging)
from winnow.pipeline.luigi.matches import RemoteMatchesTask

if repo is not None:
repo = str(repo)

if contributor is not None:
contributor = str(contributor)

generate_remote_matches(
pipeline=self._pipeline,
repository_name=repo,
contributor_name=contributor,
logging.config.fileConfig("./logging.conf")
luigi.build(
[RemoteMatchesTask(config=self._pipeline.config, repository_name=repo)],
local_scheduler=True,
workers=1,
)
35 changes: 18 additions & 17 deletions cli/cli/handlers/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import os


class PipelineCli:
"""Process video files."""

Expand All @@ -9,20 +6,24 @@ def __init__(self, config):

def all(self):
"""Process all video files."""
from winnow.utils.logging import configure_logging_cli
from winnow.pipeline.detect_scenes import detect_scenes
from winnow.pipeline.generate_local_matches import generate_local_matches
from winnow.utils.files import scan_videos
from winnow.pipeline.extract_exif import extract_exif
from winnow.pipeline.pipeline_context import PipelineContext

configure_logging_cli(self._config.logging)
import luigi

# Resolve list of video files from the directory
absolute_root = os.path.abspath(self._config.sources.root)
videos = scan_videos(absolute_root, "**", extensions=self._config.sources.extensions)
from winnow.pipeline.luigi.exif import ExifTask
from winnow.pipeline.luigi.signatures import (
SignaturesTask,
DBSignaturesTask,
)
from winnow.pipeline.luigi.matches import MatchesReportTask, DBMatchesTask

pipeline_context = PipelineContext(self._config)
generate_local_matches(files=videos, pipeline=pipeline_context)
detect_scenes(files=videos, pipeline=pipeline_context)
extract_exif(videos, pipeline=pipeline_context)
luigi.build(
[
ExifTask(config=self._config),
SignaturesTask(config=self._config),
DBSignaturesTask(config=self._config),
MatchesReportTask(config=self._config),
DBMatchesTask(config=self._config),
],
local_scheduler=True,
workers=1,
)
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ services:
SECURITY_STORAGE_PATH: "/project/data/representations"
RPC_SERVER_HOST: "rpc"
RPC_SERVER_PORT: 50051
EMBEDDINGS_FOLDER: "/project/data/representations/embeddings"
volumes:
# Set the BENETECH_DATA_LOCATION environment variable to the path
# on your host machine where you placed your video files
Expand Down
1 change: 1 addition & 0 deletions environment-gpu.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ dependencies:
- trimap
- pacmap
- torch
- typing_extensions==4.1.1
- grpcio==1.43.0
- grpcio-tools==1.43.0

1 change: 1 addition & 0 deletions environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ dependencies:
- trimap
- pacmap
- torch
- typing_extensions==4.1.1
- grpcio==1.43.0
- grpcio-tools==1.43.0

Expand Down
3 changes: 0 additions & 3 deletions process_video_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
import luigi

from winnow.pipeline.luigi.download import DownloadFilesTask
from winnow.pipeline.pipeline_context import PipelineContext
from winnow.pipeline.process_urls import process_urls
from winnow.utils.config import resolve_config
from winnow.utils.logging import configure_logging_cli


@click.command()
Expand Down
110 changes: 110 additions & 0 deletions rpc/embeddings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from threading import Lock
from typing import List, Dict, Optional, Tuple

import numpy as np
from annoy import AnnoyIndex

import rpc.rpc_pb2 as proto
from winnow.pipeline.luigi.condense import CondensedFingerprints
from winnow.pipeline.luigi.embeddings import (
EmbeddingsTask,
UmapEmbeddingsTask,
TSNEEmbeddingsTask,
TriMapEmbeddingsTask,
PaCMAPEmbeddingsTask,
)
from winnow.pipeline.luigi.embeddings_annoy_index import (
EmbeddingsAnnoyIndexTask,
PaCMAPAnnoyIndexTask,
TriMAPAnnoyIndexTask,
UMAPAnnoyIndexTask,
TSNEAnnoyIndexTask,
)
from winnow.pipeline.luigi.utils import FileKeyDF
from winnow.pipeline.pipeline_context import PipelineContext
from winnow.storage.file_key import FileKey


class EmbeddingsIndex:
def __init__(self, annoy_index: AnnoyIndex, files: List[FileKey], positions: Dict[FileKey, np.ndarray]):
self._annoy_index: AnnoyIndex = annoy_index
self._files: List[FileKey] = files
self._positions: Dict[FileKey, np.ndarray] = positions

def query(
self,
x: float,
y: float,
max_count: int = 10,
max_distance: Optional[float] = None,
) -> List[proto.FoundNeighbor]:
if max_distance <= 0:
max_distance = None
indices, distances = self._annoy_index.get_nns_by_vector([x, y], max_count, include_distances=True)
files = [self._files[i] for i in indices]
results: List[proto.FoundNeighbor] = []
for file, distance in zip(files, distances):
if max_distance is not None and distance > max_distance:
break
x, y = self._positions[file]
results.append(
proto.FoundNeighbor(
file_path=file.path,
file_hash=file.hash,
distance=distance,
x=x,
y=y,
)
)
return results


class EmbeddingLoader:
def __init__(self, pipeline: PipelineContext):
self._pipeline: PipelineContext = pipeline
self._cache: Dict[str, EmbeddingsIndex] = {}
self._lock = Lock()

def load(self, algorithm: str) -> Optional[EmbeddingsIndex]:
with self._lock:
if algorithm not in self._cache:
index = self._do_load(algorithm)
if index is not None:
self._cache[algorithm] = index
return self._cache.get(algorithm)

def _do_load(self, algorithm: str) -> Optional[EmbeddingsIndex]:
"""Do load embeddings index."""
embeddings_task, annoy_task = self._task(algorithm)
if embeddings_task is None or annoy_task is None:
return None
embeddings: CondensedFingerprints = embeddings_task.output().read()
if embeddings is None:
return None

annoy_output = annoy_task.output()
annoy_paths, _ = annoy_output.latest_result
if annoy_paths is None:
return None

annoy_index_path, annoy_files_path = annoy_paths
annoy_index = AnnoyIndex(2, "euclidean")
annoy_index.load(annoy_index_path)
annoy_files_df = FileKeyDF.read_csv(annoy_files_path)
positions: Dict[FileKey, np.ndarray] = {}
for i, file_key in enumerate(embeddings.to_file_keys()):
positions[file_key] = embeddings.fingerprints[i]
return EmbeddingsIndex(annoy_index, FileKeyDF.to_file_keys(annoy_files_df), positions)

def _task(self, algorithm: str) -> Tuple[Optional[EmbeddingsTask], Optional[EmbeddingsAnnoyIndexTask]]:
config = self._pipeline.config
if algorithm == "pacmap":
return PaCMAPEmbeddingsTask(config=config), PaCMAPAnnoyIndexTask(config=config)
elif algorithm == "trimap":
return TriMapEmbeddingsTask(config=config), TriMAPAnnoyIndexTask(config=config)
elif algorithm == "umap":
return UmapEmbeddingsTask(config=config), UMAPAnnoyIndexTask(config=config)
elif algorithm == "t-sne":
return TSNEEmbeddingsTask(config=config), TSNEAnnoyIndexTask(config=config)
else:
return None, None
31 changes: 31 additions & 0 deletions rpc/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,35 @@ message StatusRequest {
// Service status response
message StatusResponse {
bool status = 1;
}

// Embeddings service
service Embeddings {
// Get nearest neighbors
rpc query_nearest_neighbors (NearestNeighborsRequest) returns (NearestNeighborsResults) {}
rpc get_status (EmbeddingsStatusRequest) returns (StatusResponse) {}
}

message NearestNeighborsRequest {
string algorithm = 1;
float x = 2;
float y = 3;
float max_distance = 4;
int32 max_count = 5;
}

message NearestNeighborsResults {
repeated FoundNeighbor neighbors = 1;
}

message FoundNeighbor {
string file_path = 1;
string file_hash = 2;
float x = 3;
float y = 4;
float distance = 5;
}

message EmbeddingsStatusRequest {
string algorithm = 1;
}
Loading

0 comments on commit c7765fb

Please sign in to comment.