diff --git a/broker/Dockerfile_multiarch b/broker/Dockerfile_multiarch new file mode 100644 index 00000000..d685c6fa --- /dev/null +++ b/broker/Dockerfile_multiarch @@ -0,0 +1,5 @@ +FROM alpine +ARG TARGETOS +ARG TARGETARCH +ADD /${TARGETOS}/${TARGETARCH}/broker / +CMD ["/broker"] diff --git a/broker/build_k8s b/broker/build_k8s new file mode 100644 index 00000000..935045fa --- /dev/null +++ b/broker/build_k8s @@ -0,0 +1 @@ +docker buildx build --platform linux/arm,linux/arm64,linux/amd64 --push -f ./Dockerfile_multiarch -t "fogflow/broker:k8s" . diff --git a/designer/build_k8s b/designer/build_k8s new file mode 100644 index 00000000..15522942 --- /dev/null +++ b/designer/build_k8s @@ -0,0 +1 @@ +docker image tag fogflow/designer:latest fogflow/designer:k8s diff --git a/designer/config.json b/designer/config.json index 5d0fb92b..192d6ff3 100644 --- a/designer/config.json +++ b/designer/config.json @@ -28,7 +28,8 @@ }, "designer": { "webSrvPort": 8080, - "agentPort": 1030 + "agentPort": 1030, + "notRecreateSubscriptions": false }, "rabbitmq": { "port": 5672, diff --git a/designer/main.js b/designer/main.js index 081bc1dc..0e3eea52 100644 --- a/designer/main.js +++ b/designer/main.js @@ -68,6 +68,8 @@ if (!('host_ip' in globalConfigFile.broker)) { var cloudBrokerURL = "http://" + globalConfigFile.broker.host_ip + ":" + globalConfigFile.broker.http_port var ngsi10client = new NGSIClient.NGSI10Client(cloudBrokerURL + "/ngsi10"); +config.notRecreateSubscriptions = globalConfigFile.designer.notRecreateSubscriptions; + var recheck_interval = 2000; var timerID = setTimeout(function entityrestore(){ var url = cloudBrokerURL + "/version"; @@ -81,29 +83,58 @@ var timerID = setTimeout(function entityrestore(){ }); }); - // create the persistent subscriptions at the FogFlow Cloud Broker - Object.values(db.data.subscriptions).forEach(subscription => { - var headers = {}; - - if (subscription.destination_broker == 'NGSI-LD') { - headers["Content-Type"] = "application/json"; - headers["Destination"] = "NGSI-LD"; - headers["NGSILD-Tenant"] = subscription.tenant; - } else if (subscription.destination_broker == 'NGSIv2') { - headers["Content-Type"] = "application/json"; - headers["Destination"] = "NGSIv2"; - } - - var subscribeCtxReq = {}; - subscribeCtxReq.entities = [{ type: subscription['entity_type'], isPattern: true }]; - subscribeCtxReq.reference = subscription['reference_url']; - ngsi10client.subscribeContextWithHeaders(subscribeCtxReq, headers).then(function (subscriptionId) { - console.log("new subscription id = ", subscriptionId); - console.log(subscription) - }).catch(function (error) { - console.log('failed to subscribe context, ', error); - }); - }); + + if (!config.notRecreateSubscriptions){ + + fetch(cloudBrokerURL + "/ngsi10/subscription"). + then((response) => response.json()). + then(brokerSubscriptions => { + + Object.keys(db.data.subscriptions). + forEach(key => { + console.log("checking if the following subscription is already in the broker: ",key) + if(brokerSubscriptions.hasOwnProperty(key)) { + //delete designerSubscriptions[key]; + console.log("subscription already in the broker:", key) + } else { + + var subscription = db.data.subscriptions[key] + + var headers = {}; + + if (subscription.destination_broker == 'NGSI-LD') { + headers["Content-Type"] = "application/json"; + headers["Destination"] = "NGSI-LD"; + headers["NGSILD-Tenant"] = subscription.tenant; + } else if (subscription.destination_broker == 'NGSIv2') { + headers["Content-Type"] = "application/json"; + headers["Destination"] = "NGSIv2"; + } + + var subscribeCtxReq = {}; + subscribeCtxReq.entities = [{ type: subscription['entity_type'], isPattern: true }]; + subscribeCtxReq.reference = subscription['reference_url']; + // it is necessary to send them out not at the same time for having different subID + // since the subID is based on timestamp + var delayBeforeSend = Math.floor(Math.random() * 1000) + setTimeout(function(){ + //console.log("waited", delayBeforeSend); + ngsi10client.subscribeContextWithHeaders(subscribeCtxReq, headers).then(function (subscriptionId) { + console.log("new subscription id = ", subscriptionId); + console.log(subscription) + delete db.data.subscriptions[key]; + db.data.subscriptions[subscriptionId] = subscription + db.write(); + }).catch(function (error) { + console.log('failed to subscribe context, ', error); + }); + }, delayBeforeSend); + } + }); + }) + } else { + console.log("not recreating subscriptions ", config.notRecreateSubscriptions); + } }).catch(error=>{ console.log("try it again due to the error: ", error.code); @@ -420,6 +451,39 @@ app.post('/operator', jsonParser, async function (req, res) { res.sendStatus(200) }); +app.get('/dockerimage', async function (req, res) { + var operators = db.data.operators; + var dockers = {} + for (const [key, value] of Object.entries(operators)) { + if (value.hasOwnProperty('dockerimages') && value.dockerimages.length > 0){ + dockers.key = value + } + } + res.json(dockers); + +}); + +app.post('/dockerimage', jsonParser, async function (req, res) { + var dockerimages = req.body; + for (var i = 0; i < dockerimages.length; i++) { + var dockerimage = dockerimages[i]; + var operatorName = dockerimage.operatorName + + if (operatorName in db.data.operators) { + if (db.data.operators[operatorName].hasOwnProperty('dockerimages')){ + db.data.operators[operatorName].dockerimages.push(dockerimage) + } else { + db.data.operators[operatorName].dockerimages = [] + db.data.operators[operatorName].dockerimages.push(dockerimage) + } + } + } + + await db.write(); + + res.sendStatus(200) +}); + app.get('/dockerimage/:operator', async function (req, res) { var operatorName = req.params.operator; var operator = db.data.operators[operatorName]; @@ -430,8 +494,16 @@ app.post('/dockerimage/:operator', jsonParser, async function (req, res) { var operatorName = req.params.operator; var dockerimage = req.body; + console.log(operatorName in db.data.operators) + console.log(db.data.operators[operatorName]) + if (operatorName in db.data.operators) { - db.data.operators[operatorName].dockerimages.push(dockerimage) + if (db.data.operators[operatorName].hasOwnProperty('dockerimages')){ + db.data.operators[operatorName].dockerimages.push(dockerimage) + } else { + db.data.operators[operatorName].dockerimages = [] + db.data.operators[operatorName].dockerimages.push(dockerimage) + } } await db.write(); diff --git a/designer/public/js/function.js b/designer/public/js/function.js index 735e6b20..ea465fc8 100644 --- a/designer/public/js/function.js +++ b/designer/public/js/function.js @@ -31,7 +31,7 @@ $(function() { addMenuItem('FogFunction', 'Fog Function', showFogFunctions); //addMenuItem('TaskInstance', 'Task Instance', showTaskInstances); - //initFogFunctionExamples(); + initFogFunctionExamples(); showFogFunctions(); queryOperatorList(); diff --git a/designer/public/js/initialization.js b/designer/public/js/initialization.js index d058f01e..a4d1ea22 100644 --- a/designer/public/js/initialization.js +++ b/designer/public/js/initialization.js @@ -2,47 +2,58 @@ function defaultOperatorList(){ var operatorList = [{ name: "nodejs", description: "", - parameters: [] + parameters: [], + dockerimages: [] }, { name: "python", description: "", - parameters: [] + parameters: [], + dockerimages: [] }, { name: "iotagent", description: "", - parameters: [] + parameters: [], + dockerimages: [] }, { name: "counter", description: "", - parameters: [] + parameters: [], + dockerimages: [] }, { name: "anomaly", description: "", - parameters: [] + parameters: [], + dockerimages: [] }, { name: "facefinder", description: "", - parameters: [] + parameters: [], + dockerimages: [] }, { name: "connectedcar", description: "", - parameters: [] + parameters: [], + dockerimages: [] }, { name: "recommender", description: "", - parameters: [] + parameters: [], + dockerimages: [] }, { name: "privatesite", description: "", - parameters: [] + parameters: [], + dockerimages: [] }, { name: "publicsite", description: "", - parameters: [] + parameters: [], + dockerimages: [] }, { name: "dummy", description: "", - parameters: [] + parameters: [], + dockerimages: [] }]; return operatorList; diff --git a/designer/public/js/operator.js b/designer/public/js/operator.js index 3ca3259d..1e95b8c1 100644 --- a/designer/public/js/operator.js +++ b/designer/public/js/operator.js @@ -4,11 +4,11 @@ $(function() { var handlers = {}; addMenuItem('Operator', 'Operator', showOperator); + + initOperatorList(); + initDockerImageList(); showOperator(); - - //initOperatorList(); - //initDockerImageList(); $(window).on('hashchange', function() { var hash = window.location.hash; @@ -51,6 +51,7 @@ $(function() { } function initOperatorList(){ + console.log("let's initialize the operator list") fetch('/operator').then(res => res.json()).then(opList => { if (Object.keys(opList).length === 0) { var operators = defaultOperatorList(); @@ -73,6 +74,27 @@ $(function() { }) } + function initDockerImageList(){ + console.log("Initialize docker images list") + fetch('/dockerimage').then(res => res.json()).then(imageList => { + if (Object.keys(imageList).length === 0) { + var images = defaultDockerImageList(); + fetch("/dockerimage", { + method: "POST", + headers: { + Accept: "application/json", + "Content-Type": "application/json" + }, + body: JSON.stringify(images) + }) + .then(response => { + console.log("send the initial list of docker images: ", response.status) + }) + .catch(err => console.log(err)); + } + }) + } + function queryOperatorList() { fetch('/operator').then(res => res.json()).then(operators => { Object.values(operators).forEach(operator => { @@ -109,7 +131,11 @@ $(function() { html += '