Skip to content

Commit

Permalink
Merge pull request #799 from taosdata/3.0
Browse files Browse the repository at this point in the history
taos-tools 3.0  merge to main
  • Loading branch information
DuanKuanJun authored Sep 24, 2024
2 parents 39e2255 + 352b7f7 commit 45ee09e
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 46 deletions.
2 changes: 1 addition & 1 deletion inc/bench.h
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,7 @@ int convertStringToDatatype(char *type, int length);
unsigned int taosRandom();
void tmfree(void *buf);
void tmfclose(FILE *fp);
void fetchResult(TAOS_RES *res, threadInfo *pThreadInfo);
int64_t fetchResult(TAOS_RES *res, threadInfo *pThreadInfo);
void prompt(bool NonStopMode);
void ERROR_EXIT(const char *msg);
int getServerVersionRest(int16_t rest_port);
Expand Down
3 changes: 2 additions & 1 deletion src/benchInsert.c
Original file line number Diff line number Diff line change
Expand Up @@ -1772,7 +1772,8 @@ static void *syncWriteInterlace(void *sarg) {
// timestamp
char time_string[BIGINT_BUFF_LEN];
if(stbInfo->useNow && stbInfo->interlaceRows == 1 && !fillBack) {
snprintf(time_string, BIGINT_BUFF_LEN, "now");
int64_t now = toolsGetTimestamp(database->precision);
snprintf(time_string, BIGINT_BUFF_LEN, "%"PRId64"", now);
} else {
snprintf(time_string, BIGINT_BUFF_LEN, "%"PRId64"",
disorderTs?disorderTs:childTbl->ts);
Expand Down
2 changes: 1 addition & 1 deletion src/benchMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ int main(int argc, char* argv[]) {
}
#ifdef WEBSOCKET
if (g_arguments->debug_print) {
ws_enable_log();
ws_enable_log("info");
}

if (g_arguments->dsn != NULL) {
Expand Down
8 changes: 5 additions & 3 deletions src/benchQuery.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ int selectAndGetResult(threadInfo *pThreadInfo, char *command) {
threadID, dbName);
ret = -2;
} else {
int64_t rows = 0;
TAOS_RES *res = taos_query(taos, command);
int code = taos_errno(res);
if (res == NULL || code) {
Expand All @@ -55,10 +56,11 @@ int selectAndGetResult(threadInfo *pThreadInfo, char *command) {
}
} else {
//if (strlen(pThreadInfo->filePath) > 0) {
fetchResult(res, pThreadInfo);
rows = fetchResult(res, pThreadInfo);
//}
}
taos_free_result(res);
debugPrint("query sql:%s rows:%"PRId64"\n", command, rows);
}
}
return ret;
Expand Down Expand Up @@ -428,9 +430,9 @@ static int multi_thread_specified_table_query(uint16_t iface, char* dbName) {
// check invaid
if(nSqlCount == 0 || nConcurrent == 0 ) {
if(nSqlCount == 0)
infoPrint(" query sql count is %" PRIu64 ". must set query sqls. \n", nSqlCount);
warnPrint("specified table query sql count is %" PRIu64 ".\n", nSqlCount);
if(nConcurrent == 0)
infoPrint(" concurrent is %d , specified_table_query->concurrent must not zero. \n", nConcurrent);
warnPrint("concurrent is %d , specified_table_query->concurrent is zero. \n", nConcurrent);
return 0;
}

Expand Down
30 changes: 26 additions & 4 deletions src/benchUtil.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,19 @@ void resetAfterAnsiEscape(void) {
FORCE_INLINE unsigned int taosRandom() { return (unsigned int)rand(); }
#endif

void swapItem(char** names, int32_t i, int32_t j ) {
debugPrint("swap item i=%d (%s) j=%d (%s)\n", i, names[i], j, names[j]);
char * p = names[i];
names[i] = names[j];
names[j] = p;
}

int getAllChildNameOfSuperTable(TAOS *taos, char *dbName, char *stbName,
char ** childTblNameOfSuperTbl,
int64_t childTblCountOfSuperTbl) {
char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
"select tbname from %s.`%s` limit %" PRId64 "",
"select distinct tbname from %s.`%s` limit %" PRId64 "",
dbName, stbName, childTblCountOfSuperTbl);
TAOS_RES *res = taos_query(taos, cmd);
int32_t code = taos_errno(res);
Expand Down Expand Up @@ -135,6 +142,17 @@ int getAllChildNameOfSuperTable(TAOS *taos, char *dbName, char *stbName,
count++;
}
taos_free_result(res);

// random swap order
if (count < 4) {
return 0;
}

int32_t swapCnt = count/2;
for(int32_t i = 0; i < swapCnt; i++ ) {
int32_t j = swapCnt + RD(swapCnt);
swapItem(childTblNameOfSuperTbl, i, j);
}
return 0;
}

Expand Down Expand Up @@ -273,7 +291,7 @@ SBenchConn* initBenchConnImpl() {
SBenchConn* conn = benchCalloc(1, sizeof(SBenchConn), true);
#ifdef WEBSOCKET
if (g_arguments->websocket) {
conn->taos_ws = ws_connect_with_dsn(g_arguments->dsn);
conn->taos_ws = ws_connect(g_arguments->dsn);
char maskedDsn[256] = "\0";
memcpy(maskedDsn, g_arguments->dsn, 20);
memcpy(maskedDsn+20, "...", 3);
Expand Down Expand Up @@ -796,13 +814,15 @@ int postProceSql(char *sqlstr, char* dbName, int precision, int iface,
}

// fetch result fo file or nothing
void fetchResult(TAOS_RES *res, threadInfo *pThreadInfo) {
int64_t fetchResult(TAOS_RES *res, threadInfo *pThreadInfo) {
TAOS_ROW row = NULL;
int num_fields = 0;
int64_t totalLen = 0;
TAOS_FIELD *fields = 0;
int64_t rows = 0;
char *databuf = NULL;
bool toFile = strlen(pThreadInfo->filePath) > 0;


if(toFile) {
num_fields = taos_field_count(res);
Expand All @@ -824,10 +844,11 @@ void fetchResult(TAOS_RES *res, threadInfo *pThreadInfo) {
char temp[HEAD_BUFF_LEN] = {0};
int len = taos_print_row(temp, row, fields, num_fields);
len += snprintf(temp + len, HEAD_BUFF_LEN - len, "\n");
debugPrint("query result:%s\n", temp);
//debugPrint("query result:%s\n", temp);
memcpy(databuf + totalLen, temp, len);
totalLen += len;
}
rows ++;
//if not toFile , only loop call taos_fetch_row
}

Expand All @@ -836,6 +857,7 @@ void fetchResult(TAOS_RES *res, threadInfo *pThreadInfo) {
appendResultBufToFile(databuf, pThreadInfo->filePath);
free(databuf);
}
return rows;
}

char *convertDatatypeToString(int type) {
Expand Down
72 changes: 36 additions & 36 deletions src/taosdump.c
Original file line number Diff line number Diff line change
Expand Up @@ -1389,7 +1389,7 @@ TAOS *taosConnect(const char *dbName) {

#ifdef WEBSOCKET
WS_TAOS *wsConnect() {
WS_TAOS *ws_taos = ws_connect_with_dsn(g_args.dsn);
WS_TAOS *ws_taos = ws_connect(g_args.dsn);
if (NULL == ws_taos) {
char maskedDsn[256] = "\0";
memcpy(maskedDsn, g_args.dsn, 20);
Expand Down Expand Up @@ -1483,9 +1483,9 @@ static int getTableRecordInfoImplWS(
while (true) {
int rows = 0;
const void *data = NULL;
code = ws_fetch_block(ws_res, &data, &rows);
code = ws_fetch_raw_block(ws_res, &data, &rows);
if (code) {
errorPrint("%s() LN%d, ws_fetch_block() error. reason: %s!\n",
errorPrint("%s() LN%d, ws_fetch_raw_block() error. reason: %s!\n",
__func__, __LINE__,
ws_errstr(ws_res));
ws_free_result(ws_res);
Expand Down Expand Up @@ -1822,9 +1822,9 @@ static int getDbCountWS(WS_RES *ws_res) {
while (true) {
int rows = 0;
const void *data = NULL;
code = ws_fetch_block(ws_res, &data, &rows);
code = ws_fetch_raw_block(ws_res, &data, &rows);
if (code) {
errorPrint("%s() LN%d, ws_fetch_block() error. reason: %s!\n",
errorPrint("%s() LN%d, ws_fetch_raw_block() error. reason: %s!\n",
__func__, __LINE__,
ws_errstr(ws_res));
return 0;
Expand Down Expand Up @@ -2102,9 +2102,9 @@ static int64_t getNtbCountOfStbWS(char* dbName, const char* stbName) {
while (true) {
int rows = 0;
const void *data = NULL;
ws_code = ws_fetch_block(ws_res, &data, &rows);
ws_code = ws_fetch_raw_block(ws_res, &data, &rows);
if (0 == rows) {
debugPrint("%s() LN%d, No more data from ws_fetch_block(), "
debugPrint("%s() LN%d, No more data from ws_fetch_raw_block(), "
"ws_taos: %p, code: 0x%08x, reason:%s\n",
__func__, __LINE__,
ws_taos, ws_errno(ws_res), ws_errstr(ws_res));
Expand Down Expand Up @@ -2607,10 +2607,10 @@ static int getTableTagValueWSV3(
while (true) {
int rows = 0;
const void *data = NULL;
ws_code = ws_fetch_block(ws_res, &data, &rows);
ws_code = ws_fetch_raw_block(ws_res, &data, &rows);

if (ws_code) {
errorPrint("%s() LN%d, ws_fetch_block() error, "
errorPrint("%s() LN%d, ws_fetch_raw_block() error, "
"code: 0x%08x, command: %s, reason: %s\n",
__func__, __LINE__, ws_code, command, ws_errstr(ws_res));
}
Expand Down Expand Up @@ -2695,10 +2695,10 @@ static int getTableTagValueWSV2(
while (true) {
int rows = 0;
const void *data = NULL;
ws_code = ws_fetch_block(ws_res, &data, &rows);
ws_code = ws_fetch_raw_block(ws_res, &data, &rows);

if (ws_code) {
errorPrint("%s() LN%d, ws_fetch_block() error, "
errorPrint("%s() LN%d, ws_fetch_raw_block() error, "
"code: 0x%08x, sqlstr: %s, reason: %s\n",
__func__, __LINE__, ws_code, sqlstr, ws_errstr(ws_res));
}
Expand Down Expand Up @@ -2813,9 +2813,9 @@ static int getTableDesWS(
while (true) {
int rows = 0;
const void *data = NULL;
ws_code = ws_fetch_block(ws_res, &data, &rows);
ws_code = ws_fetch_raw_block(ws_res, &data, &rows);
if (0 == rows) {
debugPrint("%s() LN%d, No more data from ws_fetch_block(), "
debugPrint("%s() LN%d, No more data from ws_fetch_raw_block(), "
"ws_taos: %p, code: 0x%08x, reason:%s\n",
__func__, __LINE__,
ws_taos, ws_errno(ws_res), ws_errstr(ws_res));
Expand Down Expand Up @@ -4562,9 +4562,9 @@ int64_t queryDbForDumpOutCountWS(
while (true) {
int rows = 0;
const void *data = NULL;
ws_code = ws_fetch_block(ws_res, &data, &rows);
ws_code = ws_fetch_raw_block(ws_res, &data, &rows);
if (0 == rows) {
debugPrint("%s() LN%d, No more data from ws_fetch_block(), "
debugPrint("%s() LN%d, No more data from ws_fetch_raw_block(), "
"ws_taos: %p, code: 0x%08x, reason:%s\n",
__func__, __LINE__,
ws_taos, ws_errno(ws_res), ws_errstr(ws_res));
Expand Down Expand Up @@ -5056,18 +5056,18 @@ static int64_t writeResultToAvroWS(
while (true) {
int rows = 0;
const void *data = NULL;
int32_t ws_code = ws_fetch_block(ws_res, &data, &rows);
int32_t ws_code = ws_fetch_raw_block(ws_res, &data, &rows);

if (ws_code) {
errorPrint("%s() LN%d, ws_fetch_blocK() error, ws_taos: %p, "
errorPrint("%s() LN%d, ws_fetch_raw_block() error, ws_taos: %p, "
"code: 0x%08x, reason: %s\n",
__func__, __LINE__, ws_taos,
ws_errno(ws_res), ws_errstr(ws_res));
break;
}

if (0 == rows) {
debugPrint("%s() LN%d, No more data from ws_fetch_block(), "
debugPrint("%s() LN%d, No more data from ws_fetch_raw_block(), "
"ws_taos: %p, code: 0x%08x, reason:%s\n",
__func__, __LINE__,
ws_taos, ws_errno(ws_res), ws_errstr(ws_res));
Expand Down Expand Up @@ -8055,17 +8055,17 @@ static int64_t writeResultDebugWS(
while (true) {
int rows = 0;
const void *data = NULL;
int32_t ws_code = ws_fetch_block(ws_res, &data, &rows);
int32_t ws_code = ws_fetch_raw_block(ws_res, &data, &rows);

if (ws_code) {
errorPrint("%s() LN%d, ws_fetch_blocK() error!"
errorPrint("%s() LN%d, ws_fetch_raw_block() error!"
" code: 0x%08x, reason: %s\n",
__func__, __LINE__,
ws_code, ws_errstr(ws_res));
break;
}
if (0 == rows) {
debugPrint("%s() LN%d, No more data from ws_fetch_block(), "
debugPrint("%s() LN%d, No more data from ws_fetch_raw_block(), "
"code: 0x%08x, reason:%s\n",
__func__, __LINE__,
ws_errno(ws_res), ws_errstr(ws_res));
Expand Down Expand Up @@ -9375,10 +9375,10 @@ static int64_t fillTbNameArrWS(
while (true) {
int rows = 0;
const void *data = NULL;
ws_code = ws_fetch_block(ws_res, &data, &rows);
ws_code = ws_fetch_raw_block(ws_res, &data, &rows);

if (0 == rows) {
debugPrint("%s() LN%d, No more data from ws_fetch_block(), "
debugPrint("%s() LN%d, No more data from ws_fetch_raw_block(), "
"ws_taos: %p, code: 0x%08x, reason:%s\n",
__func__, __LINE__,
ws_taos, ws_errno(ws_res), ws_errstr(ws_res));
Expand Down Expand Up @@ -9965,10 +9965,10 @@ int readNextTableDesWS(void* ws_res, TableDes* tbDes, int *idx, int *cnt) {
// get block
if(*idx >= *cnt || *cnt == 0) {
const void *data = NULL;
int ws_code = ws_fetch_block(ws_res, &data, cnt);
int ws_code = ws_fetch_raw_block(ws_res, &data, cnt);
if (ws_code !=0 ) {
// read to end
errorPrint("read next ws_fetch_block failed, err code=%d idx=%d index=%d\n", ws_code, *idx, index);
errorPrint("read next ws_fetch_raw_block failed, err code=%d idx=%d index=%d\n", ws_code, *idx, index);
return -1;
}

Expand Down Expand Up @@ -10498,10 +10498,10 @@ static void dumpExtraInfoVarWS(void *taos, FILE *fp) {
while (true) {
int rows = 0;
const void *data = NULL;
ws_code = ws_fetch_block(ws_res, &data, &rows);
ws_code = ws_fetch_raw_block(ws_res, &data, &rows);

if (0 == rows) {
debugPrint("%s() LN%d, No more data from ws_fetch_block(), "
debugPrint("%s() LN%d, No more data from ws_fetch_raw_block(), "
"ws_taos: %p, code: 0x%08x, reason:%s\n",
__func__, __LINE__,
taos, ws_errno(ws_res), ws_errstr(ws_res));
Expand Down Expand Up @@ -11713,10 +11713,10 @@ static int64_t dumpStbAndChildTbOfDbWS(
while (true) {
int rows = 0;
const void *data = NULL;
ws_code = ws_fetch_block(ws_res, &data, &rows);
ws_code = ws_fetch_raw_block(ws_res, &data, &rows);

if (0 == rows) {
debugPrint("%s() LN%d, No more data from ws_fetch_block(), "
debugPrint("%s() LN%d, No more data from ws_fetch_raw_block(), "
"ws_taos: %p, code: 0x%08x, reason:%s\n",
__func__, __LINE__,
ws_taos, ws_errno(ws_res), ws_errstr(ws_res));
Expand Down Expand Up @@ -11820,10 +11820,10 @@ static int64_t dumpNTablesOfDbWS(WS_TAOS *ws_taos, SDbInfo *dbInfo) {
while (true) {
int rows = 0;
const void *data = NULL;
ws_code = ws_fetch_block(ws_res, &data, &rows);
ws_code = ws_fetch_raw_block(ws_res, &data, &rows);

if (0 == rows) {
debugPrint("%s() LN%d, No more data from ws_fetch_block(), "
debugPrint("%s() LN%d, No more data from ws_fetch_raw_block(), "
"ws_taos: %p, code: 0x%08x, reason:%s\n",
__func__, __LINE__,
ws_taos, ws_errno(ws_res), ws_errstr(ws_res));
Expand Down Expand Up @@ -12641,10 +12641,10 @@ static int fillDbExtraInfoV3WS(
while (true) {
int rows = 0;
const void *data = NULL;
ws_code = ws_fetch_block(ws_res, &data, &rows);
ws_code = ws_fetch_raw_block(ws_res, &data, &rows);

if (0 == rows) {
debugPrint("%s() LN%d, No more data from ws_fetch_block(), "
debugPrint("%s() LN%d, No more data from ws_fetch_raw_block(), "
"ws_taos: %p, code: 0x%08x, reason:%s\n",
__func__, __LINE__,
ws_taos, ws_errno(ws_res), ws_errstr(ws_res));
Expand Down Expand Up @@ -12714,10 +12714,10 @@ static int fillDbInfoWS(void *taos) {
while (true) {
int rows = 0;
const void *data = NULL;
code = ws_fetch_block(ws_res, &data, &rows);
code = ws_fetch_raw_block(ws_res, &data, &rows);

if (0 == rows) {
debugPrint("%s() LN%d, No more data from ws_fetch_block(), "
debugPrint("%s() LN%d, No more data from ws_fetch_raw_block(), "
"ws_taos: %p, code: 0x%08x, reason:%s\n",
__func__, __LINE__,
taos, ws_errno(ws_res), ws_errstr(ws_res));
Expand Down Expand Up @@ -13253,7 +13253,7 @@ static int dumpEntry() {

#ifdef WEBSOCKET
if (g_args.verbose_print) {
ws_enable_log();
ws_enable_log("info");
}
if (NULL == g_args.dsn) {
g_args.dsn = getenv("TDENGINE_CLOUD_DSN");
Expand Down

0 comments on commit 45ee09e

Please sign in to comment.