Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watchtower + Multiprocessing #31

Open
Audace opened this issue Nov 24, 2016 · 10 comments
Open

Watchtower + Multiprocessing #31

Audace opened this issue Nov 24, 2016 · 10 comments

Comments

@Audace
Copy link

Audace commented Nov 24, 2016

The logger is not successfully writing to CloudWatch when using multiprocessing. I tested to see whether this was my configuration by dropping a watchtower handler and using a file handler. This logged perfectly, however, when switching back to the watchtower handler only messages before and after outputs = pool.map(worker, inputs) worked.

Any idea how to fix this? Setting use_queues to True didn't help.

Sample code:

import watchtower
import logging
from multiprocessing import Lock, Process, Queue, current_process, Manager, Pool

def worker(var):
    logger.debug("Incoming variable: %s" % var)
    logger.debug("Outgoing variable: %s" % (var+1))
    return var+1

def main():
    inputs = []
    for i in xrange(1000):
        inputs.append(i)

    logger.debug("Starting run now!")
    pool = Pool(processes=3)
    outputs = pool.map(worker, inputs)
    pool.close()
    pool.join()
    logger.debug("Just finished run")

if __name__ == "__main__":
    logger = logging.getLogger("multi")
    logger.setLevel(logging.DEBUG)

    fh = logging.FileHandler("test.log")
    fh.setLevel(logging.DEBUG)
    logger.addHandler(fh)

    wt_project_handler = watchtower.CloudWatchLogHandler(stream_name="test",
                                                         use_queues=True)
    wt_project_handler.setLevel(logging.DEBUG)
    logger.addHandler(wt_project_handler)

    main()
@Audace
Copy link
Author

Audace commented Nov 24, 2016

This now works when I set use_queues to False. However, I now get the following two errors:

TypeError: ('__init__() takes exactly 3 arguments (2 given)',
<class 'botocore.exceptions.ClientError'>, (u'An error occurred (InvalidSequenceTokenException)
when calling the PutLogEvents operation: The given sequenceToken is invalid. The next expected
sequenceToken is: 49566986361376648647772148512500932485923247621329154002',))

and

An error occurred (ThrottlingException) when calling the PutLogEvents operation (reached max
retries: 4): Rate exceeded

@Audace
Copy link
Author

Audace commented Nov 25, 2016

Solved this issue by using this repo: https://github.com/jruere/multiprocessing-logging, which was spun out of this post: http://stackoverflow.com/questions/641420/how-should-i-log-while-using-multiprocessing-in-python.

All it resulted in was importing multiprocessing_logging and then adding multiprocessing_logging.install_mp_handler(logger)

@Audace Audace closed this as completed Nov 25, 2016
@kislyuk kislyuk reopened this Mar 26, 2017
@kislyuk
Copy link
Owner

kislyuk commented Mar 26, 2017

Thanks, I probably need to add docs on how to deal with this, so reopening this issue to keep track of that.

@spetoolio
Copy link

Just experienced this issue, thanks for the fix @Audace!

@kislyuk it may be worth updating the docs to include a reference to that library. I'm working with django rq and any logging within a worker process was not making it into the watchtower batch. I'd assume other worker libraries like Celery would also experience this issue. Similar to @Audace 's answer, I updated my django app's ready() function (guaranteeing the install_mp_handler() is called after logging is setup and before any logs are sent)

from django.apps import AppConfig

class MyAppConfig(AppConfig):
    name = 'MyApp'

def ready(self):
    import multiprocessing_logging
    import logging
    multiprocessing_logging.install_mp_handler(logging.getLogger("my_apps_primary_logger"))

@spetoolio
Copy link

spetoolio commented Dec 3, 2018

Just wanted to add that although this seemed solved, I still encounter situations where all logging stops reaching CloudWatch suddenly, but continues to successfully log to local log files. I do not see any obvious causes, and after restarting my workers everything is back to normal as if nothing was wrong. If anyone has any thoughts, please feel free to share, and I will update if I come across a solution. @Audace not sure if you saw anything similar after implementing your solution?

@redixhumayun
Copy link

@kislyuk Is this the suggested way to deal with multi-process logging for this library?

@kislyuk
Copy link
Owner

kislyuk commented Jan 1, 2021

The suggested way to use logging in multiprocessing pools is to share nothing. Use one logger per worker process (or thread) and initialize the logger after forking. A shared logger will not work correctly with multiprocessing due to the stateful nature of the logger and race conditions that will arise between different copies of the logger in the different processes.

@redixhumayun
Copy link

@kislyuk So, I'm not using a multiprocessing pool, just manually creating all the child processes I want.

If I pass a separate logger instance to each child process, does each logger instance log to a separate file / stream? Is there a way to collate everything based on based on some metric(like timestamp)?

@kislyuk
Copy link
Owner

kislyuk commented Jan 2, 2021

@redixhumayun that is for you to configure. Check the project documentation for how to configure the log stream names. Check the cloudwatch documentation for how to collate logs.

@redixhumayun
Copy link

I wanted to post my solution here, just in case it is useful to anyone else.

The Python Logging Cookbook has some great examples of how to log to a single file form multiple processes, a sample of that can be found here.

The sample linked above makes use of the QueueHandler class that accepts logs from multiple processes and outputs them somewhere else. Since it is a queue, log order is maintained.

Integrating with WatchTower is as easy as adding an additional handler at the output end of the queue.

You can use the sample code from the link above and just modify the listener_configurer function as below

def listener_configurer():
    root = logging.getLogger()
    h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
    watchtower_handler = watchtower.CloudWatchLogHandler()
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
    h.setFormatter(f)
    watchtower_handler.setFormatter(f)
    root.addHandler(h)
    root.addHandler(watchtower_handler)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants