Skip to content

Commit

Permalink
mongo: 支持动态增加字段、修改字段类型 #211
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Dec 5, 2023
1 parent 1147d60 commit 02d6bc6
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 55 deletions.
2 changes: 1 addition & 1 deletion lealone-db/src/main/java/org/lealone/db/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -1981,7 +1981,7 @@ synchronized User createAdminUser(String userName, byte[] userPasswordHash) {
}
}

User getSystemUser() {
public User getSystemUser() {
return systemUser;
}

Expand Down
11 changes: 11 additions & 0 deletions lealone-db/src/main/java/org/lealone/db/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,17 @@ public Column getColumn(String columnName) {
return column;
}

// 不会抛异常
public Column findColumn(String columnName) {
Column column = columnMap.get(columnName);
if (column == null) {
if (database.equalsIdentifiers(Column.ROWID, columnName)) {
return getRowIdColumn();
}
}
return column;
}

/**
* Does the column with the given name exist?
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public static Column parseColumn(Table table, String columnName) {
// if ("_id".equalsIgnoreCase(columnName)) {
// return table.getRowIdColumn();
// }
return table.getColumn(columnName.toUpperCase());
return table.findColumn(columnName.toUpperCase());
}

public static ExpressionColumn getExpressionColumn(TableFilter tableFilter, String columnName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@
import org.bson.BsonValue;
import org.bson.io.ByteBufferBsonInput;
import org.lealone.common.exceptions.DbException;
import org.lealone.common.util.StatementBuilder;
import org.lealone.common.util.Utils;
import org.lealone.db.api.ErrorCode;
import org.lealone.db.session.ServerSession;
import org.lealone.db.table.Column;
import org.lealone.db.table.Table;
import org.lealone.db.value.Value;
import org.lealone.plugins.mongo.server.MongoServerConnection;
import org.lealone.plugins.mongo.server.MongoTask;
import org.lealone.sql.dml.Insert;
import org.lealone.sql.expression.Expression;
import org.lealone.sql.expression.ValueExpression;

public class BCInsert extends BsonCommand {

Expand Down Expand Up @@ -49,17 +52,48 @@ private static void addRows(BsonDocument topDoc, MongoServerConnection conn,
HashSet<Column> set = new HashSet<>();
BsonDocument document = documents.get(i);
for (Entry<String, BsonValue> e : document.entrySet()) {
Column column = parseColumn(table, e.getKey());
String columnName = e.getKey();
BsonValue v = e.getValue();
Column column = parseColumn(table, columnName);
if (column == null) {
// 如果后续写入的字段不存在,动态增加新的
column = addColumn(session, table, columnName, v);
}
if (!set.add(column)) {
throw DbException.get(ErrorCode.DUPLICATE_COLUMN_NAME_1, column.getSQL());
}
try {
Value columnValuue = toValue(v);
columnValuue = column.convert(columnValuue);
values.add(ValueExpression.get(columnValuue));
} catch (Throwable t) {
// 如果后续写入的字段值的类型跟字段的类型不匹配,将字段的类型改成通用的varchar类型
column = alterColumnType(session, table, columnName);
values.add(toValueExpression(v));
}
columns.add(column);
values.add(toValueExpression(e.getValue()));
}
insert.setColumns(columns.toArray(new Column[columns.size()]));
insert.addRow(values.toArray(new Expression[values.size()]));
}
insert.prepare();
createAndSubmitYieldableUpdate(task, insert);
}

private static Column addColumn(ServerSession session, Table table, String columnName, BsonValue v) {
StatementBuilder sql = new StatementBuilder();
sql.append("ALTER TABLE ").append(table.getName()).append(" ADD COLUMN ").append(columnName)
.append(" ");
appendColumnType(sql, v);
session.executeUpdateLocal(sql.toString());
return parseColumn(table, columnName);
}

private static Column alterColumnType(ServerSession session, Table table, String columnName) {
StatementBuilder sql = new StatementBuilder();
sql.append("ALTER TABLE ").append(table.getName()).append(" ALTER COLUMN ").append(columnName)
.append(" varchar");
session.executeUpdateLocal(sql.toString());
return parseColumn(table, columnName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,17 @@ public static Database getDatabase(BsonDocument doc) {
String dbName = doc.getString("$db").getValue();
if (dbName == null)
dbName = MongoServer.DATABASE_NAME;
Database db = LealoneDatabase.getInstance().findDatabase(dbName);
LealoneDatabase ldb = LealoneDatabase.getInstance();
Database db = ldb.findDatabase(dbName);
if (db == null) {
// 需要同步
synchronized (LealoneDatabase.class) {
db = LealoneDatabase.getInstance().findDatabase(dbName);
db = ldb.findDatabase(dbName);
if (db == null) {
String sql = "CREATE DATABASE IF NOT EXISTS " + dbName;
LealoneDatabase.getInstance().getSystemSession().executeUpdateLocal(sql);
try (ServerSession session = ldb.createSession(ldb.getSystemUser())) {
session.executeUpdateLocal(sql);
}
db = LealoneDatabase.getInstance().getDatabase(dbName);
}
}
Expand Down Expand Up @@ -123,63 +126,69 @@ public static void createTable(String tableName, BsonDocument firstDoc, ServerSe
sql.append(", ");
sql.append(columnName).append(" ");
BsonValue v = e.getValue();
switch (v.getBsonType()) {
case INT32:
sql.append("int");
break;
case INT64:
sql.append("long");
break;
case DOUBLE:
sql.append("double");
break;
case STRING:
sql.append("varchar");
break;
case ARRAY:
sql.append("array");
break;
case BINARY:
sql.append("binary");
break;
case OBJECT_ID:
sql.append("binary");
break;
case BOOLEAN:
sql.append("boolean");
break;
case DATE_TIME:
sql.append("datetime");
break;
case TIMESTAMP:
// MongoDB的TIMESTAMP只是一种内部使用的特殊类型并不是直的TIMESTAMP,为了跟DATE_TIME区分,这里直接用time表示
sql.append("time");
break;
case REGULAR_EXPRESSION:
sql.append("varchar");
break;
case DECIMAL128:
sql.append("decimal");
break;
case DOCUMENT:
sql.append("map");
// createTable(tableName + "_" + columnName, v.asDocument(), session);
break;
default:
sql.append("varchar");
}
appendColumnType(sql, v);
}
sql.append(")");
session.executeUpdateLocal(sql.toString());
}

public static void appendColumnType(StatementBuilder sql, BsonValue v) {
switch (v.getBsonType()) {
case INT32:
sql.append("int");
break;
case INT64:
sql.append("long");
break;
case DOUBLE:
sql.append("double");
break;
case STRING:
sql.append("varchar");
break;
case ARRAY:
sql.append("array");
break;
case BINARY:
sql.append("binary");
break;
case OBJECT_ID:
sql.append("binary");
break;
case BOOLEAN:
sql.append("boolean");
break;
case DATE_TIME:
sql.append("datetime");
break;
case TIMESTAMP:
// MongoDB的TIMESTAMP只是一种内部使用的特殊类型并不是直的TIMESTAMP,为了跟DATE_TIME区分,这里直接用time表示
sql.append("time");
break;
case REGULAR_EXPRESSION:
sql.append("varchar");
break;
case DECIMAL128:
sql.append("decimal");
break;
case DOCUMENT:
sql.append("map");
// createTable(tableName + "_" + columnName, v.asDocument(), session);
break;
default:
sql.append("varchar");
}
}

public static ServerSession createSession(Database db) {
return new ServerSession(db, getUser(db), 0);
// return db.createSession(getUser(db));
}

public static ServerSession getSession(Database db, MongoServerConnection conn) {
return conn.getPooledSession(db);
ServerSession s = conn.getPooledSession(db);
conn.getScheduler().addSession(s, db.getId());
return s;
}

public static User getUser(Database db) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public static BsonDocument execute(ByteBufferBsonInput input, BsonDocument doc,
BsonString viewOn = doc.getString("viewOn", null);
if (viewOn == null) {
try (ServerSession session = getSession(getDatabase(doc), conn)) {
String sql = "CREATE TABLE IF NOT EXISTS " + name;
String sql = "CREATE TABLE IF NOT EXISTS " + name + " (_id binary primary key)";
session.executeUpdateLocal(sql);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ public SessionInfo getSessionInfo(BsonDocument doc) {
}

public PooledSession getPooledSession(Database db) {
return new PooledSession(db, BsonCommand.getUser(db), 0, this);
PooledSession s = new PooledSession(db, BsonCommand.getUser(db), 0, this);
s.setScheduler(scheduler);
return s;
// LinkedList<PooledSession> pooledSessions = pooledSessionsMap.get(db.getName());
// PooledSession ps = null;
// if (pooledSessions == null) {
Expand Down

0 comments on commit 02d6bc6

Please sign in to comment.