Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexer adjustments #221

Merged
merged 8 commits into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions hive/indexer/accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ class Accounts:
# fifo queue
_dirty = UniqueFIFO()

# in-mem id->rank map
_ranks = {}

# account core methods
# --------------------

Expand Down Expand Up @@ -116,15 +119,11 @@ def flush(cls, steem, trx=False, spread=1):
return count

@classmethod
def update_ranks(cls):
"""Rebuild `hive_accounts` table rank-by-vote-weight column."""
sql = """
UPDATE hive_accounts
SET rank = r.rnk
FROM (SELECT id, ROW_NUMBER() OVER (ORDER BY vote_weight DESC) as rnk FROM hive_accounts) r
WHERE hive_accounts.id = r.id AND rank != r.rnk;
"""
DB.query(sql)
def fetch_ranks(cls):
"""Rebuild account ranks and store in memory for next update."""
sql = "SELECT id FROM hive_accounts ORDER BY vote_weight DESC"
for rank, _id in enumerate(DB.query_col(sql)):
cls._ranks[_id] = rank + 1

@classmethod
def _cache_accounts(cls, accounts, steem, trx=True):
Expand Down Expand Up @@ -186,5 +185,10 @@ def _sql(cls, account, cached_at):

'raw_json': json.dumps(account)}

# update rank field, if present
_id = cls.get_id(account['name'])
if _id in cls._ranks:
values['rank'] = cls._ranks[_id]

bind = ', '.join([k+" = :"+k for k in list(values.keys())][1:])
return ("UPDATE hive_accounts SET %s WHERE name = :name" % bind, values)
52 changes: 39 additions & 13 deletions hive/indexer/cached_post.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from hive.utils.timer import Timer
from hive.indexer.accounts import Accounts

# pylint: disable=too-many-lines

log = logging.getLogger(__name__)

DB = Db.instance()
Expand Down Expand Up @@ -304,6 +306,7 @@ def _update_batch(cls, steem, tuples, trx=True, full_total=None):
(i.e. `not post['author']`), it's important to advance _last_id,
because this cursor is used to deduce any missing cache entries.
"""
# pylint: disable=too-many-locals

timer = Timer(total=len(tuples), entity='post',
laps=['rps', 'wps'], full_total=full_total)
Expand All @@ -317,16 +320,28 @@ def _update_batch(cls, steem, tuples, trx=True, full_total=None):
posts = steem.get_content_batch(post_args)
post_ids = [tup[1] for tup in tups]
post_levels = [tup[2] for tup in tups]

catmap = cls._get_cat_map_for_insert(tups)
for pid, post, level in zip(post_ids, posts, post_levels):
if post['author']:
if pid in catmap: post['category'] = catmap[pid]
buffer.extend(cls._sql(pid, post, level=level))
else:
# When a post has been deleted (or otherwise DNE),
# steemd simply returns a blank post object w/ all
# fields blank. While it's best to not try to cache
# already-deleted posts, it can happen during missed
# post sweep and while using `trail_blocks` > 0.
pass

# monitor: post not found which should def. exist; see #173
sql = """SELECT id, author, permlink, is_deleted
FROM hive_posts WHERE id = :id"""
row = DB.query_row(sql, id=pid)
if level == 'insert' and not row['is_deleted']:
log.warning("couldnt load post for %s: %s", level, row)
else:
log.info("couldnt load post for %s: %s", level, row)

cls._bump_last_id(pid)

timer.batch_lap()
Expand All @@ -345,33 +360,44 @@ def last_id(cls):
cls._last_id = DB.query_one(sql)
return cls._last_id

@classmethod
def _get_cat_map_for_insert(cls, tups):
"""Cached posts must use validated `category` from hive_posts.

`category` returned from steemd is subject to change.
"""
# get list of ids of posts which are to be inserted
ids = [tup[1] for tup in tups if tup[2] == 'insert']
if not ids:
return {}
# build a map of id->category for each of those posts
sql = "SELECT id, category FROM hive_posts WHERE id IN :ids"
cats = {r[0]: r[1] for r in DB.query_all(sql, ids=tuple(ids))}
return cats

@classmethod
def _bump_last_id(cls, next_id):
"""Update our last_id based on a recent insert."""
last_id = cls.last_id()
if next_id <= last_id:
return

if next_id - last_id > 2:
gap = next_id - last_id - 1
if gap:
log.info("skipped %d ids %d -> %d", gap, last_id, next_id)
cls._ensure_safe_gap(last_id, next_id)
if next_id - last_id > 4:
# gap of 2 is common due to deletions. report on larger gaps.
log.warning("skipping post ids %d -> %d", last_id, next_id)

cls._last_id = next_id

@classmethod
def _ensure_safe_gap(cls, last_id, next_id):
"""Paranoid check of important operating assumption."""
sql = """
SELECT COUNT(*) FROM hive_posts
WHERE id BETWEEN :x1 AND :x2 AND is_deleted = '0'
"""
sql = """SELECT COUNT(*) FROM hive_posts
WHERE id BETWEEN :x1 AND :x2 AND is_deleted = '0'"""
missing_posts = DB.query_one(sql, x1=(last_id + 1), x2=(next_id - 1))
if not missing_posts:
return
raise Exception("found large cache gap: %d --> %d (%d)"
% (last_id, next_id, missing_posts))
if missing_posts:
raise Exception("found cache gap: %d --> %d (%d)"
% (last_id, next_id, missing_posts))

@classmethod
def _sql(cls, pid, post, level=None):
Expand Down
26 changes: 17 additions & 9 deletions hive/indexer/custom_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@

log = logging.getLogger(__name__)

def _get_auth(op):
"""get account name submitting a custom_json op.

Hive custom_json op processing requires `required_posting_auths`
is always used and length 1. It may be that some ops will require
`required_active_auths` in the future. For now, these are ignored.
"""
if op['required_auths']:
log.warning("unexpected active auths: %s", op)
return None
if len(op['required_posting_auths']) != 1:
log.warning("unexpected auths: %s", op)
return None
return op['required_posting_auths'][0]

class CustomOp:
"""Processes custom ops and dispatches updates."""

Expand All @@ -27,18 +42,11 @@ def process_ops(cls, ops, block_num, block_date):
if op['id'] not in ['follow', 'com.steemit.community']:
continue

# we assume `required_posting_auths` is always used and length 1.
# it may be that some ops require `required_active_auths` instead.
# (e.g. if we use that route for admin action of acct creation)
# if op['required_active_auths']:
# log.warning("unexpected active auths: %s" % op)
if len(op['required_posting_auths']) != 1:
log.warning("unexpected auths: %s", op)
account = _get_auth(op)
if not account:
continue

account = op['required_posting_auths'][0]
op_json = load_json_key(op, 'json')

if op['id'] == 'follow':
if block_num < 6000000 and not isinstance(op_json, list):
op_json = ['follow', op_json] # legacy compat
Expand Down
7 changes: 4 additions & 3 deletions hive/indexer/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ def run(self):
# ensure db schema up to date, check app status
DbState.initialize()

# prefetch id->name memory map
# prefetch id->name and id->rank memory maps
Accounts.load_ids()
Accounts.fetch_ranks()

if DbState.is_initial_sync():
# resume initial sync
Expand Down Expand Up @@ -189,8 +190,8 @@ def listen(self):
cnt['insert'], cnt['update'], cnt['payout'], cnt['upvote'],
cnt['recount'], accts, follows, ms, ' SLOW' if ms > 1000 else '')

#if num % 1200 == 0: #1hr
# Accounts.update_ranks() #144
if num % 1200 == 0: #1hr
Accounts.fetch_ranks()
if num % 100 == 0: #5min
Accounts.dirty_oldest(500)
Accounts.flush(steemd, trx=True)
Expand Down
2 changes: 1 addition & 1 deletion hive/steem/block/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def start(self, start_block):

while self._gap_ok(curr, head):
head = schedule.wait_for_block(curr)
block = self._client.get_block(curr)
block = self._client.get_block(curr, strict=False)
schedule.check_block(curr, block)

if not block:
Expand Down
9 changes: 7 additions & 2 deletions hive/steem/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,19 @@ def get_content_batch(self, tuples):
assert 'author' in post, "invalid post: %s" % post
return posts

def get_block(self, num):
def get_block(self, num, strict=True):
"""Fetches a single block.

If the result does not contain a `block` key, it's assumed
this block does not yet exist and None is returned.
"""
result = self.__exec('get_block', {'block_num': num})
return result['block'] if 'block' in result else None
if 'block' in result:
return result['block']
elif strict:
raise Exception('block %d not available' % num)
else:
return None

def stream_blocks(self, start_from, trail_blocks=0, max_gap=100):
"""Stream blocks. Returns a generator."""
Expand Down