Skip to content

Commit

Permalink
[Storage] Garbage collector CLI
Browse files Browse the repository at this point in the history
Added a new command to the CCN operator CLI to run a garbage
collector on local storage. The new `gc run` command lists
all the files that are not linked to any message or permanent
pin and deletes them.

Using the --verbose option, the command will print more details
on the files it will preserve and delete. The --dry-run option
allows to run the GC without actually deleting any file.
  • Loading branch information
odesenfans committed Jun 14, 2022
1 parent fdf844f commit 15b9a1e
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 3 deletions.
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ testing =
pytest-aiohttp
pytest-asyncio
pytest-mock
types-pytz
types-pyyaml
types-requests
types-setuptools
Expand Down
159 changes: 159 additions & 0 deletions src/aleph/ccn_cli/commands/garbage_collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
"""
This migration checks all the files stored in local storage (=GridFS) and compares them to the list
of messages already on the node. The files that are not linked to any message are scheduled for
deletion.
"""
import asyncio
import datetime as dt
from typing import Any, Dict, FrozenSet, List
from typing import cast

import pytz
import typer
from aleph_message.models import MessageType
from configmanager import Config

import aleph.model
from aleph.ccn_cli.cli_config import CliConfig
from aleph.config import get_defaults
from aleph.model import init_db_globals
from aleph.model.filepin import PermanentPin
from aleph.model.hashes import delete_value as delete_gridfs_file
from aleph.model.messages import Message

gc_ns = typer.Typer()


# Get all the messages that potentially store data in local storage:
# * AGGREGATEs with item_type=="storage"
# * POSTs with item_type=="storage"
# * STOREs with content.item_type=="storage"
async def get_hashes(
msg_type: MessageType, item_type_field: str, item_hash_field: str
) -> FrozenSet[str]:
def rgetitem(dictionary: Any, fields: List[str]) -> Any:
value = dictionary[fields[0]]
if len(fields) > 1:
return rgetitem(value, fields[1:])
return value

return frozenset(
[
rgetitem(msg, item_hash_field.split("."))
async for msg in Message.collection.find(
{"type": msg_type, item_type_field: "storage"},
{item_hash_field: 1},
batch_size=1000,
)
]
)


def print_files_to_preserve(files_to_preserve: Dict[str, FrozenSet[str]]) -> None:
typer.echo("The following files will be preserved:")
for file_type, files in files_to_preserve.items():
typer.echo(f"* {len(files)} {file_type}")


async def list_files_to_preserve(
gridfs_files_dict: Dict[str, Dict],
temporary_files_ttl: int,
) -> Dict[str, FrozenSet[str]]:
files_to_preserve_dict = {}

# Preserve any file that was uploaded less than an hour ago
current_datetime = pytz.utc.localize(dt.datetime.utcnow())
files_to_preserve_dict["temporary files"] = frozenset(
[
file["filename"]
for file in gridfs_files_dict.values()
if file["uploadDate"]
> current_datetime - dt.timedelta(seconds=temporary_files_ttl)
]
)

# Get all the messages that potentially store data in local storage:
# * AGGREGATEs with item_type=="storage"
# * POSTs with item_type=="storage"
# * STOREs with content.item_type=="storage"
files_to_preserve_dict["aggregates"] = await get_hashes(
MessageType.aggregate, "item_type", "item_hash"
)
files_to_preserve_dict["posts"] = await get_hashes(
MessageType.post, "item_type", "item_hash"
)
files_to_preserve_dict["stores"] = await get_hashes(
MessageType.store, "content.item_type", "content.item_hash"
)

# We also keep permanent pins, even if they are also stored on IPFS
files_to_preserve_dict["file pins"] = frozenset(
[
pin["multihash"]
async for pin in PermanentPin.collection.find({}, {"multihash": 1})
]
)

return files_to_preserve_dict


async def run(ctx: typer.Context, dry_run: bool):
config = Config(schema=get_defaults())
cli_config = cast(CliConfig, ctx.obj)
config.yaml.load(str(cli_config.config_file_path))

init_db_globals(config=config)
if aleph.model.db is None: # for mypy
raise ValueError("DB not initialized as expected.")

# Get a set of all the files currently in GridFS
gridfs_files_dict = {
file["filename"]: file
async for file in aleph.model.db["fs.files"].find(
projection={"_id": 0, "filename": 1, "length": 1, "uploadDate": 1},
batch_size=1000,
)
}
gridfs_files = frozenset(gridfs_files_dict.keys())

typer.echo(f"Found {len(gridfs_files_dict)} files in local storage.")

files_to_preserve_dict = await list_files_to_preserve(
gridfs_files_dict=gridfs_files_dict,
temporary_files_ttl=config.storage.temporary_files_ttl.value,
)
files_to_preserve = frozenset().union(*files_to_preserve_dict.values())
files_to_delete = gridfs_files - files_to_preserve

if cli_config.verbose:
print_files_to_preserve(files_to_preserve_dict)

restored_memory = sum(
gridfs_files_dict[filename]["length"] for filename in files_to_delete
)
typer.echo(
f"{len(files_to_delete)} will be deleted, totaling {restored_memory} bytes."
)

if dry_run:
if cli_config.verbose:
typer.echo("The following files will be deleted:")
for file_to_delete in files_to_delete:
typer.echo(f"* {file_to_delete}")

else:
for file_to_delete in files_to_delete:
typer.echo(f"Deleting {file_to_delete}...")
await delete_gridfs_file(file_to_delete)

typer.echo("Done.")


@gc_ns.command(name="run")
def run_gc(
ctx: typer.Context,
dry_run: bool = typer.Option(
False, help="If set, display files to delete without deleting them."
),
):
asyncio.run(run(ctx, dry_run))
3 changes: 3 additions & 0 deletions src/aleph/ccn_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import typer

from .cli_config import CliConfig
from .commands.garbage_collector import gc_ns
from .commands.keys import keys_ns
from .commands.migrations import migrations_ns

Expand All @@ -17,6 +18,7 @@ def validate_config_file_path(config: Optional[Path]) -> Optional[Path]:

return config


def validate_key_dir(key_dir: Optional[Path]) -> Optional[Path]:
if key_dir is not None:
if key_dir.exists and not key_dir.is_dir():
Expand Down Expand Up @@ -63,6 +65,7 @@ def main(
ctx.obj = cli_config


app.add_typer(gc_ns, name="gc", help="Invoke the garbage collector.")
app.add_typer(keys_ns, name="keys", help="Operations on private keys.")
app.add_typer(migrations_ns, name="migrations", help="Run DB migrations.")

Expand Down
11 changes: 8 additions & 3 deletions src/aleph/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def get_defaults():
return {
"logging": {
"level": logging.WARNING,
"max_log_file_size": 1_000_000_000 # 1GB,
"max_log_file_size": 1_000_000_000, # 1GB,
},
"aleph": {
"queue_topic": "ALEPH-QUEUE",
Expand Down Expand Up @@ -39,7 +39,12 @@ def get_defaults():
"/ip4/62.210.93.220/tcp/4025/p2p/QmXdci5feFmA2pxTg8p3FCyWmSKnWYAAmr7Uys1YCTFD8U",
],
},
"storage": {"folder": "./data/", "store_files": False, "engine": "mongodb"},
"storage": {
"folder": "./data/",
"store_files": False,
"engine": "mongodb",
"temporary_files_ttl": 3600,
},
"nuls": {
"chain_id": 8964,
"enabled": False,
Expand Down Expand Up @@ -89,7 +94,7 @@ def get_defaults():
"peers": [
"/dnsaddr/api1.aleph.im/ipfs/12D3KooWNgogVS6o8fVsPdzh2FJpCdJJLVSgJT38XGE1BJoCerHx",
"/ip4/51.159.57.71/tcp/4001/p2p/12D3KooWBH3JVSBwHLNzxv7EzniBP3tDmjJaoa3EJBF9wyhZtHt2",
"/ip4/62.210.93.220/tcp/4001/p2p/12D3KooWLcmvqojHzUnR7rr8YhFKGDD8z7fmsPyBfAm2rT3sFGAF"
"/ip4/62.210.93.220/tcp/4001/p2p/12D3KooWLcmvqojHzUnR7rr8YhFKGDD8z7fmsPyBfAm2rT3sFGAF",
],
},
"sentry": {
Expand Down

0 comments on commit 15b9a1e

Please sign in to comment.