From fdef095ba17193dcf32c86818ada20a1d79c5090 Mon Sep 17 00:00:00 2001 From: minwoox Date: Tue, 30 Jun 2026 15:07:33 +0900 Subject: [PATCH] Store xDS resources as YAML instead of JSON Motivation: - xDS resources were stored as JSON files. YAML is more human-readable and is the preferred format for configuration in the Kubernetes/Envoy ecosystem. Modifications: - `XdsResourceManager`: `fileName()` now returns `.yaml`; `push()` uses `Change.ofYamlUpsert()`; `updateOrDelete()` falls back to the legacy `.json` path when the `.yaml` file is absent; `update()` atomically removes the old `.json` and creates the new `.yaml` in a single commit. - `XdsResourceWatchingService`: changed `handleXdsResource()` signature from `String contentAsText` to `JsonNode content`; both call sites now pass `entry.content()` / `change.content()` directly instead of `contentAsText()`. Also accepts `EntryType.YAML` and handles `UPSERT_YAML`. Result: - Newly created xDS resources are stored as `.yaml` files. - Existing `.json` files remain readable; they are atomically migrated to `.yaml` on the first update. --- .../k8s/XdsKubernetesNodeIpExtractorTest.java | 12 +- .../server/command/ContentTransformer.java | 3 +- .../git/AbstractChangesApplier.java | 23 ++ .../git/TransformingChangesApplier.java | 21 +- .../features/xds/K8sAggregatorStatus.tsx | 2 +- webapp/src/dogma/features/xds/XdsTypes.ts | 8 +- webapp/src/dogma/features/xds/xdsApiSlice.ts | 80 ++++-- .../xds/cluster/v1/XdsClusterService.java | 4 +- .../xds/endpoint/v1/XdsEndpointService.java | 4 +- .../v1/XdsEndpointUpdateScheduler.java | 117 ++++++-- .../internal/CentralDogmaXdsResources.java | 9 +- .../xds/internal/ControlPlaneService.java | 10 +- .../xds/internal/XdsEndpointReadService.java | 34 ++- .../xds/internal/XdsResourceManager.java | 98 +++++-- .../internal/XdsResourceWatchingService.java | 22 +- .../XdsKubernetesEndpointFetchingService.java | 87 ++++-- .../xds/k8s/v1/XdsKubernetesService.java | 4 +- .../xds/listener/v1/XdsListenerService.java | 4 +- .../xds/route/v1/XdsRouteService.java | 4 +- .../XdsEndpointReadPermissionTest.java | 6 +- .../XdsLegacyJsonCompatibilityTest.java | 251 ++++++++++++++++++ .../XdsResourceWatchingServiceTest.java | 11 +- .../xds/internal/XdsWritePermissionTest.java | 2 +- .../v1/AggregatingMultipleKubernetesTest.java | 19 +- .../xds/k8s/v1/XdsKubernetesServiceTest.java | 12 +- 25 files changed, 697 insertions(+), 150 deletions(-) create mode 100644 xds/src/test/java/com/linecorp/centraldogma/xds/internal/XdsLegacyJsonCompatibilityTest.java diff --git a/it/xds-k8s-node-ip-extractor/src/test/java/com/linecorp/centraldogma/it/xds/k8s/XdsKubernetesNodeIpExtractorTest.java b/it/xds-k8s-node-ip-extractor/src/test/java/com/linecorp/centraldogma/it/xds/k8s/XdsKubernetesNodeIpExtractorTest.java index 193eb8294c..59dbb10e53 100644 --- a/it/xds-k8s-node-ip-extractor/src/test/java/com/linecorp/centraldogma/it/xds/k8s/XdsKubernetesNodeIpExtractorTest.java +++ b/it/xds-k8s-node-ip-extractor/src/test/java/com/linecorp/centraldogma/it/xds/k8s/XdsKubernetesNodeIpExtractorTest.java @@ -152,15 +152,15 @@ void extractsNodeIpFromLabel() throws Exception { final Repository fooGroup = dogma.projectManager().get(XDS_CENTRAL_DOGMA_PROJECT) .repos().get("foo"); final Entry aggregatorEntry = - fooGroup.get(Revision.HEAD, Query.ofJson( - K8S_ENDPOINT_AGGREGATORS_DIRECTORY + aggregatorId + ".json")).join(); + fooGroup.get(Revision.HEAD, Query.ofYaml( + K8S_ENDPOINT_AGGREGATORS_DIRECTORY + aggregatorId + ".yaml")).join(); // KubernetesEndpointsUpdater commits the resolved endpoints in the next revision. await().until(() -> fooGroup.normalizeNow(Revision.HEAD) .equals(aggregatorEntry.revision().forward(1))); final Entry endpointEntry = fooGroup.get( - Revision.HEAD, Query.ofJson(K8S_ENDPOINTS_DIRECTORY + aggregatorId + ".json")).join(); + Revision.HEAD, Query.ofYaml(K8S_ENDPOINTS_DIRECTORY + aggregatorId + ".yaml")).join(); // The endpoints must use the label values resolved by LabelBasedNodeIpExtractor, // not the default InternalIP addresses (1.1.1.1 / 2.2.2.2). @@ -228,13 +228,13 @@ void fallsBackToInternalIpWhenLabelKeyIsAbsent() throws Exception { final Repository fooGroup = dogma.projectManager().get(XDS_CENTRAL_DOGMA_PROJECT) .repos().get("foo"); final Entry aggregatorEntry = - fooGroup.get(Revision.HEAD, Query.ofJson( - K8S_ENDPOINT_AGGREGATORS_DIRECTORY + aggregatorId + ".json")).join(); + fooGroup.get(Revision.HEAD, Query.ofYaml( + K8S_ENDPOINT_AGGREGATORS_DIRECTORY + aggregatorId + ".yaml")).join(); await().until(() -> fooGroup.normalizeNow(Revision.HEAD) .equals(aggregatorEntry.revision().forward(1))); final Entry endpointEntry = fooGroup.get( - Revision.HEAD, Query.ofJson(K8S_ENDPOINTS_DIRECTORY + aggregatorId + ".json")).join(); + Revision.HEAD, Query.ofYaml(K8S_ENDPOINTS_DIRECTORY + aggregatorId + ".yaml")).join(); assertThatJson(endpointEntry.content()).isEqualTo( '{' + " \"clusterName\": \"groups/foo/k8s/clusters/" + aggregatorId + "\"," + diff --git a/server/src/main/java/com/linecorp/centraldogma/server/command/ContentTransformer.java b/server/src/main/java/com/linecorp/centraldogma/server/command/ContentTransformer.java index 3e8d75d091..30176aa767 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/command/ContentTransformer.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/command/ContentTransformer.java @@ -40,7 +40,8 @@ public class ContentTransformer { */ public ContentTransformer(String path, EntryType entryType, BiFunction transformer) { this.path = requireNonNull(path, "path"); - checkArgument(entryType == EntryType.JSON, "entryType: %s (expected: %s)", entryType, EntryType.JSON); + checkArgument(entryType == EntryType.JSON || entryType == EntryType.YAML, + "entryType: %s (expected: %s or %s)", entryType, EntryType.JSON, EntryType.YAML); this.entryType = requireNonNull(entryType, "entryType"); this.transformer = requireNonNull(transformer, "transformer"); } diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/AbstractChangesApplier.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/AbstractChangesApplier.java index f4568fb8d5..774829f1e0 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/AbstractChangesApplier.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/AbstractChangesApplier.java @@ -37,6 +37,7 @@ import com.linecorp.centraldogma.common.CentralDogmaException; import com.linecorp.centraldogma.common.Revision; import com.linecorp.centraldogma.internal.Jackson; +import com.linecorp.centraldogma.internal.Yaml; import com.linecorp.centraldogma.server.storage.StorageException; abstract class AbstractChangesApplier { @@ -98,6 +99,28 @@ public void apply(DirCacheEntry ent) { } } + static final class InsertYaml extends PathEdit { + private final ObjectInserter inserter; + private final JsonNode jsonNode; + + InsertYaml(String entryPath, ObjectInserter inserter, JsonNode jsonNode) { + super(entryPath); + this.inserter = inserter; + this.jsonNode = jsonNode; + } + + @Override + public void apply(DirCacheEntry ent) { + try { + final byte[] yamlBytes = Yaml.writeValueAsString(jsonNode).getBytes(UTF_8); + ent.setObjectId(inserter.insert(Constants.OBJ_BLOB, yamlBytes)); + ent.setFileMode(FileMode.REGULAR_FILE); + } catch (IOException e) { + throw new StorageException("failed to create a new YAML blob", e); + } + } + } + static final class InsertText extends PathEdit { private final ObjectInserter inserter; private final String text; diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/TransformingChangesApplier.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/TransformingChangesApplier.java index 20c208852a..ec56c6ec3e 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/TransformingChangesApplier.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/TransformingChangesApplier.java @@ -35,6 +35,7 @@ import com.linecorp.centraldogma.common.EntryType; import com.linecorp.centraldogma.common.Revision; import com.linecorp.centraldogma.internal.Jackson; +import com.linecorp.centraldogma.internal.Yaml; import com.linecorp.centraldogma.server.command.ContentTransformer; final class TransformingChangesApplier extends AbstractChangesApplier { @@ -42,8 +43,8 @@ final class TransformingChangesApplier extends AbstractChangesApplier { private final ContentTransformer transformer; TransformingChangesApplier(ContentTransformer transformer) { - checkArgument(transformer.entryType() == EntryType.JSON, - "transformer: %s (expected: JSON type)", transformer); + checkArgument(transformer.entryType() == EntryType.JSON || transformer.entryType() == EntryType.YAML, + "transformer: %s (expected: JSON or YAML type)", transformer); //noinspection unchecked this.transformer = (ContentTransformer) transformer; } @@ -55,13 +56,23 @@ int doApply(Revision headRevision, DirCache dirCache, final DirCacheEntry oldEntry = dirCache.getEntry(changePath); final byte[] oldContent = oldEntry != null ? reader.open(oldEntry.getObjectId()).getBytes() : null; - final JsonNode oldJsonNode = oldContent != null ? Jackson.readTree(oldContent) - : JsonNodeFactory.instance.nullNode(); + final JsonNode oldJsonNode; + if (oldContent == null) { + oldJsonNode = JsonNodeFactory.instance.nullNode(); + } else if (transformer.entryType() == EntryType.YAML) { + oldJsonNode = Yaml.readTree(oldContent); + } else { + oldJsonNode = Jackson.readTree(oldContent); + } try { final JsonNode newJsonNode = transformer.transformer().apply(headRevision, oldJsonNode.deepCopy()); requireNonNull(newJsonNode, "transformer.transformer().apply() returned null"); if (!Objects.equals(newJsonNode, oldJsonNode)) { - applyPathEdit(dirCache, new InsertJson(changePath, inserter, newJsonNode)); + if (transformer.entryType() == EntryType.YAML) { + applyPathEdit(dirCache, new InsertYaml(changePath, inserter, newJsonNode)); + } else { + applyPathEdit(dirCache, new InsertJson(changePath, inserter, newJsonNode)); + } return 1; } } catch (CentralDogmaException e) { diff --git a/webapp/src/dogma/features/xds/K8sAggregatorStatus.tsx b/webapp/src/dogma/features/xds/K8sAggregatorStatus.tsx index 12e7f2c0c0..a857fa0061 100644 --- a/webapp/src/dogma/features/xds/K8sAggregatorStatus.tsx +++ b/webapp/src/dogma/features/xds/K8sAggregatorStatus.tsx @@ -22,7 +22,7 @@ import { useGetGroupHistoryQuery, useGetResourceQuery } from 'dogma/features/xds // The aggregator writes its generated endpoints to this path; reading it tells us whether (and when) the // aggregator has produced endpoints from Kubernetes. -const generatedPath = (id: string) => `/k8s/endpoints/${id}.json`; +const generatedPath = (id: string) => `/k8s/endpoints/${id}.yaml`; // eslint-disable-next-line @typescript-eslint/no-explicit-any function countEndpoints(content: any): { localities: number; endpoints: number } { diff --git a/webapp/src/dogma/features/xds/XdsTypes.ts b/webapp/src/dogma/features/xds/XdsTypes.ts index 447df135cb..27c3989f9a 100644 --- a/webapp/src/dogma/features/xds/XdsTypes.ts +++ b/webapp/src/dogma/features/xds/XdsTypes.ts @@ -48,17 +48,17 @@ export interface GroupDto { export interface XdsResourceDto { // The resource id, i.e. the file path under the type directory without the - // leading '/{type}/' prefix and the trailing '.json' suffix. + // leading '/{type}/' prefix and the trailing file extension (.yaml or .json). id: string; - // The full repository path, e.g. '/clusters/foo.json'. + // The full repository path, e.g. '/clusters/foo.yaml'. path: string; revision: number; } -// The full xDS resource name derived from its repository path, e.g. group 'foo' + '/clusters/c1.json' +// The full xDS resource name derived from its repository path, e.g. group 'foo' + '/clusters/c1.yaml' // becomes 'groups/foo/clusters/c1'. This matches the `name` the server assigns to CDS/LDS/RDS resources. export function resourceName(group: string, path: string): string { - return `groups/${group}${path.replace(/\.json$/, '')}`; + return `groups/${group}${path.replace(/\.(json|yaml)$/, '')}`; } // A starter template offered when creating a new resource of each type. diff --git a/webapp/src/dogma/features/xds/xdsApiSlice.ts b/webapp/src/dogma/features/xds/xdsApiSlice.ts index a011f8e87d..4d8f2874ed 100644 --- a/webapp/src/dogma/features/xds/xdsApiSlice.ts +++ b/webapp/src/dogma/features/xds/xdsApiSlice.ts @@ -15,7 +15,7 @@ */ import { createApi } from '@reduxjs/toolkit/query/react'; -import { FetchBaseQueryError } from '@reduxjs/toolkit/query'; +import { FetchBaseQueryError, FetchArgs } from '@reduxjs/toolkit/query'; import { baseQueryWithReauth } from 'dogma/features/api/baseQuery'; import { createLoginUrl } from 'dogma/util/auth'; import { @@ -53,7 +53,7 @@ interface RawRepoDto { interface RawFileDto { path: string; - type: 'TEXT' | 'DIRECTORY' | 'JSON' | 'YML'; + type: 'TEXT' | 'DIRECTORY' | 'JSON' | 'YAML'; revision: number; } @@ -68,26 +68,26 @@ export interface FileContentDto { export type ResourceArg = { group: string; type: XdsResourceType }; // `k8s` marks an endpoint generated by a k8s aggregator (stored under '/k8s/endpoints/'); it is read-only. -export type ResourceIdArg = ResourceArg & { id: string; k8s?: boolean }; +// `path` is the full repository path (e.g. '/clusters/foo.yaml') — when provided, it is used directly to +// avoid a .yaml→.json fallback request on servers that still have legacy .json files. +export type ResourceIdArg = ResourceArg & { id: string; k8s?: boolean; path?: string }; export type CreateResourceArg = ResourceArg & { id: string; body: string }; export type UpdateResourceArg = ResourceIdArg & { body: string }; // Derives a Kubernetes endpoint aggregator id from its repository file path, e.g. -// '/k8s/endpointAggregators/foo.json' becomes 'foo'. +// '/k8s/endpointAggregators/foo.yaml' becomes 'foo'. function k8sAggregatorIdFromPath(path: string): string { let id = path; const prefix = '/k8s/endpointAggregators/'; if (id.startsWith(prefix)) { id = id.substring(prefix.length); } - if (id.endsWith('.json')) { - id = id.substring(0, id.length - '.json'.length); - } + id = id.replace(/\.(json|yaml)$/, ''); return id; } // Derives a resource id from a repository file path, e.g. -// '/clusters/foo/bar.json' with type 'clusters' becomes 'foo/bar'. +// '/clusters/foo/bar.yaml' with type 'clusters' becomes 'foo/bar'. function idFromPath(path: string, type: XdsResourceType): string { let id = path; if (id.startsWith('/')) { @@ -97,9 +97,7 @@ function idFromPath(path: string, type: XdsResourceType): string { if (id.startsWith(prefix)) { id = id.substring(prefix.length); } - if (id.endsWith('.json')) { - id = id.substring(0, id.length - '.json'.length); - } + id = id.replace(/\.(json|yaml)$/, ''); return id; } @@ -110,6 +108,40 @@ function encodeResourcePath(id: string): string { return id.split('/').map(encodeURIComponent).join('/'); } +// Fetches a file by its full repository path, falling back from .yaml to .json for legacy servers. +// Pass `path` (including extension) when known from a listing response to skip the fallback round-trip. +// eslint-disable-next-line @typescript-eslint/no-explicit-any +type FetchFn = (arg: string | FetchArgs) => any; + +async function fetchYamlOrJson( + group: string, + basePath: string, + fetchWithBQ: FetchFn, + knownPath?: string, +): Promise<{ data: FileContentDto } | { error: FetchBaseQueryError }> { + if (knownPath) { + const direct = await fetchWithBQ( + `/api/v1/projects/${XDS_PROJECT}/repos/${group}/contents${knownPath}?revision=head`, + ); + if (!direct.error) return { data: direct.data as FileContentDto }; + if ((direct.error as FetchBaseQueryError).status !== 404) { + return { error: direct.error as FetchBaseQueryError }; + } + } + const yamlResult = await fetchWithBQ( + `/api/v1/projects/${XDS_PROJECT}/repos/${group}/contents${basePath}.yaml?revision=head`, + ); + if (!yamlResult.error) return { data: yamlResult.data as FileContentDto }; + if ((yamlResult.error as FetchBaseQueryError).status !== 404) { + return { error: yamlResult.error as FetchBaseQueryError }; + } + const jsonResult = await fetchWithBQ( + `/api/v1/projects/${XDS_PROJECT}/repos/${group}/contents${basePath}.json?revision=head`, + ); + if (jsonResult.error) return { error: jsonResult.error as FetchBaseQueryError }; + return { data: jsonResult.data as FileContentDto }; +} + export const xdsApiSlice = createApi({ reducerPath: 'xdsApi', // The login page is in the main web app at the server root (outside the '/xds' basePath), so navigate out @@ -168,7 +200,7 @@ export const xdsApiSlice = createApi({ // clean id. They are flagged read-only by their path in the UI. id: type === 'endpoints' && file.path.startsWith('/k8s/endpoints/') - ? file.path.slice('/k8s/endpoints/'.length).replace(/\.json$/, '') + ? file.path.slice('/k8s/endpoints/'.length).replace(/\.(json|yaml)$/, '') : idFromPath(file.path, type), path: file.path, revision: file.revision, @@ -181,12 +213,17 @@ export const xdsApiSlice = createApi({ // Endpoints (EDS) are not access-controlled, so they are read through a dedicated ungated API instead // of the generic content API which requires the READ repository role. k8s-aggregator-generated endpoints // live under '/k8s/endpoints/' and are read via their own path. - query: ({ group, type, id, k8s }) => - type === 'endpoints' - ? k8s + async queryFn({ group, type, id, k8s, path }, _queryApi, _extraOptions, fetchWithBQ) { + if (type === 'endpoints') { + const url = k8s ? `/api/v1/xds/groups/${group}/k8s/endpoints/${id}` - : `/api/v1/xds/groups/${group}/endpoints/${id}` - : `/api/v1/projects/${XDS_PROJECT}/repos/${group}/contents/${type}/${id}.json?revision=head`, + : `/api/v1/xds/groups/${group}/endpoints/${id}`; + const result = await fetchWithBQ(url); + if (result.error) return { error: result.error as FetchBaseQueryError }; + return { data: result.data as FileContentDto }; + } + return fetchYamlOrJson(group, `/${type}/${id}`, fetchWithBQ, path); + }, providesTags: ['Resource'], }), createResource: builder.mutation({ @@ -240,9 +277,10 @@ export const xdsApiSlice = createApi({ }, providesTags: ['K8sAggregator'], }), - getK8sAggregator: builder.query({ - query: ({ group, id }) => - `/api/v1/projects/${XDS_PROJECT}/repos/${group}/contents/k8s/endpointAggregators/${id}.json?revision=head`, + getK8sAggregator: builder.query({ + async queryFn({ group, id, path }, _queryApi, _extraOptions, fetchWithBQ) { + return fetchYamlOrJson(group, `/k8s/endpointAggregators/${id}`, fetchWithBQ, path); + }, providesTags: ['K8sAggregator'], }), // Dry-run preview: resolves the aggregator's watchers against Kubernetes and returns the @@ -394,7 +432,7 @@ export const xdsApiSlice = createApi({ const idOf = (type: XdsResourceType, path: string): string => type === 'endpoints' && path.startsWith('/k8s/endpoints/') - ? path.slice('/k8s/endpoints/'.length).replace(/\.json$/, '') + ? path.slice('/k8s/endpoints/'.length).replace(/\.(json|yaml)$/, '') : idFromPath(path, type); const nodes: XdsGraphNode[] = []; diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/cluster/v1/XdsClusterService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/cluster/v1/XdsClusterService.java index 828d64b160..d87866aac4 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/cluster/v1/XdsClusterService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/cluster/v1/XdsClusterService.java @@ -78,8 +78,8 @@ public void createCluster(CreateClusterRequest request, StreamObserver // can be set to false via the update API. .setRespectDnsTtl(true) .build(); - xdsResourceManager.push(responseObserver, group, clusterName, CLUSTERS_DIRECTORY + clusterId + ".json", - "Create cluster: " + clusterName, cluster, currentAuthor(), true); + xdsResourceManager.push(responseObserver, group, clusterName, CLUSTERS_DIRECTORY + clusterId + ".yaml", + "Create cluster: " + clusterName, cluster, currentAuthor()); } @Override diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/endpoint/v1/XdsEndpointService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/endpoint/v1/XdsEndpointService.java index 790ffae30d..f8bc87a347 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/endpoint/v1/XdsEndpointService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/endpoint/v1/XdsEndpointService.java @@ -78,7 +78,7 @@ public void createEndpoint(CreateEndpointRequest request, .setClusterName(clusterName) .build(); xdsResourceManager.push(responseObserver, group, clusterName, fileName(endpointId), - "Create endpoint: " + clusterName, endpoint, currentAuthor(), true); + "Create endpoint: " + clusterName, endpoint, currentAuthor()); } private static String clusterName(String parent, String endpointId) { @@ -125,7 +125,7 @@ private static Matcher checkEndpointName(String endpointName) { } private static String fileName(String endpointId) { - return ENDPOINTS_DIRECTORY + endpointId + ".json"; + return ENDPOINTS_DIRECTORY + endpointId + ".yaml"; } @Override diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/endpoint/v1/XdsEndpointUpdateScheduler.java b/xds/src/main/java/com/linecorp/centraldogma/xds/endpoint/v1/XdsEndpointUpdateScheduler.java index 79752c27b0..536edc5ff5 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/endpoint/v1/XdsEndpointUpdateScheduler.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/endpoint/v1/XdsEndpointUpdateScheduler.java @@ -43,9 +43,12 @@ import com.linecorp.armeria.internal.common.RequestContextUtil; import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import com.linecorp.centraldogma.common.Author; +import com.linecorp.centraldogma.common.Change; +import com.linecorp.centraldogma.common.ChangeConflictException; import com.linecorp.centraldogma.common.EntryNotFoundException; import com.linecorp.centraldogma.common.EntryType; import com.linecorp.centraldogma.common.Markup; +import com.linecorp.centraldogma.common.Query; import com.linecorp.centraldogma.common.RedundantChangeException; import com.linecorp.centraldogma.common.Revision; import com.linecorp.centraldogma.internal.Jackson; @@ -170,11 +173,12 @@ private void flush() { } } - final ContentTransformer transformer = new ContentTransformer<>( - fileName, EntryType.JSON, new BatchUpdateTransformer(toRegister, toDeregister)); - + // Use a glob to find both the .yaml file and its legacy .json counterpart in one atomic call, + // avoiding a race window where a migration commit could land between two sequential find() calls. final Repository repository = xdsResourceManager.xdsProject().repos().get(group); - repository.find(Revision.HEAD, fileName, FIND_ONE_WITHOUT_CONTENT).handle((entries, cause) -> { + final String baseFileName = fileName.substring(0, fileName.length() - 5); + repository.find(Revision.HEAD, baseFileName + ".*", FIND_ONE_WITHOUT_CONTENT) + .handle((entries, cause) -> { if (cause != null) { copied.forEach(pendingUpdate -> pendingUpdate.streamObserver.onError(cause)); return null; @@ -186,42 +190,107 @@ private void flush() { copied.forEach(pendingUpdate -> pendingUpdate.streamObserver.onError(runtimeException)); return null; } + final String foundFileName = entries.keySet().iterator().next(); + if (foundFileName.endsWith(".yaml")) { + executeTransform(fileName, EntryType.YAML, copied, toRegister, toDeregister); + } else { + executeTransformWithMigration(repository, foundFileName, + copied, toRegister, toDeregister); + } + return null; + }); + } + + private void executeTransform(String targetFileName, EntryType entryType, + List copied, + List toRegister, + List toDeregister) { + final ContentTransformer transformer = new ContentTransformer<>( + targetFileName, entryType, new BatchUpdateTransformer(toRegister, toDeregister)); + final String commitMessage = + "Batch update for " + endpointName + " in group " + group + ": " + + toRegister.size() + " register, " + toDeregister.size() + " deregister"; + xdsResourceManager.commandExecutor() + .execute(Command.transform( + null, Author.SYSTEM, XDS_CENTRAL_DOGMA_PROJECT, group, Revision.HEAD, + commitMessage, "", Markup.PLAINTEXT, transformer)) + .handle((result, cause2) -> { + if (cause2 != null) { + final Throwable peeled = Exceptions.peel(cause2); + if (!(peeled instanceof RedundantChangeException)) { + copied.forEach(pendingUpdate -> pendingUpdate + .streamObserver.onError(peeled)); + return null; + } + // If the change is redundant, we just ignore it and complete + // the stream observer without error. + } + completeUpdates(copied); + return null; + }); + } + + private void executeTransformWithMigration(Repository repository, String legacyFileName, + List copied, + List toRegister, + List toDeregister) { + // Read the legacy .json file with content so we can transform and migrate atomically. + repository.getOrNull(Revision.HEAD, Query.ofJson(legacyFileName)).handle((entry, cause) -> { + if (cause != null) { + copied.forEach(p -> p.streamObserver.onError(cause)); + return null; + } + if (entry == null) { + // .json was removed concurrently (e.g. by updateEndpoint migration); use .yaml. + executeTransform(fileName, EntryType.YAML, copied, toRegister, toDeregister); + return null; + } + final JsonNode newContent = new BatchUpdateTransformer(toRegister, toDeregister) + .apply(entry.revision(), entry.content()); final String commitMessage = "Batch update for " + endpointName + " in group " + group + ": " + toRegister.size() + " register, " + toDeregister.size() + " deregister"; + final ImmutableList> changes = ImmutableList.of( + Change.ofRemoval(legacyFileName), Change.ofYamlUpsert(fileName, newContent)); xdsResourceManager.commandExecutor() - .execute(Command.transform( - null, Author.SYSTEM, XDS_CENTRAL_DOGMA_PROJECT, group, Revision.HEAD, - commitMessage, "", Markup.PLAINTEXT, transformer)) + .execute(Command.push(Author.SYSTEM, XDS_CENTRAL_DOGMA_PROJECT, group, + Revision.HEAD, commitMessage, "", Markup.PLAINTEXT, + changes)) .handle((result, cause2) -> { if (cause2 != null) { final Throwable peeled = Exceptions.peel(cause2); + if (peeled instanceof ChangeConflictException && + peeled.getMessage().contains("non-existent file")) { + // .json removed concurrently; retry with .yaml. + executeTransform(fileName, EntryType.YAML, + copied, toRegister, toDeregister); + return null; + } if (!(peeled instanceof RedundantChangeException)) { - copied.forEach(pendingUpdate -> pendingUpdate - .streamObserver.onError(peeled)); + copied.forEach(p -> p.streamObserver.onError(peeled)); return null; } - // If the change is redundant, we just ignore it and complete - // the stream observer without error. } - copied.forEach(pendingUpdate -> { - final StreamObserver streamObserver = pendingUpdate.streamObserver; - if (pendingUpdate.register) { - //noinspection unchecked - ((StreamObserver ) streamObserver).onNext( - pendingUpdate.endpoint); - } else { - //noinspection unchecked - ((StreamObserver) streamObserver).onNext( - Empty.getDefaultInstance()); - } - streamObserver.onCompleted(); - }); + completeUpdates(copied); return null; }); return null; }); } + + private void completeUpdates(List copied) { + copied.forEach(pendingUpdate -> { + final StreamObserver streamObserver = pendingUpdate.streamObserver; + if (pendingUpdate.register) { + //noinspection unchecked + ((StreamObserver) streamObserver).onNext(pendingUpdate.endpoint); + } else { + //noinspection unchecked + ((StreamObserver) streamObserver).onNext(Empty.getDefaultInstance()); + } + streamObserver.onCompleted(); + }); + } } private static class PendingUpdate { diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/CentralDogmaXdsResources.java b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/CentralDogmaXdsResources.java index 8d1b340ecb..7446f8cfdc 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/CentralDogmaXdsResources.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/CentralDogmaXdsResources.java @@ -103,7 +103,8 @@ void removeCluster(String groupName, String path) { } private static String getResourceName(String groupName, String path) { - return "groups/" + groupName + path.substring(0, path.length() - 5); // Remove .json + // Remove file extension (.json or .yaml) + return "groups/" + groupName + path.substring(0, path.length() - 5); } void removeEndpoint(String groupName, String path) { @@ -112,11 +113,11 @@ void removeEndpoint(String groupName, String path) { if (groupEndpoints == null) { return; } - // e.g. /endpoints/foo-cluster.json file with group foo -> groups/foo/clusters/foo-cluster - // e.g. /k8s/endpoints/foo-cluster.json file with group foo -> groups/foo/k8s/clusters/foo-cluster + // e.g. /endpoints/foo-cluster.yaml file with group foo -> groups/foo/clusters/foo-cluster + // e.g. /k8s/endpoints/foo-cluster.yaml file with group foo -> groups/foo/k8s/clusters/foo-cluster final String clusterName = "groups/" + groupName + - ENDPOINTS_PATTERN.matcher(path.substring(0, path.length() - 5) /* remove .json */) + ENDPOINTS_PATTERN.matcher(path.substring(0, path.length() - 5) /* remove file extension */) .replaceFirst("/clusters/"); endpointUpdated |= groupEndpoints.remove(clusterName) != null; } diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlaneService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlaneService.java index 376e13f1b9..45f053feef 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlaneService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlaneService.java @@ -338,23 +338,23 @@ protected String pathPattern() { } @Override - protected void handleXdsResource(String path, String contentAsText, String groupName) + protected void handleXdsResource(String path, JsonNode content, String groupName) throws IOException { if (path.startsWith(CLUSTERS_DIRECTORY)) { final Cluster.Builder builder = Cluster.newBuilder(); - JSON_MESSAGE_MARSHALLER.mergeValue(contentAsText, builder); + JSON_MESSAGE_MARSHALLER.mergeValue(content.traverse(), builder); centralDogmaXdsResources.setCluster(groupName, builder.build()); } else if (path.startsWith(ENDPOINTS_DIRECTORY) || path.startsWith(K8S_ENDPOINTS_DIRECTORY)) { final ClusterLoadAssignment.Builder builder = ClusterLoadAssignment.newBuilder(); - JSON_MESSAGE_MARSHALLER.mergeValue(contentAsText, builder); + JSON_MESSAGE_MARSHALLER.mergeValue(content.traverse(), builder); centralDogmaXdsResources.setEndpoint(groupName, builder.build()); } else if (path.startsWith(LISTENERS_DIRECTORY)) { final Listener.Builder builder = Listener.newBuilder(); - JSON_MESSAGE_MARSHALLER.mergeValue(contentAsText, builder); + JSON_MESSAGE_MARSHALLER.mergeValue(content.traverse(), builder); centralDogmaXdsResources.setListener(groupName, builder.build()); } else if (path.startsWith(ROUTES_DIRECTORY)) { final RouteConfiguration.Builder builder = RouteConfiguration.newBuilder(); - JSON_MESSAGE_MARSHALLER.mergeValue(contentAsText, builder); + JSON_MESSAGE_MARSHALLER.mergeValue(content.traverse(), builder); centralDogmaXdsResources.setRoute(groupName, builder.build()); } else { // ignore diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsEndpointReadService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsEndpointReadService.java index 4d4756f30b..262fd05470 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsEndpointReadService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsEndpointReadService.java @@ -19,16 +19,19 @@ import static com.linecorp.centraldogma.xds.internal.ControlPlaneService.K8S_ENDPOINTS_DIRECTORY; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.linecorp.armeria.common.util.Exceptions; import com.linecorp.armeria.server.annotation.Get; import com.linecorp.armeria.server.annotation.Param; import com.linecorp.armeria.server.annotation.ProducesJson; import com.linecorp.centraldogma.common.Entry; +import com.linecorp.centraldogma.common.EntryNotFoundException; import com.linecorp.centraldogma.common.EntryType; import com.linecorp.centraldogma.common.Revision; import com.linecorp.centraldogma.server.storage.project.Project; @@ -56,12 +59,12 @@ public CompletableFuture listEndpoints(@Param String group) { .thenApply(entries -> { final ArrayNode array = JsonNodeFactory.instance.arrayNode(); for (Entry entry : entries.values()) { - if (entry.type() != EntryType.JSON) { + if (entry.type() != EntryType.JSON && entry.type() != EntryType.YAML) { continue; } final ObjectNode node = array.addObject(); node.put("path", entry.path()); - node.put("type", "JSON"); + node.put("type", entry.type().name()); node.put("revision", entry.revision().major()); } return array; @@ -73,7 +76,8 @@ public CompletableFuture listEndpoints(@Param String group) { */ @Get("/xds/groups/{group}/endpoints/{*id}") public CompletableFuture getEndpoint(@Param String group, @Param String id) { - return read(group, ENDPOINTS_DIRECTORY + id + ".json"); + return readWithFallback(group, ENDPOINTS_DIRECTORY + id + ".yaml", + ENDPOINTS_DIRECTORY + id + ".json"); } /** @@ -82,17 +86,33 @@ public CompletableFuture getEndpoint(@Param String group, @Param Strin */ @Get("/xds/groups/{group}/k8s/endpoints/{*id}") public CompletableFuture getK8sEndpoint(@Param String group, @Param String id) { - return read(group, K8S_ENDPOINTS_DIRECTORY + id + ".json"); + return readWithFallback(group, K8S_ENDPOINTS_DIRECTORY + id + ".yaml", + K8S_ENDPOINTS_DIRECTORY + id + ".json"); } - private CompletableFuture read(String group, String path) { - return xdsProject.repos().get(group).get(Revision.HEAD, path).thenApply(XdsEndpointReadService::toNode); + private CompletableFuture readWithFallback(String group, String yamlPath, String jsonPath) { + final Repository repository = xdsProject.repos().get(group); + return repository.get(Revision.HEAD, yamlPath) + .thenApply(XdsEndpointReadService::toNode) + .handle((result, cause) -> { + if (cause == null) { + return CompletableFuture.completedFuture(result); + } + if (Exceptions.peel(cause) instanceof EntryNotFoundException) { + return repository.get(Revision.HEAD, jsonPath) + .thenApply(XdsEndpointReadService::toNode); + } + final CompletableFuture failed = new CompletableFuture<>(); + failed.completeExceptionally(cause); + return failed; + }) + .thenCompose(Function.identity()); } private static JsonNode toNode(Entry entry) { final ObjectNode node = JsonNodeFactory.instance.objectNode(); node.put("path", entry.path()); - node.put("type", "JSON"); + node.put("type", entry.type().name()); node.put("revision", entry.revision().major()); node.set("content", (JsonNode) entry.content()); return node; diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceManager.java b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceManager.java index 03fcc10464..327d481d76 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceManager.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceManager.java @@ -23,9 +23,11 @@ import java.io.IOException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; +import java.util.function.Consumer; import java.util.regex.Pattern; import org.curioswitch.common.protobuf.json.MessageMarshaller; +import org.jspecify.annotations.Nullable; import org.reflections.Reflections; import org.reflections.scanners.SubTypesScanner; @@ -178,31 +180,61 @@ private void checkWritePermission0(String group) { public void push( StreamObserver responseObserver, String group, String resourceName, String fileName, - String summary, T resource, Author author, boolean create) { + String summary, T resource, Author author) { final Change change; try { final String jsonText = JSON_MESSAGE_MARSHALLER.writeValueAsString(resource); final JsonNode jsonNode = Jackson.readTree(jsonText); - if (create) { - change = Change.ofJsonPatch(fileName, null, jsonNode); - } else { - change = Change.ofJsonUpsert(fileName, jsonNode); - } + // ofJsonPatch with null oldJsonNode means "expect the file to not exist yet", giving us an atomic + // create-or-fail that prevents a concurrent create from silently overwriting the first writer. + // APPLY_JSON_PATCH works on .yaml files (see Change.ofJsonPatch javadoc). + change = Change.ofJsonPatch(fileName, null, jsonNode); } catch (IOException e) { // This could happen when the message has a type that isn't registered to JSON_MESSAGE_MARSHALLER. responseObserver.onError(Status.INTERNAL.withCause(new IllegalStateException( "failed to convert message to JSON: " + resource, e)).asRuntimeException()); return; } + // We still need to check for a legacy .json file explicitly, because the atomic ofJsonPatch above + // only guards against a concurrent .yaml creation — it won't detect an existing .json counterpart. + final Repository repository = xdsProject.repos().get(group); + final String baseFileName = fileName.substring(0, fileName.length() - 5); + repository.find(Revision.HEAD, baseFileName + ".*", FIND_ONE_WITHOUT_CONTENT) + .handle((entries, cause) -> { + if (cause != null) { + responseObserver.onError(cause); + return null; + } + if (!entries.isEmpty()) { + responseObserver.onError( + Status.ALREADY_EXISTS + .withDescription("Resource already exists: " + resourceName) + .asRuntimeException()); + return null; + } + executePush(responseObserver, group, resourceName, summary, author, resource, + change, true, null); + return null; + }); + } + + private void executePush( + StreamObserver responseObserver, String group, String resourceName, + String summary, Author author, T resource, Change change, + boolean create, @Nullable String legacyFileToRemove) { + final ImmutableList> changes = + legacyFileToRemove != null + ? ImmutableList.of(Change.ofRemoval(legacyFileToRemove), change) + : ImmutableList.of(change); commandExecutor.execute(Command.push(author, XDS_CENTRAL_DOGMA_PROJECT, group, Revision.HEAD, - summary, "", Markup.PLAINTEXT, ImmutableList.of(change))) + summary, "", Markup.PLAINTEXT, changes)) .handle((unused, cause) -> { if (cause != null) { final Throwable peeled = Exceptions.peel(cause); if (create && peeled instanceof ChangeConflictException) { + // A concurrent create raced past the find() check and created the file first. responseObserver.onError( Status.ALREADY_EXISTS - .withCause(peeled) .withDescription("Resource already exists: " + resourceName) .asRuntimeException()); return null; @@ -213,7 +245,6 @@ public void push( responseObserver.onCompleted(); return null; } - responseObserver.onError(cause); return null; } @@ -225,7 +256,11 @@ public void push( public static String fileName(String group, String resourceName) { // Remove groups/{group} - return resourceName.substring(7 + group.length()) + ".json"; + return resourceName.substring(7 + group.length()) + ".yaml"; + } + + private static String toLegacyJsonFileName(String yamlFileName) { + return yamlFileName.substring(0, yamlFileName.length() - 5) + ".json"; } public void update(StreamObserver responseObserver, String group, @@ -236,9 +271,22 @@ public void update(StreamObserver responseObserver, Strin public void update(StreamObserver responseObserver, String group, String resourceName, String fileName, String summary, T resource, Author author) { - updateOrDelete(responseObserver, group, resourceName, fileName, - () -> push(responseObserver, group, resourceName, fileName, - summary, resource, author, false)); + updateOrDelete(responseObserver, group, resourceName, fileName, foundFileName -> { + final String legacyFileToRemove = foundFileName.endsWith(".json") ? foundFileName : null; + final String targetFileName = legacyFileToRemove != null ? fileName : foundFileName; + final Change change; + try { + final String jsonText = JSON_MESSAGE_MARSHALLER.writeValueAsString(resource); + final JsonNode jsonNode = Jackson.readTree(jsonText); + change = Change.ofYamlUpsert(targetFileName, jsonNode); + } catch (IOException e) { + responseObserver.onError(Status.INTERNAL.withCause(new IllegalStateException( + "failed to convert message to JSON: " + resource, e)).asRuntimeException()); + return; + } + executePush(responseObserver, group, resourceName, summary, author, resource, + change, false, legacyFileToRemove); + }); } public void delete(StreamObserver responseObserver, String group, @@ -248,10 +296,10 @@ public void delete(StreamObserver responseObserver, String group, public void delete(StreamObserver responseObserver, String group, String resourceName, String fileName, String summary, Author author) { - final Runnable deleteTask = () -> + updateOrDelete(responseObserver, group, resourceName, fileName, foundFileName -> commandExecutor.execute(Command.push(author, XDS_CENTRAL_DOGMA_PROJECT, group, Revision.HEAD, summary, "", Markup.PLAINTEXT, - ImmutableList.of(Change.ofRemoval(fileName)))) + ImmutableList.of(Change.ofRemoval(foundFileName)))) .handle((unused, cause) -> { if (cause != null) { responseObserver.onError( @@ -261,25 +309,29 @@ public void delete(StreamObserver responseObserver, String group, responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); return null; - }); - updateOrDelete(responseObserver, group, resourceName, fileName, deleteTask); + })); } public void updateOrDelete(StreamObserver responseObserver, String group, String resourceName, - String fileName, Runnable task) { + String fileName, Consumer task) { + // Use a glob to find both the .yaml file and its legacy .json counterpart in one atomic call. + // Two sequential find() calls would have a race window: a migration commit that atomically removes + // .json and creates .yaml could land between the two calls, making the resource appear missing. final Repository repository = xdsProject.repos().get(group); - repository.find(Revision.HEAD, fileName, FIND_ONE_WITHOUT_CONTENT).handle((entries, cause) -> { + final String baseFileName = fileName.substring(0, fileName.length() - 5); + repository.find(Revision.HEAD, baseFileName + ".*", FIND_ONE_WITHOUT_CONTENT) + .handle((entries, cause) -> { if (cause != null) { responseObserver.onError(cause); return null; } if (entries.isEmpty()) { - // TODO(minwoox): implement allowMissing. - responseObserver.onError(Status.NOT_FOUND.withDescription("Resource not found: " + resourceName) - .asRuntimeException()); + responseObserver.onError( + Status.NOT_FOUND.withDescription("Resource not found: " + resourceName) + .asRuntimeException()); return null; } - task.run(); + task.accept(entries.keySet().iterator().next()); return null; }); } diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceWatchingService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceWatchingService.java index b0fb041604..5081849e03 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceWatchingService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceWatchingService.java @@ -26,11 +26,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList.Builder; import com.google.common.collect.Sets; import com.linecorp.centraldogma.common.Change; +import com.linecorp.centraldogma.common.ChangeType; import com.linecorp.centraldogma.common.Entry; import com.linecorp.centraldogma.common.EntryType; import com.linecorp.centraldogma.common.RepositoryNotFoundException; @@ -70,7 +72,7 @@ protected Project xdsProject() { protected abstract String pathPattern(); - protected abstract void handleXdsResource(String path, String contentAsText, String groupName) + protected abstract void handleXdsResource(String path, JsonNode content, String groupName) throws IOException; protected abstract void onGroupRemoved(String groupName); @@ -110,13 +112,13 @@ protected void init() { " at revision: " + normalizedRevision, cause); } for (Entry entry : entries.values()) { - if (entry.type() != EntryType.JSON || !entry.hasContent()) { + if ((entry.type() != EntryType.JSON && entry.type() != EntryType.YAML) || + !entry.hasContent()) { continue; } final String path = entry.path(); - final String contentAsText = entry.contentAsText(); try { - handleXdsResource(path, contentAsText, groupName); + handleXdsResource(path, (JsonNode) entry.content(), groupName); } catch (Throwable t) { logger.warn("Unexpected exception while building an xDS resource from {}.", groupName + path, t); @@ -221,15 +223,25 @@ private void handleDiff(String groupName, Revision newRevision, for (Change change : changes.values()) { final String path = change.path(); switch (change.type()) { + case UPSERT_YAML: case UPSERT_JSON: try { - handleXdsResource(path, change.contentAsText(), groupName); + handleXdsResource(path, (JsonNode) change.content(), groupName); } catch (Throwable t) { logger.warn("Unexpected exception while handling an xDS resource from {}.", groupName + path, t); } break; case REMOVE: + // Skip if this is an atomic .json → .yaml migration commit: the .json is being + // removed while the same resource is upserted as .yaml in the same diff. + if (path.endsWith(".json")) { + final String base = path.substring(0, path.length() - 5); + final Change counterpart = changes.get(base + ".yaml"); + if (counterpart != null && counterpart.type() != ChangeType.REMOVE) { + break; + } + } onFileRemoved(groupName, path); break; default: diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesEndpointFetchingService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesEndpointFetchingService.java index 2bd7ac7c69..bb9c78391d 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesEndpointFetchingService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesEndpointFetchingService.java @@ -16,6 +16,9 @@ package com.linecorp.centraldogma.xds.k8s.v1; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.linecorp.centraldogma.server.storage.repository.FindOptions.FIND_ONE_WITHOUT_CONTENT; + +import com.google.common.collect.ImmutableList; import static com.linecorp.centraldogma.xds.internal.ControlPlanePlugin.XDS_CENTRAL_DOGMA_PROJECT; import static com.linecorp.centraldogma.xds.internal.ControlPlaneService.K8S_ENDPOINTS_DIRECTORY; import static com.linecorp.centraldogma.xds.internal.XdsResourceManager.JSON_MESSAGE_MARSHALLER; @@ -59,6 +62,7 @@ import com.linecorp.centraldogma.server.command.Command; import com.linecorp.centraldogma.server.command.CommandExecutor; import com.linecorp.centraldogma.server.storage.project.Project; +import com.linecorp.centraldogma.server.storage.repository.Repository; import com.linecorp.centraldogma.xds.internal.XdsResourceWatchingService; import io.envoyproxy.envoy.config.core.v3.Address; @@ -120,15 +124,15 @@ protected String pathPattern() { } @Override - protected void handleXdsResource(String path, String contentAsText, String groupName) + protected void handleXdsResource(String path, JsonNode content, String groupName) throws InvalidProtocolBufferException { final KubernetesEndpointAggregator.Builder aggregatorBuilder = KubernetesEndpointAggregator.newBuilder(); try { - JSON_MESSAGE_MARSHALLER.mergeValue(contentAsText, aggregatorBuilder); + JSON_MESSAGE_MARSHALLER.mergeValue(content.traverse(), aggregatorBuilder); } catch (IOException e) { logger.warn("Failed to parse a KubernetesEndpointAggregator at {}{}. content: {}", - groupName, path, contentAsText, e); + groupName, path, content, e); return; } @@ -179,8 +183,8 @@ protected void onGroupRemoved(String groupName) { protected void onFileRemoved(String groupName, String path) { final Map updaters = kubernetesEndpointsUpdaters.get(groupName); // e.g. groups/foo/k8s/endpointAggregators/foo-cluster - final String aggregatorName = - "groups/" + groupName + path.substring(0, path.length() - 5); // Remove .json + final String aggregatorName = "groups/" + groupName + path.substring( + 0, path.length() - 5); // Remove file extension (.json or .yaml) if (updaters != null) { final KubernetesEndpointsUpdater updater = updaters.get(aggregatorName); if (updater != null) { @@ -188,22 +192,40 @@ protected void onFileRemoved(String groupName, String path) { } } - // Remove corresponding endpoints. - final String endpointPath = AGGREGATORS_REPLCACE_PATTERN.matcher(path).replaceFirst("/endpoints/"); - logger.info("Removing {} from {}. aggregatorName: {}", endpointPath, groupName, aggregatorName); + // Find the endpoint file (either .yaml or legacy .json) and remove it in a single commit. + final String endpointBase = AGGREGATORS_REPLCACE_PATTERN.matcher( + path.substring(0, path.length() - 5)).replaceFirst("/endpoints/"); + logger.info("Removing endpoint for {} from {}. aggregatorName: {}", endpointBase, groupName, + aggregatorName); + final Repository repository = xdsProject().repos().get(groupName); + repository.find(Revision.HEAD, endpointBase + ".*", FIND_ONE_WITHOUT_CONTENT) + .handle((entries, findCause) -> { + if (findCause != null) { + logger.warn("Failed to find endpoint file for {} in {}", endpointBase, groupName, findCause); + return null; + } + final String actualPath = entries.keySet().stream() + .filter(p -> p.endsWith(".yaml") || p.endsWith(".json")) + .findFirst().orElse(null); + if (actualPath == null) { + return null; // Already removed or never created. + } + removeEndpointFile(groupName, actualPath); + return null; + }); + } + + private void removeEndpointFile(String groupName, String endpointPath) { commandExecutor.execute( Command.push(Author.SYSTEM, XDS_CENTRAL_DOGMA_PROJECT, groupName, Revision.HEAD, "Remove " + endpointPath, "", Markup.PLAINTEXT, Change.ofRemoval(endpointPath))).handle((unused, cause) -> { if (cause != null) { final Throwable peeled = Exceptions.peel(cause); - if (peeled instanceof ChangeConflictException && - peeled.getMessage().contains("non-existent file")) { - // TODO(minwoox): Provide a type to ChangeConflictException to distinguish this case. + if (peeled instanceof ChangeConflictException) { // Could happen if deleteKubernetesEndpointAggregator is called before the file is created. return null; } - logger.warn("Failed to remove {} from {}", endpointPath, groupName, cause); } return null; @@ -228,6 +250,7 @@ private static class KubernetesEndpointsUpdater { @Nullable private ScheduledFuture scheduledFuture; private boolean closing; + private volatile boolean legacyJsonMigrated; KubernetesEndpointsUpdater( CommandExecutor commandExecutor, @@ -316,7 +339,7 @@ private void pushK8sEndpoints() { final boolean matches = matcher.matches(); assert matches; final String aggregatorId = matcher.group(2); - final String fileName = K8S_ENDPOINTS_DIRECTORY + aggregatorId + ".json"; + final String fileName = K8S_ENDPOINTS_DIRECTORY + aggregatorId + ".yaml"; final JsonNode jsonNode; try { jsonNode = Jackson.readTree(json); @@ -324,7 +347,42 @@ private void pushK8sEndpoints() { // Should never reach here as it is already validated. throw new IllegalStateException(e); } - final Change change = Change.ofJsonUpsert(fileName, jsonNode); + final Change yamlChange = Change.ofYamlUpsert(fileName, jsonNode); + if (legacyJsonMigrated) { + pushYamlChange(yamlChange); + return; + } + // Attempt atomic migration: remove legacy .json and upsert .yaml in one commit. + final String legacyFileName = K8S_ENDPOINTS_DIRECTORY + aggregatorId + ".json"; + final List> changes = ImmutableList.of(Change.ofRemoval(legacyFileName), yamlChange); + commandExecutor.execute( + Command.push(Author.SYSTEM, XDS_CENTRAL_DOGMA_PROJECT, groupName, Revision.HEAD, + "Add " + aggregator.getClusterName() + '.', "", + Markup.PLAINTEXT, changes)).handle((unused, cause) -> { + if (cause != null) { + final Throwable peeled = Exceptions.peel(cause); + if (peeled instanceof RedundantChangeException) { + // Repository state is unchanged: .json is already absent and .yaml already has the + // same content, so migration is effectively done. + legacyJsonMigrated = true; + return null; + } + if (peeled instanceof ChangeConflictException) { + // No legacy .json file exists; mark as migrated and push only the .yaml. + legacyJsonMigrated = true; + pushYamlChange(yamlChange); + return null; + } + logger.warn("Failed to push {} to {}", changes, groupName, peeled); + return null; + } + // Migration succeeded; no need to attempt removal on future pushes. + legacyJsonMigrated = true; + return null; + }); + } + + private void pushYamlChange(Change change) { commandExecutor.execute( Command.push(Author.SYSTEM, XDS_CENTRAL_DOGMA_PROJECT, groupName, Revision.HEAD, "Add " + aggregator.getClusterName() + '.', "", @@ -332,7 +390,6 @@ private void pushK8sEndpoints() { if (cause != null) { final Throwable peeled = Exceptions.peel(cause); if (peeled instanceof RedundantChangeException) { - // ignore return null; } logger.warn("Failed to push {} to {}", change, groupName, peeled); diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesService.java index f697ecd8b7..46f6c7f7a0 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesService.java @@ -141,13 +141,13 @@ public void createKubernetesEndpointAggregator( .asRuntimeException(); } final Author author = currentAuthor(); - final String fileName = K8S_ENDPOINT_AGGREGATORS_DIRECTORY + aggregatorId + ".json"; + final String fileName = K8S_ENDPOINT_AGGREGATORS_DIRECTORY + aggregatorId + ".yaml"; validateKubernetesEndpointAndPush( responseObserver, kubernetesLocalityLbEndpointsList, group, fileName, () -> xdsResourceManager.push( responseObserver, group, kubernetesEndpointName, fileName, - "Create kubernetes endpoint: " + kubernetesEndpointName, aggregator, author, true)); + "Create kubernetes endpoint: " + kubernetesEndpointName, aggregator, author)); } private void validateKubernetesEndpointAndPush( diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/listener/v1/XdsListenerService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/listener/v1/XdsListenerService.java index 8e407d7d6f..3ed02a1278 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/listener/v1/XdsListenerService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/listener/v1/XdsListenerService.java @@ -69,8 +69,8 @@ public void createListener(CreateListenerRequest request, StreamObserver void pushLegacyJson(String path, T resource) throws IOException { + final JsonNode jsonNode = Jackson.readTree(JSON_MESSAGE_MARSHALLER.writeValueAsString(resource)); + dogma.client() + .forRepo(XDS_CENTRAL_DOGMA_PROJECT, GROUP) + .commit("Add legacy " + path, Change.ofJsonUpsert(path, jsonNode)) + .push() + .join(); + } + + private static Repository repo() { + return dogma.projectManager().get(XDS_CENTRAL_DOGMA_PROJECT).repos().get(GROUP); + } + + private static void assertFileExists(Repository repo, String path) { + final Query query = path.endsWith(".yaml") ? Query.ofYaml(path) : Query.ofJson(path); + assertThat(repo.getOrNull(Revision.HEAD, query).join()) + .as("expected file to exist: %s", path) + .isNotNull(); + } + + private static void assertFileAbsent(Repository repo, String path) { + final Query query = path.endsWith(".yaml") ? Query.ofYaml(path) : Query.ofJson(path); + assertThat(repo.getOrNull(Revision.HEAD, query).join()) + .as("expected file to be absent: %s", path) + .isNull(); + } + + private static AggregatedHttpResponse deleteResource(String path) { + return dogma.httpClient() + .execute(RequestHeaders.builder(HttpMethod.DELETE, path) + .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") + .build()) + .aggregate() + .join(); + } + + private static AggregatedHttpResponse updateEndpoint(String endpointId, + ClusterLoadAssignment endpoint) throws IOException { + return dogma.httpClient() + .execute(RequestHeaders.builder(HttpMethod.PATCH, + "/api/v1/xds/groups/" + GROUP + "/endpoints/" + + endpointId) + .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") + .contentType(MediaType.JSON_UTF_8) + .build(), + JSON_MESSAGE_MARSHALLER.writeValueAsString(endpoint)) + .aggregate() + .join(); + } + + private static void assertOk(AggregatedHttpResponse response) { + assertThat(response.status()).isSameAs(HttpStatus.OK); + assertThat(response.headers().get("grpc-status")).isEqualTo("0"); + } + + private static void assertAlreadyExists(AggregatedHttpResponse response) { + assertThat(response.status()).isSameAs(HttpStatus.CONFLICT); + assertThat(response.headers().get("grpc-status")) + .isEqualTo(Integer.toString(Status.ALREADY_EXISTS.getCode().value())); + } +} diff --git a/xds/src/test/java/com/linecorp/centraldogma/xds/internal/XdsResourceWatchingServiceTest.java b/xds/src/test/java/com/linecorp/centraldogma/xds/internal/XdsResourceWatchingServiceTest.java index 82862d56e2..28fe8d370c 100644 --- a/xds/src/test/java/com/linecorp/centraldogma/xds/internal/XdsResourceWatchingServiceTest.java +++ b/xds/src/test/java/com/linecorp/centraldogma/xds/internal/XdsResourceWatchingServiceTest.java @@ -26,6 +26,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import com.fasterxml.jackson.databind.JsonNode; + import com.linecorp.centraldogma.client.CentralDogma; import com.linecorp.centraldogma.common.Change; import com.linecorp.centraldogma.server.storage.project.Project; @@ -68,6 +70,13 @@ void foo() throws InterruptedException { .push().join(); assertThat(queue.take()).isEqualTo("/a.json removed"); assertThat(queue.take()).isEqualTo("diff handled: bar"); + + // YAML files are also handled (new format). + client.forRepo("foo", "bar").commit("Add a YAML file", Change.ofYamlUpsert("/c.yaml", "key: value")) + .push().join(); + assertThat(queue.take()).isEqualTo("handleXdsResource: /c.yaml"); + assertThat(queue.take()).isEqualTo("diff handled: bar"); + client.removeRepository("foo", "bar").join(); assertThat(queue.take()).isEqualTo("bar removed"); } @@ -91,7 +100,7 @@ protected String pathPattern() { } @Override - protected void handleXdsResource(String path, String contentAsText, String groupName) + protected void handleXdsResource(String path, JsonNode content, String groupName) throws IOException { queue.add("handleXdsResource: " + path); } diff --git a/xds/src/test/java/com/linecorp/centraldogma/xds/internal/XdsWritePermissionTest.java b/xds/src/test/java/com/linecorp/centraldogma/xds/internal/XdsWritePermissionTest.java index 4580d7c5fb..4d8e98a897 100644 --- a/xds/src/test/java/com/linecorp/centraldogma/xds/internal/XdsWritePermissionTest.java +++ b/xds/src/test/java/com/linecorp/centraldogma/xds/internal/XdsWritePermissionTest.java @@ -119,7 +119,7 @@ void readRoleIsInsufficientAndAdminCanWrite() throws Exception { private static HttpStatus getClusterStatus(WebClient client, String group, String clusterId) { return client.get("/api/v1/projects/@xds/repos/" + group + "/contents/clusters/" + clusterId + - ".json?revision=head") + ".yaml?revision=head") .aggregate().join().status(); } diff --git a/xds/src/test/java/com/linecorp/centraldogma/xds/k8s/v1/AggregatingMultipleKubernetesTest.java b/xds/src/test/java/com/linecorp/centraldogma/xds/k8s/v1/AggregatingMultipleKubernetesTest.java index 79fa008973..081ac6c3a9 100644 --- a/xds/src/test/java/com/linecorp/centraldogma/xds/k8s/v1/AggregatingMultipleKubernetesTest.java +++ b/xds/src/test/java/com/linecorp/centraldogma/xds/k8s/v1/AggregatingMultipleKubernetesTest.java @@ -49,6 +49,7 @@ import com.linecorp.centraldogma.common.Entry; import com.linecorp.centraldogma.common.Query; import com.linecorp.centraldogma.common.Revision; +import com.linecorp.centraldogma.internal.Jackson; import com.linecorp.centraldogma.server.storage.repository.Repository; import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension; @@ -129,14 +130,14 @@ void aggregateMultipleKubernetes() throws Exception { final Repository fooGroup = dogma.projectManager().get(XDS_CENTRAL_DOGMA_PROJECT).repos().get("foo"); final Entry aggregatorEntry = - fooGroup.get(Revision.HEAD, Query.ofJson( - K8S_ENDPOINT_AGGREGATORS_DIRECTORY + aggregatorId + ".json")).join(); - assertAggregator(aggregatorEntry.contentAsText(), expectedAggregator); + fooGroup.get(Revision.HEAD, Query.ofYaml( + K8S_ENDPOINT_AGGREGATORS_DIRECTORY + aggregatorId + ".yaml")).join(); + assertAggregator(Jackson.writeValueAsString(aggregatorEntry.content()), expectedAggregator); await().until(() -> fooGroup.normalizeNow(Revision.HEAD).equals(aggregatorEntry.revision().forward(1))); - final Entry endpointEntry = fooGroup.getOrNull(Revision.HEAD, Query.ofJson( - K8S_ENDPOINTS_DIRECTORY + aggregatorId + ".json")).join(); + final Entry endpointEntry = fooGroup.getOrNull(Revision.HEAD, Query.ofYaml( + K8S_ENDPOINTS_DIRECTORY + aggregatorId + ".yaml")).join(); assertThatJson(endpointEntry.content()).isEqualTo( '{' + " \"clusterName\": \"groups/foo/k8s/clusters/foo-k8s-cluster/1\"," + @@ -208,8 +209,8 @@ void aggregateMultipleKubernetes() throws Exception { // the debouncing logic. await().until(() -> fooGroup.normalizeNow(Revision.HEAD).equals( endpointEntry.revision().forward(1))); - final Entry endpointEntry1 = fooGroup.getOrNull(Revision.HEAD, Query.ofJson( - K8S_ENDPOINTS_DIRECTORY + aggregatorId + ".json")).join(); + final Entry endpointEntry1 = fooGroup.getOrNull(Revision.HEAD, Query.ofYaml( + K8S_ENDPOINTS_DIRECTORY + aggregatorId + ".yaml")).join(); assertThatJson(endpointEntry1.content()).isEqualTo( '{' + " \"clusterName\": \"groups/foo/k8s/clusters/foo-k8s-cluster/1\"," + @@ -274,8 +275,8 @@ void aggregateMultipleKubernetes() throws Exception { await().until(() -> fooGroup.normalizeNow(Revision.HEAD).equals( // 1 + 2 because of the aggregator update and endpoint update endpointEntry.revision().forward(3))); - final Entry endpointEntry2 = fooGroup.getOrNull(Revision.HEAD, Query.ofJson( - K8S_ENDPOINTS_DIRECTORY + aggregatorId + ".json")).join(); + final Entry endpointEntry2 = fooGroup.getOrNull(Revision.HEAD, Query.ofYaml( + K8S_ENDPOINTS_DIRECTORY + aggregatorId + ".yaml")).join(); assertThatJson(endpointEntry2.content()).isEqualTo( '{' + " \"clusterName\": \"groups/foo/k8s/clusters/foo-k8s-cluster/1\"," + diff --git a/xds/src/test/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesServiceTest.java b/xds/src/test/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesServiceTest.java index 0eab9c1982..620513d08c 100644 --- a/xds/src/test/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesServiceTest.java +++ b/xds/src/test/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesServiceTest.java @@ -56,6 +56,7 @@ import com.linecorp.centraldogma.common.Entry; import com.linecorp.centraldogma.common.Query; import com.linecorp.centraldogma.common.Revision; +import com.linecorp.centraldogma.internal.Jackson; import com.linecorp.centraldogma.server.credential.CreateCredentialRequest; import com.linecorp.centraldogma.server.internal.credential.AccessTokenCredential; import com.linecorp.centraldogma.server.storage.repository.Repository; @@ -221,9 +222,9 @@ void createEndpointAggregatorsRequest(String credentialId) throws IOException { assertAggregator(json, expectedAggregator); final Repository fooGroup = dogma.projectManager().get(XDS_CENTRAL_DOGMA_PROJECT).repos().get("foo"); final Entry entry = - fooGroup.get(Revision.HEAD, Query.ofJson( - K8S_ENDPOINT_AGGREGATORS_DIRECTORY + aggregatorId + ".json")).join(); - assertAggregator(entry.contentAsText(), expectedAggregator); + fooGroup.get(Revision.HEAD, Query.ofYaml( + K8S_ENDPOINT_AGGREGATORS_DIRECTORY + aggregatorId + ".yaml")).join(); + assertAggregator(Jackson.writeValueAsString(entry.content()), expectedAggregator); final ClusterLoadAssignment loadAssignment = clusterLoadAssignment(clusterName, 30000); checkEndpointsViaDiscoveryRequest(dogma.httpClient().uri(), loadAssignment, clusterName); @@ -232,10 +233,11 @@ void createEndpointAggregatorsRequest(String credentialId) throws IOException { // so that endpoints are not updated one by one. final Entry clusterEntry = fooGroup.get(entry.revision().forward(1), - Query.ofJson("/k8s/endpoints/" + aggregatorId + ".json")) + Query.ofYaml("/k8s/endpoints/" + aggregatorId + ".yaml")) .join(); final ClusterLoadAssignment.Builder clusterLoadAssignmentBuilder = ClusterLoadAssignment.newBuilder(); - JSON_MESSAGE_MARSHALLER.mergeValue(clusterEntry.contentAsText(), clusterLoadAssignmentBuilder); + JSON_MESSAGE_MARSHALLER.mergeValue(Jackson.writeValueAsString(clusterEntry.content()), + clusterLoadAssignmentBuilder); assertThat(clusterLoadAssignmentBuilder.build()).isEqualTo(loadAssignment); dispatcher.queue().forEach(req -> {