Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Export for Job Framework #1382

Merged
merged 15 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,9 @@ jobs:
AWS_ACCESS_KEY_ID: localstack
AWS_SECRET_ACCESS_KEY: localstack
AWS_DEFAULT_REGION: us-east-1
- name: Run Setup of JobService
working-directory: ./xyz-jobs/xyz-job-service/src/main/bash
run: docker run --rm --entrypoint '' -v ./localSetup.sh:/aws/localSetup.sh --add-host host.docker.internal=host-gateway amazon/aws-cli ./localSetup.sh true
- name: Run tests
working-directory: ./
run: mvn verify -DskipTests=false
20 changes: 20 additions & 0 deletions Dockerfile-job-service
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM openjdk:17-slim

MAINTAINER Benjamin Rögner "[email protected]"
MAINTAINER Lucas Ceni "[email protected]"
MAINTAINER Dimitar Goshev "[email protected]"
MAINTAINER Minikon Nah "[email protected]"
MAINTAINER Maximilian Chrzan "[email protected]"

ENV LOG_CONFIG log4j2-console-plain.json
ENV LOG_PATH /var/log/xyz

#Override the following environment variables to let the service connect to different host names
ENV LOCALSTACK_ENDPOINT http://aws-localstack:4566
ENV HUB_ENDPOINT http://xyz-hub:8080/hub

COPY xyz-jobs/xyz-job-service/target/xyz-job-service.jar .
ADD Dockerfile-job-service /

EXPOSE 7070
CMD java -jar xyz-job-service.jar
18 changes: 18 additions & 0 deletions docker-compose-dynamodb.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,24 @@ services:
- "postgres"
- "aws-localstack"
command: java -cp xyz-hub-service.jar com.here.xyz.httpconnector.CService
xyz-job-service:
image: "xyz-job"
build:
context: "./"
dockerfile: "Dockerfile-job-service"
container_name: xyz-job-service
ports:
- "7070:7070"
environment:
- HUB_ENDPOINT=http://xyz-hub:8080/hub
- LOCALSTACK_ENDPOINT=http://localstack:4566
- JOBS_DYNAMODB_TABLE_ARN=arn:aws:dynamodb:dynamodb:000000008000:table/xyz-jobs-local
depends_on:
- "xyz-hub"
- "dynamodb"
- "postgres"
- "aws-localstack"
command: java -cp xyz-job-service.jar com.here.xyz.jobs.service.JobService
postgres:
image: "xyz-postgres"
build:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2017-2023 HERE Europe B.V.
* Copyright (C) 2017-2024 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@

package com.here.xyz.connectors;

import static com.here.xyz.responses.XyzError.EXCEPTION;
import static com.here.xyz.responses.XyzError.FORBIDDEN;

import com.amazonaws.services.lambda.AWSLambda;
Expand All @@ -34,8 +35,8 @@
import com.here.xyz.XyzSerializable;
import com.here.xyz.connectors.decryptors.EventDecryptor;
import com.here.xyz.connectors.decryptors.EventDecryptor.Decryptors;
import com.here.xyz.connectors.runtime.ConnectorRuntime;
import com.here.xyz.connectors.runtime.LambdaConnectorRuntime;
import com.here.xyz.util.runtime.FunctionRuntime;
import com.here.xyz.util.runtime.LambdaFunctionRuntime;
import com.here.xyz.events.Event;
import com.here.xyz.events.EventNotification;
import com.here.xyz.events.HealthCheckEvent;
Expand Down Expand Up @@ -63,10 +64,6 @@
* A default implementation of a request handler that can be reused. It supports out of the box caching via e-tag.
*/
public abstract class AbstractConnectorHandler implements RequestStreamHandler {

/**
* Logger
*/
private static final Logger logger = LogManager.getLogger();

/**
Expand Down Expand Up @@ -216,7 +213,7 @@ public void handleRequest(InputStream input, OutputStream output, Context contex

String connectorId = null;
this.streamId = streamId != null ? streamId : event.getStreamId();
new LambdaConnectorRuntime(context, this.streamId);
new LambdaFunctionRuntime(context, this.streamId);

if (event.getConnectorParams() != null && event.getConnectorParams().get("connectorId") != null)
connectorId = (String) event.getConnectorParams().get("connectorId");
Expand All @@ -242,7 +239,7 @@ public void handleRequest(InputStream input, OutputStream output, Context contex
logger.error("{} Unexpected exception occurred:", traceItem, e);
dataOut = new ErrorResponse()
.withStreamId(this.streamId)
.withError(XyzError.EXCEPTION)
.withError(EXCEPTION)
.withErrorMessage("Unexpected exception occurred.");
}
catch (OutOfMemoryError e) {
Expand Down Expand Up @@ -311,7 +308,7 @@ private void writeDataOut(OutputStream output, Typed dataOut, String ifNoneMatch
.toByteArray();
}

final boolean runningLocally = ConnectorRuntime.getInstance().isRunningLocally();
final boolean runningLocally = FunctionRuntime.getInstance().isRunningLocally();
if (dataOut instanceof BinaryResponse) {
//NOTE: BinaryResponses contain an ETag automatically, nothing to calculate here
String etag = ((BinaryResponse) dataOut).getEtag();
Expand Down Expand Up @@ -372,8 +369,8 @@ private static void checkEventTypeAllowed(Event event) throws ErrorResponseExcep
* These type of events are sent in regular intervals to the lambda handler and could be used to keep the handler's container active and
* the connection to the database open.
*/
protected XyzResponse processHealthCheckEvent(HealthCheckEvent event) {
if (event.getWarmupCount() > 0 && !ConnectorRuntime.getInstance().isRunningLocally()) {
protected HealthStatus processHealthCheckEvent(HealthCheckEvent event) throws Exception {
if (event.getWarmupCount() > 0 && !FunctionRuntime.getInstance().isRunningLocally()) {
int warmupCount = event.getWarmupCount();
event.setWarmupCount(0);
byte[] newEvent = event.toByteArray();
Expand All @@ -383,7 +380,7 @@ protected XyzResponse processHealthCheckEvent(HealthCheckEvent event) {
if (lambdaClient == null)
lambdaClient = AWSLambdaClientBuilder.defaultClient();
threads.add(new Thread(() -> lambdaClient.invoke(new InvokeRequest()
.withFunctionName(((LambdaConnectorRuntime) ConnectorRuntime.getInstance()).getInvokedFunctionArn())
.withFunctionName(((LambdaFunctionRuntime) FunctionRuntime.getInstance()).getInvokedFunctionArn())
.withPayload(ByteBuffer.wrap(newEvent)))));
}
threads.forEach(t -> t.start());
Expand All @@ -395,10 +392,7 @@ protected XyzResponse processHealthCheckEvent(HealthCheckEvent event) {
Thread.sleep((event.getMinResponseTime() + start) - System.currentTimeMillis());
}
catch (InterruptedException e) {
return new ErrorResponse()
.withErrorMessage(e.getMessage())
.withStreamId(streamId)
.withError(XyzError.EXCEPTION);
throw new ErrorResponseException(EXCEPTION, e.getMessage());
}
}
return new HealthStatus();
Expand Down
Loading
Loading