Skip to content

Commit

Permalink
S3 processor agent (#1)
Browse files Browse the repository at this point in the history
This agent will read a single file from an S3 bucket. The name of the
file to read is taken from the record.
  • Loading branch information
cdbartholomew authored Jan 13, 2024
1 parent f887bd6 commit 012e520
Show file tree
Hide file tree
Showing 19 changed files with 939 additions and 19 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ dist
.tox
build/
venv/
.vscode
81 changes: 68 additions & 13 deletions dev/s3_upload.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,71 @@ file=$4
s3_key=minioadmin
s3_secret=minioadmin

fileWithoutPath=$(basename $file)
resource="/${bucket}/${fileWithoutPath}"
content_type="application/octet-stream"
date=`date -R`
_signature="PUT\n\n${content_type}\n${date}\n${resource}"
signature=`echo -en ${_signature} | openssl sha1 -hmac ${s3_secret} -binary | base64`
set -x
curl -X PUT -T "${file}" \
-H "Host: ${host}" \
-H "Date: ${date}" \
-H "Content-Type: ${content_type}" \
-H "Authorization: AWS ${s3_key}:${signature}" \
${url}${resource}
# Function to generate a signature
generate_signature() {
local method=$1
local content_type=$2
local date=$3
local resource=$4

local string_to_sign="${method}\n\n${content_type}\n${date}\n${resource}"
echo -en ${string_to_sign} | openssl sha1 -hmac ${s3_secret} -binary | base64
}

# Check if the bucket exists
check_bucket_exists() {
local check_resource="/${bucket}/"
local check_date=`date -R`
local check_signature=$(generate_signature "HEAD" "" "${check_date}" "${check_resource}")

local status_code=$(curl -s -o /dev/null -w "%{http_code}" -X HEAD \
-H "Host: ${host}" \
-H "Date: ${check_date}" \
-H "Authorization: AWS ${s3_key}:${check_signature}" \
${url}${check_resource})

if [ "$status_code" == "404" ]; then
return 1 # Bucket does not exist
else
return 0 # Bucket exists
fi
}

# Create the bucket
create_bucket() {
local create_resource="/${bucket}/"
local create_date=`date -R`
local create_signature=$(generate_signature "PUT" "" "${create_date}" "${create_resource}")

curl -X PUT \
-H "Host: ${host}" \
-H "Date: ${create_date}" \
-H "Authorization: AWS ${s3_key}:${create_signature}" \
${url}${create_resource}
}

# Upload the file
upload_file() {
local file_without_path=$(basename $file)
local upload_resource="/${bucket}/${file_without_path}"
local content_type="application/octet-stream"
local upload_date=`date -R`
local upload_signature=$(generate_signature "PUT" "${content_type}" "${upload_date}" "${upload_resource}")

curl -X PUT -T "${file}" \
-H "Host: ${host}" \
-H "Date: ${upload_date}" \
-H "Content-Type: ${content_type}" \
-H "Authorization: AWS ${s3_key}:${upload_signature}" \
${url}${upload_resource}
}

# Main script
if ! check_bucket_exists; then
echo "Bucket does not exist. Creating bucket: ${bucket}"
create_bucket
fi

echo "Uploading file: ${file}"
upload_file

2 changes: 1 addition & 1 deletion examples/applications/openai-text-completions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

This sample application shows how to use the `gpt-3.5-turbo-instruct` Open AI model.

## Configure you OpenAI
## Configure your OpenAI key


```
Expand Down
1 change: 1 addition & 0 deletions examples/applications/s3-processor/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
java/lib/*
36 changes: 36 additions & 0 deletions examples/applications/s3-processor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Preprocessing Text

This sample application shows how to read a single file from an S3 bucket and process in a pipeline.

The pipeline will:


- Read a file from and S3 bucket which is specified in the value of the message sent to the input topic
- Extract text from document files (PDF, Word...)
- Split the text into chunks
- Write the chunks to the output topic

## Prerequisites

Prepare some PDF files and upload them to a bucket in S3.

## Deploy the LangStream application

```
./bin/langstream docker run test -app examples/applications/s3-processor -s examples/secrets/secrets.yaml --docker-args="-p9900:9000"
```

Please note that here we are adding --docker-args="-p9900:9000" to expose the S3 API on port 9900.


## Write some documents in the S3 bucket

```
# Upload a document to the S3 bucket
dev/s3_upload.sh localhost http://localhost:9900 documents README.md
dev/s3_upload.sh localhost http://localhost:9900 documents examples/applications/s3-source/simple.pdf
```

## Interact with the pipeline

Now you can use the developer UI to specify which document to read from S3 and process. To process the simple.pdf file, simply send `simple.pdf` as the message.
20 changes: 20 additions & 0 deletions examples/applications/s3-processor/configuration.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

configuration:
resources:
dependencies:
70 changes: 70 additions & 0 deletions examples/applications/s3-processor/extract-text.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name: "Extract and manipulate text"
topics:
- name: "request-topic"
creation-mode: create-if-not-exists
- name: "response-topic"
creation-mode: create-if-not-exists
pipeline:
- name: "convert-to-structure"
type: "document-to-json"
input: "request-topic"
configuration:
text-field: "filename"
- name: "Process file from S3"
type: "s3-processor"
configuration:
bucketName: "${secrets.s3.bucket-name}"
endpoint: "${secrets.s3.endpoint}"
access-key: "${secrets.s3.access-key}"
secret-key: "${secrets.s3.secret}"
region: "${secrets.s3.region}"
objectName: "{{ value.filename }}"
- name: "Extract text"
type: "text-extractor"
- name: "Split into chunks"
type: "text-splitter"
configuration:
splitter_type: "RecursiveCharacterTextSplitter"
chunk_size: 400
separators: ["\n\n", "\n", " ", ""]
keep_separator: false
chunk_overlap: 100
length_function: "cl100k_base"
- name: "Convert to structured data"
type: "document-to-json"
configuration:
text-field: text
copy-properties: true
- name: "prepare-structure"
type: "compute"
output: "response-topic"
configuration:
fields:
- name: "value.filename"
expression: "properties.name"
type: STRING
- name: "value.chunk_id"
expression: "properties.chunk_id"
type: STRING
- name: "value.language"
expression: "properties.language"
type: STRING
- name: "value.chunk_num_tokens"
expression: "properties.chunk_num_tokens"
type: STRING
38 changes: 38 additions & 0 deletions examples/applications/s3-processor/gateways.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

gateways:
- id: "request"
type: produce
topic: "request-topic"
parameters:
- sessionId
produceOptions:
headers:
- key: langstream-client-session-id
valueFromParameters: sessionId

- id: "reply"
type: consume
topic: "response-topic"
parameters:
- sessionId
consumeOptions:
filters:
headers:
- key: langstream-client-session-id
valueFromParameters: sessionId
Binary file added examples/applications/s3-processor/simple.pdf
Binary file not shown.
7 changes: 7 additions & 0 deletions examples/applications/s3-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ The extract-text.yaml file defines a pipeline that will:

Prepare some PDF files and upload them to a bucket in S3.

## Configure your OpenAI key


```
export OPEN_AI_ACCESS_KEY=...
```

## Deploy the LangStream application

```
Expand Down
9 changes: 9 additions & 0 deletions langstream-agents/langstream-agent-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>langstream-agents-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.samskivert</groupId>
<artifactId>jmustache</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@
import ai.langstream.api.runner.code.AgentCode;
import ai.langstream.api.runner.code.AgentCodeProvider;

public class S3SourceAgentCodeProvider implements AgentCodeProvider {
public class S3AgentCodeProvider implements AgentCodeProvider {
@Override
public boolean supports(String agentType) {
return "s3-source".equals(agentType);
return switch (agentType) {
case "s3-source", "s3-processor" -> true;
default -> false;
};
}

@Override
public AgentCode createInstance(String agentType) {
return new S3Source();
return switch (agentType) {
case "s3-source" -> new S3Source();
case "s3-processor" -> new S3Processor();
default -> throw new IllegalStateException();
};
}
}
Loading

0 comments on commit 012e520

Please sign in to comment.