Skip to content

Commit

Permalink
Added issue processing for mysql output.
Browse files Browse the repository at this point in the history
Added T_USE_MYSQL_CONTAINER switch to support local mysql docker container as well as external service.
  • Loading branch information
chrisekelley committed Oct 3, 2022
1 parent 32f4ab3 commit 2ffd181
Show file tree
Hide file tree
Showing 11 changed files with 212 additions and 38 deletions.
7 changes: 5 additions & 2 deletions config.defaults.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ T_UPLOAD_TOKEN="password"
# The database username and password. Please make this extrememly secure.
T_COUCHDB_USER_ADMIN_NAME="admin"
T_COUCHDB_USER_ADMIN_PASS="password"
# Mysql
# Mysql - Enter container name - usually "mysql" or the ip address or server name for an external service.
T_MYSQL_CONTAINER_NAME="mysql"
T_MYSQL_USER="admin"
T_MYSQL_PASSWORD="password"
Expand Down Expand Up @@ -59,9 +59,12 @@ T_PAID_ALLOWANCE="unlimited"
# Reporting delay determines how quickly uploads will get processed and show up in reporting outputs such as CSV. Time is in milliseconds and default is 5 minutes.
T_REPORTING_DELAY="300000"

# Limit the rebuild-mysql-db script to certain group dbs. Omit to rebuild all.
# Limit mysql processing to certain group dbs.
T_REBUILD_MYSQL_DBS=""

# Enter "true" if using a mysql container instead of an external database service such as AWS RDS. This will launch a mysql container.
T_USE_MYSQL_CONTAINER=""

# When CSV is generated, this determines how many form responses are held in memory during a batch. The higher the number the more memory this process will take but the faster it will complete.
T_CSV_BATCH_SIZE=50

Expand Down
21 changes: 16 additions & 5 deletions develop.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,14 @@ else
fi

if echo "$T_MODULES" | grep mysql; then
./mysql-start.sh
echo "Waiting 60 seconds for myql to start..."
sleep 60
./mysql-setup.sh
./mysql-create-dirs.sh
fi

if echo "$T_USE_MYSQL_CONTAINER" | grep "true"; then
./mysql-start-container.sh
echo "Waiting 60 seconds for mysql container to start..."
sleep 60
./mysql-setup.sh
fi

if echo "$T_MYSQL_PHPMYADMIN" | grep "TRUE"; then
Expand Down Expand Up @@ -223,9 +227,16 @@ OPTIONS="--link $T_COUCHDB_CONTAINER_NAME:couchdb \
tangerine/tangerine:local
"

if echo "$T_USE_MYSQL_CONTAINER" | grep "true"; then
echo "Linking mysql container ..."
OPTIONS="
--link $T_MYSQL_CONTAINER_NAME:mysql \
$OPTIONS
"
fi

if echo "$T_MODULES" | grep mysql; then
OPTIONS="
--link $T_MYSQL_CONTAINER_NAME:mysql \
--env \"T_MYSQL_CONTAINER_NAME=$T_MYSQL_CONTAINER_NAME\" \
--env \"T_MYSQL_USER=$T_MYSQL_USER\" \
--env \"T_MYSQL_PASSWORD=$T_MYSQL_PASSWORD\" \
Expand Down
26 changes: 26 additions & 0 deletions mysql-create-dirs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@

#
# Load config.
#

source ./config.defaults.sh
if [ -f "./config.sh" ]; then
source ./config.sh
else
echo "You have no config.sh. Copy config.defaults.sh to config.sh, change the passwords and try again." && exit 1;
fi


if [ ! -d data ]; then
mkdir data
fi
if [ ! -d data/mysql ]; then
mkdir data/mysql
fi
if [ ! -d data/mysql/databases ]; then
mkdir data/mysql/databases
fi
if [ ! -d data/mysql/state ]; then
mkdir data/mysql/state
fi

2 changes: 1 addition & 1 deletion mysql-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ else
eval ${CMD}
CMD="docker exec mysql mysql -u root -p'$T_MYSQL_PASSWORD' -e \"GRANT ALL PRIVILEGES ON \"*.*\" TO '$T_MYSQL_USER'@'%' WITH GRANT OPTION;\""
eval ${CMD}
fi
fi
14 changes: 14 additions & 0 deletions mysql-start-container.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env bash

source ./config.defaults.sh
if [ -f "./config.sh" ]; then
source ./config.sh
else
echo "You have no config.sh. Copy config.defaults.sh to config.sh, change the passwords and try again." && exit 1;
fi

[ "$(docker ps | grep $T_MYSQL_CONTAINER_NAME)" ] && docker stop $T_MYSQL_CONTAINER_NAME
[ "$(docker ps -a | grep $T_MYSQL_CONTAINER_NAME)" ] && docker rm $T_MYSQL_CONTAINER_NAME

docker run -v $(pwd)/server/src/modules/mysql/conf.d:/etc/mysql/conf.d --name "$T_MYSQL_CONTAINER_NAME" -v "$(pwd)/data/mysql/databases:/var/lib/mysql" -p 3306:3306 -e "MYSQL_ROOT_PASSWORD=$T_MYSQL_PASSWORD" -d mysql:latest

4 changes: 2 additions & 2 deletions mysql-start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ if [ ! -d data/mysql/state ]; then
fi


[ "$(docker ps | grep $T_MYSQL_CONTAINER_NAME)" ] && docker stop $T_MYSQL_CONTAINER_NAME
[ "$(docker ps -a | grep $T_MYSQL_CONTAINER_NAME)" ] && docker rm $T_MYSQL_CONTAINER_NAME
# [ "$(docker ps | grep $T_MYSQL_CONTAINER_NAME)" ] && docker stop $T_MYSQL_CONTAINER_NAME
# [ "$(docker ps -a | grep $T_MYSQL_CONTAINER_NAME)" ] && docker rm $T_MYSQL_CONTAINER_NAME


#docker run -v $(pwd)/server/src/modules/mysql/conf.d:/etc/mysql/conf.d --name "$T_MYSQL_CONTAINER_NAME" -v "$(pwd)/data/mysql/databases:/var/lib/mysql" -p 3306:3306 -e "MYSQL_ROOT_PASSWORD=$T_MYSQL_PASSWORD" -d mysql:latest
Expand Down
88 changes: 87 additions & 1 deletion server/src/modules/mysql-js/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,23 @@ module.exports = {
const result = await saveToMysql(knex, uploadedDoc, tablenameSuffix, tableName, docType, primaryKey, createFunction)
log.info('Processed: ' + JSON.stringify(result))
}
} else if (doc.type === 'issue') {
const uploadedDoc = await saveFlatResponse(doc, locationList, targetDb, sanitized);
// issue doc
tableName = 'issue' + tablenameSuffix
docType = 'issue'
primaryKey = 'ID'
createFunction = function (t) {
t.engine('InnoDB')
t.string(primaryKey, 36).notNullable().primary();
t.string('caseId', 36) // .index('response_caseId_IDX');
t.string('participantID', 36) //.index('case_instances_ParticipantID_IDX');
t.string('caseEventId', 36) // .index('eventform_caseEventId_IDX');
t.tinyint('complete');
t.string('archived', 36); //
}
const result = await saveToMysql(knex, uploadedDoc, tablenameSuffix, tableName, docType, primaryKey, createFunction)
log.info('Processed: ' + JSON.stringify(result))
} else {
const uploadedDoc = await saveFlatResponse(doc, locationList, targetDb, sanitized);
tableName = null;
Expand Down Expand Up @@ -748,6 +765,68 @@ async function convert_response(knex, doc, groupId, tableName) {
return cleanData
}

async function convert_issue(knex, doc, groupId, tableName) {
let data = doc.data
if (!data) {
data = {}
}

const id= doc._id
const startDatetime = doc.startDatetime
const geoIp = doc.geoip
const caseId = doc.caseId
const eventId = doc.eventId
const eventFormId= doc.eventFormId
const participantId = doc.participantId
const caseEventId = doc.caseEventId
const formID = 'issue'

// append geoIP to the data object
if (geoIp) {
for (const key in geoIp) {
if (geoIp.hasOwnProperty(key)) {
const value = geoIp[key]
data[`geoip_${key}`] = value
}
}
}

if (!data['_id']) {
data['_id'] = id
}
if (!data['caseid']) {
data['caseId'] = caseId
}
if (!data['participantid']) {
data['participantid'] = participantId
}
if (!data['eventid']) {
data['eventid'] = eventId
}
if (!data['eventformid']) {
data['eventformid'] = eventFormId
}
if (!data['caseeventid']) {
data['caseeventid'] = caseEventId
}
if (!data['startdatetime']) {
data['startdatetime'] = startDatetime
}
// adding formID to the data object
if (!data['formID_sanitized']) {
data['formID_sanitized'] = formID
}

doc.ID = doc._id
doc.dbRevision = doc._rev

// # Delete the following keys;
const valuesToRemove = ['_id', '_rev','buildChannel','buildId','caseEventId','deviceId','eventFormId','eventId','groupId','participantId','startDatetime', 'startUnixtime']
valuesToRemove.forEach(e => delete doc[e]);
const cleanData = populateDataFromDocument(doc, data);
return cleanData
}

async function saveToMysql(knex, doc, tablenameSuffix, tableName, docType, primaryKey, createFunction) {
let data;
let result = {id: doc._id, tableName, docType}
Expand All @@ -759,7 +838,11 @@ async function saveToMysql(knex, doc, tablenameSuffix, tableName, docType, prima
}
// Docs of type response must be flattened first to get the table name.
if (doc.type.toLowerCase() !== 'response') {
await createTable(knex, groupId, tableName, docType, createFunction, primaryKey)
try {
await createTable(knex, groupId, tableName, docType, createFunction, primaryKey)
} catch (e) {
log.error("Error creating table: " + e)
}
}
switch (doc.type.toLowerCase()) {
case 'case':
Expand All @@ -774,6 +857,9 @@ async function saveToMysql(knex, doc, tablenameSuffix, tableName, docType, prima
case 'event-form':
data = await convert_event_form(knex, doc, groupId, tableName)
break;
case 'issue':
data = await convert_issue(knex, doc, groupId, tableName)
break;
case 'response':
data = await convert_response(knex, doc, groupId, tableName)
// Check if table exists and create if needed:
Expand Down
1 change: 1 addition & 0 deletions server/src/reporting/clear-reporting-cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ async function clearReportingCache() {
const newState = Object.assign({}, state, {
databases: state.databases.map(({name, sequence}) => { return {name, sequence: 0}})
})
console.log("newState: " + JSON.stringify(newState))
await writeFile(REPORTING_WORKER_STATE, JSON.stringify(newState), 'utf-8')
await unlink(REPORTING_WORKER_PAUSE)
console.log('Done!')
Expand Down
65 changes: 42 additions & 23 deletions server/src/reporting/reporting-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -154,34 +154,53 @@ async function batch() {
const DB = PouchDB.defaults(workerState.pouchDbDefaults)
const startTime = new Date().toISOString()
let processed = 0
let onlyProcessTheseGroups = []
if (process.env.T_REBUILD_MYSQL_DBS && process.env.T_REBUILD_MYSQL_DBS !== '') {
onlyProcessTheseGroups = process.env.T_REBUILD_MYSQL_DBS
? JSON.parse(process.env.T_REBUILD_MYSQL_DBS.replace(/\'/g, `"`))
: []
// log.info('onlyProcessTheseGroups from T_REBUILD_MYSQL_DBS: ' + onlyProcessTheseGroups)
}
// Process batch.
for (let database of workerState.databases) {
const db = new DB(database.name)
const changes = await db.changes({ since: database.sequence, limit: workerState.batchSizePerDatabase, include_docs: false })
if (changes.results.length > 0) {
for (let change of changes.results) {
try {
await changeProcessor(change, db)
processed++
} catch (error) {
let errorMessage = JSON.stringify(error)
let errorMessageText = error.message

// Sometimes JSON.stringify wipes out the error.
console.log("typeof error message: " + typeof error.message + " errorMessage: " + errorMessage + " errorMessageText: " + errorMessageText)
if (typeof error.message === 'object') {
errorMessageText = JSON.stringify(error.message)
}
if (errorMessage === '{}') {
errorMessage = "Error : " + " message: " + errorMessageText
} else {
errorMessage = "Error : " + " message: " + errorMessageText + " errorMessage: " + errorMessage
let processGroup = false
if (onlyProcessTheseGroups.length === 0 || onlyProcessTheseGroups.includes(database.name)) {
processGroup = true
} else {
// log.debug("Excluding group: " + database.name + " from mysql processing.")
}
if (processGroup) {
const db = new DB(database.name)
const changes = await db.changes({
since: database.sequence,
limit: workerState.batchSizePerDatabase,
include_docs: false
})
if (changes.results.length > 0) {
for (let change of changes.results) {
try {
await changeProcessor(change, db)
processed++
} catch (error) {
let errorMessage = JSON.stringify(error)
let errorMessageText = error.message

// Sometimes JSON.stringify wipes out the error.
console.log("typeof error message: " + typeof error.message + " errorMessage: " + errorMessage + " errorMessageText: " + errorMessageText)
if (typeof error.message === 'object') {
errorMessageText = JSON.stringify(error.message)
}
if (errorMessage === '{}') {
errorMessage = "Error : " + " message: " + errorMessageText
} else {
errorMessage = "Error : " + " message: " + errorMessageText + " errorMessage: " + errorMessage
}
log.error(`Error on change sequence ${change.seq} with id ${change.id} - Error: ${errorMessage} ::::: `)
}
log.error(`Error on change sequence ${change.seq} with id ${change.id} - Error: ${errorMessage} ::::: `)
}
// Even if an error was thrown, continue on with the next sequences.
database.sequence = changes.results[changes.results.length - 1].seq
}
// Even if an error was thrown, continue on with the next sequences.
database.sequence = changes.results[changes.results.length-1].seq
}
}
// Persist state to disk.
Expand Down
2 changes: 2 additions & 0 deletions server/src/scripts/reporting-worker-pause.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/usr/bin/env bash

if [ "$2" = "--help" ]; then
echo "Usage:"
echo " reporting-worker-pause"
Expand Down
20 changes: 16 additions & 4 deletions start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,14 @@ else
fi

if echo "$T_MODULES" | grep mysql; then
./mysql-start.sh
echo "Waiting 60 seconds for myql to start..."
sleep 60
./mysql-setup.sh
./mysql-create-dirs.sh
fi

if echo "$T_USE_MYSQL_CONTAINER" | grep "true"; then
./mysql-start-container.sh
echo "Waiting 60 seconds for mysql container to start..."
sleep 60
./mysql-setup.sh
fi

if echo "$T_MYSQL_PHPMYADMIN" | grep "TRUE"; then
Expand Down Expand Up @@ -227,6 +231,14 @@ else
"
fi

if echo "$T_USE_MYSQL_CONTAINER" | grep "true"; then
echo "Linking mysql container ..."
OPTIONS="
--link $T_MYSQL_CONTAINER_NAME:mysql \
$OPTIONS
"
fi

if echo "$T_MODULES" | grep mysql; then
RUN_OPTIONS="
--link $T_MYSQL_CONTAINER_NAME:mysql \
Expand Down

0 comments on commit 2ffd181

Please sign in to comment.