From 012e520e8872713be7a7765073abf5ca18833e96 Mon Sep 17 00:00:00 2001 From: Chris Bartholomew Date: Sat, 13 Jan 2024 11:38:36 -0500 Subject: [PATCH] S3 processor agent (#1) This agent will read a single file from an S3 bucket. The name of the file to read is taken from the record. --- .gitignore | 1 + dev/s3_upload.sh | 81 +++++-- .../openai-text-completions/README.md | 2 +- examples/applications/s3-processor/.gitignore | 1 + examples/applications/s3-processor/README.md | 36 +++ .../s3-processor/configuration.yaml | 20 ++ .../s3-processor/extract-text.yaml | 70 ++++++ .../applications/s3-processor/gateways.yaml | 38 ++++ examples/applications/s3-processor/simple.pdf | Bin 0 -> 8017 bytes examples/applications/s3-source/README.md | 7 + langstream-agents/langstream-agent-s3/pom.xml | 9 + ...Provider.java => S3AgentCodeProvider.java} | 13 +- .../ai/langstream/agents/s3/S3Processor.java | 193 ++++++++++++++++ .../META-INF/ai.langstream.agents.index | 3 +- ...ngstream.api.runner.code.AgentCodeProvider | 2 +- .../ai/langstream/agents/s3/S3SourceTest.java | 214 ++++++++++++++++++ .../ObjectStorageProcessorAgentProvider.java | 114 ++++++++++ ...i.langstream.api.runtime.AgentNodeProvider | 1 + .../agents/S3ProcessorAgentProviderTest.java | 153 +++++++++++++ 19 files changed, 939 insertions(+), 19 deletions(-) create mode 100644 examples/applications/s3-processor/.gitignore create mode 100644 examples/applications/s3-processor/README.md create mode 100644 examples/applications/s3-processor/configuration.yaml create mode 100644 examples/applications/s3-processor/extract-text.yaml create mode 100644 examples/applications/s3-processor/gateways.yaml create mode 100644 examples/applications/s3-processor/simple.pdf rename langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/{S3SourceAgentCodeProvider.java => S3AgentCodeProvider.java} (68%) create mode 100644 langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3Processor.java create mode 100644 langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/ObjectStorageProcessorAgentProvider.java create mode 100644 langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/S3ProcessorAgentProviderTest.java diff --git a/.gitignore b/.gitignore index ede07bb1b..b4725f914 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,4 @@ dist .tox build/ venv/ +.vscode diff --git a/dev/s3_upload.sh b/dev/s3_upload.sh index 65412fc54..1f4295707 100755 --- a/dev/s3_upload.sh +++ b/dev/s3_upload.sh @@ -26,16 +26,71 @@ file=$4 s3_key=minioadmin s3_secret=minioadmin -fileWithoutPath=$(basename $file) -resource="/${bucket}/${fileWithoutPath}" -content_type="application/octet-stream" -date=`date -R` -_signature="PUT\n\n${content_type}\n${date}\n${resource}" -signature=`echo -en ${_signature} | openssl sha1 -hmac ${s3_secret} -binary | base64` -set -x -curl -X PUT -T "${file}" \ - -H "Host: ${host}" \ - -H "Date: ${date}" \ - -H "Content-Type: ${content_type}" \ - -H "Authorization: AWS ${s3_key}:${signature}" \ - ${url}${resource} +# Function to generate a signature +generate_signature() { + local method=$1 + local content_type=$2 + local date=$3 + local resource=$4 + + local string_to_sign="${method}\n\n${content_type}\n${date}\n${resource}" + echo -en ${string_to_sign} | openssl sha1 -hmac ${s3_secret} -binary | base64 +} + +# Check if the bucket exists +check_bucket_exists() { + local check_resource="/${bucket}/" + local check_date=`date -R` + local check_signature=$(generate_signature "HEAD" "" "${check_date}" "${check_resource}") + + local status_code=$(curl -s -o /dev/null -w "%{http_code}" -X HEAD \ + -H "Host: ${host}" \ + -H "Date: ${check_date}" \ + -H "Authorization: AWS ${s3_key}:${check_signature}" \ + ${url}${check_resource}) + + if [ "$status_code" == "404" ]; then + return 1 # Bucket does not exist + else + return 0 # Bucket exists + fi +} + +# Create the bucket +create_bucket() { + local create_resource="/${bucket}/" + local create_date=`date -R` + local create_signature=$(generate_signature "PUT" "" "${create_date}" "${create_resource}") + + curl -X PUT \ + -H "Host: ${host}" \ + -H "Date: ${create_date}" \ + -H "Authorization: AWS ${s3_key}:${create_signature}" \ + ${url}${create_resource} +} + +# Upload the file +upload_file() { + local file_without_path=$(basename $file) + local upload_resource="/${bucket}/${file_without_path}" + local content_type="application/octet-stream" + local upload_date=`date -R` + local upload_signature=$(generate_signature "PUT" "${content_type}" "${upload_date}" "${upload_resource}") + + curl -X PUT -T "${file}" \ + -H "Host: ${host}" \ + -H "Date: ${upload_date}" \ + -H "Content-Type: ${content_type}" \ + -H "Authorization: AWS ${s3_key}:${upload_signature}" \ + ${url}${upload_resource} +} + +# Main script +if ! check_bucket_exists; then + echo "Bucket does not exist. Creating bucket: ${bucket}" + create_bucket +fi + +echo "Uploading file: ${file}" +upload_file + diff --git a/examples/applications/openai-text-completions/README.md b/examples/applications/openai-text-completions/README.md index 45bc33fe0..da2de732b 100644 --- a/examples/applications/openai-text-completions/README.md +++ b/examples/applications/openai-text-completions/README.md @@ -2,7 +2,7 @@ This sample application shows how to use the `gpt-3.5-turbo-instruct` Open AI model. -## Configure you OpenAI +## Configure your OpenAI key ``` diff --git a/examples/applications/s3-processor/.gitignore b/examples/applications/s3-processor/.gitignore new file mode 100644 index 000000000..55dea2dd3 --- /dev/null +++ b/examples/applications/s3-processor/.gitignore @@ -0,0 +1 @@ +java/lib/* \ No newline at end of file diff --git a/examples/applications/s3-processor/README.md b/examples/applications/s3-processor/README.md new file mode 100644 index 000000000..7717595e3 --- /dev/null +++ b/examples/applications/s3-processor/README.md @@ -0,0 +1,36 @@ +# Preprocessing Text + +This sample application shows how to read a single file from an S3 bucket and process in a pipeline. + +The pipeline will: + + +- Read a file from and S3 bucket which is specified in the value of the message sent to the input topic +- Extract text from document files (PDF, Word...) +- Split the text into chunks +- Write the chunks to the output topic + +## Prerequisites + +Prepare some PDF files and upload them to a bucket in S3. + +## Deploy the LangStream application + +``` +./bin/langstream docker run test -app examples/applications/s3-processor -s examples/secrets/secrets.yaml --docker-args="-p9900:9000" +``` + +Please note that here we are adding --docker-args="-p9900:9000" to expose the S3 API on port 9900. + + +## Write some documents in the S3 bucket + +``` +# Upload a document to the S3 bucket +dev/s3_upload.sh localhost http://localhost:9900 documents README.md +dev/s3_upload.sh localhost http://localhost:9900 documents examples/applications/s3-source/simple.pdf +``` + +## Interact with the pipeline + +Now you can use the developer UI to specify which document to read from S3 and process. To process the simple.pdf file, simply send `simple.pdf` as the message. \ No newline at end of file diff --git a/examples/applications/s3-processor/configuration.yaml b/examples/applications/s3-processor/configuration.yaml new file mode 100644 index 000000000..f01781a45 --- /dev/null +++ b/examples/applications/s3-processor/configuration.yaml @@ -0,0 +1,20 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +configuration: + resources: + dependencies: \ No newline at end of file diff --git a/examples/applications/s3-processor/extract-text.yaml b/examples/applications/s3-processor/extract-text.yaml new file mode 100644 index 000000000..ed2bc1788 --- /dev/null +++ b/examples/applications/s3-processor/extract-text.yaml @@ -0,0 +1,70 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name: "Extract and manipulate text" +topics: + - name: "request-topic" + creation-mode: create-if-not-exists + - name: "response-topic" + creation-mode: create-if-not-exists +pipeline: + - name: "convert-to-structure" + type: "document-to-json" + input: "request-topic" + configuration: + text-field: "filename" + - name: "Process file from S3" + type: "s3-processor" + configuration: + bucketName: "${secrets.s3.bucket-name}" + endpoint: "${secrets.s3.endpoint}" + access-key: "${secrets.s3.access-key}" + secret-key: "${secrets.s3.secret}" + region: "${secrets.s3.region}" + objectName: "{{ value.filename }}" + - name: "Extract text" + type: "text-extractor" + - name: "Split into chunks" + type: "text-splitter" + configuration: + splitter_type: "RecursiveCharacterTextSplitter" + chunk_size: 400 + separators: ["\n\n", "\n", " ", ""] + keep_separator: false + chunk_overlap: 100 + length_function: "cl100k_base" + - name: "Convert to structured data" + type: "document-to-json" + configuration: + text-field: text + copy-properties: true + - name: "prepare-structure" + type: "compute" + output: "response-topic" + configuration: + fields: + - name: "value.filename" + expression: "properties.name" + type: STRING + - name: "value.chunk_id" + expression: "properties.chunk_id" + type: STRING + - name: "value.language" + expression: "properties.language" + type: STRING + - name: "value.chunk_num_tokens" + expression: "properties.chunk_num_tokens" + type: STRING diff --git a/examples/applications/s3-processor/gateways.yaml b/examples/applications/s3-processor/gateways.yaml new file mode 100644 index 000000000..8ceacaa40 --- /dev/null +++ b/examples/applications/s3-processor/gateways.yaml @@ -0,0 +1,38 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +gateways: + - id: "request" + type: produce + topic: "request-topic" + parameters: + - sessionId + produceOptions: + headers: + - key: langstream-client-session-id + valueFromParameters: sessionId + + - id: "reply" + type: consume + topic: "response-topic" + parameters: + - sessionId + consumeOptions: + filters: + headers: + - key: langstream-client-session-id + valueFromParameters: sessionId diff --git a/examples/applications/s3-processor/simple.pdf b/examples/applications/s3-processor/simple.pdf new file mode 100644 index 0000000000000000000000000000000000000000..2fc52524e5c1875fdbff37eb62bc3470c1b25803 GIT binary patch literal 8017 zcmbt(by!qg*FKF52q;JiGSWH85F%ZIbV&*W3^{}_gd!jl=epLtH@CK;vM?Ni1aY^_ew$s&+sydX z(gs3;VPID)N6_WVV5qVK7Kd>M-dHpaqlmF~wZVX)>KGS0oIMx;LxQEHK^{1F4B8pw zLuei?7uh6B6}onGZmq)C?UIAtC5pS`rlC@&K*VW``OnHDJ!J<2>hqu9eJJ~Fl6wzf z8BA!{94ST`%*M=MW`f-auF(uJCmQ2e>Gc-dx}+_oD0UBPb=vsMHgzhC#TP=~FHY)t zZ?Ap7mbGMy?0CHIHJ;Q~8q;ts_n|yU+*Z#7bc#FKGT6pdma{aJ=ya7Mc2R;gNNm}t z(YJ$h!DSa(+Vzd8Jz)9*`-YqPr^B{awTB#R2Oa}fV?r`#w2Ty6O<&M|quC>t4+LRc zY<|rO_=krA{s$!bH{AdXw9$4aPj`$94vYZ0b-_?wjEAeIyEVoGEb>#J;Oc?{`aQsS zNC5>6jEw_Y-qi(D$yx~XxPJp(%tF;~m2Q~#$#z=F<^Y26g=Qy)QJ;mX&_sG-xIOEJ{&BH7X#Mux%%Zm{0|43 zKyXkwe3o5+6oa9P7%vBFjIN6OUwIaVBmVPB_%r*diQf8pLk!_vwPLlO&T@dcw^(jb zfImxKIY*)sp?XJ~C@h4>OOaHW{tUmNjeY~6N;rqL5iRViui*YA&jYwKlwf1-l_f%FC zNv7E&?n%3>+vJ#9ebp3#*&V)~p4}(v1Tgw!i;j7oOl$-~iJo~i;u{gcsrVL&8h7f* zJqi0X0-s#FFZW0yRIPH)3_Nm}U3hD|zfq>QX9X9GZ`PQ)>!aSaRZjH)#(1aLHBpI> z+t@kTK6a&$b!El4f6jiderV!?zuuA5+gk?)8$8QhX>w6%#uM1(7tmS;f^9C7$82SF z2?hk^&SbK)Q!ljEzy#k&3ChQxY+eY%UVqp!O~p$Yxx9?^AWV0H2rh^nqzAdauMd;h zN&c{UU?`c(C=dru1jzfcM73RUvAu2KXFoWxu%GPj8Q zmP~2kptp53_Jy&<9?^(f$MgH_lxFu_rqty)^yapyqHWx{p9(Q&O@v?@iO$B5<1#=Z zfe;vxkmYICJ4iR;aWFy1oh!7&iNugAL=LhfFXf~M2|#l2K|-%!M>m4rz+0b4G=oIl z$jnHtHJo;%q`IRb3kqwz3L_j3bT}2XPJ8aDTtN&&Hchg8Kka#wQ#Tdlw8iO(TNNT= zXdlY%ed2k>BoJJXsh};j#ff97I%OWbpF#c!EJAif=ofP45m68N^dQ0OJ3}Rstfy3( zK1?e5oO?pf*f27sR>2@m{ica=>ijx^P^d*y+$q_&r--|ijZU54P?V1`;vzhA(J7WA znM@?B7qWi7XV119Q9~r7M&WYO6A_Mn= za;*AFbB%ak$aqnRo_Tyq=dfpfc6B4H&nv02GE-aTmEz7C- zQ%50ZnnZL0pK0MZ!f0={m|5|cJ{Xj%QDx=PjH6c#<6_k1;!iFQ)K_)nFz1otsl6EY z?vjm)JCm{Ob*&<trb6zbOFnbwTx5~&$+`KtqFkf= z(lBNi#-<{ceb5uP*t*5OP`gEe;l`NG^3Af&vdosf*ClI$S?(frnRBAzqN<~!H$~q@l{`SIm2{A)>7#I1^b$M4NwrM||oq_EVnFtZr2B#E@4m{VL*-lfc@%!{;_ z+nZ3nA~BgYnSXU1mXN&Kp>NAw7AcWgShSKS+%26w_FSsa`dOKgU6y89C!P62h^d&| z8?!fh-tt;kp+-=PVx6^2X`S@K^z3?-deeaS4WGcnmXUIg8=qy~%dAJIp|hr}ISXQ+ zcMD{hWJ$J(Tfe?>Z$o;exGc4K~HQX}W`uO<7JY!a zGfN9o6`dPVGvvI!yrn&=c@wF|o&24rJF_UM7}ptlg_+#DZ%kFNy2nCnAW;@RZkfFX zBMCDmOZy!f1NY~>TAW+P2Bn6MDh>o`uF}M+M7GE0#m%#HQC`2BRD3P=TJf8QJ63Cs zYkAU_q$8y7OVibP)jjuv?xK%~j=c8AHV4+{_XiHNNW@8AP%MD5Nt8(p$)za~6lY02 zNtzo88>@rZp`R_d%z7D!fk3@%Qm25w=oNRA{Nb z68-k`Tb0QB3|ASvmEWnfD)p&oDETTSDP2u65;89Pq8w!s1|&^6`HsuyipRC>>n48&E3 zA3U6I6P&-lVZMtwj3r45c^T5JnABn(VXa!EdQNqQNK5Y3q~}YEZ-HO(<748p_>uYD z`5zrZ%6H3qU3h>!+@lYdCXD^A?TvM8hpg1*Z08(!PkNp59vxe(+B8bG zGQK|asc6!?aJJ*y({FF8uPxUZqD^mD(l|uhM3*C;y{uI$c2w!h`{0uJ)_% zo0Zhz_K|`53Fm+w!u!OnjKrW>4OfvgN`y*awi#qtK`%Wz0=}BelWD}No{Nwgk)OK=8*?YUI zU9p|#o;;RF_xArL-xj(N8NnES=^9SDs{G66)S{B2@ehGX#n(3~-SbCk4-O5Ip4$~k zm`nPl`qi$49o_H0P@R@&vFz`#_-f6$cG0cuC~>CnK^>{z{J!}CbbPzYZ}!2D)bqrV zWc#a-I^Q4HN2M11#*ezm^J%Wjbleg@vOlt3o_Wd((h^83kSRRs+a0jYuiaYC7_jUr zyK5I6R-pj>k+4^+)lAxz-zBJAuKZP{P^BwsFKX;o=~nCdhp42Unb3OvbtlqV(J9RH5%czY%tCQI8-*%q2S@_c9?XZ@ccW?Z8oy>ONQRad5R(tH?vevFv zLi&9f@7olI-+XwSZRaL-LN*WnU1sAeSbX6N6uYVl3i4*3rzt#SJ97>ur~D-I033YIt-^H;r#Z~vz;N?43D7^(*py{^`t&iG0mstrEz z02J$2ftnkRFZ==T;6IAz6WM>Yrj0lAGX|)(9r&jWz#jZhGkDJMcPn^G;ql)|0>19| z1j2*^;W^lN;3NDM;9rvkiuHe5RzTy>SXaBBti|IuUC}_hJE^+Zx}Govd;>rmfD!}D z6#(ULpn`sZ@|PE}Urv5U0EQYFn*g$(YzFxAKN-XGkH1g!gq}e4JgsmiR7xN4`7@LF zjQ{ME!(Z}(S1?#F49>wCt%(8RI^i{n7!PZA2REFnI|%-ZRw=mvywt%3n4bpPTG!PX z?edpI*}>fdr(lnE2O|LzqmKU7fFl41uQ}M@?C~HX#Kb}Pb@l&$i@`(yODHish5P3% zDuQ3j`1?Qi|LQ~Hp*_(;ik(D-LWzPA2z*vyCvR~Se%^Tgh<_uINH7ZaYs4RK^FNm? zUh&^K_zTLP&}yQc0g(R}=&@)!fXCvAAV4nV0TK%`6_!B3!NMZqa4=j#TnzYd#KZt* zstT+%2WvSOJ1hq93YGJ)#(!86C}1ewA-*Au08$54K)YSRIM~_aKw@wa{EEXk8v=TG z-iba*CqRy&Mn6@+D7e^9l`CLzyj3r|lk9;-L;$%4+UI8rfC&XUi4fShz#zbYUHtd^ z*Rnkk{6^yEJ_bOXVE}Ibv#TKi7yZB4ZlRGM*7VxB`WrFd_+?KAl-1VWRr@2F>z6(1hU(iMNY1g`CY;IRCery!PhEbT?)+kV!>lH&X*D?YE`dm%iqb4TrDSDM! zbg<@y)o#uc&ISjYqCYt0Uak_>klJU>FCFWudQm)ifcfB*J)5ZF0P~iyXs2qdIXX0G zkbDmBgCSlzR@T9aMw5>ztN<-QK)Tn%k=>p@pJ4N##!E0 zbc3w|LWaU+SWf5}jju73XY0AFlzWrI_Z6YynEOmE%#ncOm6tK6JOh~<@LwZ4hME5`A3>y&(Sm4 zo#t0v1N8$xY6g|hY{xjJ6D&ZQT1UXQhh%rZIn#QL1(h7co!MRwNm zYkk-DPjo@{^v1t?vuAs^<0F)z_7%rYchCFbt^P1?$+D#yg#gL#J3@~!p069P9$qXJ z5X~;vPq-jDq%H$HlB?sAp~(rFWtw-rH_-^8G|heTKqOwSlhio%?bQLfd=!f1`o!EpC@`VTibE^G3{$ezeJog|?PBSKWd zelvqw*Ety7{A5_Rdzwn<+}T%Jg3RwOg@8k-tj>Pkx7`i&LH2v|ODKKparKYrazX8K zZ?xG4vK#Gwkf+Ow`{bll>Bq9D&`!Z!8WBe>C>|Jxo#Z0sy-Ci?EdH)Rl}r=i_AoGx z>WK|r`ll$+|NPMCy&RJi@mCGDRZPs=OQJM4rCdCMns=YP$xwLh@8hz%-fxjKF6}+l zBvJSNBeK;hV$-)YnXlewe~YwI=u__JMr|*(zFe~E%Z};QFhK)F+@ZFbVDy&JsPhBVd|G$yq6;^ZCvW=<+f%7T z^|mKc%@MtK`_ZqBhKY}h(x^Qq7AA2PDo#E&mP<03kUdY+$2qNRkv>(DFQf~LiB*V{ zdy-?^cuc0I4hC-Yg$%fnjfyv1paQ2^wVj?G;AgtAA?Ne~tRQZr&pz2Osd-O_YL%ob zGt5U#T_>ePZs_A+g%;A1*^rXtuDOpqL3V@1=ziRK+omH|`Z}A`upxwt*H=>fN7Xvj zghl7Twvv+1x8rwT1t#d;@9f`>XwvdfFi(z1>rnG7_ul?8IJh`8E?@}Zus;s;S+og% z$7fA$6VIj;81=F_Lo=qrrQE;#m})vgSCV{)wjIKHk!885g`gb4D#rP8@*}G}Gx~*O zFGMIs_eI5YR>blZCDlatx@FmNg9U#6g8Mm#oWZ6OGb(YjnMH=tU*|zEXtU7!S$)&3 z!w`YbAy*p^bV3{p9q|S>es@YwYYst+J@wAs%+x)@d(DtRY@(f=q-0$owJv&=V)!xI zuTnk??Qr>$;Y0E|t(O+#m-8Bj#kN(k6cK^YpqsYg5z8>oR5&6(y-hmUt*0Hb0*htK zEkZuiKEI^id^ztz<=O3lNWX0>U%xNh=4u8)Id-JGjntq?kDA3uacSK}Pc+L$tZh+` zR8Wv2grBLj5BFf`q4Ft{^4q1;+MA`TkKW`UB+qI36ect@=yP0K#C6i7O$dh6>`AA4 zQE@4#i8Bx0O4f?*@(PLbce7YzyDzZ##nSb=OOtv@@(LVI&~Yf#dflZ&gla49Xg!@> zxjX-9;%)mTvd@z%ZxLLZ@^d-&18JDP^WNE~8_bs>EFsJ|BYQ-VaZI8eK=e$8us5CH ztQi3X1nB-L-r844LD}~&Q0a}~F0QOR8KV_AyK(W7g#InQ!Dm)taXQz%M$UvPCvq&? z3N(!;byl=FT90Lx4*N_!Qdx)F;e--W)Ny?!15pS%E!+u3;Nu0Yagk9xz zhnD_m{*c+8k?qd>+3ue?YgR&RvD1i~Y1~Imo73 zaoJDmx=!pZY^*#oXp5=_RYc(<1c2+J0ZM_hH{q{$X;lFRECcWPhU9e<8oG{xW%^MtMIcW zP-4|RC%rP35$*fV&)P)wtc=c(WiQ#Z+vxUaRUDG1 zbvr}ZT|ID4N9GS=Y747h={a?eUpjW)ST1$H9sL#dZu2GgWVP1aLY9EoX|sUW_03^a ztDLT+eiVt{r`AHyhE1f_55_*OqA8b8J)iW~x2CZqv10?@>VQ31$+D{o@Y);Ane8jf z)9irG1WXS8Ajsw%eemzy0r>Cj0sMiNz5@>67knNL&Td!?Kfr9=U2QzAful-3XSB7J z9@r4$?&09-0)`_Y2nZZ(#wYLTfVBbmlcK1&89!LZ6YY+>1qP1PPR{6jaQq;s0&tEA z&`L1h2@wZuo(KRViui*uKE+E27y=0sL!jW|BBCM)6Bvva_yKwvt~UQ&<&U#HA9su` z!0OIBmfSGzrZfPWk?Z#K>b4ogW*}>-}PW3uz!z50F>q* zdPq@m0M5UUMT&|5MDg!37zze(sK3jQ$p0V%VEMNmKn?#n7A5j884&Wn<{^Sa0&My3 ze#MX?f9T=d(E!JDKRKq>bGU^8Fa?fa@jE~#J0QRg&dKJ+$*RFO6ak`t8D@jQAS4hH zHfS4+6~YE#Wi5h0h`>Z)a0CVc7lBEG{%;jvI|v|@9ynkHon%uCg_Hnsb1P{ngZ>|_ C`jJ-v literal 0 HcmV?d00001 diff --git a/examples/applications/s3-source/README.md b/examples/applications/s3-source/README.md index a891c2e49..fd78362ec 100644 --- a/examples/applications/s3-source/README.md +++ b/examples/applications/s3-source/README.md @@ -16,6 +16,13 @@ The extract-text.yaml file defines a pipeline that will: Prepare some PDF files and upload them to a bucket in S3. +## Configure your OpenAI key + + +``` +export OPEN_AI_ACCESS_KEY=... +``` + ## Deploy the LangStream application ``` diff --git a/langstream-agents/langstream-agent-s3/pom.xml b/langstream-agents/langstream-agent-s3/pom.xml index 209c44a21..a96ac35ee 100644 --- a/langstream-agents/langstream-agent-s3/pom.xml +++ b/langstream-agents/langstream-agent-s3/pom.xml @@ -46,6 +46,15 @@ lombok provided + + ${project.groupId} + langstream-agents-commons + ${project.version} + + + com.samskivert + jmustache + org.slf4j slf4j-api diff --git a/langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3SourceAgentCodeProvider.java b/langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3AgentCodeProvider.java similarity index 68% rename from langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3SourceAgentCodeProvider.java rename to langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3AgentCodeProvider.java index 27066ce93..e843f621a 100644 --- a/langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3SourceAgentCodeProvider.java +++ b/langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3AgentCodeProvider.java @@ -18,14 +18,21 @@ import ai.langstream.api.runner.code.AgentCode; import ai.langstream.api.runner.code.AgentCodeProvider; -public class S3SourceAgentCodeProvider implements AgentCodeProvider { +public class S3AgentCodeProvider implements AgentCodeProvider { @Override public boolean supports(String agentType) { - return "s3-source".equals(agentType); + return switch (agentType) { + case "s3-source", "s3-processor" -> true; + default -> false; + }; } @Override public AgentCode createInstance(String agentType) { - return new S3Source(); + return switch (agentType) { + case "s3-source" -> new S3Source(); + case "s3-processor" -> new S3Processor(); + default -> throw new IllegalStateException(); + }; } } diff --git a/langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3Processor.java b/langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3Processor.java new file mode 100644 index 000000000..861677bea --- /dev/null +++ b/langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3Processor.java @@ -0,0 +1,193 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.s3; + +import ai.langstream.ai.agents.commons.JsonRecord; +import ai.langstream.ai.agents.commons.MutableRecord; +import ai.langstream.api.runner.code.AbstractAgentCode; +import ai.langstream.api.runner.code.AgentProcessor; +import ai.langstream.api.runner.code.Header; +import ai.langstream.api.runner.code.Record; +import ai.langstream.api.runner.code.RecordSink; +import ai.langstream.api.runtime.ComponentType; +import com.samskivert.mustache.Mustache; +import com.samskivert.mustache.Template; +import io.minio.GetObjectArgs; +import io.minio.GetObjectResponse; +import io.minio.MinioClient; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class S3Processor extends AbstractAgentCode implements AgentProcessor { + private String bucketName; + private MinioClient minioClient; + private Template objectTemplate; + + @Override + public void init(Map configuration) throws Exception { + bucketName = configuration.getOrDefault("bucketName", "langstream-source").toString(); + String endpoint = + configuration + .getOrDefault("endpoint", "http://minio-endpoint.-not-set:9090") + .toString(); + String username = configuration.getOrDefault("access-key", "minioadmin").toString(); + String password = configuration.getOrDefault("secret-key", "minioadmin").toString(); + String region = configuration.getOrDefault("region", "").toString(); + String objectName = configuration.getOrDefault("objectName", "").toString(); + + // Object name is a mustache template because it is passed in the record + objectTemplate = Mustache.compiler().compile(objectName); + + log.info( + "Connecting to S3 Bucket at {} in region {} with user {}", + endpoint, + region, + username); + + MinioClient.Builder builder = + MinioClient.builder().endpoint(endpoint).credentials(username, password); + if (!region.isBlank()) { + builder.region(region); + } + minioClient = builder.build(); + } + + @Override + public void process(List records, RecordSink recordSink) { + for (Record record : records) { + processRecord(record, recordSink); + } + } + + @Override + public ComponentType componentType() { + return ComponentType.PROCESSOR; + } + + private void processRecord(Record record, RecordSink recordSink) { + log.debug("Processing record {}", record.toString()); + MutableRecord context = MutableRecord.recordToMutableRecord(record, true); + final JsonRecord jsonRecord = context.toJsonRecord(); + + String fileName = objectTemplate.execute(jsonRecord); + + log.info("Processing file {}", fileName); + + // Retrieve headers from the original record + Collection
originalHeaders = record.headers(); + + log.debug("Original headers: {}", originalHeaders); + + try { + GetObjectArgs getObjectArgs = + GetObjectArgs.builder().bucket(bucketName).object(fileName).build(); + + GetObjectResponse getObjectResponse = minioClient.getObject(getObjectArgs); + + // Read the file content from the response + byte[] fileContent = getObjectResponse.readAllBytes(); + log.debug("File content: {}", new String(fileContent)); + + // Create a new record with the file content + Record processedRecord = new S3SourceRecord(fileContent, fileName, originalHeaders); + log.debug("Processed record key: {}", processedRecord.key()); + log.debug("Processed record value: {}", processedRecord.value()); + // Send the processed record to the record sink + recordSink.emit(new SourceRecordAndResult(record, List.of(processedRecord), null)); + } catch (Exception e) { + log.error("Error processing record: {}", e.getMessage()); + } + } + + @Override + protected Map buildAdditionalInfo() { + return Map.of("bucketName", bucketName); + } + + private static class S3SourceRecord implements Record { + private final byte[] read; + private final String name; + private final Collection
headers; + + public S3SourceRecord(byte[] read, String name, Collection
originalHeaders) { + this.read = read; + this.name = name; + this.headers = new ArrayList<>(originalHeaders); + log.debug("Headers: {}", headers); + this.headers.add(new S3RecordHeader("name", name)); + } + + /** + * the key is used for routing, so it is better to set it to something meaningful. In case + * of retransmission the message will be sent to the same partition. + * + * @return the key + */ + @Override + public Object key() { + return name; + } + + @Override + public Object value() { + return read; + } + + @Override + public String origin() { + return null; + } + + @Override + public Long timestamp() { + return System.currentTimeMillis(); + } + + @Override + public Collection
headers() { + return headers; + } + + @AllArgsConstructor + @ToString + private static class S3RecordHeader implements Header { + + final String key; + final String value; + + @Override + public String key() { + return key; + } + + @Override + public String value() { + return value; + } + + @Override + public String valueAsString() { + return value; + } + } + } +} diff --git a/langstream-agents/langstream-agent-s3/src/main/resources/META-INF/ai.langstream.agents.index b/langstream-agents/langstream-agent-s3/src/main/resources/META-INF/ai.langstream.agents.index index 064ae56f3..11532289b 100644 --- a/langstream-agents/langstream-agent-s3/src/main/resources/META-INF/ai.langstream.agents.index +++ b/langstream-agents/langstream-agent-s3/src/main/resources/META-INF/ai.langstream.agents.index @@ -1 +1,2 @@ -s3-source \ No newline at end of file +s3-source +s3-processor \ No newline at end of file diff --git a/langstream-agents/langstream-agent-s3/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider b/langstream-agents/langstream-agent-s3/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider index 2e7cf6f7a..160fc35ff 100644 --- a/langstream-agents/langstream-agent-s3/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider +++ b/langstream-agents/langstream-agent-s3/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider @@ -1 +1 @@ -ai.langstream.agents.s3.S3SourceAgentCodeProvider \ No newline at end of file +ai.langstream.agents.s3.S3AgentCodeProvider \ No newline at end of file diff --git a/langstream-agents/langstream-agent-s3/src/test/java/ai/langstream/agents/s3/S3SourceTest.java b/langstream-agents/langstream-agent-s3/src/test/java/ai/langstream/agents/s3/S3SourceTest.java index a4dcf0066..089a3bf5d 100644 --- a/langstream-agents/langstream-agent-s3/src/test/java/ai/langstream/agents/s3/S3SourceTest.java +++ b/langstream-agents/langstream-agent-s3/src/test/java/ai/langstream/agents/s3/S3SourceTest.java @@ -15,6 +15,7 @@ */ package ai.langstream.agents.s3; +import static org.junit.Assert.assertSame; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -25,10 +26,14 @@ import ai.langstream.api.runner.code.AgentCodeRegistry; import ai.langstream.api.runner.code.AgentContext; +import ai.langstream.api.runner.code.AgentProcessor; import ai.langstream.api.runner.code.AgentSource; +import ai.langstream.api.runner.code.Header; import ai.langstream.api.runner.code.MetricsReporter; import ai.langstream.api.runner.code.Record; +import ai.langstream.api.runner.code.SimpleRecord; import io.minio.ListObjectsArgs; +import io.minio.MakeBucketArgs; import io.minio.MinioClient; import io.minio.PutObjectArgs; import io.minio.RemoveObjectArgs; @@ -37,10 +42,12 @@ import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import org.junit.jupiter.api.BeforeAll; @@ -71,6 +78,197 @@ static void setup() { .build(); } + @Test + void testProcess() throws Exception { + // Add some objects to the bucket + String bucket = "langstream-test-" + UUID.randomUUID(); + minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucket).build()); + AgentProcessor agentProcessor = buildAgentProcessor(bucket); + String content = "test-content-"; + for (int i = 0; i < 2; i++) { + String s = content + i; + minioClient.putObject( + PutObjectArgs.builder().bucket(bucket).object("test-" + i + ".txt").stream( + new ByteArrayInputStream(s.getBytes(StandardCharsets.UTF_8)), + s.length(), + -1) + .build()); + } + + // Create a input record that specifies the first file + String objectName = "test-0.txt"; + SimpleRecord someRecord = + SimpleRecord.builder() + .value("{\"objectName\": \"" + objectName + "\"}") + .headers( + List.of( + new SimpleRecord.SimpleHeader( + "original", "Some session id"))) + .build(); + + // Process the record + List resultsForRecord = new ArrayList<>(); + agentProcessor.process(List.of(someRecord), resultsForRecord::add); + + // Should be a record for the file + assertEquals(1, resultsForRecord.size()); + + // the processor must pass downstream the original record + Record emittedToDownstream = resultsForRecord.get(0).sourceRecord(); + assertSame(emittedToDownstream, someRecord); + + // The resulting record should have the file content as the value + assertArrayEquals( + "test-content-0".getBytes(StandardCharsets.UTF_8), + (byte[]) resultsForRecord.get(0).resultRecords().get(0).value()); + // The resulting record should have the file name as the key + assertEquals(objectName, resultsForRecord.get(0).resultRecords().get(0).key()); + + // Check headers + Collection
headers = resultsForRecord.get(0).resultRecords().get(0).headers(); + + // Make sure the name header matches the object name + Optional
foundNameHeader = + headers.stream() + .filter( + header -> + "name".equals(header.key()) + && objectName.equals(header.value())) + .findFirst(); + + assertTrue( + foundNameHeader.isPresent()); // Check that the object name is passed in the record + + // Make sure the original header matches the passed in header + Optional
foundOrigHeader = + headers.stream() + .filter( + header -> + "original".equals(header.key()) + && "Some session id".equals(header.value())) + .findFirst(); + + assertTrue( + foundOrigHeader.isPresent()); // Check that the object name is passed in the record + + // Get the next file + String secondObjectName = "test-1.txt"; + someRecord = + SimpleRecord.builder() + .value("{\"objectName\": \"" + secondObjectName + "\"}") + .headers( + List.of( + new SimpleRecord.SimpleHeader( + "original", "Some session id"))) + .build(); + + resultsForRecord = new ArrayList<>(); + agentProcessor.process(List.of(someRecord), resultsForRecord::add); + + // Make sure the second file is processed + assertEquals(1, resultsForRecord.size()); // assertEquals( + } + + @Test + void testProcessFromDirectory() throws Exception { + // Add some objects to the bucket in a directory + String bucket = "langstream-test-" + UUID.randomUUID(); + String directory = "test-dir/"; + minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucket).build()); + AgentProcessor agentProcessor = buildAgentProcessor(bucket); + String content = "test-content-"; + for (int i = 0; i < 2; i++) { + String s = content + i; + minioClient.putObject( + PutObjectArgs.builder() + .bucket(bucket) + .object(directory + "test-" + i + ".txt") + .stream( + new ByteArrayInputStream(s.getBytes(StandardCharsets.UTF_8)), + s.length(), + -1) + .build()); + } + + // Process the first file in the directory + String firstObjectName = directory + "test-0.txt"; + SimpleRecord firstRecord = + SimpleRecord.builder() + .value("{\"objectName\": \"" + firstObjectName + "\"}") + .headers( + List.of( + new SimpleRecord.SimpleHeader( + "original", "Some session id"))) + .build(); + + List resultsForFirstRecord = new ArrayList<>(); + agentProcessor.process(List.of(firstRecord), resultsForFirstRecord::add); + // Make sure the first file is processed and that the original record is passed downstream + assertEquals(1, resultsForFirstRecord.size()); + assertSame(firstRecord, resultsForFirstRecord.get(0).sourceRecord()); + // Check that the content of the first record is the content of the first file + assertArrayEquals( + "test-content-0".getBytes(StandardCharsets.UTF_8), + (byte[]) resultsForFirstRecord.get(0).resultRecords().get(0).value()); + // Check that the key of the first record is the name of the first file + assertEquals(firstObjectName, resultsForFirstRecord.get(0).resultRecords().get(0).key()); + + // Check headers for first record + // The name header contains the file name + Collection
firstRecordHeaders = + resultsForFirstRecord.get(0).resultRecords().get(0).headers(); + assertTrue( + firstRecordHeaders.stream() + .anyMatch( + header -> + "name".equals(header.key()) + && firstObjectName.equals(header.value()))); + // The original header contains the original header from the record + assertTrue( + firstRecordHeaders.stream() + .anyMatch( + header -> + "original".equals(header.key()) + && "Some session id".equals(header.value()))); + + // Process the second file in the directory + String secondObjectName = directory + "test-1.txt"; + SimpleRecord secondRecord = + SimpleRecord.builder() + .value("{\"objectName\": \"" + secondObjectName + "\"}") + .headers( + List.of( + new SimpleRecord.SimpleHeader( + "original", "Some session id"))) + .build(); + + List resultsForSecondRecord = new ArrayList<>(); + agentProcessor.process(List.of(secondRecord), resultsForSecondRecord::add); + + assertEquals(1, resultsForSecondRecord.size()); + assertSame(secondRecord, resultsForSecondRecord.get(0).sourceRecord()); + + assertArrayEquals( + "test-content-1".getBytes(StandardCharsets.UTF_8), + (byte[]) resultsForSecondRecord.get(0).resultRecords().get(0).value()); + + // Check headers for second record + Collection
secondRecordHeaders = + resultsForSecondRecord.get(0).resultRecords().get(0).headers(); + assertTrue( + secondRecordHeaders.stream() + .anyMatch( + header -> + "name".equals(header.key()) + && secondObjectName.equals(header.value()))); + assertTrue( + secondRecordHeaders.stream() + .anyMatch( + header -> + "original".equals(header.key()) + && "Some session id".equals(header.value()))); + } + @Test void testRead() throws Exception { String bucket = "langstream-test-" + UUID.randomUUID(); @@ -186,6 +384,22 @@ private AgentSource buildAgentSource(String bucket) throws Exception { return agentSource; } + private AgentProcessor buildAgentProcessor(String bucket) throws Exception { + AgentProcessor agent = + (AgentProcessor) AGENT_CODE_REGISTRY.getAgentCode("s3-processor").agentCode(); + Map configs = new HashMap<>(); + String endpoint = localstack.getEndpointOverride(S3).toString(); + configs.put("endpoint", endpoint); + configs.put("bucketName", bucket); + configs.put("objectName", "{{ value.objectName }}"); + agent.init(configs); + AgentContext context = mock(AgentContext.class); + when(context.getMetricsReporter()).thenReturn(MetricsReporter.DISABLED); + agent.setContext(context); + agent.start(); + return agent; + } + @Test void testIsExtensionAllowed() { assertTrue(S3Source.isExtensionAllowed("aaa", Set.of("*"))); diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/ObjectStorageProcessorAgentProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/ObjectStorageProcessorAgentProvider.java new file mode 100644 index 000000000..9fd410478 --- /dev/null +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/ObjectStorageProcessorAgentProvider.java @@ -0,0 +1,114 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.runtime.impl.k8s.agents; + +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.api.model.AgentConfiguration; +import ai.langstream.api.runtime.ComponentType; +import ai.langstream.impl.agents.AbstractComposableAgentProvider; +import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import java.util.Set; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +/** Implements support S3 Processor Agents. */ +@Slf4j +public class ObjectStorageProcessorAgentProvider extends AbstractComposableAgentProvider { + + protected static final String S3_PROCESSOR = "s3-processor"; + + public ObjectStorageProcessorAgentProvider() { + super(Set.of(S3_PROCESSOR), List.of(KubernetesClusterRuntime.CLUSTER_TYPE, "none")); + } + + @Override + protected final ComponentType getComponentType(AgentConfiguration agentConfiguration) { + return ComponentType.PROCESSOR; + } + + @Override + protected Class getAgentConfigModelClass(String type) { + switch (type) { + case S3_PROCESSOR: + return S3ProcessorConfiguration.class; + default: + throw new IllegalArgumentException("Unknown agent type: " + type); + } + } + + @AgentConfig(name = "S3 Processor", description = "Processes a file from an S3 bucket") + @Data + public static class S3ProcessorConfiguration { + + protected static final String DEFAULT_BUCKET_NAME = "langstream-source"; + protected static final String DEFAULT_ENDPOINT = "http://minio-endpoint.-not-set:9090"; + protected static final String DEFAULT_ACCESSKEY = "minioadmin"; + protected static final String DEFAULT_SECRETKEY = "minioadmin"; + + @ConfigProperty( + description = + """ + The name of the bucket that contains the file. + """, + defaultValue = DEFAULT_BUCKET_NAME) + private String bucketName = DEFAULT_BUCKET_NAME; + + @ConfigProperty( + description = + """ + The endpoint of the S3 server. + """, + defaultValue = DEFAULT_ENDPOINT) + private String endpoint = DEFAULT_ENDPOINT; + + @ConfigProperty( + description = + """ + Access key for the S3 server. + """, + defaultValue = DEFAULT_ACCESSKEY) + @JsonProperty("access-key") + private String accessKey = DEFAULT_ACCESSKEY; + + @ConfigProperty( + description = + """ + Secret key for the S3 server. + """, + defaultValue = DEFAULT_SECRETKEY) + @JsonProperty("secret-key") + private String secretKey = DEFAULT_SECRETKEY; + + @ConfigProperty( + required = false, + description = + """ + Region for the S3 server. + """) + private String region = ""; + + @ConfigProperty( + required = true, + description = + """ + The object name to read from the S3 server. + """) + private String objectName = ""; + } +} diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AgentNodeProvider b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AgentNodeProvider index 2363609ca..2a7ec77cf 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AgentNodeProvider +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AgentNodeProvider @@ -1,6 +1,7 @@ ai.langstream.runtime.impl.k8s.agents.PythonCodeAgentProvider ai.langstream.runtime.impl.k8s.agents.KubernetesGenAIToolKitFunctionAgentProvider ai.langstream.runtime.impl.k8s.agents.ObjectStorageSourceAgentProvider +ai.langstream.runtime.impl.k8s.agents.ObjectStorageProcessorAgentProvider ai.langstream.runtime.impl.k8s.agents.WebCrawlerSourceAgentProvider ai.langstream.runtime.impl.k8s.agents.TextProcessingAgentsProvider ai.langstream.runtime.impl.k8s.agents.KafkaConnectAgentsProvider diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/S3ProcessorAgentProviderTest.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/S3ProcessorAgentProviderTest.java new file mode 100644 index 000000000..b677168df --- /dev/null +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/S3ProcessorAgentProviderTest.java @@ -0,0 +1,153 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.runtime.impl.k8s.agents; + +import static org.junit.jupiter.api.Assertions.*; + +import ai.langstream.api.doc.AgentConfigurationModel; +import ai.langstream.api.runtime.PluginsRegistry; +import ai.langstream.deployer.k8s.util.SerializationUtil; +import ai.langstream.impl.noop.NoOpComputeClusterRuntimeProvider; +import java.util.Map; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class S3ProcessorAgentProviderTest { + @Test + @SneakyThrows + public void testValidation() { + validate( + """ + topics: [] + pipeline: + - name: "s3-processor" + type: "s3-processor" + configuration: + a-field: "val" + objectName: "simple.pdf" + """, + "Found error on agent configuration (agent: 's3-processor', type: 's3-processor'). Property 'a-field' is unknown"); + validate( + """ + topics: [] + pipeline: + - name: "s3-processor" + type: "s3-processor" + configuration: + objectName: "simple.pdf" + """, + null); + validate( + """ + topics: [] + pipeline: + - name: "s3-processor" + type: "s3-processor" + configuration: + bucketName: "my-bucket" + access-key: KK + secret-key: SS + endpoint: "http://localhost:9000" + region: "us-east-1" + objectName: "simple.pdf" + """, + null); + validate( + """ + topics: [] + pipeline: + - name: "s3-processor" + type: "s3-processor" + configuration: + bucketName: 12 + objectName: "simple.pdf" + """, + null); + validate( + """ + topics: [] + pipeline: + - name: "s3-processor" + type: "s3-processor" + configuration: + bucketName: {object: true} + objectName: "simple.pdf" + """, + "Found error on agent configuration (agent: 's3-processor', type: 's3-processor'). Property 'bucketName' has a wrong data type. Expected type: java.lang.String"); + } + + private void validate(String pipeline, String expectErrMessage) throws Exception { + AgentValidationTestUtil.validate(pipeline, expectErrMessage); + } + + @Test + @SneakyThrows + public void testDocumentation() { + final Map model = + new PluginsRegistry() + .lookupAgentImplementation( + "s3-processor", + new NoOpComputeClusterRuntimeProvider.NoOpClusterRuntime()) + .generateSupportedTypesDocumentation(); + + Assertions.assertEquals( + """ + { + "s3-processor" : { + "name" : "S3 Processor", + "description" : "Processes a file from an S3 bucket", + "properties" : { + "access-key" : { + "description" : "Access key for the S3 server.", + "required" : false, + "type" : "string", + "defaultValue" : "minioadmin" + }, + "bucketName" : { + "description" : "The name of the bucket that contains the file.", + "required" : false, + "type" : "string", + "defaultValue" : "langstream-source" + }, + "endpoint" : { + "description" : "The endpoint of the S3 server.", + "required" : false, + "type" : "string", + "defaultValue" : "http://minio-endpoint.-not-set:9090" + }, + "objectName" : { + "description" : "The object name to read from the S3 server.", + "required" : true, + "type" : "string" + }, + "region" : { + "description" : "Region for the S3 server.", + "required" : false, + "type" : "string" + }, + "secret-key" : { + "description" : "Secret key for the S3 server.", + "required" : false, + "type" : "string", + "defaultValue" : "minioadmin" + } + } + } + }""", + SerializationUtil.prettyPrintJson(model)); + } +}