Skip to content

Commit

Permalink
Change mailchimp import method (#22)
Browse files Browse the repository at this point in the history
- load data in the db more often
- allow for a longer timeout & retry time
  • Loading branch information
tudoramariei authored Jul 26, 2023
1 parent 6db4ca6 commit 9e6ab58
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 32 deletions.
10 changes: 6 additions & 4 deletions api/paul_api/paul_api/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@
SENTRY_DSN=(str, ""),
SENTRY_ENVIRONMENT=(str, ""),
SENTRY_TRACES_SAMPLE_RATE=(float, 0.0),
BACKGROUND_WORKERS=(int, 3),
ADMIN_SITE_TITLE=(str, "PAUL Admin"),
ADMIN_SITE_HEADER=(str, "PAUL"),
MAILCHIMP_KEY=(str, ""),
# django-q2 settings
BACKGROUND_WORKERS=(int, 3),
WORKER_TIMEOUT=(int, 20 * 60), # All tasks must finish in less than 20 minutes
)
environ.Env.read_env(f"{root}/.env") # reading .env file

Expand Down Expand Up @@ -196,10 +198,10 @@
"name": "paul",
"workers": env("BACKGROUND_WORKERS"),
"recycle": 100,
"timeout": 300, # All tasks must finish in less than 5 minutes
"retry": 600, # Retry unfinished tasks after 10 minutes
"timeout": env("WORKER_TIMEOUT"),
"retry": env("WORKER_TIMEOUT") + 120, # Retry unfinished tasks after 2 more minutes
"ack_failures": True,
"max_attempts": 3,
"max_attempts": 2,
"compress": True,
"save_limit": 200,
"queue_limit": 4,
Expand Down
74 changes: 46 additions & 28 deletions api/paul_api/plugin_mailchimp/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,20 @@ def create_mailchimp_tables(audiences_name: str="") -> int:
get_or_create_table(mc_settings.audiences_stats_table_name, 'audiences_stats')
get_or_create_table(mc_settings.audience_segments_table_name, 'audience_segments')
get_or_create_table(mc_settings.audience_tags_table_name, 'audience_tags')

contact_table = get_or_create_table(
audiences_name, 'contact_merge_fields', 'audience_members')
contact_table.table_type = Table.TYPE_CONTACTS
contact_table.save()

get_or_create_table(
mc_settings.segment_members_table_name, 'contact_merge_fields', 'segment_members')

return contact_table.id


def get_or_create_table(table_name: str, *table_rulesets: str) -> Table:

if not len(table_rulesets):
raise ValueError(_("No table rulesets provided"))

Expand All @@ -87,7 +87,7 @@ def get_or_create_table(table_name: str, *table_rulesets: str) -> Table:
# Combine all required table definitions into a single one
mappings = [table_fields.TABLE_MAPPING[ruleset] for ruleset in table_rulesets]
table_fields_defs = functools.reduce(operator.ior, mappings, {})

for field_name, field_details in table_fields_defs.items():
column, created = TableColumn.objects.get_or_create(
table=table,
Expand Down Expand Up @@ -214,15 +214,15 @@ def retrieve_lists_data(client: MailChimp):
'audience_id': mlist['id'],
'audience_name': mlist['name']
})

for field in audiences_stats_table_fields_defs:
field_def = audiences_stats_table_fields_defs[field]

try:
field_value = get_field_value(field, field_def, mlist)
except KeyError:
continue

if field_def['type'] == 'date':
audience_stats_entry.data[field] = field_value[:10]
else:
Expand All @@ -232,10 +232,11 @@ def retrieve_lists_data(client: MailChimp):

# Sync list segments
list_segments = client.lists.segments.all(list_id=mlist['id'], get_all=True)
segment_members_creation_queue = []
segment_members_update_queue = []

for segment in list_segments['segments']:
segment_members_creation_queue = []
segment_members_update_queue = []

audience_segments_exists = Entry.objects.filter(
table=audience_segments_table, data__audience_id=segment['list_id'])
if audience_segments_exists:
Expand Down Expand Up @@ -286,15 +287,15 @@ def retrieve_lists_data(client: MailChimp):
})
for field in segment_members_and_contact_fields_defs:
field_def = segment_members_and_contact_fields_defs[field]

try:
field_value = get_field_value(field, field_def, member)
except KeyError:
continue

if field_def['type'] == 'enum':
table_column = TableColumn.objects.get(table=segment_members_table, name=field)

if not table_column.choices:
table_column.choices = []
if is_list_field(field_def):
Expand All @@ -318,16 +319,25 @@ def retrieve_lists_data(client: MailChimp):
segment_members_update_queue.append(segment_members_entry)
else:
segment_members_creation_queue.append(segment_members_entry)

Entry.objects.bulk_create(segment_members_creation_queue, batch_size=50)
Entry.objects.bulk_update(segment_members_update_queue, ["data"], batch_size=50)

segment_threshold = 1000
if len(segment_members_creation_queue) > segment_threshold:
Entry.objects.bulk_create(segment_members_creation_queue, batch_size=50)
segment_members_creation_queue = []
if len(segment_members_update_queue) > segment_threshold:
Entry.objects.bulk_update(segment_members_update_queue, ["data"], batch_size=50)
segment_members_update_queue = []

Entry.objects.bulk_create(segment_members_creation_queue, batch_size=50)
Entry.objects.bulk_update(segment_members_update_queue, ["data"], batch_size=50)

# # Sync list members
list_members = client.lists.members.all(list_id=mlist['id'], get_all=True)
list_members_creation_queue = []
list_members_update_queue = []

for member in list_members['members']:
list_members_creation_queue = []
list_members_update_queue = []

member['audience_name'] = mlist['name']
audience_members_exists = Entry.objects.filter(
table=audience_members_table, data__id=member['id'], data__audience_id=mlist['id'])
Expand All @@ -354,7 +364,7 @@ def retrieve_lists_data(client: MailChimp):
except Exception as e:
print(e)
raise e

if field_def['type'] == 'enum':
table_column = TableColumn.objects.get(table=audience_members_table, name=field)
if not table_column.choices:
Expand Down Expand Up @@ -382,14 +392,22 @@ def retrieve_lists_data(client: MailChimp):
audience_members_entry.data[field] = field_value[:10]
else:
audience_members_entry.data[field] = field_value

if audience_members_exists:
list_members_update_queue.append(audience_members_entry)
else:
list_members_creation_queue.append(audience_members_entry)

Entry.objects.bulk_create(list_members_creation_queue, batch_size=50)
Entry.objects.bulk_update(list_members_update_queue, ["data"], batch_size=50)

members_threshold = 1000
if len(list_members_creation_queue) > members_threshold:
Entry.objects.bulk_create(list_members_creation_queue, batch_size=50)
list_members_creation_queue = []
if len(list_members_update_queue) > members_threshold:
Entry.objects.bulk_update(list_members_update_queue, ["data"], batch_size=50)
list_members_update_queue = []

Entry.objects.bulk_create(list_members_creation_queue, batch_size=50)
Entry.objects.bulk_update(list_members_update_queue, ["data"], batch_size=50)

return success, stats

Expand Down Expand Up @@ -441,7 +459,7 @@ def get_emails_from_filtered_view(token, filtered_view):
while continue_request: # TODO: get rid of web request
url = 'http://{}/api/filters/{}/entries/?page={}'.format(
settings.ALLOWED_HOSTS[0],
filtered_view.pk,
filtered_view.pk,
page
)
r = requests.get(url, headers=headers).json()
Expand Down Expand Up @@ -484,7 +502,7 @@ def update_table_fields(table: Table, column_model: TableColumn, field_defs: dic
"""
Update table fields with the new field defs
"""

total_updates = 0
for field_name, field_details in field_defs.items():
# First, check if a column with the current name already exists
Expand All @@ -503,14 +521,14 @@ def update_table_fields(table: Table, column_model: TableColumn, field_defs: dic
# If the column does not exist with either the current name or the old name, create it
if not column:
column = column_model(table=table)

# Update the column details
column.name = field_name
column.display_name = field_details['display_name']
column.field_type = field_details['type']
column.save()
total_updates += 1

return total_updates


Expand All @@ -519,17 +537,17 @@ def update_entry_data_keys(table: Table, entry_model: Entry, field_defs: dict) -
Update table entry data keys from the old key value to the new key value,
if the new key doesn't already exist
"""

total_updates = 0
entry_update_queue = []
entries = entry_model.objects.filter(table=table)

key_mapping = {}
for field_name, field_details in field_defs.items():
old_key = field_details.get('old_key')
if old_key:
key_mapping[old_key] = field_name

# Rename old keys for each table entry
for entry in entries:
if not entry.data:
Expand All @@ -547,4 +565,4 @@ def update_entry_data_keys(table: Table, entry_model: Entry, field_defs: dict) -
total_updates += 1

entry_model.objects.bulk_update(entry_update_queue, ["data"], batch_size=50)
return total_updates
return total_updates

0 comments on commit 9e6ab58

Please sign in to comment.