Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# Copyright 2024 - 2026 Block, Inc.
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#
# frozen_string_literal: true

require "elastic_graph/constants"
require "elastic_graph/graphql/scalar_coercion_adapters/valid_time_zones"
require "elastic_graph/json_ingestion/schema_definition/factory_extension"
require "elastic_graph/json_ingestion/schema_definition/indexing/field_type/enum_extension"
require "elastic_graph/json_ingestion/schema_definition/indexing/field_type/object_extension"
require "elastic_graph/json_ingestion/schema_definition/indexing/field_type/scalar_extension"
require "elastic_graph/json_ingestion/schema_definition/indexing/field_type/union_extension"
require "elastic_graph/json_ingestion/schema_definition/state_extension"
require "elastic_graph/schema_definition/indexing/field_type/enum"
require "elastic_graph/schema_definition/indexing/field_type/object"
require "elastic_graph/schema_definition/indexing/field_type/scalar"
require "elastic_graph/schema_definition/indexing/field_type/union"

module ElasticGraph
module JSONIngestion
# Namespace for all JSON Schema schema definition support.
#
# {SchemaDefinition::APIExtension} is the primary entry point and should be used as a schema definition extension module.
module SchemaDefinition
# Module designed to be extended onto an {ElasticGraph::SchemaDefinition::API} instance
# to add JSON Schema ingestion serializer capabilities.
module APIExtension
# Default JSON schema options applied to ElasticGraph's built-in scalar types when this extension
# is loaded. Keyed by the un-overridden type name; the lookup at runtime maps each key through
# `type_name_overrides` so renamed built-ins still receive the right options.
BUILT_IN_SCALAR_JSON_SCHEMA_OPTIONS_BY_NAME = {
"Boolean" => {type: "boolean"},
"Float" => {type: "number"},
"ID" => {type: "string"},
"Int" => {type: "integer", minimum: INT_MIN, maximum: INT_MAX},
"String" => {type: "string"},
"Cursor" => {type: "string"},
"Date" => {type: "string", format: "date"},
"DateTime" => {type: "string", format: "date-time"},
"LocalTime" => {type: "string", pattern: VALID_LOCAL_TIME_JSON_SCHEMA_PATTERN},
"TimeZone" => {type: "string", enum: GraphQL::ScalarCoercionAdapters::VALID_TIME_ZONES.to_a.freeze},
"Untyped" => {type: ["array", "boolean", "integer", "number", "object", "string"].freeze},
"JsonSafeLong" => {type: "integer", minimum: JSON_SAFE_LONG_MIN, maximum: JSON_SAFE_LONG_MAX},
"LongString" => {type: "integer", minimum: LONG_STRING_MIN, maximum: LONG_STRING_MAX}
}.freeze

# Wires up the factory extension when this module is extended onto an API instance.
#
# @param api [ElasticGraph::SchemaDefinition::API] the API instance to extend
# @return [void]
# @api private
def self.extended(api)
# Prepend our indexing-field-type extensions onto the core classes so they participate in
# `to_json_schema` / `format_field_json_schema_customizations` / `json_schema_field_metadata_by_field_name`.
# Guarded so re-extending an already-extended API instance is a no-op.
ElasticGraph::SchemaDefinition::Indexing::FieldType::Enum.prepend(Indexing::FieldType::EnumExtension) unless ElasticGraph::SchemaDefinition::Indexing::FieldType::Enum < Indexing::FieldType::EnumExtension
ElasticGraph::SchemaDefinition::Indexing::FieldType::Object.prepend(Indexing::FieldType::ObjectExtension) unless ElasticGraph::SchemaDefinition::Indexing::FieldType::Object < Indexing::FieldType::ObjectExtension
ElasticGraph::SchemaDefinition::Indexing::FieldType::Scalar.prepend(Indexing::FieldType::ScalarExtension) unless ElasticGraph::SchemaDefinition::Indexing::FieldType::Scalar < Indexing::FieldType::ScalarExtension
ElasticGraph::SchemaDefinition::Indexing::FieldType::Union.prepend(Indexing::FieldType::UnionExtension) unless ElasticGraph::SchemaDefinition::Indexing::FieldType::Union < Indexing::FieldType::UnionExtension

state = api.state.extend(StateExtension) # : ElasticGraph::SchemaDefinition::State & StateExtension
state.reserved_type_names << EVENT_ENVELOPE_JSON_SCHEMA_NAME
api.factory.extend FactoryExtension

# Build a lookup from final (post-`type_name_overrides`) names to JSON schema options. We can't
# key directly on `type.name` because users may have overridden the names of built-in scalars
# (e.g. `Cursor` → `PreCursor`); the keys in `BUILT_IN_SCALAR_JSON_SCHEMA_OPTIONS_BY_NAME` are
# always the un-overridden names.
options_by_final_name = BUILT_IN_SCALAR_JSON_SCHEMA_OPTIONS_BY_NAME.to_h do |name, options|
[api.state.type_ref(name).to_final_form.name, options]
end

api.on_built_in_types do |type|
if (options = options_by_final_name[type.name])
scalar_type = type # : ElasticGraph::SchemaDefinition::SchemaElements::ScalarType & SchemaElements::ScalarTypeExtension
scalar_type.json_schema(**options)
elsif type.name == api.state.type_ref("GeoLocation").to_final_form.name
# @type var geo_location_type: ElasticGraph::SchemaDefinition::SchemaElements::TypeWithSubfields & SchemaElements::ObjectInterfaceExtension
geo_location_type = _ = type
names = api.state.schema_elements

# We use `nullable: false` because `GeoLocation` is indexed as a single `geo_point` field,
# and therefore can't support a `latitude` without a `longitude` or vice-versa.
latitude = geo_location_type.graphql_fields_by_name.fetch(names.latitude) # : ElasticGraph::SchemaDefinition::SchemaElements::Field & SchemaElements::FieldExtension
longitude = geo_location_type.graphql_fields_by_name.fetch(names.longitude) # : ElasticGraph::SchemaDefinition::SchemaElements::Field & SchemaElements::FieldExtension
latitude.json_schema minimum: -90, maximum: 90, nullable: false
longitude.json_schema minimum: -180, maximum: 180, nullable: false
end
end
end

# Defines the version number of the current JSON schema. Importantly, every time a change is made that impacts the JSON schema
# artifact, the version number must be incremented to ensure that each different version of the JSON schema is identified by a unique
# version number. The publisher will then include this version number in published events to identify the version of the schema it
# was using. This avoids the need to deploy the publisher and ElasticGraph indexer at the same time to keep them in sync.
#
# @note While this is an important part of how ElasticGraph is designed to support schema evolution, it can be annoying constantly
# have to increment this while rapidly changing the schema during prototyping. You can disable the requirement to increment this
# on every JSON schema change by setting `enforce_json_schema_version` to `false` in your `Rakefile`.
#
# @param version [Integer] current version number of the JSON schema artifact
# @return [void]
# @see Local::RakeTasks#enforce_json_schema_version
def json_schema_version(version)
state = json_ingestion_state

if !version.is_a?(Integer) || version < 1
raise Errors::SchemaError, "`json_schema_version` must be a positive integer. Specified version: #{version}"
end

if state.json_schema_version
raise Errors::SchemaError, "`json_schema_version` can only be set once on a schema. Previously-set version: #{state.json_schema_version}"
end

state.json_schema_version = version
state.json_schema_version_setter_location = caller_locations(1, 1).to_a.first
nil
end

# Defines strictness of the JSON schema validation. By default, the JSON schema will require all fields to be provided by the
# publisher (but they can be nullable) and will ignore extra fields that are not defined in the schema. Use this method to
# configure this behavior.
#
# @param allow_omitted_fields [bool] Whether nullable fields can be omitted from indexing events.
# @param allow_extra_fields [bool] Whether extra fields (e.g. beyond fields defined in the schema) can be included in indexing events.
# @return [void]
#
# @note If you allow both omitted fields and extra fields, ElasticGraph's JSON schema validation will allow (and ignore) misspelled
# field names in indexing events. For example, if the ElasticGraph schema has a nullable field named `parentId` but the publisher
# accidentally provides it as `parent_id`, ElasticGraph would happily ignore the `parent_id` field entirely, because `parentId`
# is allowed to be omitted and `parent_id` would be treated as an extra field. Therefore, we recommend that you only set one of
# these to `true` (or none).
def json_schema_strictness(allow_omitted_fields: false, allow_extra_fields: true)
state = json_ingestion_state

unless [true, false].include?(allow_omitted_fields)
raise Errors::SchemaError, "`allow_omitted_fields` must be true or false"
end

unless [true, false].include?(allow_extra_fields)
raise Errors::SchemaError, "`allow_extra_fields` must be true or false"
end

state.allow_omitted_json_schema_fields = allow_omitted_fields
state.allow_extra_json_schema_fields = allow_extra_fields
nil
end

private

# Returns the API's `state` narrowed to include this gem's `StateExtension`. Centralizes
# the Steep cast that's needed because Steep can't see the `extend(StateExtension)` applied
# at runtime in `extended`.
def json_ingestion_state
state # : ElasticGraph::SchemaDefinition::State & StateExtension
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Copyright 2024 - 2026 Block, Inc.
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#
# frozen_string_literal: true

require "elastic_graph/json_ingestion/schema_definition/indexing/index_extension"
require "elastic_graph/json_ingestion/schema_definition/results_extension"
require "elastic_graph/json_ingestion/schema_definition/schema_artifact_manager_extension"
require "elastic_graph/json_ingestion/schema_definition/schema_elements/enum_type_extension"
require "elastic_graph/json_ingestion/schema_definition/schema_elements/field_extension"
require "elastic_graph/json_ingestion/schema_definition/schema_elements/object_interface_extension"
require "elastic_graph/json_ingestion/schema_definition/schema_elements/scalar_type_extension"
require "elastic_graph/json_ingestion/schema_definition/schema_elements/type_reference_extension"

module ElasticGraph
module JSONIngestion
module SchemaDefinition
# Extension module applied to `ElasticGraph::SchemaDefinition::Factory` to wire up
# JSON Schema support on Results and SchemaArtifactManager instances.
#
# @api private
module FactoryExtension
# @private
def new_enum_type(name)
super(name) do |type|
type.extend SchemaElements::EnumTypeExtension
yield type if block_given?
end
end

# @private
def new_field(**kwargs, &block)
super(**kwargs) do |field|
field.extend SchemaElements::FieldExtension
block&.call(field)
end
end

# @private
def new_index(name, settings, type, &block)
super(name, settings, type) do |index|
index.extend Indexing::IndexExtension
index.require_id_in_json_schema
block&.call(index)
end
end

# @private
def new_interface_type(name)
super(name) do |type|
type.extend SchemaElements::ObjectInterfaceExtension
yield type if block_given?
end
end

# @private
def new_object_type(name)
super(name) do |type|
type.extend SchemaElements::ObjectInterfaceExtension
yield type if block_given?
end
end

# @private
def new_scalar_type(name)
super(name) do |type|
type.extend SchemaElements::ScalarTypeExtension
yield type if block_given?
type.validate_json_schema_configuration! unless state.initially_registered_built_in_types.empty?
end
end

# @private
def new_type_reference(name)
super(name).extend(SchemaElements::TypeReferenceExtension)
end

# Creates a new Results instance with JSON Schema extensions.
#
# @return [ElasticGraph::SchemaDefinition::Results] the created results instance
def new_results
super.extend(ResultsExtension)
end

# Creates a new SchemaArtifactManager instance with JSON Schema extensions.
#
# @return [ElasticGraph::SchemaDefinition::SchemaArtifactManager] the created artifact manager
def new_schema_artifact_manager(...)
super.extend(SchemaArtifactManagerExtension)
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Copyright 2024 - 2026 Block, Inc.
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#
# frozen_string_literal: true

require "elastic_graph/constants"
require "elastic_graph/json_ingestion/schema_definition/indexing/json_schema_field_metadata"
require "elastic_graph/support/hash_util"

module ElasticGraph
module JSONIngestion
module SchemaDefinition
# Namespace for JSON-schema-aware indexing components.
module Indexing
# Extends indexing fields with JSON schema generation behavior.
#
# @api private
module FieldExtension
# JSON schema overrides that automatically apply to specific mapping types so that the JSON schema
# validation will reject values which cannot be indexed into fields of a specific mapping type.
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/current/number.html Elasticsearch numeric field type documentation
# @note We don't handle `integer` here because it's the default numeric type (handled by our definition of the `Int` scalar type).
# @note Likewise, we don't handle `long` here because a custom scalar type must be used for that since GraphQL's `Int` type can't handle long values.
JSON_SCHEMA_OVERRIDES_BY_MAPPING_TYPE = {
"byte" => {"minimum" => -(2**7), "maximum" => (2**7) - 1},
"short" => {"minimum" => -(2**15), "maximum" => (2**15) - 1},
"keyword" => {"maxLength" => DEFAULT_MAX_KEYWORD_LENGTH},
"text" => {"maxLength" => DEFAULT_MAX_TEXT_LENGTH}
}

# @return [Hash<Symbol, Object>] user-specified JSON schema customizations for this field
def json_schema_customizations
@json_schema_customizations
end

# @private
def with_json_schema(json_schema_layers:, json_schema_customizations:)
@json_schema_layers = json_schema_layers
@json_schema_customizations = json_schema_customizations
self
end

# Returns the JSON schema definition for this field.
#
# @return [Hash<String, Object>] the JSON schema hash
def json_schema
@json_schema ||=
json_schema_layers
.reverse # resolve layers from innermost to outermost wrappings
.reduce(inner_json_schema) { |acc, layer| process_layer(layer, acc) }
.merge(outer_json_schema_customizations)
.merge({"description" => doc_comment}.compact)
.then { |hash| Support::HashUtil.stringify_keys(hash) }
end

# @return [JSONSchemaFieldMetadata] metadata about this field for inclusion in the JSON schema
def json_schema_metadata
JSONSchemaFieldMetadata.new(type: type.name, name_in_index: name_in_index)
end

def nullable?
json_schema_layers.include?(:nullable)
end

private

def json_schema_layers
@json_schema_layers
end

def inner_json_schema
user_specified_customizations =
if user_specified_json_schema_customizations_go_on_outside?
{} # : ::Hash[::String, untyped]
else
Support::HashUtil.stringify_keys(json_schema_customizations)
end

customizations_from_mapping = JSON_SCHEMA_OVERRIDES_BY_MAPPING_TYPE[mapping["type"]] || {}
customizations = customizations_from_mapping.merge(user_specified_customizations)
# @type var field_type: _JSONFieldType
field_type = _ = indexing_field_type
customizations = field_type.format_field_json_schema_customizations(customizations)

ref = {"$ref" => "#/$defs/#{type.unwrapped_name}"}
return ref if customizations.empty?

# Combine any customizations with the type ref under an "allOf" subschema:
# all of these properties must hold true for the type to be valid.
#
# Note that if we simply combine the customizations with the `$ref`
# at the same level, it will not work, because other subschema
# properties are ignored when they are in the same object as a `$ref`:
# https://github.com/json-schema-org/JSON-Schema-Test-Suite/blob/2.0.0/tests/draft7/ref.json#L165-L168
{"allOf" => [ref, customizations]}
end

def outer_json_schema_customizations
return {} unless user_specified_json_schema_customizations_go_on_outside?
Support::HashUtil.stringify_keys(json_schema_customizations)
end

# Indicates if the user-specified JSON schema customizations should go on the inside
# (where they normally go) or on the outside. They only go on the outside when it's
# an array field, because then they apply to the array itself instead of the items in the
# array.
def user_specified_json_schema_customizations_go_on_outside?
json_schema_layers.include?(:array)
end

def process_layer(layer, schema)
case layer
when :nullable
# Here we use "anyOf" to ensure that JSON can either match the schema OR null.
#
# (Using "oneOf" would mean that if we had a schema that also allowed null,
# null would never be allowed, since "oneOf" must match exactly one subschema).
{
"anyOf" => [
schema,
{"type" => "null"}
]
}
when :array
{"type" => "array", "items" => schema}
end
end
end
end
end
end
end
Loading
Loading