Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix STRDS removement and unregistration of rasters in STRDS #354

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion src/actinia_core/core/grass_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def set(self):
try:
value = self.env[key]
origValue = os.getenv(key)
if origValue:
if origValue and "PATH" in key:
value += ":" + origValue
os.putenv(key, value)
os.environ[key] = value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,42 +324,59 @@ def _update_views_in_tgis(self, tgis_db_path):
con.close()
del cur

def _merge_tgis_dbs(self, tgis_db_path_1, tgis_db_path_2):
"""Merge two tgis sqlite.db files

Args:
tgis_db_path_1(str): path of a tgis sqlite.db file in which the
other should be merged
tgis_db_path_2(str): path of a tgis sqlite.db file which should be
merged in tgis_db_path_1
"""
con = sqlite3.connect(tgis_db_path_1)
con.execute(f"ATTACH '{tgis_db_path_2}' as dba")
con.execute("BEGIN")

table_names1 = [row[1] for row in con.execute(
"SELECT * FROM sqlite_master where type='table'")]
table_names2 = [row[1] for row in con.execute(
"SELECT * FROM dba.sqlite_master where type='table'")]

# merge databases
for table in table_names2:
if table == 'tgis_metadata':
con.execute(f"DROP TABLE {table}")
con.execute(f"CREATE TABLE {table} AS "
f"SELECT * FROM dba.{table}")
continue
# for example raster_register_xxx tables are not in both dbs
if table not in table_names1:
con.execute(f"CREATE TABLE {table} AS "
f"SELECT * FROM dba.{table}")
continue
combine = f"INSERT OR IGNORE INTO {table} SELECT * FROM dba.{table}"
con.execute(combine)
con.commit()
con.execute("detach database dba")
if con:
con.close()
def _change_mapsetname_in_tgistable(
self, cur, table_name,
source_mapset, target_mapset, skip_columns=[]):
columns = [row[0] for row in cur.execute(
f"SELECT * FROM {table_name}").description]

# find PRIMARY KEY
selection = [row[1] for row in cur.execute(
f"PRAGMA table_info({table_name})") if row[-1] == 1]
if len(selection) == 0:
primary_key = ""
else:
primary_key = selection[0]

for col in columns:
if col not in skip_columns:
update_statement = f"UPDATE {table_name} SET {col} = " \
f"REPLACE({col}, '{source_mapset}', '{target_mapset}')"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the name of the map and of the source mapset are the same, e.g. elevation@elevation? This would then not only replace the mapset name, but also the map name?

Copy link
Member

@marcjansen marcjansen Jun 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
f"REPLACE({col}, '{source_mapset}', '{target_mapset}')"
f"REPLACE({col}, '@{source_mapset}', '@{target_mapset}')"

untested, make sure to verify 🙈

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for the column "id" but not for the column "mapset".

if col == primary_key:
primary_key_vals = [row[0] for row in cur.execute(
f"SELECT {primary_key} FROM {table_name}")]
deleted_keys = list()
for p_key in primary_key_vals:
if p_key not in deleted_keys:
new_p_key = p_key.replace(
source_mapset, target_mapset)
if (source_mapset in p_key and
new_p_key in primary_key_vals):
deleted_keys.append(new_p_key)
delete_old_entry = f"DELETE FROM {table_name}" \
f" WHERE {primary_key}='{new_p_key}'"
cur.execute(delete_old_entry)
old_row = [row for row in cur.execute(
f"SELECT * FROM {table_name} WHERE "
f"{primary_key}='{p_key}'")][0]
new_row = list()
for old_v, col in zip(old_row, columns):
if (col not in skip_columns and
isinstance(old_v, str)):
new_v = old_v.replace(
source_mapset, target_mapset)
elif old_v is None:
new_v == "NULL"
else:
new_v = old_v
new_row.append(new_v)
insert_statment = f"INSERT INTO {table_name}" \
f" {tuple(columns)} VALUES {tuple(new_row)}"
cur.execute(insert_statment)
else:
cur.execute(update_statement)
else:
cur.execute(update_statement)

def _change_mapsetname_in_tgis(self, tgis_path, source_mapset,
target_mapset, target_tgis_db):
Expand All @@ -381,20 +398,13 @@ def _change_mapsetname_in_tgis(self, tgis_path, source_mapset,
table_names = [row[1] for row in cur.execute(
"SELECT * FROM sqlite_master where type='table'")]
for table_name in table_names:
columns = [row[0] for row in cur.execute(
f"SELECT * FROM {table_name}").description]
for col in columns:
cur.execute(f"UPDATE {table_name} SET {col} = REPLACE({col}, "
f"'{source_mapset}', '{target_mapset}')")
self._change_mapsetname_in_tgistable(
cur, table_name, source_mapset, target_mapset)
con.commit()
if con:
con.close()
del cur

# if there already exists a sqlite.db file then merge it
if target_tgis_db is not None:
self._merge_tgis_dbs(tgis_db_path, target_tgis_db)

# update views
self._update_views_in_tgis(tgis_db_path)

Expand Down Expand Up @@ -433,19 +443,31 @@ def _merge_mapset_into_target(self, source_mapset, target_mapset):

if os.path.exists(source_path) is True:
# Hardlink the sources into the target
stdout = subprocess.PIPE
stderr = subprocess.PIPE

p = subprocess.Popen(["/bin/cp", "-flr",
"%s" % source_path,
"%s/." % target_path],
stdout=stdout,
stderr=stderr)
(stdout_buff, stderr_buff) = p.communicate()
if p.returncode != 0:
raise AsyncProcessError(
"Unable to merge mapsets. Error in linking:"
" stdout: %s stderr: %s" % (stdout_buff, stderr_buff))
self._copy_folder(
source_path, target_path,
msg="merge mapsets. Error in linking")

def _copy_folder(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using shutil.copytree()?

self, source_path, target_path,
msg="copy temporary mapset to original location"):
try:
stdout = subprocess.PIPE
stderr = subprocess.PIPE
p = subprocess.Popen(["/bin/cp", "-fr",
"%s" % source_path,
"%s" % target_path],
stdout=stdout,
stderr=stderr)
(stdout_buff, stderr_buff) = p.communicate()
if p.returncode != 0:
raise AsyncProcessError(
f"Unable to {msg}. Copy error "
"stdout: %s stderr: %s returncode: %i" % (stdout_buff,
stderr_buff,
p.returncode))
except Exception as e:
raise AsyncProcessError(
f"Unable to copy {msg}. Exception %s" % str(e))

def _copy_merge_tmp_mapset_to_target_mapset(self):
"""Copy the temporary mapset into the original location
Expand Down Expand Up @@ -491,25 +513,7 @@ def _copy_merge_tmp_mapset_to_target_mapset(self):

self._send_resource_update(message)

try:
stdout = subprocess.PIPE
stderr = subprocess.PIPE
p = subprocess.Popen(["/bin/cp", "-fr",
"%s" % source_path,
"%s" % target_path],
stdout=stdout,
stderr=stderr)
(stdout_buff, stderr_buff) = p.communicate()
if p.returncode != 0:
raise AsyncProcessError(
"Unable to copy temporary mapset to "
"original location. Copy error "
"stdout: %s stderr: %s returncode: %i" % (stdout_buff,
stderr_buff,
p.returncode))
except Exception as e:
raise AsyncProcessError("Unable to copy temporary mapset to "
"original location. Exception %s" % str(e))
self._copy_folder(source_path, target_path)

# Merge the temp mapset into the target mapset in case the target already exists
if self.target_mapset_exists is True:
Expand Down Expand Up @@ -561,6 +565,116 @@ def _execute_process_list(self, process_list):
elif process.exec_type == "python":
eval(process.executable)

def _tgis_set_mapset_to_temp_mapset(self):
"""Rename mapset of STRDS in tgis sqlite.db file
"""
tgis_db_path = os.path.join(self.temp_mapset_path, "tgis", "sqlite.db")

con = sqlite3.connect(tgis_db_path)
cur = con.cursor()
table_names = [row[1] for row in cur.execute(
"SELECT * FROM sqlite_master where type='table'")]

tables_not_to_change = [
"raster_base",
"raster_relative_time",
"raster_absolute_time",
"raster_spatial_extent",
"raster_metadata",
"vector_base",
"vector_relative_time",
"vector_absolute_time",
"vector_spatial_extent",
"vector_metadata",
"raster3d_base",
"raster3d_relative_time",
"raster3d_absolute_time",
"raster3d_spatial_extent",
"raster3d_metadata",
"tgis_metadata",
]
# raster_map_register_XXX
tables_change_all_mapsets = [
"strds_base",
"strds_relative_time",
"strds_absolute_time",
"strds_spatial_extent",
"strds_metadata",
"stvds_base",
"stvds_relative_time",
"stvds_absolute_time",
"stvds_spatial_extent",
"stvds_metadata",
"str3ds_base",
"str3ds_relative_time",
"str3ds_absolute_time",
"str3ds_spatial_extent",
"str3ds_metadata",

]
tables_change_only_few_mapsets = [
"raster_stds_register",
"vector_stds_register",
"raster3d_stds_register",
]

for table_name in table_names:
if table_name in tables_not_to_change:
continue
elif table_name in tables_change_all_mapsets:
self._change_mapsetname_in_tgistable(
cur, table_name,
self.target_mapset_name, self.temp_mapset_name)
elif table_name in tables_change_only_few_mapsets:
self._change_mapsetname_in_tgistable(
cur, table_name,
self.target_mapset_name, self.temp_mapset_name,
["id"])
con.commit()
if con:
con.close()

def _create_temporary_grass_environment(self, source_mapset_name=None,
interim_result_mapset=None,
interim_result_file_path=None):
"""Create a temporary GRASS GIS environment

This method will:
1. create the temporary database
2. sets-up the GRASS environment
3. Create temporary mapset
4. Copies tgis db

This method will link the required mapsets that are
defined in *self.required_mapsets* into the location.
The mapsets may be from the global and/or user database.

Args:
source_mapset_name (str): The name of the source mapset to copy the
WIND file from
interim_result_mapset (str): The path to the mapset which is saved
as interim result and should be used
as start mapset for the job resumtion
interim_result_file_path (str): The path of the interim result
temporary file path
Raises:
This method will raise an AsyncProcessError
"""
super(PersistentProcessing, self)._create_temporary_grass_environment(
source_mapset_name,
interim_result_mapset,
interim_result_file_path
)
tgis_path = os.path.join(
self.user_location_path,
self.target_mapset_name,
"tgis"
)
if os.path.isdir(tgis_path):
self._copy_folder(
tgis_path, os.path.join(self.temp_mapset_path, "tgis"))
self._tgis_set_mapset_to_temp_mapset()

def _execute(self, skip_permission_check=False):
"""Overwrite this function in subclasses

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1399,10 +1399,12 @@ def _create_temporary_grass_environment(self, source_mapset_name=None,
mapset_name="PERMANENT")

# Create the temporary mapset and switch into it
self._create_temporary_mapset(temp_mapset_name=self.temp_mapset_name,
source_mapset_name=source_mapset_name,
interim_result_mapset=interim_result_mapset,
interim_result_file_path=interim_result_file_path)
self._create_temporary_mapset(
temp_mapset_name=self.temp_mapset_name,
source_mapset_name=source_mapset_name,
interim_result_mapset=interim_result_mapset,
interim_result_file_path=interim_result_file_path
)

def _execute(self, skip_permission_check=False):
"""Overwrite this function in subclasses.
Expand Down
Loading