Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions boto3/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,19 @@ class PythonDeprecationWarning(Warning):
pass


class CredentialSecurityWarning(UserWarning):
"""
Emitted when boto3 detects that long-term AWS credentials (access key IDs
prefixed with ``AKIA``) are in use. Long-term credentials do not expire
automatically and carry a higher risk than temporary credentials obtained
via IAM roles or AWS IAM Identity Center.

To suppress this warning, set the environment variable
``AWS_SUPPRESS_CREDENTIAL_WARNINGS=1``.
"""

pass


class InvalidCrtTransferConfigError(Boto3Error):
pass
56 changes: 55 additions & 1 deletion boto3/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import copy
import os
import warnings

import botocore.session
from botocore.client import Config
Expand All @@ -24,7 +25,11 @@

import boto3
import boto3.utils
from boto3.exceptions import ResourceNotExistsError, UnknownAPIVersionError
from boto3.exceptions import (
CredentialSecurityWarning,
ResourceNotExistsError,
UnknownAPIVersionError,
)

from .resources.factory import ResourceFactory

Expand Down Expand Up @@ -105,6 +110,7 @@ def __init__(
)
self._setup_loader()
self._register_default_handlers()
self._long_term_credential_warning_issued = False

def __repr__(self):
return '{}(region_name={})'.format(
Expand Down Expand Up @@ -334,6 +340,9 @@ def client(
# botocore version mismatches in AWS Lambda.
del create_client_kwargs['aws_account_id']

self._warn_if_long_term_credentials(
explicit_access_key=aws_access_key_id
)
return self._session.create_client(
service_name, **create_client_kwargs
)
Expand Down Expand Up @@ -506,6 +515,51 @@ def resource(

return cls(client=client)

def _warn_if_long_term_credentials(self, explicit_access_key=None):
"""Emit a warning when long-term IAM user credentials are detected.

Long-term credentials (access keys prefixed with ``AKIA``) carry a
higher risk than temporary credentials because they never expire
automatically. This warning is emitted at most once per
:class:`Session` instance and can be silenced by setting the
``AWS_SUPPRESS_CREDENTIAL_WARNINGS`` environment variable to ``1``.

The warning is skipped when the caller explicitly supplies an access
key to :meth:`client` or :meth:`resource`, because the caller is
assumed to be intentionally overriding session-level credentials.
"""
if self._long_term_credential_warning_issued:
return
if os.environ.get('AWS_SUPPRESS_CREDENTIAL_WARNINGS') == '1':
return
if explicit_access_key is not None:
return

credentials = self.get_credentials()
if credentials is None:
return

resolved = credentials.resolve()
if resolved is None or not resolved.access_key:
return

if resolved.access_key.startswith('AKIA'):
self._long_term_credential_warning_issued = True
warnings.warn(
"boto3 detected long-term AWS credentials (access key ID "
"starting with 'AKIA'). Long-term credentials do not expire "
"automatically and may pose a security risk if compromised. "
"Consider switching to a safer alternative:\n"
" - IAM roles (recommended for EC2 / Lambda / ECS / EKS)\n"
" - IAM Identity Center (SSO) for local development: "
"`aws sso login`\n"
" - AWS STS AssumeRole for cross-account access\n"
"To suppress this warning, set the environment variable "
"AWS_SUPPRESS_CREDENTIAL_WARNINGS=1.",
CredentialSecurityWarning,
stacklevel=3,
)

def _register_default_handlers(self):
# S3 customizations
self._session.register(
Expand Down
81 changes: 80 additions & 1 deletion tests/unit/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from botocore.exceptions import NoCredentialsError, UnknownServiceError

from boto3 import __version__
from boto3.exceptions import ResourceNotExistsError
from boto3.exceptions import CredentialSecurityWarning, ResourceNotExistsError
from boto3.session import Session
from tests import BaseTestCase, mock

Expand Down Expand Up @@ -458,3 +458,82 @@ def test_can_reach_events(self):
session = Session(botocore_session=mock_bc_session)
session.events
mock_bc_session.get_component.assert_called_with('event_emitter')


class TestLongTermCredentialWarning(BaseTestCase):
"""Tests for Session._warn_if_long_term_credentials."""

def _make_session_with_key(self, access_key):
"""Return a Session whose get_credentials() yields the given key."""
mock_bc_session = self.bc_session_cls()
session = Session(botocore_session=mock_bc_session)

mock_creds = mock.Mock()
mock_resolved = mock.Mock()
mock_resolved.access_key = access_key
mock_creds.resolve.return_value = mock_resolved
session.get_credentials = mock.Mock(return_value=mock_creds)

return session

def _assert_no_credential_warning(self, func):
"""Assert that func() emits no CredentialSecurityWarning."""
import warnings as _warnings

with _warnings.catch_warnings(record=True) as record:
_warnings.simplefilter('always')
func()
cred_warnings = [
w
for w in record
if issubclass(w.category, CredentialSecurityWarning)
]
assert len(cred_warnings) == 0, (
f"Unexpected CredentialSecurityWarning: {cred_warnings}"
)

def test_warns_for_long_term_akia_credentials(self):
session = self._make_session_with_key('AKIAIOSFODNN7EXAMPLE')
with pytest.warns(CredentialSecurityWarning, match='AKIA'):
session._warn_if_long_term_credentials()

def test_no_warning_for_temporary_asia_credentials(self):
session = self._make_session_with_key('ASIAIOSFODNN7EXAMPLE')
self._assert_no_credential_warning(
session._warn_if_long_term_credentials
)

def test_no_warning_when_explicit_access_key_provided(self):
"""Explicit credentials passed to client() suppress the warning."""
session = self._make_session_with_key('AKIAIOSFODNN7EXAMPLE')
self._assert_no_credential_warning(
lambda: session._warn_if_long_term_credentials(
explicit_access_key='AKIAIOSFODNN7EXAMPLE'
)
)

def test_warning_issued_only_once_per_session(self):
session = self._make_session_with_key('AKIAIOSFODNN7EXAMPLE')
with pytest.warns(CredentialSecurityWarning):
session._warn_if_long_term_credentials()
# Second call must not produce another warning.
self._assert_no_credential_warning(
session._warn_if_long_term_credentials
)

def test_no_warning_when_env_var_suppresses(self):
with mock.patch.dict(
'os.environ', {'AWS_SUPPRESS_CREDENTIAL_WARNINGS': '1'}
):
session = self._make_session_with_key('AKIAIOSFODNN7EXAMPLE')
self._assert_no_credential_warning(
session._warn_if_long_term_credentials
)

def test_no_warning_when_credentials_are_none(self):
mock_bc_session = self.bc_session_cls()
session = Session(botocore_session=mock_bc_session)
session.get_credentials = mock.Mock(return_value=None)
self._assert_no_credential_warning(
session._warn_if_long_term_credentials
)