diff --git a/hive/indexer/accounts.py b/hive/indexer/accounts.py index c885298f3..1736e5f58 100644 --- a/hive/indexer/accounts.py +++ b/hive/indexer/accounts.py @@ -26,6 +26,9 @@ class Accounts: # fifo queue _dirty = UniqueFIFO() + # in-mem id->rank map + _ranks = {} + # account core methods # -------------------- @@ -116,15 +119,11 @@ def flush(cls, steem, trx=False, spread=1): return count @classmethod - def update_ranks(cls): - """Rebuild `hive_accounts` table rank-by-vote-weight column.""" - sql = """ - UPDATE hive_accounts - SET rank = r.rnk - FROM (SELECT id, ROW_NUMBER() OVER (ORDER BY vote_weight DESC) as rnk FROM hive_accounts) r - WHERE hive_accounts.id = r.id AND rank != r.rnk; - """ - DB.query(sql) + def fetch_ranks(cls): + """Rebuild account ranks and store in memory for next update.""" + sql = "SELECT id FROM hive_accounts ORDER BY vote_weight DESC" + for rank, _id in enumerate(DB.query_col(sql)): + cls._ranks[_id] = rank + 1 @classmethod def _cache_accounts(cls, accounts, steem, trx=True): @@ -186,5 +185,10 @@ def _sql(cls, account, cached_at): 'raw_json': json.dumps(account)} + # update rank field, if present + _id = cls.get_id(account['name']) + if _id in cls._ranks: + values['rank'] = cls._ranks[_id] + bind = ', '.join([k+" = :"+k for k in list(values.keys())][1:]) return ("UPDATE hive_accounts SET %s WHERE name = :name" % bind, values) diff --git a/hive/indexer/cached_post.py b/hive/indexer/cached_post.py index f731b41b9..0b4abf010 100644 --- a/hive/indexer/cached_post.py +++ b/hive/indexer/cached_post.py @@ -12,6 +12,8 @@ from hive.utils.timer import Timer from hive.indexer.accounts import Accounts +# pylint: disable=too-many-lines + log = logging.getLogger(__name__) DB = Db.instance() @@ -304,6 +306,7 @@ def _update_batch(cls, steem, tuples, trx=True, full_total=None): (i.e. `not post['author']`), it's important to advance _last_id, because this cursor is used to deduce any missing cache entries. """ + # pylint: disable=too-many-locals timer = Timer(total=len(tuples), entity='post', laps=['rps', 'wps'], full_total=full_total) @@ -317,8 +320,11 @@ def _update_batch(cls, steem, tuples, trx=True, full_total=None): posts = steem.get_content_batch(post_args) post_ids = [tup[1] for tup in tups] post_levels = [tup[2] for tup in tups] + + catmap = cls._get_cat_map_for_insert(tups) for pid, post, level in zip(post_ids, posts, post_levels): if post['author']: + if pid in catmap: post['category'] = catmap[pid] buffer.extend(cls._sql(pid, post, level=level)) else: # When a post has been deleted (or otherwise DNE), @@ -326,7 +332,16 @@ def _update_batch(cls, steem, tuples, trx=True, full_total=None): # fields blank. While it's best to not try to cache # already-deleted posts, it can happen during missed # post sweep and while using `trail_blocks` > 0. - pass + + # monitor: post not found which should def. exist; see #173 + sql = """SELECT id, author, permlink, is_deleted + FROM hive_posts WHERE id = :id""" + row = DB.query_row(sql, id=pid) + if level == 'insert' and not row['is_deleted']: + log.warning("couldnt load post for %s: %s", level, row) + else: + log.info("couldnt load post for %s: %s", level, row) + cls._bump_last_id(pid) timer.batch_lap() @@ -345,6 +360,21 @@ def last_id(cls): cls._last_id = DB.query_one(sql) return cls._last_id + @classmethod + def _get_cat_map_for_insert(cls, tups): + """Cached posts must use validated `category` from hive_posts. + + `category` returned from steemd is subject to change. + """ + # get list of ids of posts which are to be inserted + ids = [tup[1] for tup in tups if tup[2] == 'insert'] + if not ids: + return {} + # build a map of id->category for each of those posts + sql = "SELECT id, category FROM hive_posts WHERE id IN :ids" + cats = {r[0]: r[1] for r in DB.query_all(sql, ids=tuple(ids))} + return cats + @classmethod def _bump_last_id(cls, next_id): """Update our last_id based on a recent insert.""" @@ -352,26 +382,22 @@ def _bump_last_id(cls, next_id): if next_id <= last_id: return - if next_id - last_id > 2: + gap = next_id - last_id - 1 + if gap: + log.info("skipped %d ids %d -> %d", gap, last_id, next_id) cls._ensure_safe_gap(last_id, next_id) - if next_id - last_id > 4: - # gap of 2 is common due to deletions. report on larger gaps. - log.warning("skipping post ids %d -> %d", last_id, next_id) cls._last_id = next_id @classmethod def _ensure_safe_gap(cls, last_id, next_id): """Paranoid check of important operating assumption.""" - sql = """ - SELECT COUNT(*) FROM hive_posts - WHERE id BETWEEN :x1 AND :x2 AND is_deleted = '0' - """ + sql = """SELECT COUNT(*) FROM hive_posts + WHERE id BETWEEN :x1 AND :x2 AND is_deleted = '0'""" missing_posts = DB.query_one(sql, x1=(last_id + 1), x2=(next_id - 1)) - if not missing_posts: - return - raise Exception("found large cache gap: %d --> %d (%d)" - % (last_id, next_id, missing_posts)) + if missing_posts: + raise Exception("found cache gap: %d --> %d (%d)" + % (last_id, next_id, missing_posts)) @classmethod def _sql(cls, pid, post, level=None): diff --git a/hive/indexer/custom_op.py b/hive/indexer/custom_op.py index cfd8b2fd2..a18dddc90 100644 --- a/hive/indexer/custom_op.py +++ b/hive/indexer/custom_op.py @@ -17,6 +17,21 @@ log = logging.getLogger(__name__) +def _get_auth(op): + """get account name submitting a custom_json op. + + Hive custom_json op processing requires `required_posting_auths` + is always used and length 1. It may be that some ops will require + `required_active_auths` in the future. For now, these are ignored. + """ + if op['required_auths']: + log.warning("unexpected active auths: %s", op) + return None + if len(op['required_posting_auths']) != 1: + log.warning("unexpected auths: %s", op) + return None + return op['required_posting_auths'][0] + class CustomOp: """Processes custom ops and dispatches updates.""" @@ -27,18 +42,11 @@ def process_ops(cls, ops, block_num, block_date): if op['id'] not in ['follow', 'com.steemit.community']: continue - # we assume `required_posting_auths` is always used and length 1. - # it may be that some ops require `required_active_auths` instead. - # (e.g. if we use that route for admin action of acct creation) - # if op['required_active_auths']: - # log.warning("unexpected active auths: %s" % op) - if len(op['required_posting_auths']) != 1: - log.warning("unexpected auths: %s", op) + account = _get_auth(op) + if not account: continue - account = op['required_posting_auths'][0] op_json = load_json_key(op, 'json') - if op['id'] == 'follow': if block_num < 6000000 and not isinstance(op_json, list): op_json = ['follow', op_json] # legacy compat diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py index 38d155471..d87a3c93b 100644 --- a/hive/indexer/sync.py +++ b/hive/indexer/sync.py @@ -39,8 +39,9 @@ def run(self): # ensure db schema up to date, check app status DbState.initialize() - # prefetch id->name memory map + # prefetch id->name and id->rank memory maps Accounts.load_ids() + Accounts.fetch_ranks() if DbState.is_initial_sync(): # resume initial sync @@ -189,8 +190,8 @@ def listen(self): cnt['insert'], cnt['update'], cnt['payout'], cnt['upvote'], cnt['recount'], accts, follows, ms, ' SLOW' if ms > 1000 else '') - #if num % 1200 == 0: #1hr - # Accounts.update_ranks() #144 + if num % 1200 == 0: #1hr + Accounts.fetch_ranks() if num % 100 == 0: #5min Accounts.dirty_oldest(500) Accounts.flush(steemd, trx=True) diff --git a/hive/steem/block/stream.py b/hive/steem/block/stream.py index e7c1d0d6b..89be9da1b 100644 --- a/hive/steem/block/stream.py +++ b/hive/steem/block/stream.py @@ -82,7 +82,7 @@ def start(self, start_block): while self._gap_ok(curr, head): head = schedule.wait_for_block(curr) - block = self._client.get_block(curr) + block = self._client.get_block(curr, strict=False) schedule.check_block(curr, block) if not block: diff --git a/hive/steem/client.py b/hive/steem/client.py index 4a56daa8a..5671262a7 100644 --- a/hive/steem/client.py +++ b/hive/steem/client.py @@ -46,14 +46,19 @@ def get_content_batch(self, tuples): assert 'author' in post, "invalid post: %s" % post return posts - def get_block(self, num): + def get_block(self, num, strict=True): """Fetches a single block. If the result does not contain a `block` key, it's assumed this block does not yet exist and None is returned. """ result = self.__exec('get_block', {'block_num': num}) - return result['block'] if 'block' in result else None + if 'block' in result: + return result['block'] + elif strict: + raise Exception('block %d not available' % num) + else: + return None def stream_blocks(self, start_from, trail_blocks=0, max_gap=100): """Stream blocks. Returns a generator."""