Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove sleep loop from moth/setup routines. #1244

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
f05d7eb
remove sleep loop from moth/setup routines.
petersilva Sep 27, 2024
362b9df
implement ebo for broker connection attempts, AMQP
petersilva Oct 2, 2024
f793c92
inadvertantly changed getSetup signature
petersilva Oct 2, 2024
bbf2fe4
promote setEbo to moth parent classe for re-use by mqtt
petersilva Oct 2, 2024
551276d
adding ebo to broker connections in mqtt
petersilva Oct 2, 2024
91e340c
Add logDuplicates to flags and set default to False
mshak2 Aug 20, 2024
44d2c90
Update HTTP report codes for accuracy
mshak2 Aug 22, 2024
d6aa8cb
Fix typos
mshak2 Aug 22, 2024
8f95578
Update HTTP report codes for accuracy and fix typos
mshak2 Aug 22, 2024
1bd32d7
Update HTTP report codes for accuracy
mshak2 Aug 22, 2024
0095d4d
Add 404, 406, and 425 to known_report_codes
mshak2 Aug 22, 2024
d5be40c
Check for logDuplicates in flag options and add nodupe to logEvents if
mshak2 Aug 23, 2024
8ce7e8d
Log when a message is rejected because it's a duplicate
mshak2 Aug 23, 2024
e2d32bb
Add logDuplicates to default options and set to True
mshak2 Aug 23, 2024
5374ad7
Fix duplicate check to look for report code 304
mshak2 Sep 4, 2024
48ef387
Add duplicate logging to after_work, replace relPath with getIDStr
mshak2 Sep 17, 2024
34b49fb
Update 304 codes in init.py
mshak2 Sep 24, 2024
b0ee612
Add HTTP codes 410 and 504
mshak2 Sep 26, 2024
595f935
Add back the return that was removed in 7c95a6fc07c5bb72fc9f2f8efbb1d…
reidsunderland Sep 27, 2024
9c4ecc1
QoS should apply to all instances + consumer close / cancel fix (#1245)
reidsunderland Oct 2, 2024
e42c9b4
remove sleep loop from moth/setup routines.
petersilva Sep 27, 2024
33ee470
implement ebo for broker connection attempts, AMQP
petersilva Oct 2, 2024
07caa05
promote setEbo to moth parent classe for re-use by mqtt
petersilva Oct 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sarracenia/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,13 +417,17 @@ def durationToSeconds(str_value, default=None) -> float:
206: "Partial Content: received and inserted.",
304: "Not modified (Checksum validated, unchanged, so no download resulted.)",
307: "Insertion deferred (writing to temporary part file for the moment.)",
404: "Not Found: no pattern match",
406: "Not Acceptable: file older than fileAgeMax",
410: "Gone: server data different from notification message",
417: "Expectation Failed: invalid notification message (corrupt headers)",
422: "Unprocessable Content: could not determine path to transfer to",
425: "Too Early: file younger than fileAgeMin",
499: "Failure: Not Copied. SFTP/FTP/HTTP download problem",
#FIXME : should not have 503 error code 3 times in a row
# 503: "Service unavailable. delete (File removal not currently supported.)",
503: "Unable to process: Service unavailable",
504: "Gateway Timeout: message too old"
# 503: "Unsupported transport protocol specified in posting."
}

Expand Down
8 changes: 6 additions & 2 deletions sarracenia/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def __repr__(self) -> str:
'inline': False,
'inlineOnly': False,
'identity_method': 'sha512',
'logDuplicates': False,
'logFormat': '%(asctime)s [%(levelname)s] %(name)s %(funcName)s %(message)s',
'logMetrics': False,
'logStdout': False,
Expand Down Expand Up @@ -142,7 +143,7 @@ def __repr__(self) -> str:
# all the boolean settings.
flag_options = [ 'acceptSizeWrong', 'acceptUnmatched', 'amqp_consumer', 'baseUrl_relPath', 'debug', \
'delete', 'discard', 'download', 'dry_run', 'durable', 'exchangeDeclare', 'exchangeSplit', 'logReject', 'realpathFilter', \
'follow_symlinks', 'force_polling', 'inline', 'inlineOnly', 'inplace', 'logMetrics', 'logStdout', 'logReject', 'restore', \
'follow_symlinks', 'force_polling', 'inline', 'inlineOnly', 'inplace', 'logDuplicates', 'logMetrics', 'logStdout', 'logReject', 'restore', \
'messageDebugDump', 'mirror', 'timeCopy', 'notify_only', 'overwrite', 'post_on_start', \
'permCopy', 'persistent', 'queueBind', 'queueDeclare', 'randomize', 'recursive', 'realpathPost', \
'reconnect', 'report', 'reset', 'retry_refilter', 'retryEmptyBeforeExit', 'save',
Expand All @@ -164,7 +165,7 @@ def __repr__(self) -> str:
set_options = [ 'logEvents', 'fileEvents' ]

set_choices = {
'logEvents' : set(sarracenia.flowcb.entry_points + [ 'reject' ]),
'logEvents' : set(sarracenia.flowcb.entry_points + [ 'reject', 'nodupe' ]),
'fileEvents' : set( [ 'create', 'delete', 'link', 'mkdir', 'modify', 'rmdir' ] )
}
# FIXME: doesn't work... wonder why?
Expand Down Expand Up @@ -1607,6 +1608,9 @@ def parse_line(self, component, cfg, cfname, lineno, l ):
setattr(self, k, isTrue(v))
if k in ['logReject'] and self.logReject:
self.logEvents = self.logEvents | set(['reject'])

if k in ['logDuplicates'] and self.logDuplicates:
self.logEvents = self.logEvents | set(['nodupe'])
return

if len(line) < 2:
Expand Down
7 changes: 3 additions & 4 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1070,7 +1070,7 @@ def filter(self) -> None:
(m['fileOp']['rename']))
else:
self.reject(
m, 304, "mask=%s strip=%s url=%s" %
m, 404, "mask=%s strip=%s url=%s" %
(str(mask), strip, urlToMatch))
break

Expand Down Expand Up @@ -1105,7 +1105,7 @@ def filter(self) -> None:
self.o.flatten)
filtered_worklist.append(m)
else:
self.reject(m, 304, "unmatched pattern %s" % url)
self.reject(m, 404, "unmatched pattern %s" % url)

self.worklist.incoming = filtered_worklist

Expand Down Expand Up @@ -1479,7 +1479,6 @@ def file_should_be_downloaded(self, msg) -> bool:
logger.debug("%s file size different, so cannot be the same" %
(msg['new_path']))
return True

else:
end = 0

Expand All @@ -1499,7 +1498,7 @@ def file_should_be_downloaded(self, msg) -> bool:
pass

if new_mtime <= old_mtime:
self.reject(msg, 304,
self.reject(msg, 406,
"mtime not newer %s " % (msg['new_path']))
return False
else:
Expand Down
1 change: 1 addition & 0 deletions sarracenia/flow/winnow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
default_options = {
'acceptUnmatched': True,
'nodupe_ttl': 300,
'logDuplicates': True
}


Expand Down
14 changes: 14 additions & 0 deletions sarracenia/flowcb/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ def after_accept(self, worklist):
(msg['relPath'], msg['report']['code'], msg['report']['message']))
else:
logger.info("rejected: %s " % self._messageAcceptStr(msg))

elif 'nodupe' in self.o.logEvents:
for msg in worklist.rejected:
if 'report' in msg and msg['report']['code'] in [ 304 ]:
logger.info(
"%s rejected: %d %s " %
(msg.getIDStr(), msg['report']['code'], msg['report']['message']))

for msg in worklist.incoming:

Expand Down Expand Up @@ -188,6 +195,13 @@ def after_work(self, worklist):
else:
logger.info("rejected: %s " % self._messageStr(msg))

elif 'nodupe' in self.o.logEvents:
for msg in worklist.rejected:
if 'report' in msg and msg['report']['code'] in [ 304 ]:
logger.info(
"%s rejected: %d %s " %
(msg.getIDStr(), msg['report']['code'], msg['report']['message']))

for msg in worklist.ok:
if 'size' in msg:
self.fileBytes += msg['size']
Expand Down
8 changes: 4 additions & 4 deletions sarracenia/flowcb/nodupe/disk.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,22 +163,22 @@ def after_accept(self, worklist):
if mtime < min_mtime:
m['_deleteOnPost'] |= set(['reject'])
m['reject'] = f"{m['mtime']} too old (nodupe check), oldest allowed {timeflt2str(min_mtime)}"
m.setReport(304, f"{m['mtime']} too old (nodupe check), oldest allowed {timeflt2str(min_mtime)}" )
m.setReport(406, f"{m['mtime']} too old (nodupe check), oldest allowed {timeflt2str(min_mtime)}" )
worklist.rejected.append(m)
continue
elif mtime > max_mtime:
m['_deleteOnPost'] |= set(['reject'])
m['reject'] = f"{m['mtime']} too new (nodupe check), newest allowed {timeflt2str(max_mtime)}"
m.setReport(304, f"{m['mtime']} too new (nodupe check), newest allowed {timeflt2str(max_mtime)}" )
m.setReport(425, f"{m['mtime']} too new (nodupe check), newest allowed {timeflt2str(max_mtime)}" )
worklist.rejected.append(m)
continue

if '_isRetry' in m or self.check_message(m):
new_incoming.append(m)
else:
m['_deleteOnPost'] |= set(['reject'])
m['reject'] = "not modifified 1 (nodupe check)"
m.setReport(304, 'Not modified 1 (cache check)')
m['reject'] = "not modified 1 (nodupe check)"
m.setReport(304, 'Not modified 1 (nodupe check)')
worklist.rejected.append(m)

if self.fp:
Expand Down
8 changes: 4 additions & 4 deletions sarracenia/flowcb/nodupe/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,22 +163,22 @@ def after_accept(self, worklist):
if mtime < min_mtime:
m['_deleteOnPost'] |= set(['reject'])
m['reject'] = f"{m['mtime']} too old (nodupe check), oldest allowed {timeflt2str(min_mtime)}"
m.setReport(304, f"{m['mtime']} too old (nodupe check), oldest allowed {timeflt2str(min_mtime)}" )
m.setReport(406, f"{m['mtime']} too old (nodupe check), oldest allowed {timeflt2str(min_mtime)}" )
worklist.rejected.append(m)
continue
elif mtime > max_mtime:
m['_deleteOnPost'] |= set(['reject'])
m['reject'] = f"{m['mtime']} too new (nodupe check), newest allowed {timeflt2str(max_mtime)}"
m.setReport(304, f"{m['mtime']} too new (nodupe check), newest allowed {timeflt2str(max_mtime)}" )
m.setReport(425, f"{m['mtime']} too new (nodupe check), newest allowed {timeflt2str(max_mtime)}" )
worklist.rejected.append(m)
continue

if '_isRetry' in m or self._is_new(m):
new_incoming.append(m)
else:
m['_deleteOnPost'] |= set(['reject'])
m['reject'] = "not modifified 1 (nodupe check)"
m.setReport(304, 'Not modified 1 (cache check)')
m['reject'] = "not modified 1 (nodupe check)"
m.setReport(304, 'Not modified 1 (nodupe check)')
worklist.rejected.append(m)

logger.debug("items registered in duplicate suppression cache: %d" % (len(self._redis.keys(self._rkey_base + ":*"))) )
Expand Down
21 changes: 21 additions & 0 deletions sarracenia/moth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ def __init__(self, props=None, is_subscriber=True) -> None:
self.metrics = { 'connected': False }
self.metricsReset()

now = time.time()
self.next_connect_time = now
self.next_connect_failures = 0

if (sys.version_info.major == 3) and (sys.version_info.minor < 7):
self.o = {}
for k in default_options:
Expand Down Expand Up @@ -415,6 +419,23 @@ def cleanup(self) -> None:
else:
self.putCleanUp()

def setEbo(self,start)->None:
""" Calculate next retry time using exponential backoff
note that it doesn't look like classic EBO because the time
is multiplied by how long it took to fail. Long failures should not
be retried quickly, but short failures can be variable in duration.
If the timing of failures is variable, the "attempt_duration" will be low,
and so the next_try might get smaller even though it hasn't succeeded yet...
it should eventually settle down to a long period though.
"""
now=time.time()
attempt_duration = now - start
self.next_connect_failures += 1
ebo = 2**self.next_connect_failures
next_try = min(attempt_duration * ebo, 600)
self.next_connect_time = now + next_try
logger.error( f"could not connect. next try in {next_try} seconds.")

if features['amqp']['present']:
import sarracenia.moth.amqp
import sarracenia.moth.amqpconsumer
Expand Down
Loading
Loading