Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/pyinfra/api/facts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import inspect
import re
import time
from inspect import getcallargs
from socket import error as socket_error, timeout as timeout_error
from typing import TYPE_CHECKING, Any, Callable, Generic, Optional, Type, TypeVar, cast
Expand Down Expand Up @@ -262,12 +263,14 @@ def _get_fact(
ensure_hosts: Optional[Any] = None,
apply_failed_hosts: bool = True,
) -> Any:
fact_start = time.monotonic()
fact = cls()
name = fact.name

fact_kwargs, global_kwargs = _handle_fact_kwargs(state, host, cls, args, kwargs)

kwargs_str = get_kwargs_str(fact_kwargs)
fact_key = f"{name} ({kwargs_str})" if kwargs_str else name
logger.debug(
"Getting fact: %s (%s) (ensure_hosts: %r)",
name,
Expand Down Expand Up @@ -402,4 +405,6 @@ def _get_fact(
if apply_failed_hosts and not status and not global_kwargs["_ignore_errors"]:
state.fail_hosts({host})

state.timings.record_fact(host, fact_key, time.monotonic() - fact_start)

return data
11 changes: 8 additions & 3 deletions src/pyinfra/api/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from __future__ import annotations

import time
from functools import wraps
from inspect import signature
from io import StringIO
Expand Down Expand Up @@ -350,9 +351,13 @@ def command_generator() -> Iterator[PyinfraCommand]:
op_is_change = None
if state.should_check_for_changes():
op_is_change = False
for _ in command_generator():
op_is_change = True
break
prepare_start = time.monotonic()
try:
for _ in command_generator():
op_is_change = True
break
finally:
state.timings.record_op_prepare(op_hash, host, time.monotonic() - prepare_start)
else:
# If not calling the op function to check for change we still want to ensure the args
# are valid, so use Signature.bind to trigger any TypeError.
Expand Down
4 changes: 4 additions & 0 deletions src/pyinfra/api/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ def _run_host_op(state: "State", host: "Host", op_hash: str) -> bool:
commands: list[PyinfraCommand] = []
all_output_lines: list[OutputLine] = []

execute_start = time.monotonic()

# Retry loop
while retry_attempt <= retries:
did_error = False
Expand Down Expand Up @@ -193,6 +195,8 @@ def _run_host_op(state: "State", host: "Host", op_hash: str) -> bool:

break

state.timings.record_op_execute(op_hash, host, time.monotonic() - execute_start)

# Handle results
op_success = return_status = not did_error
host_results = state.get_results_for_host(host)
Expand Down
70 changes: 68 additions & 2 deletions src/pyinfra/api/state.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from __future__ import annotations

from collections import defaultdict
from dataclasses import dataclass
from dataclasses import dataclass, field
from enum import IntEnum
from graphlib import CycleError, TopologicalSorter
from multiprocessing import cpu_count
from typing import TYPE_CHECKING, Callable, Iterator, Optional
from typing import TYPE_CHECKING, Callable, Iterator, Optional, cast

from gevent.pool import Pool
from paramiko import PKey
Expand Down Expand Up @@ -142,6 +142,66 @@ class StateHostResults:
partial_ops = 0


def _resolve_host(host: "Host") -> "Host":
"""
Normalise a (possibly gevent-context-proxy) host to its underlying concrete
Host. Storing the proxy as a dict key works for the duration of the greenlet
but breaks downstream consumers (eg JSON output) that read after the
greenlet's local has been cleared.
"""

get_module = getattr(host, "_get_module", None)
if callable(get_module):
resolved = get_module()
if resolved is not None:
return cast("Host", resolved)
return host


@dataclass
class StateTimings:
"""
Per-run timing data, populated as the deploy progresses. All durations are in
seconds (``time.monotonic`` deltas). ``wall_start``/``wall_end`` use
``time.time`` for a human-readable run duration.
"""

# Wall clock timestamps for the overall run
wall_start: Optional[float] = None
wall_end: Optional[float] = None
# Monotonic timestamps, used for the elapsed delta
run_start: Optional[float] = None
run_end: Optional[float] = None

# op_hash -> {host: seconds spent generating commands during change detection}
op_prepare: dict[str, dict["Host", float]] = field(
default_factory=lambda: defaultdict(dict),
)
# op_hash -> {host: seconds spent executing commands on the remote host}
op_execute: dict[str, dict["Host", float]] = field(
default_factory=lambda: defaultdict(dict),
)
# host -> {fact_key: [seconds, ...]}; a list because some facts may be re-fetched
facts: dict["Host", dict[str, list[float]]] = field(
default_factory=lambda: defaultdict(lambda: defaultdict(list)),
)

def record_op_prepare(self, op_hash: str, host: "Host", seconds: float) -> None:
self.op_prepare[op_hash][_resolve_host(host)] = seconds

def record_op_execute(self, op_hash: str, host: "Host", seconds: float) -> None:
self.op_execute[op_hash][_resolve_host(host)] = seconds

def record_fact(self, host: "Host", fact_key: str, seconds: float) -> None:
self.facts[_resolve_host(host)][fact_key].append(seconds)

@property
def elapsed(self) -> Optional[float]:
if self.run_start is None or self.run_end is None:
return None
return self.run_end - self.run_start


class State:
"""
Manages state for a pyinfra deploy.
Expand All @@ -158,6 +218,9 @@ class State:
# Main gevent pool
pool: "Pool"

# Per-run timing data
timings: "StateTimings"

# Current stage this state is in
current_stage: StateStage = StateStage.Setup
# Warning counters by stage
Expand Down Expand Up @@ -239,6 +302,9 @@ def init(

self.callback_handlers: list[BaseStateCallback] = []

# Per-run timing data (populated by operations.py / facts.py / cli.py)
self.timings = StateTimings()

# Setup greenlet pools
self.pool = Pool(config.PARALLEL)
self.fact_pool = Pool(config.PARALLEL)
Expand Down
48 changes: 44 additions & 4 deletions src/pyinfra_cli/cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import sys
import time
import warnings
from fnmatch import fnmatch
from getpass import getpass
Expand Down Expand Up @@ -28,8 +29,11 @@
print_inventory,
print_meta,
print_results,
print_run_elapsed,
print_state_operations,
print_support_info,
print_timings,
print_timings_json,
)
from .util import exec_file, load_deploy_file, load_func, parse_cli_arg
from .virtualenv import init_virtualenv
Expand Down Expand Up @@ -215,6 +219,26 @@ def _print_support(ctx, param, value):
default=False,
help="Print operations after generating and exit.",
)
@click.option(
"--timings",
is_flag=True,
default=False,
help="Print a summary of the slowest operations and facts at the end of the run.",
)
@click.option(
"--timings-json",
is_flag=True,
default=False,
help="Print structured timing data as JSON to stdout at the end of the run.",
)
@click.option(
"--log-timestamps",
is_flag=True,
default=False,
help="Prefix every log line with a wall-clock timestamp.",
envvar="PYINFRA_LOG_TIMESTAMPS",
show_envvar=True,
)
@click.version_option(
version=__version__,
prog_name="pyinfra",
Expand Down Expand Up @@ -320,6 +344,9 @@ def _main(
debug_all: bool,
debug_facts: bool,
debug_operations: bool,
timings: bool = False,
timings_json: bool = False,
log_timestamps: bool = False,
support: bool = False,
):
# Setup working directory
Expand All @@ -329,7 +356,7 @@ def _main(

# Setup logging & Bootstrap/Venv
#
_setup_log_level(debug, debug_all)
_setup_log_level(debug, debug_all, log_timestamps=log_timestamps)
init_virtualenv()

# Check operations are valid and setup commands
Expand Down Expand Up @@ -445,11 +472,24 @@ def _main(

logger.info("--> Beginning operation run...")
state.set_stage(StateStage.Execute)
run_ops(state, serial=serial, no_wait=no_wait)
state.timings.run_start = time.monotonic()
state.timings.wall_start = time.time()
try:
run_ops(state, serial=serial, no_wait=no_wait)
finally:
state.timings.run_end = time.monotonic()
state.timings.wall_end = time.time()

logger.info("--> Results:")
state.set_stage(StateStage.Disconnect)
print_results(state)

print_run_elapsed(state)
if timings:
print_timings(state)
if timings_json:
print_timings_json(state)

_exit()


Expand Down Expand Up @@ -481,7 +521,7 @@ def _do_confirm(msg: str) -> bool:

# Setup
#
def _setup_log_level(debug, debug_all):
def _setup_log_level(debug, debug_all, log_timestamps: bool = False):
if not debug and not sys.warnoptions:
warnings.simplefilter("ignore")

Expand All @@ -493,7 +533,7 @@ def _setup_log_level(debug, debug_all):
if debug_all:
other_log_level = logging.DEBUG

setup_logging(log_level, other_log_level)
setup_logging(log_level, other_log_level, log_timestamps=log_timestamps)


def _validate_operations(operations, chdir):
Expand Down
17 changes: 15 additions & 2 deletions src/pyinfra_cli/log.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
import os
import time

import click
from typing_extensions import override
Expand Down Expand Up @@ -27,6 +29,10 @@ class LogFormatter(logging.Formatter):
logging.CRITICAL: lambda s: click.style(s, "red", bold=True),
}

def __init__(self, log_timestamps: bool = False) -> None:
super().__init__()
self.log_timestamps = log_timestamps

@override
def format(self, record):
message = record.msg
Expand Down Expand Up @@ -57,20 +63,27 @@ def format(self, record):
if record.levelno in self.level_to_format:
message = self.level_to_format[record.levelno](message)

if self.log_timestamps:
ts = time.strftime("%H:%M:%S", time.localtime())
message = "[{0}] {1}".format(ts, message)

self.previous_was_header = "-->" in message
return message

# If not a string, pass to standard Formatter
return super().format(record)


def setup_logging(log_level, other_log_level=None):
def setup_logging(log_level, other_log_level=None, log_timestamps: bool = False):
if other_log_level:
logging.basicConfig(level=other_log_level)

if not log_timestamps and os.environ.get("PYINFRA_LOG_TIMESTAMPS"):
log_timestamps = True

logger.setLevel(log_level)
handler = LogHandler()
formatter = LogFormatter()
formatter = LogFormatter(log_timestamps=log_timestamps)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.propagate = False
Loading
Loading