Skip to content

Commit

Permalink
Merge pull request #6273 from tomachalek/archive_redux
Browse files Browse the repository at this point in the history
Revert conc archiving with some simplifications:
  • Loading branch information
tomachalek authored Jul 31, 2024
2 parents 6b50847 + 4e7d6f8 commit e291926
Show file tree
Hide file tree
Showing 16 changed files with 246 additions and 273 deletions.
18 changes: 0 additions & 18 deletions lib/action/model/concordance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,6 @@ async def store_unbound_query_chain(self, chain: List[Tuple[str, ConcFormArgs]])
new_ids, _ = await self._store_conc_params()
self._active_q_data = await qp.open(new_ids[-1])

async def _archive_conc(self, user_id, conc_id):
with plugins.runtime.QUERY_PERSISTENCE as qp:
try:
await qp.archive(user_id, conc_id)
except Exception as ex:
logging.getLogger(__name__).error('Failed to archive concordance {}: {}'.format(conc_id, ex))

async def _store_conc_params(self) -> Tuple[List[str], Optional[int]]:
"""
Stores concordance operation if the query_persistence plugin is installed
Expand All @@ -317,12 +310,6 @@ async def _store_conc_params(self) -> Tuple[List[str], Optional[int]]:
qp_store_id = await qp.store(user_id, curr_data=curr_data, prev_data=self._active_q_data)
ans = [qp_store_id]

# archive the concordance, it may take a bit longer, so we
# do this as a non-blocking operation
task = application.add_task(self._archive_conc(user_id, qp_store_id))
task.add_done_callback(lambda r: logging.getLogger(__name__).debug(
f'finished archiving of conc {qp_store_id}'))

history_ts = await self._save_query_to_history(ans[0], curr_data) if use_history else None
lines_groups = prev_data.get('lines_groups', self._lines_groups.serialize())
for q_idx, op in self._auto_generated_conc_ops:
Expand All @@ -334,11 +321,6 @@ async def _store_conc_params(self) -> Tuple[List[str], Optional[int]]:
corpora=self.get_current_aligned_corpora(), usesubcorp=self.args.usesubcorp,
lastop_form=op.to_dict(), user_id=self.session_get('user', 'id'))
qp_store_id = await qp.store(self.session_get('user', 'id'), curr_data=curr, prev_data=prev)
# archive the concordance, it may take a bit longer, so we
# do this as a non-blocking operation
task = application.add_task(self._archive_conc(user_id, qp_store_id))
task.add_done_callback(lambda r: logging.getLogger(__name__).debug(
f'finished archiving of conc {qp_store_id}'))
ans.append(qp_store_id)
return ans, history_ts

Expand Down
22 changes: 7 additions & 15 deletions lib/plugin_types/query_persistence/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,34 +118,26 @@ async def store(self, user_id: int, curr_data: Dict, prev_data: Optional[Dict] =
"""

@abc.abstractmethod
async def update(self, data: Dict, arch_enqueue: bool = False):
async def update(self, data: Dict):
"""
Update data stored in key-value database with id `data['id']`. If applicable,
it should also try to update an archive item to prevent possible inconsistencies.
If arch_enqueue is set to True then the method should set the item to be archived
(no matter what is actual status of item's archiving).
"""

@abc.abstractmethod
async def archive(self, user_id: int, conc_id: str) -> Dict[str, Any]:
async def archive(self, conc_id: str, explicit: bool) -> Dict[str, Any]:
"""
Make the concordance record persistent.
!!! Important requirements:
1) It is expected the concordance parameters are available via
key-value (Redis) storage already (this should be ensured by KonText)
Important requirements:
2) The method is run within an asyncio task so in case a sql backend
is used (which is very likely), on_aio_task_enter(), on_aio_task_exit()
should be called.
1) It should be OK to call the function multiple times without any
side effects.
3) it is up to this method to decide whether the user user_id is permitted
to change the concordance identified by conc_id
2) It is expected the concordance parameters are available via
key-value (Redis) storage already (this should be ensured by KonText)
arguments:
user_id -- user who wants to perform the operation
conc_id -- an identifier of the concordance
returns:
Expand Down
2 changes: 1 addition & 1 deletion lib/plugins/default_query_history/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async def make_persistent(self, user_id, query_id, q_supertype, created, name):
if last_match_idx > -1:
data[last_match_idx]['name'] = name
await self.db.list_set(k, last_match_idx, data[last_match_idx])
await self._query_persistence.archive(user_id, query_id)
await self._query_persistence.archive(query_id, True)
else:
ts = self._current_timestamp()
item = dict(created=ts, query_id=query_id, name=name, q_supertype=q_supertype)
Expand Down
2 changes: 1 addition & 1 deletion lib/plugins/mysql_corparch/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ async def load_corpus_as_source_info(self, cursor: MySQLCursorAbstract, corpus_i
lang = user_lang.split('_')[0]
lang = lang if lang in ('en', 'cs') else 'en'
await cursor.execute(
f'SELECT IFNULL(com.title_{lang}, com.title_en) AS title, com.authors '
f'SELECT IFNULL(com.desc_{lang}, com.desc_en) AS title, com.authors '
'FROM vlo_metadata_common AS com '
'JOIN vlo_metadata_corpus AS cor ON com.corpus_metadata_id = cor.id '
'WHERE cor.corpus_name = %s AND com.deleted = FALSE', (corpus_id,))
Expand Down
2 changes: 1 addition & 1 deletion lib/plugins/mysql_query_history/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async def _update_name(self, user_id, query_id, created, new_name) -> bool:

async def make_persistent(self, user_id, query_id, q_supertype, created, name) -> bool:
if await self._update_name(user_id, query_id, created, name):
await self._query_persistence.archive(user_id, query_id)
await self._query_persistence.archive(query_id, True)
else:
c = await self.store(user_id, query_id, q_supertype)
await self._update_name(user_id, query_id, c, name)
Expand Down
128 changes: 46 additions & 82 deletions lib/plugins/mysql_query_persistence/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) 2020 Charles University in Prague, Faculty of Arts,
# Institute of the Czech National Corpus
# Copyright (c) 2020 Martin Zimandl <[email protected]>
# Copyright (c) 2023 Tomas Machalek <[email protected]>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
Expand All @@ -27,49 +28,15 @@
The plug-in is able to connect either via its own configuration (see config.rng) or via
an integration_db plugin.
How to create the required data table:
CREATE TABLE kontext_conc_persistence (
id VARCHAR(191) PRIMARY KEY,
data JSON NOT NULL,
created TIMESTAMP NOT NULL,
num_access INT NOT NULL DEFAULT 0,
last_access TIMESTAMP
) DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;
Possible modifications in case the number of records is large:
ALTER TABLE `kontext_conc_persistence`
ENGINE='Aria';
ALTER TABLE `kontext_conc_persistence`
ADD PRIMARY KEY `id_created` (`id`, `created`),
DROP INDEX `PRIMARY`;
ALTER TABLE kontext_conc_persistence
PARTITION BY RANGE (UNIX_TIMESTAMP(created)) (
PARTITION `to_2016` VALUES LESS THAN (UNIX_TIMESTAMP('2016-12-31 23:59:59')),
PARTITION `to_2019` VALUES LESS THAN (UNIX_TIMESTAMP('2019-12-31 23:59:59')),
PARTITION `to_2022` VALUES LESS THAN (UNIX_TIMESTAMP('2022-12-31 23:59:59')),
PARTITION `to_2025` VALUES LESS THAN (UNIX_TIMESTAMP('2025-12-31 23:59:59')),
PARTITION `to_2028` VALUES LESS THAN (UNIX_TIMESTAMP('2028-12-31 23:59:59')),
PARTITION `to_2031` VALUES LESS THAN (UNIX_TIMESTAMP('2031-12-31 23:59:59')),
PARTITION `to_2034` VALUES LESS THAN (UNIX_TIMESTAMP('2034-12-31 23:59:59')),
PARTITION `to_2037` VALUES LESS THAN (UNIX_TIMESTAMP('2037-12-31 23:59:59')),
PARTITION `the_rest` VALUES LESS THAN MAXVALUE
)
"""

import logging
import re
import datetime
import ujson as json
from mysql.connector.errors import IntegrityError

import plugins
from action.errors import ForbiddenException, NotFoundException
from plugin_types.auth import AbstractAuth
from plugin_types.general_storage import KeyValueStorage
from plugin_types.query_persistence import AbstractQueryPersistence
Expand All @@ -80,7 +47,7 @@
from plugins.common.mysql.adhocdb import AdhocDB
from plugins.common.sqldb import DatabaseAdapter
from plugins.mysql_integration_db import MySqlIntegrationDb
from . import cleanup
from plugin_types.query_persistence.error import QueryPersistenceRecNotFound


def mk_key(code):
Expand All @@ -92,9 +59,9 @@ class MySqlQueryPersistence(AbstractQueryPersistence):
This class stores user's queries in their internal form (see Kontext.q attribute).
"""

DEFAULT_TTL_DAYS = 100
DEFAULT_TTL_DAYS = 14

DEFAULT_ANONYMOUS_REC_ARCHIVE_MIN_DAYS = 720
DEFAULT_ARCH_NON_PERMANENT_MAX_AGE_DAYS = 720

def __init__(
self,
Expand All @@ -104,16 +71,13 @@ def __init__(
auth: AbstractAuth):
super().__init__(settings)
plugin_conf = settings.get('plugins', 'query_persistence')
ttl_days = int(plugin_conf.get('ttl_days', MySqlQueryPersistence.DEFAULT_TTL_DAYS))
self._ttl_days = ttl_days
self._ttl_days = int(plugin_conf.get('ttl_days', self.DEFAULT_TTL_DAYS))
self.db = db
self._auth = auth
self._archive = sql_backend
self._settings = settings
self._cleanup_status_key = plugin_conf.get('cleanup_status_key', 'conc_persistence_cleanup_status')

def _get_ttl_for(self, user_id):
return self.ttl
self._archive_non_permanent_max_age_days = plugin_conf.get(
'archive_non_permanent_max_age_days', self.DEFAULT_ARCH_NON_PERMANENT_MAX_AGE_DAYS)

def _get_persist_level_for(self, user_id):
if self._auth.is_anonymous(user_id):
Expand Down Expand Up @@ -218,52 +182,52 @@ def records_differ(r1, r2):
curr_data[USER_ID_KEY] = user_id
data_key = mk_key(data_id)
await self.db.set(data_key, curr_data)
await self.db.set_ttl(data_key, self._get_ttl_for(user_id))
await self.db.set_ttl(data_key, self.ttl)
await self.archive(data_id, False)
latest_id = curr_data[ID_KEY]
else:
latest_id = prev_data[ID_KEY]

return latest_id

async def archive(self, user_id, conc_id):
await self._archive.on_aio_task_enter()
try:
async with self._archive.connection() as conn:
async with await conn.cursor(dictionary=True) as cursor:
await self._archive.begin_tx(cursor)
data = await self.db.get(mk_key(conc_id))
if data is None:
raise NotFoundException(
'Archive store error - concordance {0} not found'.format(conc_id))
stored_user_id = data.get('user_id', None)
if user_id != stored_user_id:
raise ForbiddenException(
'Cannot change status of a concordance belonging to another user')
await cursor.execute(
'INSERT IGNORE INTO kontext_conc_persistence (id, data, created, num_access) '
'VALUES (%s, %s, %s, %s)',
(conc_id, json.dumps(data), datetime.datetime.now().isoformat(), 0))
await conn.commit()
return data
finally:
await self._archive.on_aio_task_exit()
async def archive(self, conc_id, explicit):
data_key = conc_id
hard_limit = 100
async with self._archive.connection() as conn:
while data_key is not None and hard_limit > 0: # hard_limit prevents ending up in infinite loops of 'prev_id'
data = await self.db.get(mk_key(conc_id))
if data:
async with await conn.cursor() as cursor:
try:
await cursor.execute(
"INSERT INTO kontext_conc_persistence (id, data, created) VALUES (%s, %s, NOW())",
(data_key, json.dumps(data)))
except IntegrityError as err:
logging.getLogger(__name__).warning(f'failed to archive {data_key}: {err} (ignored)')
pass # key already in db
else:
async with await conn.cursor() as cursor:
await cursor.execute('SELECT id FROM kontext_conc_persistence WHERE id = %s LIMIT 1', (data_key,))
r = await cursor.fetchone()
if r is None:
raise QueryPersistenceRecNotFound(f'record {data_key} not found neither in cache nor in archive')
data_key = data.get('prev_id', None)
hard_limit -= 1


async def clear_old_archive_records(self):
now = datetime.datetime.now()
min_age = now - datetime.timedelta(days=self._archive_non_permanent_max_age_days)
min_age_sql = min_age.strftime('%Y-%m-%dT%H:%M:%S.%f')
async with self._archive.connection() as conn:
async with await conn.cursor() as cursor:
await cursor.execute(
"DELETE FROM kontext_query_persistence WHERE permanent = 0 AND created < %s", min_age_sql)

def export_tasks(self):
"""
Export tasks for async queue worker(s)
"""
async def cleanup_archive(num_proc: int, dry_run: bool):
return await cleanup.run(
db=self._archive,
kvdb=self.db,
dry_run=dry_run,
num_proc=num_proc,
status_key=self._cleanup_status_key,
anonymous_user_id=self._auth.anonymous_user_id(),
anonymous_rec_max_age_days=self.DEFAULT_ANONYMOUS_REC_ARCHIVE_MIN_DAYS)
return cleanup_archive,

async def update(self, data, arch_enqueue=False):
return (self.clear_old_archive_records,)

async def update(self, data):
"""
Update stored data by data['id'].
"""
Expand Down
Loading

0 comments on commit e291926

Please sign in to comment.