diff --git a/lealone-db/src/main/java/org/lealone/db/Database.java b/lealone-db/src/main/java/org/lealone/db/Database.java index 3b175d5e7..e0ab666e6 100644 --- a/lealone-db/src/main/java/org/lealone/db/Database.java +++ b/lealone-db/src/main/java/org/lealone/db/Database.java @@ -1981,7 +1981,7 @@ synchronized User createAdminUser(String userName, byte[] userPasswordHash) { } } - User getSystemUser() { + public User getSystemUser() { return systemUser; } diff --git a/lealone-db/src/main/java/org/lealone/db/table/Table.java b/lealone-db/src/main/java/org/lealone/db/table/Table.java index 23fbb4ec2..08863f631 100644 --- a/lealone-db/src/main/java/org/lealone/db/table/Table.java +++ b/lealone-db/src/main/java/org/lealone/db/table/Table.java @@ -616,6 +616,17 @@ public Column getColumn(String columnName) { return column; } + // 不会抛异常 + public Column findColumn(String columnName) { + Column column = columnMap.get(columnName); + if (column == null) { + if (database.equalsIdentifiers(Column.ROWID, columnName)) { + return getRowIdColumn(); + } + } + return column; + } + /** * Does the column with the given name exist? * diff --git a/lealone-plugins/mongo/src/main/java/org/lealone/plugins/mongo/bson/BsonBase.java b/lealone-plugins/mongo/src/main/java/org/lealone/plugins/mongo/bson/BsonBase.java index 9163ebc8c..109cb22fa 100644 --- a/lealone-plugins/mongo/src/main/java/org/lealone/plugins/mongo/bson/BsonBase.java +++ b/lealone-plugins/mongo/src/main/java/org/lealone/plugins/mongo/bson/BsonBase.java @@ -178,7 +178,7 @@ public static Column parseColumn(Table table, String columnName) { // if ("_id".equalsIgnoreCase(columnName)) { // return table.getRowIdColumn(); // } - return table.getColumn(columnName.toUpperCase()); + return table.findColumn(columnName.toUpperCase()); } public static ExpressionColumn getExpressionColumn(TableFilter tableFilter, String columnName) { diff --git a/lealone-plugins/mongo/src/main/java/org/lealone/plugins/mongo/bson/command/BCInsert.java b/lealone-plugins/mongo/src/main/java/org/lealone/plugins/mongo/bson/command/BCInsert.java index f8e4f4659..d7b1756c0 100644 --- a/lealone-plugins/mongo/src/main/java/org/lealone/plugins/mongo/bson/command/BCInsert.java +++ b/lealone-plugins/mongo/src/main/java/org/lealone/plugins/mongo/bson/command/BCInsert.java @@ -13,15 +13,18 @@ import org.bson.BsonValue; import org.bson.io.ByteBufferBsonInput; import org.lealone.common.exceptions.DbException; +import org.lealone.common.util.StatementBuilder; import org.lealone.common.util.Utils; import org.lealone.db.api.ErrorCode; import org.lealone.db.session.ServerSession; import org.lealone.db.table.Column; import org.lealone.db.table.Table; +import org.lealone.db.value.Value; import org.lealone.plugins.mongo.server.MongoServerConnection; import org.lealone.plugins.mongo.server.MongoTask; import org.lealone.sql.dml.Insert; import org.lealone.sql.expression.Expression; +import org.lealone.sql.expression.ValueExpression; public class BCInsert extends BsonCommand { @@ -49,12 +52,26 @@ private static void addRows(BsonDocument topDoc, MongoServerConnection conn, HashSet set = new HashSet<>(); BsonDocument document = documents.get(i); for (Entry e : document.entrySet()) { - Column column = parseColumn(table, e.getKey()); + String columnName = e.getKey(); + BsonValue v = e.getValue(); + Column column = parseColumn(table, columnName); + if (column == null) { + // 如果后续写入的字段不存在,动态增加新的 + column = addColumn(session, table, columnName, v); + } if (!set.add(column)) { throw DbException.get(ErrorCode.DUPLICATE_COLUMN_NAME_1, column.getSQL()); } + try { + Value columnValuue = toValue(v); + columnValuue = column.convert(columnValuue); + values.add(ValueExpression.get(columnValuue)); + } catch (Throwable t) { + // 如果后续写入的字段值的类型跟字段的类型不匹配,将字段的类型改成通用的varchar类型 + column = alterColumnType(session, table, columnName); + values.add(toValueExpression(v)); + } columns.add(column); - values.add(toValueExpression(e.getValue())); } insert.setColumns(columns.toArray(new Column[columns.size()])); insert.addRow(values.toArray(new Expression[values.size()])); @@ -62,4 +79,21 @@ private static void addRows(BsonDocument topDoc, MongoServerConnection conn, insert.prepare(); createAndSubmitYieldableUpdate(task, insert); } + + private static Column addColumn(ServerSession session, Table table, String columnName, BsonValue v) { + StatementBuilder sql = new StatementBuilder(); + sql.append("ALTER TABLE ").append(table.getName()).append(" ADD COLUMN ").append(columnName) + .append(" "); + appendColumnType(sql, v); + session.executeUpdateLocal(sql.toString()); + return parseColumn(table, columnName); + } + + private static Column alterColumnType(ServerSession session, Table table, String columnName) { + StatementBuilder sql = new StatementBuilder(); + sql.append("ALTER TABLE ").append(table.getName()).append(" ALTER COLUMN ").append(columnName) + .append(" varchar"); + session.executeUpdateLocal(sql.toString()); + return parseColumn(table, columnName); + } } diff --git a/lealone-plugins/mongo/src/main/java/org/lealone/plugins/mongo/bson/command/BsonCommand.java b/lealone-plugins/mongo/src/main/java/org/lealone/plugins/mongo/bson/command/BsonCommand.java index 69670e120..df6463899 100644 --- a/lealone-plugins/mongo/src/main/java/org/lealone/plugins/mongo/bson/command/BsonCommand.java +++ b/lealone-plugins/mongo/src/main/java/org/lealone/plugins/mongo/bson/command/BsonCommand.java @@ -70,14 +70,17 @@ public static Database getDatabase(BsonDocument doc) { String dbName = doc.getString("$db").getValue(); if (dbName == null) dbName = MongoServer.DATABASE_NAME; - Database db = LealoneDatabase.getInstance().findDatabase(dbName); + LealoneDatabase ldb = LealoneDatabase.getInstance(); + Database db = ldb.findDatabase(dbName); if (db == null) { // 需要同步 synchronized (LealoneDatabase.class) { - db = LealoneDatabase.getInstance().findDatabase(dbName); + db = ldb.findDatabase(dbName); if (db == null) { String sql = "CREATE DATABASE IF NOT EXISTS " + dbName; - LealoneDatabase.getInstance().getSystemSession().executeUpdateLocal(sql); + try (ServerSession session = ldb.createSession(ldb.getSystemUser())) { + session.executeUpdateLocal(sql); + } db = LealoneDatabase.getInstance().getDatabase(dbName); } } @@ -123,63 +126,69 @@ public static void createTable(String tableName, BsonDocument firstDoc, ServerSe sql.append(", "); sql.append(columnName).append(" "); BsonValue v = e.getValue(); - switch (v.getBsonType()) { - case INT32: - sql.append("int"); - break; - case INT64: - sql.append("long"); - break; - case DOUBLE: - sql.append("double"); - break; - case STRING: - sql.append("varchar"); - break; - case ARRAY: - sql.append("array"); - break; - case BINARY: - sql.append("binary"); - break; - case OBJECT_ID: - sql.append("binary"); - break; - case BOOLEAN: - sql.append("boolean"); - break; - case DATE_TIME: - sql.append("datetime"); - break; - case TIMESTAMP: - // MongoDB的TIMESTAMP只是一种内部使用的特殊类型并不是直的TIMESTAMP,为了跟DATE_TIME区分,这里直接用time表示 - sql.append("time"); - break; - case REGULAR_EXPRESSION: - sql.append("varchar"); - break; - case DECIMAL128: - sql.append("decimal"); - break; - case DOCUMENT: - sql.append("map"); - // createTable(tableName + "_" + columnName, v.asDocument(), session); - break; - default: - sql.append("varchar"); - } + appendColumnType(sql, v); } sql.append(")"); session.executeUpdateLocal(sql.toString()); } + public static void appendColumnType(StatementBuilder sql, BsonValue v) { + switch (v.getBsonType()) { + case INT32: + sql.append("int"); + break; + case INT64: + sql.append("long"); + break; + case DOUBLE: + sql.append("double"); + break; + case STRING: + sql.append("varchar"); + break; + case ARRAY: + sql.append("array"); + break; + case BINARY: + sql.append("binary"); + break; + case OBJECT_ID: + sql.append("binary"); + break; + case BOOLEAN: + sql.append("boolean"); + break; + case DATE_TIME: + sql.append("datetime"); + break; + case TIMESTAMP: + // MongoDB的TIMESTAMP只是一种内部使用的特殊类型并不是直的TIMESTAMP,为了跟DATE_TIME区分,这里直接用time表示 + sql.append("time"); + break; + case REGULAR_EXPRESSION: + sql.append("varchar"); + break; + case DECIMAL128: + sql.append("decimal"); + break; + case DOCUMENT: + sql.append("map"); + // createTable(tableName + "_" + columnName, v.asDocument(), session); + break; + default: + sql.append("varchar"); + } + } + public static ServerSession createSession(Database db) { return new ServerSession(db, getUser(db), 0); // return db.createSession(getUser(db)); } public static ServerSession getSession(Database db, MongoServerConnection conn) { - return conn.getPooledSession(db); + ServerSession s = conn.getPooledSession(db); + conn.getScheduler().addSession(s, db.getId()); + return s; } public static User getUser(Database db) { diff --git a/lealone-plugins/mongo/src/main/java/org/lealone/plugins/mongo/bson/command/admin/ACCreate.java b/lealone-plugins/mongo/src/main/java/org/lealone/plugins/mongo/bson/command/admin/ACCreate.java index a2662e318..f8cf0527c 100644 --- a/lealone-plugins/mongo/src/main/java/org/lealone/plugins/mongo/bson/command/admin/ACCreate.java +++ b/lealone-plugins/mongo/src/main/java/org/lealone/plugins/mongo/bson/command/admin/ACCreate.java @@ -25,7 +25,7 @@ public static BsonDocument execute(ByteBufferBsonInput input, BsonDocument doc, BsonString viewOn = doc.getString("viewOn", null); if (viewOn == null) { try (ServerSession session = getSession(getDatabase(doc), conn)) { - String sql = "CREATE TABLE IF NOT EXISTS " + name; + String sql = "CREATE TABLE IF NOT EXISTS " + name + " (_id binary primary key)"; session.executeUpdateLocal(sql); } } else { diff --git a/lealone-plugins/mongo/src/main/java/org/lealone/plugins/mongo/server/MongoServerConnection.java b/lealone-plugins/mongo/src/main/java/org/lealone/plugins/mongo/server/MongoServerConnection.java index f89fb6a36..39a4ab878 100644 --- a/lealone-plugins/mongo/src/main/java/org/lealone/plugins/mongo/server/MongoServerConnection.java +++ b/lealone-plugins/mongo/src/main/java/org/lealone/plugins/mongo/server/MongoServerConnection.java @@ -110,7 +110,9 @@ public SessionInfo getSessionInfo(BsonDocument doc) { } public PooledSession getPooledSession(Database db) { - return new PooledSession(db, BsonCommand.getUser(db), 0, this); + PooledSession s = new PooledSession(db, BsonCommand.getUser(db), 0, this); + s.setScheduler(scheduler); + return s; // LinkedList pooledSessions = pooledSessionsMap.get(db.getName()); // PooledSession ps = null; // if (pooledSessions == null) {