-
Notifications
You must be signed in to change notification settings - Fork 194
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Set consistency level to quorum * Fix test configuration * Create utilities to fix db broken repair * Add fix thread tool * Prepare cluster migrator * Soon it will work * Fix import * Select only the dest columns * Add more fixes * Fix and add emojis * Add forceUpdateAll parameter * WIP * Add counters table ignored * Remove logs forgotten in #1987 * Add special fields handlers * Fixes from server tests * Add a script to copy messages specifically
- Loading branch information
1 parent
06beb5d
commit 468a58e
Showing
32 changed files
with
580 additions
and
31 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
node_modules | ||
yarn.lock |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
Edit the file to your needs | ||
|
||
yarn install | ||
node index.js |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,213 @@ | ||
var cassandra = require("cassandra-driver"); | ||
|
||
var fromAuthProvider = new cassandra.auth.PlainTextAuthProvider("", ""); | ||
var fromContactPoints = [""]; | ||
var fromKeyspace = "twake"; | ||
|
||
var toAuthProvider = new cassandra.auth.PlainTextAuthProvider("", ""); | ||
var toContactPoints = [""]; | ||
var toKeyspace = "twake"; | ||
|
||
var forceUpdateAll = false; | ||
var ignoreTables = ["notification"]; | ||
var countersTables = ["statistics", "stats_counter", "channel_counters", "scheduled_queue_counter"]; | ||
|
||
var specialConversions = { | ||
"twake.group_user.date_added": value => { | ||
return new Date(value).getTime(); | ||
}, | ||
}; | ||
|
||
// -- start process | ||
|
||
var fromClient = new cassandra.Client({ | ||
localDataCenter: "datacenter1", | ||
contactPoints: fromContactPoints, | ||
authProvider: fromAuthProvider, | ||
keyspace: fromKeyspace, | ||
}); | ||
|
||
var toClient = new cassandra.Client({ | ||
localDataCenter: "datacenter1", | ||
contactPoints: toContactPoints, | ||
authProvider: toAuthProvider, | ||
keyspace: toKeyspace, | ||
queryOptions: { | ||
consistency: cassandra.types.consistencies.quorum, | ||
}, | ||
}); | ||
|
||
// -- Get all tables and copy schema | ||
|
||
async function client(origin, query, parameters, options) { | ||
return await new Promise((resolve, reject) => { | ||
origin.execute(query, [], {}, function (err, result) { | ||
if (err) { | ||
reject({ err, result }); | ||
} else { | ||
resolve(result); | ||
} | ||
}); | ||
}); | ||
} | ||
|
||
(async () => { | ||
const result = await client( | ||
fromClient, | ||
"SELECT table_name from system_schema.tables WHERE keyspace_name = '" + fromKeyspace + "'", | ||
[], | ||
{}, | ||
); | ||
for (row of result.rows) { | ||
try { | ||
const fromTable = fromKeyspace + "." + row.table_name; | ||
const toTable = toKeyspace + "." + row.table_name; | ||
|
||
if (ignoreTables.includes(row.table_name)) { | ||
console.log(fromTable.padEnd(50) + " | " + "ignored".padEnd(20) + " | ⏺"); | ||
continue; | ||
} | ||
|
||
let fromCount = 0; | ||
|
||
const destColumns = ( | ||
await client( | ||
toClient, | ||
"SELECT column_name from system_schema.columns where keyspace_name = '" + | ||
toKeyspace + | ||
"' and table_name = '" + | ||
row.table_name + | ||
"'", | ||
[], | ||
{}, | ||
) | ||
).rows.map(r => r.column_name); | ||
|
||
try { | ||
const fromResult = await client( | ||
fromClient, | ||
"SELECT count(*) from " + fromTable + "", | ||
[], | ||
{}, | ||
); | ||
fromCount = parseInt(fromResult.rows[0].count); | ||
} catch (err) { | ||
fromCount = NaN; | ||
} | ||
|
||
if (destColumns.length === 0) { | ||
if (fromCount > 0) | ||
console.log( | ||
fromTable.padEnd(50) + " | " + ("not_in_dest" + "/" + fromCount).padEnd(20) + " | ⏺", | ||
); | ||
continue; | ||
} | ||
|
||
try { | ||
const toResult = await client(toClient, "SELECT count(*) from " + toTable + "", [], {}); | ||
const toCount = parseInt(toResult.rows[0].count); | ||
|
||
console.log( | ||
fromTable.padEnd(50) + | ||
" | " + | ||
(toCount + "/" + fromCount).padEnd(20) + | ||
" | " + | ||
(toCount >= fromCount ? "✅" : "❌"), | ||
); | ||
|
||
if (row.table_name.indexOf("counter") >= 0 || countersTables.includes(row.table_name)) { | ||
console.log( | ||
fromTable.padEnd(50) + " | " + ("counter_table" + "/" + fromCount).padEnd(20) + " | 🧮", | ||
); | ||
if (fromCount > toCount || !fromCount || forceUpdateAll) { | ||
await new Promise(r => { | ||
fromClient.eachRow( | ||
"SELECT JSON * from " + fromTable, | ||
[], | ||
{ prepare: true, fetchSize: 1000 }, | ||
async function (n, row) { | ||
// | ||
//TODO handle counters (it is special !) | ||
}, | ||
async function (err, result) { | ||
if (result && result.nextPage) { | ||
await new Promise(r => setTimeout(r, 1000)); | ||
result.nextPage(); | ||
} else { | ||
r(); | ||
} | ||
}, | ||
); | ||
}); | ||
} | ||
|
||
continue; | ||
} | ||
|
||
if (fromCount > toCount || !fromCount || forceUpdateAll) { | ||
await new Promise(r => { | ||
fromClient.eachRow( | ||
"SELECT JSON * from " + fromTable, | ||
[], | ||
{ prepare: true, fetchSize: 1000 }, | ||
async function (n, row) { | ||
try { | ||
const json = JSON.parse(row["[json]"]); | ||
|
||
//The from table can have additional depreciated fields, we need to remove them | ||
const filteredJson = {}; | ||
for (const col of destColumns) { | ||
if ( | ||
typeof json[col] == "string" && | ||
(json[col] || "").match( | ||
/[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]+/, | ||
) | ||
) { | ||
json[col] = json[col].split(".")[0]; | ||
} | ||
|
||
if (specialConversions[fromTable + "." + col]) { | ||
json[col] = specialConversions[fromTable + "." + col](json[col]); | ||
} | ||
|
||
if (json[col] !== undefined) filteredJson[col] = json[col]; | ||
} | ||
|
||
await client( | ||
toClient, | ||
"INSERT INTO " + | ||
toTable + | ||
" JSON '" + | ||
JSON.stringify(filteredJson).replace(/'/g, "'$&") + | ||
"'", | ||
[], | ||
{}, | ||
); | ||
} catch (err) { | ||
console.log(err); | ||
} | ||
}, | ||
async function (err, result) { | ||
if (result && result.nextPage) { | ||
await new Promise(r => setTimeout(r, 1000)); | ||
result.nextPage(); | ||
} else { | ||
r(); | ||
} | ||
}, | ||
); | ||
}); | ||
} | ||
} catch (err) { | ||
console.log( | ||
fromTable.padEnd(50) + " | " + ("error" + "/" + fromCount).padEnd(20) + " | ❌", | ||
); | ||
} | ||
} catch (err) { | ||
console.log(err); | ||
continue; | ||
} | ||
|
||
//TODO copy content | ||
} | ||
})(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
{ | ||
"name": "copy-cluster", | ||
"version": "1.0.0", | ||
"main": "index.js", | ||
"license": "MIT", | ||
"dependencies": { | ||
"cassandra-driver": "^4.6.3" | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
node_modules | ||
yarn.lock |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
Edit the file to your needs | ||
|
||
yarn install | ||
node index.js |
Oops, something went wrong.