Skip to content

Commit

Permalink
Merge pull request #5 from awslabs/development
Browse files Browse the repository at this point in the history
Release 2.0.0 to support vending multiple tokens and dashboard
  • Loading branch information
avipinku authored Feb 4, 2022
2 parents 3ce8612 + 30f1023 commit 37d869c
Show file tree
Hide file tree
Showing 12 changed files with 539 additions and 214 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ jobs:
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Testing CLI (Runs both unit and integration tests)
run: |
coverage run --source=src -m pytest -v -s . && coverage report --show-missing --fail-under=63
coverage run --source=src -m pytest -v -s . && coverage report --show-missing --fail-under=75
263 changes: 163 additions & 100 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion gdk-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"component" :{
"aws.greengrass.labs.database.InfluxDB": {
"author": "AWS IoT Greengrass",
"version": "NEXT_PATCH",
"version": "2.0.0",
"build": {
"build_system": "zip"
},
Expand Down
6 changes: 3 additions & 3 deletions recipe.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
RecipeFormatVersion: '2020-01-25'
ComponentName: aws.greengrass.labs.database.InfluxDB
ComponentVersion: '1.0.0'
ComponentVersion: '2.0.0'
ComponentDescription: 'A component that provisions and manages an InfluxDB instance.'
ComponentPublisher: Amazon
ComponentDependencies:
Expand All @@ -15,7 +15,7 @@ ComponentConfiguration:
DefaultConfiguration:
AutoProvision: 'true'
InfluxDBMountPath: '/home/ggc_user/dashboard'
SecretArn: 'arn:aws:secretsmanager:<region>:<account>:secret:<name>'
SecretArn: 'arn:aws:secretsmanager:region:account:secret:name'
InfluxDBContainerName: greengrass_InfluxDB
InfluxDBOrg: 'greengrass'
InfluxDBBucket: 'greengrass-telemetry'
Expand Down Expand Up @@ -48,7 +48,7 @@ ComponentConfiguration:
operations:
- aws.greengrass#GetSecretValue
resources:
- 'arn:aws:secretsmanager:<region>:<account>:secret:<name>'
- 'arn:aws:secretsmanager:region:account:secret:name'
Manifests:
- Platform:
os: /darwin|linux/
Expand Down
47 changes: 24 additions & 23 deletions src/influxDBTokenPublisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

logging.basicConfig(level=logging.INFO)
TIMEOUT = 10
# Influx commands need to be given the port of InfluxDB inside the container, which is always 8086 unless
# overridden inside the InfluxDB config
INFLUX_CONTAINER_PORT = 8086


def parse_arguments() -> Namespace:
Expand Down Expand Up @@ -47,7 +50,7 @@ def parse_arguments() -> Namespace:
return parser.parse_args()


def retrieve_influxDB_token(args) -> str:
def retrieve_influxDB_token_json(args) -> str:
"""
Retrieve the created RW token from InfluxDB.
Expand All @@ -61,11 +64,10 @@ def retrieve_influxDB_token(args) -> str:
"""

token_json = ""
dockerExecProcess = ""
authListCommand = ['docker', 'exec', '-t', args.influxdb_container_name, 'influx', 'auth', 'list', '--json']
if args.server_protocol == "https":
authListCommand.append('--host')
authListCommand.append('https://{}:{}'.format(args.influxdb_container_name, args.influxdb_port))
authListCommand.append('https://{}:{}'.format(args.influxdb_container_name, INFLUX_CONTAINER_PORT))

if bool(strtobool(args.skip_tls_verify)):
authListCommand.append('--skip-verify')
Expand All @@ -78,48 +80,47 @@ def retrieve_influxDB_token(args) -> str:
if dockerExecProcess.stderr:
logging.error(dockerExecProcess.stderr)
if(len(token_json) == 0):
logging.error('Failed to retrieve InfluxDB RW token data from Docker! Retrieved token was: {}'.format(token_json))
logging.error('Failed to retrieve InfluxDB RW token data from Docker! Retrieved data was: {}'.format(token_json))
exit(1)
influxdb_rw_token = next(d for d in json.loads(token_json) if d['description'] == 'greengrass_readwrite')['token']
if(len(influxdb_rw_token) == 0):
logging.error('Failed to parse InfluxDB RW token! Retrieved token was: {}'.format(influxdb_rw_token))
influxdb_token = json.loads(token_json)[0]['token']
if(len(influxdb_token) == 0):
logging.error('Retrieved InfluxDB tokens was empty!')
exit(1)

return influxdb_rw_token
return token_json


def listen_to_token_requests(args, influxdb_rw_token) -> None:
def listen_to_token_requests(args, influxdb_token_json) -> None:
"""
Setup a new IPC subscription over local pub/sub to listen to token requests and vend tokens.
Parameters
----------
args(Namespace): Parsed arguments
influxdb_rw_token(str): InfluxDB RW token
influxdb_token_json(str): InfluxDB token JSON string
Returns
-------
None
"""

try:
influxDB_data = {}
influxDB_data['InfluxDBContainerName'] = args.influxdb_container_name
influxDB_data['InfluxDBOrg'] = args.influxdb_org
influxDB_data['InfluxDBBucket'] = args.influxdb_bucket
influxDB_data['InfluxDBPort'] = args.influxdb_port
influxDB_data['InfluxDBInterface'] = args.influxdb_interface
influxDB_data['InfluxDBRWToken'] = influxdb_rw_token
influxDB_data['InfluxDBServerProtocol'] = args.server_protocol
influxDB_data['InfluxDBSkipTLSVerify'] = args.skip_tls_verify
influxDB_json = json.dumps(influxDB_data)
influxdb_metadata = {}
influxdb_metadata['InfluxDBContainerName'] = args.influxdb_container_name
influxdb_metadata['InfluxDBOrg'] = args.influxdb_org
influxdb_metadata['InfluxDBBucket'] = args.influxdb_bucket
influxdb_metadata['InfluxDBPort'] = args.influxdb_port
influxdb_metadata['InfluxDBInterface'] = args.influxdb_interface
influxdb_metadata['InfluxDBServerProtocol'] = args.server_protocol
influxdb_metadata['InfluxDBSkipTLSVerify'] = args.skip_tls_verify
influxdb_metadata_json = json.dumps(influxdb_metadata)

logging.info('Successfully retrieved InfluxDB parameters!')

ipc_client = awsiot.greengrasscoreipc.connect()
request = SubscribeToTopicRequest()
request.topic = args.subscribe_topic
handler = InfluxDBTokenStreamHandler(influxDB_json, args.publish_topic)
handler = InfluxDBTokenStreamHandler(influxdb_metadata_json, influxdb_token_json, args.publish_topic)
operation = ipc_client.new_subscribe_to_topic(handler)
operation.activate(request)
logging.info('Successfully subscribed to topic: {}'.format(args.subscribe_topic))
Expand All @@ -138,8 +139,8 @@ def listen_to_token_requests(args, influxdb_rw_token) -> None:
if __name__ == "__main__":
try:
args = parse_arguments()
influxdb_rw_token = retrieve_influxDB_token(args)
listen_to_token_requests(args, influxdb_rw_token)
influxdb_token_json = retrieve_influxDB_token_json(args)
listen_to_token_requests(args, influxdb_token_json)
# Keep the main thread alive, or the process will exit.
while True:
time.sleep(10)
Expand Down
67 changes: 53 additions & 14 deletions src/influxDBTokenStreamHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,28 @@

import concurrent.futures
import logging

import json
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
PublishToTopicRequest,
PublishMessage,
BinaryMessage,
JsonMessage,
SubscriptionResponseMessage,
UnauthorizedError
)

TIMEOUT = 10
# Admin token description is in the format "USERNAME's Token"
ADMIN_TOKEN_IDENTIFIER = "'s Token"


class InfluxDBTokenStreamHandler(client.SubscribeToTopicStreamHandler):
def __init__(self, influxDB_json, publish_topic):
def __init__(self, influxdb_metadata_json, influxdb_token_json, publish_topic):
super().__init__()
# We need a separate IPC client for publishing
self.influxDB_json = influxDB_json
self.influxDB_metadata_json = influxdb_metadata_json
self.influxDB_token_json = influxdb_token_json
self.publish_topic = publish_topic
self.publish_client = awsiot.greengrasscoreipc.connect()
logging.info("Initialized InfluxDBTokenStreamHandler")
Expand All @@ -39,12 +42,12 @@ def handle_stream_event(self, event: SubscriptionResponseMessage) -> None:
None
"""
try:
message = str(event.binary_message.message, "utf-8")
if message == 'GetInfluxDBData':
logging.info('Sending InfluxDB RW Token on the response topic')
self.publish_response()
else:
logging.warning('Unknown request type received over pub/sub')
message = event.json_message.message
publish_json = self.get_publish_json(message)
if not publish_json:
logging.error("Failed to construct requested response for access")
return
self.publish_response(publish_json)
except Exception:
logging.error('Received an error', exc_info=True)

Expand Down Expand Up @@ -80,13 +83,49 @@ def on_stream_closed(self) -> None:
"""
logging.info('Subscribe to topic stream closed.')

def publish_response(self) -> None:
def get_publish_json(self, message):
"""
Parse the correct token based on the IPC message received, and construct the final JSON to publish.
:param message: the received IPC messsage
:return: the complete JSON, including token, to publish
"""

loaded_token_json = json.loads(self.influxDB_token_json)
publish_json = json.loads(self.influxDB_metadata_json)

if not message['action'] == 'RetrieveToken':
logging.warning('Unknown request type received over pub/sub')
return None

token = ''
if message['accessLevel'] == 'RW':
token = next(d for d in loaded_token_json if d['description'] == 'greengrass_readwrite')['token']
elif message['accessLevel'] == 'RO':
token = next(d for d in loaded_token_json if d['description'] == 'greengrass_read')['token']
elif message['accessLevel'] == 'Admin':
if not ADMIN_TOKEN_IDENTIFIER in loaded_token_json[0]['description']:
logging.warning("InfluxDB admin token is missing or in an incorrect format")
return None
token = loaded_token_json[0]['token']
else:
logging.warning('Unknown token request type specified over pub/sub')
return None

if len(token) == 0:
raise ValueError('Failed to parse InfluxDB {} token!'.format(message['accessLevel']))
publish_json['InfluxDBTokenAccessType'] = message['accessLevel']
publish_json['InfluxDBToken'] = token
logging.info('Sending InfluxDB {} Token on the response topic'.format(message['accessLevel']))
return publish_json

def publish_response(self, publishMessage) -> None:
"""
Publish the InfluxDB token on the token response topic.
Parameters
----------
None
publishMessage(str): the message to send including InfluxDB metadata and token
Returns
-------
Expand All @@ -96,8 +135,8 @@ def publish_response(self) -> None:
request = PublishToTopicRequest()
request.topic = self.publish_topic
publish_message = PublishMessage()
publish_message.binary_message = BinaryMessage()
publish_message.binary_message.message = bytes(self.influxDB_json, "utf-8")
publish_message.json_message = JsonMessage()
publish_message.json_message.message = publishMessage
request.publish_message = publish_message
operation = self.publish_client.new_publish_to_topic()
operation.activate(request)
Expand Down
Loading

0 comments on commit 37d869c

Please sign in to comment.