Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

script to generate test data #63

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
20 changes: 12 additions & 8 deletions Importer/jctdata/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,23 @@
@click.option("-o", "--only", "full_pipeline", flag_value=False)
@click.option("-a", "--all", "full_pipeline", flag_value=True, default=True)
@click.option("-f", "--force-resolve", is_flag=True)
def entry_point(mode, targets, stage=None, full_pipeline=True, force_resolve=False):
run(mode, targets, stage, full_pipeline, force_resolve)
@click.option("-t", "--test-database", is_flag=True)
def entry_point(mode, targets, stage=None, full_pipeline=True, force_resolve=False, test_database=False):
if test_database:
full_pipeline = False
run(mode, targets, stage, full_pipeline, force_resolve, test_database)


def run(mode:str, targets:tuple, stage:str=None, full_pipeline:bool=True, force_resolve:bool=False):
def run(mode:str, targets:tuple, stage:str=None, full_pipeline:bool=True, force_resolve:bool=False,
test_database:bool=False):
processor = MODE_MAP.get(mode)
if not processor:
return

processor(targets, stage, full_pipeline, force_resolve)
processor(targets, stage, full_pipeline, force_resolve, test_database)


def resolve(targets, stage=None, full_pipeline=True, force_resolve=False):
def resolve(targets, stage=None, full_pipeline=True, force_resolve=False, test_database=False):
if targets[0] == "_all":
targets = resolver.SOURCES.keys()

Expand All @@ -43,7 +47,7 @@ def resolve(targets, stage=None, full_pipeline=True, force_resolve=False):
getattr(datasource, stage)()


def index(targets, stage=None, full_pipeline=True, force_resolve=False):
def index(targets, stage=None, full_pipeline=True, force_resolve=False, test_database=False):
if targets[0] == "_all":
indexers = factory.get_all_indexers()
else:
Expand All @@ -62,7 +66,7 @@ def index(targets, stage=None, full_pipeline=True, force_resolve=False):
getattr(indexer, stage)()


def load(targets, stage=None, full_pipeline=True, force_resolve=False):
def load(targets, stage=None, full_pipeline=True, force_resolve=False, test_database=False):
if targets[0] == "_all":
targets = factory.get_all_index_names()

Expand All @@ -72,7 +76,7 @@ def load(targets, stage=None, full_pipeline=True, force_resolve=False):
for t in targets:
load_type = settings.INDEX_LOADERS[t]
if load_type == "es":
loader.index_latest_with_alias(t, settings.ES_INDEX_SUFFIX)
loader.index_latest_with_alias(t, settings.ES_INDEX_SUFFIX, test_database)
elif load_type == "file":
loader.load_to_file(t)
elif load_type == "helpdesk":
Expand Down
53 changes: 47 additions & 6 deletions Importer/jctdata/lib/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ def index(infile, bulkfile, conn, index_type, mapping, alias):
with open(infile, "r") as f, open(bulkfile, "w") as o:
line = f.readline()
while line:
d = json.loads(line)
if "id" not in d:
d["id"] = uuid.uuid4().hex
bulklines = esprit.raw.to_bulk_single_rec(d)
o.write(bulklines)
if line:
try:
d = json.loads(line)
if "id" not in d:
d["id"] = uuid.uuid4().hex
bulklines = esprit.raw.to_bulk_single_rec(d)
o.write(bulklines)
except json.JSONDecodeError:
print(f"skipped line {line}")
line = f.readline()

if not esprit.raw.type_exists(conn, index_type, es_version="1.7.5"):
Expand Down Expand Up @@ -63,7 +67,38 @@ def index(infile, bulkfile, conn, index_type, mapping, alias):
esprit.raw.delete(conn)


def index_latest_with_alias(target, index_suffix):
def update_with_test_data(input_file, output_file):
"""
Reads each line from the input file and appends it to a new line in the output file.

Args:
input_file: Path to the input file.
output_file: Path to the output file.
"""
test_data_file = os.path.join(settings.TEST_DATABASE, input_file)
# Check if output file exists
if os.path.exists(test_data_file) and os.path.exists(output_file):
os.makedirs(settings.TEMP_DIR, exist_ok=True)

# Clear existing files in temp directory (if any)
for filename in os.listdir(settings.TEMP_DIR):
file_path = os.path.join(settings.TEMP_DIR, filename)
os.remove(file_path)

# Copy existing output file to temporary directory
output_file = shutil.copy2(output_file, settings.TEMP_DIR)

# Open the input file in read mode and output file in append mode
with open(test_data_file, 'r') as in_file, open(output_file, 'a') as out_file:
# Read each line from the input file
for line in in_file:
# Write the line to the output file
out_file.write(line.strip()+"\n")

return output_file


def index_latest_with_alias(target, index_suffix, test_database=False):
target_dir = settings.INDEX_PATH[target]
os.makedirs(target_dir, exist_ok=True)

Expand Down Expand Up @@ -101,6 +136,12 @@ def index_latest_with_alias(target, index_suffix):
print("LOADER: ALIAS: {x}".format(x=ALIAS))
print("LOADER: BULK: {x}".format(x=BULK_FILE))

if test_database:
print("Appending test data")
# add test database records to the file
IN = update_with_test_data(target + ".json", IN)
print("LOADER: IN: {x}".format(x=IN))

index(IN, BULK_FILE, CONN, INDEX_TYPE, MAPPING, ALIAS)


Expand Down
2 changes: 2 additions & 0 deletions Importer/jctdata/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ def rel2abs(file, *args):

DATABASES = rel2abs(__file__, "..", "databases")
RESOURCES = rel2abs(__file__, "..", "resources")
TEST_DATABASE = rel2abs(__file__, "..", "test_database")
TEMP_DIR = rel2abs(__file__, "..", "temp")

DIR_DATE_FORMAT = "%Y-%m-%d_%H%M"

Expand Down
Loading