Source code for default_profile.startup.file_logger

#!/usr/bin/env python
# -*- coding: utf-8 -*-
r"""Establish a file-logger for IPython.

Collects both the input and output of every command run through the IPython
interpreter, prepends a timestamp to the commands, and save the untransformed
output to a file.

.. todo:: Logging TODOs

    - Truncate output if it exceeds a certain threshold.
        - Run **dir(np)** or **dir(pd)** a couple of times and the logs
          become swamped.
    - Possibly change that section under the shebang to also include 3
      double quotes and in the comment add system info like py version, venv,
      conda, any of the 1000000 things you could add.

.. note:: Windows Users

    Simulataneous IPython processes may throw.

.. code-block:: py3tb

   PermissionError: [WinError 32]
   The process cannot access the file because it is being used by another process:
   'C:\\Users\\fac\\.ipython\\profile_default\\log\\log-2020-02-26.py' ->
   'C:\\Users\\fac\\.ipython\\profile_default\\log\\log-2020-02-26.py.001~'

"""
import codecs
import functools
import json
import logging
import os
import pprint
import reprlib
import sqlite3
import sys
import time
import traceback

from datetime import datetime
from itertools import groupby
from logging.handlers import MemoryHandler
from pathlib import Path
try:
    from _queue import SimpleQueue
except ImportError:
    from queue import _PySimpleQueue as SimpleQueue

from IPython.core.getipython import get_ipython
from IPython.core.history import HistoryAccessor

# from IPython.core.history import HistorySavinThread, HistoryManager
from IPython.paths import get_ipython_dir
from traitlets.config.configurable import LoggingConfigurable
from traitlets.config.application import LevelFormatter
from traitlets.traitlets import Instance

from default_profile.ipython_config import UsageError


[docs]class NoUnNamedLoggers(NotImplementedError): """Raise this error if the logger a function was called with was anonymous."""
[docs] def __init__(self, *args, **kwargs): super().__init__(self, *args)
def __call__(self): return "".format("You did not provide a name for the logger.")
# TODO: isn't working
[docs]def get_history(session_number=None, raw=True, profile=None): if profile is None: profile = "default" sys.stdout.write("# coding: utf-8\n") # Profiles other than 'default' can be specified here with a profile= argument: hist = HistoryAccessor(profile) if session_number is None: session_number = 1 running_tally = [] for session, lineno, cell in hist.get_range(session=session_number, raw=raw): cell = cell.encode("utf-8") # This line is only needed on Python 2. sys.stdout.write(cell + "\n") # we could even running_tally.append({session: {lineno: cell}}) return running_tally
[docs]def betterConfig(name=None, parent=None): """Similar to :func:`logging.basicConfig`. Parameters ---------- name : str, optional Name of the logger to return. parent : str, optional Name of the parent logger. I.E. One can provide an already initialized logger to this function and it will create a child logger. Defaults to None. Returns ------- :class:`logging.Logger()` Anonymous Logger instance. Notes ----- If a :class:`logging.Filter` class is initialized with no args, it's default behavior is to allow all :class:`logging.LogRecords` to pass. """ # Let's do some basic set up to start. Connect to other present loggers BASIC_FORMAT = ( "[ %(name)s %(relativeCreated)d ] %(levelname)s %(module)s %(message)s " ) shell = get_ipython() log_level = logging.WARNING name = "log_mod" if name is None else name if parent: if isinstance( parent, logging.Logger ): # we needed the parent's name not the instance parent = parent.name better_logger = logging.getLogger(name=name).getChild(parent) else: better_logger = logging.getLogger(name=name) # Now let's set up a couple handlers better_stream = logging.StreamHandler() better_logger.addHandler(better_stream) # literally no idea if this is a good number or not handler = MemoryHandler(100, target=sys.stderr) better_logger.addHandler(handler) better_formatter = LevelFormatter(BASIC_FORMAT + "%(highlevel)s") for handler in better_logger.handlers: handler.setLevel(log_level) handler.setFormatter(better_formatter) filterer = logging.Filter("logger") better_logger.addFilter(filterer) better_logger.setLevel(logging.WARNING) better_logger.addFilter(logging.Filter()) return better_logger
[docs]class ConfigurableLogger(LoggingConfigurable, logging.Logger): logger: logging.Logger name = __name__ level = 30
[docs] def __init__(self, logger=None, **kwargs): super().__init__(**kwargs) self.logger = logger
[docs] def log(self, msg, level=None): """Override the superclasses implementation of log. Allow level to be a keyword argument. Parameters ---------- msg : str Message to send to the logger level : int, optional Log level. Defaults to logging.WARNING or 30. """ if level is None: level = logging.WARNING self.logger.log(msg, level=level)
def __repr__(self): """Pretty print the truncated results of :meth:`traits`.""" return f"<{self.__class__.__name__}> {reprlib.Repr().repr(self.traits())}"
[docs]def logging_decorator(f): """Create a decorator to be used around functions that need to be debugged. Parameters ---------- f : function The function to decorate Notes ----- Calls a logger """ @functools.wraps(f) def wrapper(*args, **kwargs): logging.debug("Entering %s", f.__name__) start = time.time() f(*args, **kwargs) end = time.time() logging.debug("Exiting %s in %-5.2f secs", f.__name__, end - start) return wrapper
[docs]def ipython_logger(): """Saves all commands run in the interactive namespace as valid IPython code. .. warning:: This is not necessarily valid python code. The commands are appended to a file in the directory of the profile in :envvar:`IPYTHONDIR` or fallback to ``~/.ipython``. This file is named based on the date. Parameters ----------- shell : |ip| Global IPython instance. Raises ------ RuntimeError If the configured logger is already logging to today's date. """ shell = get_ipython() if shell is None: return # Setup for the logger shell.logger.logmode = "append" log_dir = shell.profile_dir.log_dir fname = "log-" + time.strftime("%Y-%m-%d") + ".py" filename = os.path.join(log_dir, fname) notnew = os.path.exists(filename) logger = shell.logger try: logger.logstart(filename) except RuntimeError: print(" Already logging to " + logger.logfname) return except PermissionError as e: try: if hasattr(e, WindowsError): print(" Already logging to " + logger.logfname) return else: traceback.print_exc(e) except: breakpoint() # todo # else: # extra_logger = betterConfig(name=filename, parent=STARTUP_LOGGER.name) if notnew: logger.log_write("# =================================\n") else: logger.log_write("#!/usr/bin/env python\n") logger.log_write("# " + fname + "\n") logger.log_write("# IPython automatic logging file\n") logger.log_write("# " + time.strftime("%H:%M:%S") + "\n") logger.log_write("# =================================\n") return logger
[docs]class MyHistoryAccessor(HistoryAccessor): """ Modified HistoryAccessor to fetch the whole ipython history across all sessions """
[docs] def get_tail(self, n=10, raw=True, output=False, include_latest=False): """Get the last n lines from the history database. Parameters ---------- n : int The number of lines to get raw, output : bool See :meth:`get_range` include_latest : bool If False (default), n+1 lines are fetched, and the latest one is discarded. This is intended to be used where the function is called by a user command, which it should not return. Returns ------- Tuples as :meth:`get_range` """ self.writeout_cache() if not include_latest: n += 1 if n is None: cur = self._run_sql( "ORDER BY session DESC, line DESC LIMIT ?", (n,), raw=raw, output=output ) else: cur = self._run_sql( "ORDER BY session DESC, line DESC", (), raw=raw, output=output ) if not include_latest: return reversed(list(cur)[1:]) return reversed(list(cur))
[docs]def rem_dups(lst): # OrderedSet would be nicer, but for simplicity we use this approach return [key for key, _ in groupby(lst)]
[docs]def access_all_history(*args): # Syntax: ipythonhist [LAST X ITEMS] hist = MyHistoryAccessor() history = list(hist.get_tail()) history = rem_dups(history) if len(args) > 0: head = int(args[0]) for entry in history[-head:]: print(entry[2]) else: for entry in history: print(entry[2])
[docs]def stream_logger(logger, log_level=logging.INFO, msg_format=None): """Set up a :class:`logging.Logger` instance, add a stream handler. Should do some validation on the log level there. There's a really useful block of code in the tutorial. Parameters ---------- logger : str Configure a passed logger. See example below. log_level : int, optional Level of log records. Defaults to 20. msg_format : str, optional Representation of logging messages. Uses standard :kbd:`%` style string formatting. Defaults to ``%(asctime)s %(levelname)s %(message)s`` Returns ------- logger : :class:`logging.Logger()` instance Defaults to ``logging.INFO`` and '%(asctime)s : %(levelname)s : %(message)s : ' Examples -------- >>> import logging >>> from default_profile.startup.file_logger import stream_logger >>> LOGGER = stream_logger(logging.getLogger(name=__name__)) """ if isinstance(logger, str): logger = logging.getLogger(logger) elif isinstance(logger, logging.Logger): pass else: raise NoUnNamedLoggers() # TODO: Come up with else. What if they pass a string? if isinstance(log_level, int): level = log_level logger.setLevel(level) handler = logging.StreamHandler(stream=sys.stderr) handler.setLevel(level) if msg_format is None: msg_format = "%(asctime)s %(levelname)s %(message)s\n" formatter = logging.Formatter(msg_format) handler.setFormatter(formatter) logger.addHandler(handler) return logger
[docs]def file_logger( filename, logger=None, shell=None, log_level=logging.INFO, msg_format=None ): assert isinstance(shell, (IPython.core.interactiveshell.InteractiveShell, None)) if shell is None: shell = get_ipython() logdir = shell.profile_dir.log_dir log_file = Path(logdir).joinpath(filename) handler = logging.FileHandler(log_file) handler.setLevel(log_level) if logger is None: logger = logging.getLogger(name=__name__) logger.setLevel(log_level) if msg_format is not None: formatter = logging.Formatter(msg_format) else: formatter = logging.Formatter("%(asctime)s : %(levelname)s : %(message)s : ") handler.setFormatter(formatter) logger.addHandler(handler) return logger
[docs]def json_logger(logger=None, json_formatter=None): handler = logging.StreamHandler() if not json_formatter: fmt = JsonFormatter() else: fmt = json_formatter if not logger: root_logger = logging.getLogger() elif isinstance(logger, str): root_logger = logging.getLogger(name=logger) elif isinstance(logger, logging.Logger): root_logger = logger else: raise Exception root_logger.setLevel(logging.DEBUG) handler.setFormatter(fmt) handler.setLevel(logging.DEBUG) root_logger.addHandler(handler) return root_logger
[docs]class JsonFormatter(logging.Formatter): """Return valid :mod:`json` for a configured handler."""
[docs] def format(self, record): """Format a :class:`logging.LogRecord()` from an :exc:Exception.""" if record.exc_info: exc = traceback.format_exception(*record.exc_info) else: exc = None return json.dumps( { "msg": record.msg % record.args, "timestamp": datetime.utcfromtimestamp(record.created).isoformat() + "Z", "func": record.funcName, "level": record.levelname, "module": record.module, "process_id": record.process, "thread_id": record.thread, "exception": exc, } )
if __name__ == "__main__": ipy_logger = ipython_logger() if ipy_logger is not None and hasattr(ipy_logger, "logmode"): logmode = "append" log_output = True ipy_logger.logmode = logmode ipy_logger.log_output = log_output ipy_logger.timestamp = True # I think this has to be set ipy_logger.log_active = True # Don't use name=__name__ here or it'll dump a log file in your cwd # better_logger = betterConfig(parent=STARTUP_LOGGER)