π₯ | Document β’ Executor β’ Flow β’ Wrap-up |
π£ | Feed Data β’ Fetch Result β’ Add Logic β’ Inter & Intra Parallelism β’ Decentralize β’ Asynchronous |
π€ | CRUD Functions |
π₯ | Customize Encoder β’ Test Encoder β’ Parallelism & Batching β’ Add Data Indexer β’ Compose Flow from YAML β’ Search β’ Evaluation β’ Flow Optimization β’ REST Interface |
These code snippets provide a short introduction to Jina's functionality and design framework. To run a snippet, just click the "run" button next to the snippet.
Document, Executor, Flow are the three fundamental concepts in Jina.
- Document is the basic data type in Jina;
- Executor is how Jina processes Documents;
- Flow is how Jina streamlines and scales Executors.
Document
is Jina's primitive data type. It can contain text, image, array, embedding, URI, and be accompanied by rich meta information. To construct a Document, you can use:
import numpy
from jina import Document
doc1 = Document(content=text_from_file, mime_type='text/x-python') # a text document contains python code
doc2 = Document(embedding=numpy.random.random([10, 10])) # a ndarray document
A Document can be recursed both vertically and horizontally to have nested Documents and matched Documents. To better see the Document's recursive structure, you can use .plot()
function. If you are using JupyterLab/Notebook, all Document objects will be auto-rendered.
Click here to see more about MultimodalDocument
A MultimodalDocument
is a document composed of multiple Document
from different modalities (e.g. text, image, audio).
Jina provides multiple ways to build a multimodal Document. For example, you can provide the modality names and the content in a dict
:
from jina import MultimodalDocument
document = MultimodalDocument(modality_content_map={
'title': 'my holiday picture',
'description': 'the family having fun on the beach',
'image': PIL.Image.open('path/to/image.jpg')
})
One can also compose a MultimodalDocument
from multiple Document
directly:
from jina.types import Document, MultimodalDocument
doc_title = Document(content='my holiday picture', modality='title')
doc_desc = Document(content='the family having fun on the beach', modality='description')
doc_img = Document(content=PIL.Image.open('path/to/image.jpg'), modality='image')
doc_img.tags['date'] = '10/08/2019'
document = MultimodalDocument(chunks=[doc_title, doc_description, doc_img])
To extract fusion embeddings from different modalities Jina provides BaseMultiModalEncoder
abstract class, which has a unique encode
interface.
def encode(self, *data: 'numpy.ndarray', **kwargs) -> 'numpy.ndarray':
...
MultimodalDriver
provides data
to the MultimodalDocument
in the correct expected order. In this example below, image
embedding is passed to the encoder as the first argument, and text
as the second.
jtype: MyMultimodalEncoder
with:
positional_modality: ['image', 'text']
requests:
on:
[IndexRequest, SearchRequest]:
- jtype: MultiModalDriver {}
Interested readers can refer to jina-ai/example
: how to build a multimodal search engine for image retrieval using TIRG (Composing Text and Image for Image Retrieval) for the usage of MultimodalDriver
and BaseMultiModalEncoder
in practice.
Executor is the algorithmic component in Jina: it defines how Jina processes a Document.
Here is a simple executor that returns [1,2]
or [4,5]
depending on the text
attribute of the Document.
import numpy as np
from jina import Executor, requests
class MyExecutor(Executor):
@requests
def foo(self, text):
return [{'embedding': np.ndarray([1, 2]) if t == 'hello' else
np.ndarray([3, 4])} for t in text]
text
, embedding
are Document
attributes. Using text
as the argument automatically tells Jina to fetch the same name attribute from the Document objects. Using embedding
in the return tells Jina to write results into Document.embedding
. You can look up all available attributes by Document.get_all_attributes()
. Of course, you can have multiple arguments defined in foo()
.
Flow connects Executors from different machines and scales them up. To create a new Flow:
from jina import Flow
f = Flow().add(uses=MyExecutor)
This creates a simple Flow with MyExecutor
defined above. You can chain multiple .add()
s in a Flow.
To visualize the Flow, simply chain it with .plot('my-flow.svg')
. If you are using a Jupyter notebook, the Flow object will be displayed inline without plot
.
Now let's combine the three concepts together.
import numpy as np
from jina import Document, Executor, Flow, requests
class MyExecutor(Executor):
@requests
def foo(self, text):
return [{'embedding': np.ndarray([1, 2]) if t == 'hello' else np.ndarray([3, 4])} for t in text]
with Flow().add(uses=MyExecutor) as f:
f.index([Document(text='hello'), Document(text='world')], on_done=print)
Get the vibe? Now we're talking! Let's learn more about the basic concepts and features of Jina:
To use a Flow, open it via with
context manager, like you would open a file in Python. Now let's create some empty Documents and index them:
from jina import Document
with Flow().add() as f:
f.index((Document() for _ in range(10)))
Flow supports CRUD operations: index
, search
, update
, delete
. In addition, it also provides sugary syntax on ndarray
, csv
, ndjson
and arbitrary files.
Input |
Example of index /search
|
Explain |
numpy.ndarray
|
with f:
f.index_ndarray(numpy.random.random([4,2])) # search_ndarray() |
Input four |
CSV |
with f, open('index.csv') as fp:
f.index_csv(fp, field_resolver={'pic_url': 'uri'}) # search_csv() |
Each line in |
JSON Lines/ndjson /LDJSON
|
with f, open('index.ndjson') as fp:
f.index_ndjson(fp, field_resolver={'question_id': 'id'}) # search_ndjson() |
Each line in |
Files with wildcards |
with f:
f.index_files(['/tmp/*.mp4', '/tmp/*.pdf']) # search_files() |
Each file captured is constructed as a |
Once a request is done, callback functions are fired. Jina Flow implements a Promise-like interface: You can add callback functions on_done
, on_error
, on_always
to hook different events. In the example below, our Flow passes the message then prints the result when successful. If something goes wrong, it beeps. Finally, the result is written to output.txt
.
def beep(*args):
# make a beep sound
import os
os.system('echo -n "\a";')
with Flow().add() as f, open('output.txt', 'w') as fp:
f.index(numpy.random.random([4, 5, 2]),
on_done=print, on_error=beep, on_always=lambda x: fp.write(x.json()))
To add logic to the Flow, use the uses
parameter to add an Executor. uses
accepts multiple value types including class name, Docker image, (inline) YAML or built-in shortcut.
f = (Flow().add(uses=MyBertEncoder) # the class of a Jina Executor
.add(uses='docker://jinahub/pod.encoder.dummy_mwu_encoder:0.0.6-0.9.3') # the image name
.add(uses='myencoder.yml') # YAML serialization of a Jina Executor
.add(uses='!WaveletTransformer | {freq: 20}') # inline YAML config
.add(uses='_pass') # built-in shortcut executor
.add(uses={'__cls': 'MyBertEncoder', 'with': {'param': 1.23}})) # dict config object with __cls keyword
The power of Jina lies in its decentralized architecture: Each add
creates a new Executor, and these Executors can be run as a local thread/process, a remote process, inside a Docker container, or even inside a remote Docker container.
Chaining .add()
s creates a sequential Flow. For parallelism, use the needs
parameter:
f = (Flow().add(name='p1', needs='gateway')
.add(name='p2', needs='gateway')
.add(name='p3', needs='gateway')
.needs(['p1','p2', 'p3'], name='r1').plot())
p1
, p2
, p3
now subscribe to Gateway
and conduct their work in parallel. The last .needs()
blocks all Executors until they finish their work. Note: parallelism can also be performed inside a Executor using parallel
:
f = (Flow().add(name='p1', needs='gateway')
.add(name='p2', needs='gateway')
.add(name='p3', parallel=3)
.needs(['p1','p3'], name='r1').plot())
A Flow does not have to be local-only: You can put any Executor to remote(s). In the example below, with the host
keyword gpu-exec
, is put to a remote machine for parallelization, whereas other Executors stay local. Extra file dependencies that need to be uploaded are specified via the upload_files
keyword.
123.456.78.9 |
# have docker installed
docker run --name=jinad --network=host -v /var/run/docker.sock:/var/run/docker.sock jinaai/jina:latest-daemon --port-expose 8000
to stop it
docker rm -f jinad |
Local |
import numpy as np
from jina import Flow
f = (Flow()
.add()
.add(name='gpu_exec',
uses='mwu_encoder.yml',
host='123.456.78.9:8000',
parallel=2,
upload_files=['mwu_encoder.py'])
.add())
with f:
f.index_ndarray(np.random.random([10, 100]), output=print) |
We provide a demo server on cloud.jina.ai:8000
, give the following snippet a try!
from jina import Flow
with Flow().add().add(host='cloud.jina.ai:8000') as f:
f.index(['hello', 'world'])
While synchronous from outside, Jina runs asynchronously under the hood: it manages the eventloop(s) for scheduling the jobs. If the user wants more control over the eventloop, then AsyncFlow
can be used.
Unlike Flow
, the CRUD of AsyncFlow
accepts input and output functions as async generators. This is useful when your data sources involve other asynchronous libraries (e.g. motor for MongoDB):
from jina import AsyncFlow
async def input_function():
for _ in range(10):
yield Document()
await asyncio.sleep(0.1)
with AsyncFlow().add() as f:
async for resp in f.index(input_function):
print(resp)
AsyncFlow
is particularly useful when Jina and another heavy-lifting job are running concurrently:
async def run_async_flow_5s(): # WaitDriver pause 5s makes total roundtrip ~5s
with AsyncFlow().add(uses='- !WaitDriver {}') as f:
async for resp in f.index_ndarray(numpy.random.random([5, 4])):
print(resp)
async def heavylifting(): # total roundtrip takes ~5s
print('heavylifting other io-bound jobs, e.g. download, upload, file io')
await asyncio.sleep(5)
print('heavylifting done after 5s')
async def concurrent_main(): # about 5s; but some dispatch cost, can't be just 5s, usually at <7s
await asyncio.gather(run_async_flow_5s(), heavylifting())
if __name__ == '__main__':
asyncio.run(concurrent_main())
AsyncFlow
is very useful when using Jina inside a Jupyter Notebook. where it can run out-of-the-box.
In Jina, CRUD corresponds to four functions: index
(create), search
(read), update
, and delete
. See below as an example:
import numpy as np
from jina import Document
docs = [Document(id='π²', embedding=np.array([0, 0]), tags={'guardian': 'Azure Dragon', 'position': 'East'}),
Document(id='π¦', embedding=np.array([1, 0]), tags={'guardian': 'Vermilion Bird', 'position': 'South'}),
Document(id='π’', embedding=np.array([0, 1]), tags={'guardian': 'Black Tortoise', 'position': 'North'}),
Document(id='π―', embedding=np.array([1, 1]), tags={'guardian': 'White Tiger', 'position': 'West'})]
Let's build a Flow with a simple indexer:
from jina import Flow
f = Flow().add(uses='_index')
Document
and Flow
are basic concepts in Jina, which will be explained later. _index
is a built-in embedding + structured storage that you can use out of the box.
Index |
# save four docs (both embedding and structured info) into storage
with f:
f.index(docs, on_done=print) |
Search |
# retrieve top-3 neighbours of π², this print π²π¦π’ with score 0, 1, 1 respectively
with f:
f.search(docs[0], top_k=3, on_done=lambda x: print(x.docs[0].matches)) {"id": "π²", "tags": {"guardian": "Azure Dragon", "position": "East"}, "embedding": {"dense": {"buffer": "AAAAAAAAAAAAAAAAAAAAAA==", "shape": [2], "dtype": "<i8"}}, "score": {"opName": "NumpyIndexer", "refId": "π²"}, "adjacency": 1}
{"id": "π¦", "tags": {"position": "South", "guardian": "Vermilion Bird"}, "embedding": {"dense": {"buffer": "AQAAAAAAAAAAAAAAAAAAAA==", "shape": [2], "dtype": "<i8"}}, "score": {"value": 1.0, "opName": "NumpyIndexer", "refId": "π²"}, "adjacency": 1}
{"id": "π’", "tags": {"guardian": "Black Tortoise", "position": "North"}, "embedding": {"dense": {"buffer": "AAAAAAAAAAABAAAAAAAAAA==", "shape": [2], "dtype": "<i8"}}, "score": {"value": 1.0, "opName": "NumpyIndexer", "refId": "π²"}, "adjacency": 1} |
Update |
# update π² embedding in the storage
docs[0].embedding = np.array([1, 1])
with f:
f.update(docs[0]) |
Delete |
# remove π¦π² Documents from the storage
with f:
f.delete(['π¦', 'π²']) |
For further details about CRUD functionality, check out docs.jina.ai
That's all you need to know for understanding the magic behind hello-world
. Now let's dive deeper into it!
Let's first build a naive image encoder that embeds images into vectors using an orthogonal projection. To do this, we simply inherit from BaseImageEncoder
, a base class from the jina.executors.encoders
module. We then override its __init__()
and encode()
methods.
import numpy as np
from jina import Encoder
class MyEncoder(Encoder):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
np.random.seed(1337)
H = np.random.rand(784, 64)
u, s, vh = np.linalg.svd(H, full_matrices=False)
self.oth_mat = u @ vh
def encode(self, content: 'np.ndarray', *args, **kwargs):
return (content.reshape([-1, 784]) / 255) @ self.oth_mat
Jina provides a family of Executor
classes, which summarize frequently-used algorithmic components in neural search. This family consists of encoders, indexers, crafters, evaluators, and classifiers, each with a well-designed interface. You can find the list of all built-in executors here. If they don't meet your needs, inheriting from one of them is the easiest way to bootstrap your own Executor. Simply use our Jina Hub CLI:
pip install jina[hub] && jina hub new
Let's test our encoder in the Flow with some synthetic data:
def validate(req):
assert len(req.docs) == 100
assert NdArray(req.docs[0].embedding).value.shape == (64,)
f = Flow().add(uses='MyEncoder')
with f:
f.index_ndarray(numpy.random.random([100, 28, 28]), on_done=validate)
All good! Now our validate
function confirms that all 100 28x28 synthetic images have been embedded into 100x64 vectors.
By setting a larger input, you can play with request_size
and parallel
:
f = Flow().add(uses='MyEncoder', parallel=10)
with f:
f.index_ndarray(numpy.random.random([60000, 28, 28]), request_size=1024)
Now we need to add an indexer to store all the embeddings and the images for later retrieval. Jina provides a simple numpy
-powered vector indexer NumpyIndexer
, and a key-value indexer BinaryPbIndexer
. We can combine them in a single YAML file:
jtype: CompoundIndexer
components:
- jtype: NumpyIndexer
with:
index_filename: vec.gz
- jtype: BinaryPbIndexer
with:
index_filename: chunk.gz
metas:
workspace: ./
jtype:
defines the class name of the structure;with:
defines arguments for initializing this class object.
π‘ Config your IDE to enable autocompletion on YAML
Essentially, the above YAML config is equivalent to the following Python code:
from jina.executors.indexers.vector import NumpyIndexer
from jina.executors.indexers.keyvalue import BinaryPbIndexer
from jina.executors.indexers import CompoundIndexer
a = NumpyIndexer(index_filename='vec.gz')
b = BinaryPbIndexer(index_filename='vec.gz')
c = CompoundIndexer()
c.components = lambda: [a, b]
Now let's add our indexer YAML file to the Flow with .add(uses=)
. Let's also add two shards to the indexer to improve its scalability:
f = Flow().add(uses='MyEncoder', parallel=2).add(uses='myindexer.yml', shards=2).plot()
When you have many arguments, constructing a Flow in Python can get cumbersome. In that case, you can simply move all arguments into one flow.yml
:
jtype: Flow
version: '1.0'
pods:
- name: encode
uses: MyEncoder
parallel: 2
- name:index
uses: myindexer.yml
shards: 2
And then load it in Python:
f = Flow.load_config('flow.yml')
Querying a Flow is similar to what we did with indexing. Simply load the query Flow and switch from f.index
to f.search
. Say you want to retrieve the top 50 documents that are similar to your query and then plot them in HTML:
f = Flow.load_config('flows/query.yml')
with f:
f.search_ndarray(numpy.random.random([10, 28, 28]), shuffle=True, on_done=plot_in_html, top_k=50)
To compute precision recall on the retrieved result, you can add _eval_pr
, a built-in evaluator for computing precision & recall.
f = (Flow().add(...)
.add(uses='_eval_pr'))
You can construct an iterator of query and ground-truth pairs and feed to the flow f
, via:
from jina import Document
def query_generator():
for _ in range(10):
q = Document()
# now construct expect matches as ground-truth
gt = Document(q, copy=True) # make sure 'gt' is identical to 'q'
gt.matches.append(...)
yield q, gt
f.search(query_iterator, ...)
Flow Optimization gets the most out of your data. It allows hyper parameter optimization on a complete search Flow, including indexing and querying. For example, choosing a middle layer of a model often results in richer semantic embeddings. Let's test through all layers of a model.
Before starting, we need the optimizer requirements installed:
pip install jina[optimizer]
First, let's get all needed imports and the Flow definition:
import numpy as np
from jina import Document
from jina.executors.encoders import BaseEncoder
from jina.optimizers import FlowOptimizer, MeanEvaluationCallback
from jina.optimizers.flow_runner import SingleFlowRunner
flow = '''jtype: Flow
version: '1'
pods:
- uses:
jtype: SimpleEncoder
with:
layer: ${{JINA_ENCODER_LAYER}}
- uses: EuclideanEvaluator
'''
ENCODER_LAYER
allows the optimizer to change the Encoder configuration with each iteration.
The EuclideanEvaluator
scores the Documents according to a given groundtruth.
Beware, that the Pod definition is done via the inline syntax of Jina.
Now we will fake a model with three layers. For simplicity each layer only consists of a single integer which is taken as the embedding.
class SimpleEncoder(BaseEncoder):
ENCODE_LOOKUP = {
'π²': [1, 3, 5],
'π¦': [2, 4, 7],
'π’': [0, 2, 5],
}
def __init__(self, layer=0, *args, **kwargs):
super().__init__(*args, **kwargs)
self._layer = layer
def encode(self, data, *args, **kwargs) -> 'np.ndarray':
return np.array([[self.ENCODE_LOOKUP[data[0]][self._layer]]])
Futhermore, we define the optimization parameters in parameter.yml
.
- !IntegerParameter
jaml_variable: JINA_ENCODER_LAYER
high: 2
low: 0
step_size: 1
For optimization, we need to run slight variations of the same Flow repeatedly, with the same data.
This is realized with a SingleFlowRunner
.
documents = [
(Document(content='π²'), Document(embedding=np.array([2]))),
(Document(content='π¦'), Document(embedding=np.array([3]))),
(Document(content='π’'), Document(embedding=np.array([3])))
]
runner = SingleFlowRunner(
flow, documents, 1, 'search', overwrite_workspace=True
)
The same Documents are used for each Flow Optimization step.
documents
consists of document, groundtruth
pairs.
The given embedding represents the perfect semantic embedding.
Now we are ready to start the optimization:
optimizer = FlowOptimizer(
flow_runner=runner,
parameter_yaml='parameter.yml',
evaluation_callback=MeanEvaluationCallback(),
n_trials=3,
direction='minimize',
seed=1
)
optimizer.optimize_flow()
The MeanEvaluationCallback
gathers the evaluations from all three sended Documents per run.
After each run, it returns the mean of the single evaluations.
Finally...
...
JINA@15892[I] Trial 2 finished with value: 1.6666666666666667
and parameters: {'JINA_ENCODER_LAYER': 0}.
Best is trial 0 with value: 1.0.
JINA@15892[I]:Number of finished trials: 3
JINA@15892[I]:Best trial: {'JINA_ENCODER_LAYER': 1}
JINA@15892[I]:Time to finish: 0:00:02.081710
Tada! The layer 1 is the best one.
For a more detailed guide please read our docs.
In practice, the query Flow and the client (i.e. data sender) are often physically separated. Moreover, the client may prefer to use a REST API rather than gRPC when querying. You can set port_expose
to a public port and turn on REST support with restful=True
:
f = Flow(port_expose=45678, restful=True)
with f:
f.block()
That is the essence behind jina hello fashion
. It is merely a taste of what Jina can do. Weβre really excited to see what you do with Jina! You can easily create a Jina project from templates with one terminal command:
pip install jina[hub] && jina hub new --type app
This creates a Python entrypoint, YAML configs and a Dockerfile. You can start from there.