Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"pr": "38069",
"modification": 40
"modification": 41
}
11 changes: 3 additions & 8 deletions sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,9 @@
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Type
from typing import TypeVar
from typing import overload

Expand All @@ -68,7 +64,6 @@
from apache_beam.utils import windowed_value

if TYPE_CHECKING:
from apache_beam.coders.typecoders import CoderRegistry
from apache_beam.runners.pipeline_context import PipelineContext

# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
Expand Down Expand Up @@ -122,7 +117,7 @@
T = TypeVar('T')
CoderT = TypeVar('CoderT', bound='Coder')
ProtoCoderT = TypeVar('ProtoCoderT', bound='ProtoCoder')
ConstructorFn = Callable[[Optional[Any], List['Coder'], 'PipelineContext'], Any]
ConstructorFn = Callable[[Optional[Any], list['Coder'], 'PipelineContext'], Any]


def serialize_coder(coder):
Expand Down Expand Up @@ -1508,7 +1503,7 @@ def __hash__(self):

class _OrderedUnionCoder(FastCoder):
def __init__(
self, *coder_types: Tuple[type, Coder], fallback_coder: Optional[Coder]):
self, *coder_types: tuple[type, Coder], fallback_coder: Optional[Coder]):
self._coder_types = coder_types
self._fallback_coder = fallback_coder

Expand Down Expand Up @@ -1816,7 +1811,7 @@ def _create_impl(self):
def to_type_hint(self):
return self._window_coder.to_type_hint()

def _get_component_coders(self) -> List[Coder]:
def _get_component_coders(self) -> list[Coder]:
return [self._window_coder]

def is_deterministic(self) -> bool:
Expand Down
3 changes: 1 addition & 2 deletions sdks/python/apache_beam/coders/coders_test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import unittest
from decimal import Decimal
from typing import Any
from typing import List
from typing import NamedTuple

import pytest
Expand Down Expand Up @@ -145,7 +144,7 @@ class CodersTest(unittest.TestCase):
# nested and unnested context.

# Common test values representing Python's built-in types.
test_values_deterministic: List[Any] = [
test_values_deterministic: list[Any] = [
None,
1,
-1,
Expand Down
3 changes: 1 addition & 2 deletions sdks/python/apache_beam/coders/observable_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import logging
import unittest
from typing import List
from typing import Optional

from apache_beam.coders import observable
Expand All @@ -29,7 +28,7 @@
class ObservableMixinTest(unittest.TestCase):
observed_count = 0
observed_sum = 0
observed_keys: List[Optional[str]] = []
observed_keys: list[Optional[str]] = []

def observer(self, value, key=None):
self.observed_count += 1
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/coders/row_coder_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
("name", str),
("age", np.int32),
("address", typing.Optional[str]),
("aliases", typing.List[str]),
("aliases", list[str]),
("knows_javascript", bool),
("payload", typing.Optional[bytes]),
("custom_metadata", typing.Mapping[str, int]),
Expand All @@ -53,7 +53,7 @@
NullablePerson = typing.NamedTuple(
"NullablePerson",
[("name", typing.Optional[str]), ("age", np.int32),
("address", typing.Optional[str]), ("aliases", typing.List[str]),
("address", typing.Optional[str]), ("aliases", list[str]),
("knows_javascript", bool), ("payload", typing.Optional[bytes]),
("custom_metadata", typing.Mapping[str, int]),
("favorite_time", typing.Optional[Timestamp]),
Expand Down
3 changes: 1 addition & 2 deletions sdks/python/apache_beam/coders/slow_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@
# pytype: skip-file

import struct
from typing import List


class OutputStream(object):
"""For internal use only; no backwards-compatibility guarantees.

A pure Python implementation of stream.OutputStream."""
def __init__(self):
self.data: List[bytes] = []
self.data: list[bytes] = []
self.byte_count = 0

def write(self, b: bytes, nested: bool = False) -> None:
Expand Down
4 changes: 1 addition & 3 deletions sdks/python/apache_beam/coders/standard_coders_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import sys
import unittest
from copy import deepcopy
from typing import Dict
from typing import Tuple

import numpy as np
import yaml
Expand Down Expand Up @@ -283,7 +281,7 @@ def json_value_parser(self, coder_spec):
# Used when --fix is passed.

fix = False
to_fix: Dict[Tuple[int, bytes], bytes] = {}
to_fix: dict[tuple[int, bytes], bytes] = {}

@classmethod
def tearDownClass(cls):
Expand Down
13 changes: 5 additions & 8 deletions sdks/python/apache_beam/coders/typecoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@ def MakeXyzs(v):

# pytype: skip-file
from typing import Any
from typing import Dict
from typing import Iterable
from typing import List
from typing import Type

from apache_beam.coders import coders
from apache_beam.typehints import typehints
Expand All @@ -81,8 +78,8 @@ def MakeXyzs(v):
class CoderRegistry(object):
"""A coder registry for typehint/coder associations."""
def __init__(self, fallback_coder=None):
self._coders: Dict[Any, Type[coders.Coder]] = {}
self.custom_types: List[Any] = []
self._coders: dict[Any, type[coders.Coder]] = {}
self.custom_types: list[Any] = []
self.register_standard_coders(fallback_coder)

def register_standard_coders(self, fallback_coder):
Expand Down Expand Up @@ -110,7 +107,7 @@ def register_fallback_coder(self, fallback_coder):

def _register_coder_internal(
self, typehint_type: Any,
typehint_coder_class: Type[coders.Coder]) -> None:
typehint_coder_class: type[coders.Coder]) -> None:
self._coders[typehint_type] = typehint_coder_class

@staticmethod
Expand All @@ -123,7 +120,7 @@ def _normalize_typehint_type(typehint_type):

def register_coder(
self, typehint_type: Any,
typehint_coder_class: Type[coders.Coder]) -> None:
typehint_coder_class: type[coders.Coder]) -> None:
"""
Register a user type with a coder.

Expand Down Expand Up @@ -244,7 +241,7 @@ class FirstOf(object):
"""For internal use only; no backwards-compatibility guarantees.

A class used to get the first matching coder from a list of coders."""
def __init__(self, coders: Iterable[Type[coders.Coder]]) -> None:
def __init__(self, coders: Iterable[type[coders.Coder]]) -> None:
self._coders = coders

def from_type_hint(self, typehint, registry):
Expand Down
3 changes: 1 addition & 2 deletions sdks/python/apache_beam/dataframe/frame_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from inspect import unwrap
from typing import Any
from typing import Optional
from typing import Tuple
from typing import Union

import pandas as pd
Expand Down Expand Up @@ -163,7 +162,7 @@ def binop(self, other):
DeferredBase._pandas_type_map[None] = _DeferredScalar


def name_and_func(method: Union[str, Callable]) -> Tuple[str, Callable]:
def name_and_func(method: Union[str, Callable]) -> tuple[str, Callable]:
"""For the given method name or method, return the method name and the method
itself.

Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/dataframe/schemas_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def check_df_pcoll_equal(actual):
# pd.Series([b'abc'], dtype=bytes).dtype != 'S'
# pd.Series([b'abc'], dtype=bytes).astype(bytes).dtype == 'S'
# (test data, pandas_type, column_name, beam_type)
COLUMNS: typing.List[typing.Tuple[typing.List[typing.Any],
COLUMNS: list[tuple[list[typing.Any],
typing.Any,
str,
typing.Any]] = [
Expand Down Expand Up @@ -125,7 +125,7 @@ def check_df_pcoll_equal(actual):
SERIES_TESTS = [(pd.Series(arr, dtype=dtype, name=name), arr, beam_type)
for (arr, dtype, name, beam_type) in COLUMNS]

_TEST_ARRAYS: typing.List[typing.List[typing.Any]] = [
_TEST_ARRAYS: list[list[typing.Any]] = [
arr for (arr, _, _, _) in COLUMNS
]
DF_RESULT = list(zip(*_TEST_ARRAYS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#

import logging
import signal
import typing

import apache_beam as beam
Expand All @@ -24,7 +23,6 @@
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor
from apache_beam.transforms import ptransform
from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder
from transformers import BertConfig
from transformers import BertForMaskedLM
from transformers import BertTokenizer
Expand Down Expand Up @@ -84,7 +82,7 @@ def __init__(self, bert_tokenizer):
self.bert_tokenizer = bert_tokenizer
logging.info('Starting Postprocess')

def process(self, element: typing.Tuple[str, PredictionResult]) \
def process(self, element: tuple[str, PredictionResult]) \
-> typing.Iterable[str]:
text, prediction_result = element
inputs = prediction_result.example
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

# Intended only for internal testing.

from typing import Dict
from typing import Optional

import tensorflow as tf
Expand Down Expand Up @@ -114,7 +113,7 @@ def save_tf_model_with_signature(
model=None,
preprocess_input=None,
input_dtype=tf.float32,
feature_description: Optional[Dict] = None,
feature_description: Optional[dict] = None,
**kwargs,
):
"""
Expand Down
13 changes: 6 additions & 7 deletions sdks/python/apache_beam/examples/snippets/snippets_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,12 @@ def process(self, element):
# One can assert outputs and apply them to transforms as well.
# Helps document the contract and checks it at pipeline construction time.
# [START type_hints_transform]
from typing import Tuple, TypeVar
from typing import TypeVar

T = TypeVar('T')

@beam.typehints.with_input_types(T)
@beam.typehints.with_output_types(Tuple[int, T])
@beam.typehints.with_output_types(tuple[int, T])
class MyTransform(beam.PTransform):
def expand(self, pcoll):
return pcoll | beam.Map(lambda x: (len(x), x))
Expand All @@ -351,7 +351,7 @@ def expand(self, pcoll):

# pylint: disable=expression-not-assigned
with self.assertRaises(typehints.TypeCheckError):
words_with_lens | beam.Map(lambda x: x).with_input_types(Tuple[int, int])
words_with_lens | beam.Map(lambda x: x).with_input_types(tuple[int, int])

def test_bad_types_annotations(self):
p = TestPipeline(options=PipelineOptions(pipeline_type_check=True))
Expand Down Expand Up @@ -394,10 +394,10 @@ def process(self, element: int) -> Iterable[int]:
# annotation has an additional Optional for the else clause.
with self.assertRaises(typehints.TypeCheckError):
# [START type_hints_do_fn_annotations_optional]
from typing import List, Optional
from typing import Optional

class FilterEvensDoubleDoFn(beam.DoFn):
def process(self, element: int) -> Optional[List[int]]:
def process(self, element: int) -> Optional[list[int]]:
if element % 2 == 0:
return [element, element]
return None
Expand Down Expand Up @@ -461,7 +461,6 @@ def test_deterministic_key(self):
global Player # pylint: disable=global-variable-not-assigned

# [START type_hints_deterministic_key]
from typing import Tuple

class Player(object):
def __init__(self, team, name):
Expand All @@ -487,7 +486,7 @@ def parse_player_and_score(csv):
totals = (
lines
| beam.Map(parse_player_and_score)
| beam.CombinePerKey(sum).with_input_types(Tuple[Player, int]))
| beam.CombinePerKey(sum).with_input_types(tuple[Player, int]))
# [END type_hints_deterministic_key]

assert_that(
Expand Down
6 changes: 2 additions & 4 deletions sdks/python/apache_beam/internal/dill_pickler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
import types
import zlib
from typing import Any
from typing import Dict
from typing import Tuple

import dill

Expand All @@ -50,7 +48,7 @@

settings = {'dill_byref': None}

patch_save_code = sys.version_info >= (3, 10) and dill.__version__ == "0.3.1.1"
patch_save_code = dill.__version__ == "0.3.1.1"

if patch_save_code:
# The following function is based on 'save_code' from 'dill'
Expand Down Expand Up @@ -315,7 +313,7 @@ def save_module(pickler, obj):
# Pickle module dictionaries (commonly found in lambda's globals)
# by referencing their module.
old_save_module_dict = dill.dill.save_module_dict
known_module_dicts: Dict[int, Tuple[types.ModuleType, Dict[str, Any]]] = {}
known_module_dicts: dict[int, tuple[types.ModuleType, dict[str, Any]]] = {}

@dill.dill.register(dict)
def new_save_module_dict(pickler, obj):
Expand Down
Loading
Loading