Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,8 @@
# Install: pip install -r requirements.txt
# Recommended: use a virtualenv at .venv/ (already gitignored)

pysnc>=1.1.0
# Pinned to the fork branch carrying the upstream GlideAggregate contribution
# (ServiceNow/PySNC#146) while that PR is in review. Revert to a released
# version pin (e.g. pysnc>=1.3.0) once GlideAggregate ships upstream.
pysnc @ git+https://github.com/orwellsanimal/PySNC.git@feat/glide-aggregate
python-dotenv>=1.0.0
180 changes: 90 additions & 90 deletions scripts/python/aggregate.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
#!/usr/bin/env python3
"""
GlideAggregate-style queries for PySNC.
"""GlideAggregate-style queries for PySNC — convenience layer + CLI.

PySNC (https://github.com/ServiceNow/pysnc) wraps the ServiceNow Table API
(CRUD) but doesn't expose the Stats/Aggregate API at /api/now/stats/{table},
which is the REST equivalent of the server-side GlideAggregate class. This
module fills that gap by reusing PySNC's authenticated session against the
Stats endpoint.
PySNC's `GlideAggregate` class wraps the ServiceNow Stats API
(`/api/now/stats/{table}`), the REST equivalent of the server-side
`GlideAggregate`. This module is a thin convenience layer on top of it:
`sn_count` / `sn_aggregate` return plain Python ints / dicts that are easy to
consume from scripts, the export pipeline, and LLM tools, plus a CLI for
ad-hoc queries.

Designed to be a clean companion to PySNC — no dependencies beyond
`pysnc` and `requests` (already a transitive dep of pysnc). Could be
contributed upstream to pysnc as a GlideAggregate sibling.
Previously this module hand-rolled the Stats API directly (PySNC had no
native aggregate support). That plumbing now lives upstream in
`pysnc.GlideAggregate`; these functions delegate to it. See
ServiceNow/PySNC#146 for the upstream contribution.

Library usage:
from pysnc import ServiceNowClient
Expand Down Expand Up @@ -40,55 +41,37 @@
python scripts/python/aggregate.py sys_user --group-by identity_type

Reads SN_INSTANCE / SN_USER / SN_PASSWORD from .env when invoked as a CLI.

Grounded against ServiceNow's Aggregate API docs:
servicenow-docs/markdown/api-reference/rest-apis/c_AggregateAPI.md
"""

from __future__ import annotations

from typing import Union

from pysnc import ServiceNowClient
from pysnc.aggregate import GlideAggregate


# ---------------------------------------------------------------------------
# Internal
# ---------------------------------------------------------------------------

def _stats_request(client: ServiceNowClient, table: str, params: dict) -> dict:
"""Call /api/now/stats/{table} using PySNC's authenticated session.

Reuses the client's session so auth, retry, proxy, and cert config
are all inherited — no separate HTTP setup needed.
"""
url = f"{client.instance}/api/now/stats/{table}"
r = client.session.get(url, params=params, headers={"Accept": "application/json"})
r.raise_for_status()
return r.json()

def _split_fields(csv: str) -> list[str]:
"""Split a comma-separated field list, dropping blanks/whitespace."""
return [f.strip() for f in csv.split(",") if f.strip()]

def _parse_stats(stats: dict) -> dict:
"""Extract count + agg fields from a stats block, casting numerics."""
parsed: dict = {}
if "count" in stats:
try:
parsed["count"] = int(stats["count"])
except (TypeError, ValueError):
parsed["count"] = stats["count"]
for agg in ("avg", "sum", "min", "max"):
if agg in stats:
# Stats API returns these as string maps {field: stringValue}.
# Keep as strings — caller decides on float casting since
# not all fields are numeric (e.g. min/max on dates).
parsed[agg] = stats[agg]
return parsed

def _row_group_key(row: GlideAggregate, group_fields: list[str], display_value: bool) -> str:
"""Build a stable ', '-joined key from the current row's group-by values.

def _group_key(entry: dict) -> str:
"""Build a stable string key from groupby_fields[].value entries."""
vals = [gf.get("value", "") or "(empty)" for gf in entry.get("groupby_fields", [])]
return ", ".join(vals)
When display_value is True we use the human-readable label
(get_display_value), otherwise the raw value (get_value) — matching the
semantics callers relied on before this delegated to GlideAggregate.
"""
parts = []
for f in group_fields:
v = row.get_display_value(f) if display_value else row.get_value(f)
parts.append(v or "(empty)")
return ", ".join(parts)


# ---------------------------------------------------------------------------
Expand All @@ -112,32 +95,33 @@ def sn_count(
for reference and choice fields. Set False to get raw choice values or
sys_ids — useful when the keys will be used in further encoded queries.
"""
params: dict = {"sysparm_count": "true"}
if query:
params["sysparm_query"] = query
if group_by:
params["sysparm_group_by"] = group_by
if display_value:
params["sysparm_display_value"] = "true"
group_fields = _split_fields(group_by)

data = _stats_request(client, table, params)
result = data.get("result", {})
ga = client.GlideAggregate(table)
ga.add_aggregate("COUNT")
if query:
ga.add_encoded_query(query)
for f in group_fields:
ga.group_by(f)
if group_fields and display_value:
ga.set_display_value(True)
ga.query()

if not group_fields:
for row in ga:
try:
return int(row.get_aggregate("COUNT"))
except (TypeError, ValueError):
return 0
return 0

if not group_by:
# Ungrouped: result is an object {stats: {count: "N"}}
grouped: dict[str, int] = {}
for row in ga:
key = _row_group_key(row, group_fields, display_value)
try:
return int(result.get("stats", {}).get("count", 0))
grouped[key] = int(row.get_aggregate("COUNT"))
except (TypeError, ValueError):
return 0

# Grouped: result is a list of {stats, groupby_fields}
grouped: dict[str, int] = {}
if isinstance(result, list):
for entry in result:
try:
grouped[_group_key(entry)] = int(entry.get("stats", {}).get("count", 0))
except (TypeError, ValueError):
grouped[_group_key(entry)] = 0
grouped[key] = 0
return grouped


Expand Down Expand Up @@ -167,36 +151,52 @@ def sn_aggregate(
"sum_fields, avg_fields, min_fields, or max_fields."
)

params: dict = {}
group_fields = _split_fields(group_by)
# (api key, csv) pairs in the order the legacy parser emitted them.
field_aggs = [
("avg", avg_fields),
("sum", sum_fields),
("min", min_fields),
("max", max_fields),
]

ga = client.GlideAggregate(table)
if count:
params["sysparm_count"] = "true"
ga.add_aggregate("COUNT")
for api_key, csv in field_aggs:
for f in _split_fields(csv):
ga.add_aggregate(api_key.upper(), f)
if query:
params["sysparm_query"] = query
if group_by:
params["sysparm_group_by"] = group_by
if display_value:
params["sysparm_display_value"] = "true"
if sum_fields:
params["sysparm_sum_fields"] = sum_fields
if avg_fields:
params["sysparm_avg_fields"] = avg_fields
if min_fields:
params["sysparm_min_fields"] = min_fields
if max_fields:
params["sysparm_max_fields"] = max_fields

data = _stats_request(client, table, params)
result = data.get("result", {})

if not group_by:
if isinstance(result, dict):
return _parse_stats(result.get("stats", {}))
ga.add_encoded_query(query)
for f in group_fields:
ga.group_by(f)
if group_fields and display_value:
ga.set_display_value(True)
ga.query()

def parse_row(row: GlideAggregate) -> dict:
parsed: dict = {}
if count:
try:
parsed["count"] = int(row.get_aggregate("COUNT"))
except (TypeError, ValueError):
parsed["count"] = row.get_aggregate("COUNT")
for api_key, csv in field_aggs:
fields = _split_fields(csv)
if fields:
# Keep as strings — caller decides on float casting since not
# all fields are numeric (e.g. min/max on dates).
parsed[api_key] = {f: row.get_aggregate(api_key.upper(), f) for f in fields}
return parsed

if not group_fields:
for row in ga:
return parse_row(row)
return {}

grouped: dict[str, dict] = {}
if isinstance(result, list):
for entry in result:
grouped[_group_key(entry)] = _parse_stats(entry.get("stats", {}))
for row in ga:
grouped[_row_group_key(row, group_fields, display_value)] = parse_row(row)
return grouped


Expand Down
Loading