Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,15 @@ void extractsNodeIpFromLabel() throws Exception {
final Repository fooGroup = dogma.projectManager().get(XDS_CENTRAL_DOGMA_PROJECT)
.repos().get("foo");
final Entry<JsonNode> aggregatorEntry =
fooGroup.get(Revision.HEAD, Query.ofJson(
K8S_ENDPOINT_AGGREGATORS_DIRECTORY + aggregatorId + ".json")).join();
fooGroup.get(Revision.HEAD, Query.ofYaml(
K8S_ENDPOINT_AGGREGATORS_DIRECTORY + aggregatorId + ".yaml")).join();

// KubernetesEndpointsUpdater commits the resolved endpoints in the next revision.
await().until(() -> fooGroup.normalizeNow(Revision.HEAD)
.equals(aggregatorEntry.revision().forward(1)));

final Entry<JsonNode> endpointEntry = fooGroup.get(
Revision.HEAD, Query.ofJson(K8S_ENDPOINTS_DIRECTORY + aggregatorId + ".json")).join();
Revision.HEAD, Query.ofYaml(K8S_ENDPOINTS_DIRECTORY + aggregatorId + ".yaml")).join();

// The endpoints must use the label values resolved by LabelBasedNodeIpExtractor,
// not the default InternalIP addresses (1.1.1.1 / 2.2.2.2).
Expand Down Expand Up @@ -228,13 +228,13 @@ void fallsBackToInternalIpWhenLabelKeyIsAbsent() throws Exception {
final Repository fooGroup = dogma.projectManager().get(XDS_CENTRAL_DOGMA_PROJECT)
.repos().get("foo");
final Entry<JsonNode> aggregatorEntry =
fooGroup.get(Revision.HEAD, Query.ofJson(
K8S_ENDPOINT_AGGREGATORS_DIRECTORY + aggregatorId + ".json")).join();
fooGroup.get(Revision.HEAD, Query.ofYaml(
K8S_ENDPOINT_AGGREGATORS_DIRECTORY + aggregatorId + ".yaml")).join();
await().until(() -> fooGroup.normalizeNow(Revision.HEAD)
.equals(aggregatorEntry.revision().forward(1)));

final Entry<JsonNode> endpointEntry = fooGroup.get(
Revision.HEAD, Query.ofJson(K8S_ENDPOINTS_DIRECTORY + aggregatorId + ".json")).join();
Revision.HEAD, Query.ofYaml(K8S_ENDPOINTS_DIRECTORY + aggregatorId + ".yaml")).join();
assertThatJson(endpointEntry.content()).isEqualTo(
'{' +
" \"clusterName\": \"groups/foo/k8s/clusters/" + aggregatorId + "\"," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public class ContentTransformer<T> {
*/
public ContentTransformer(String path, EntryType entryType, BiFunction<Revision, T, T> transformer) {
this.path = requireNonNull(path, "path");
checkArgument(entryType == EntryType.JSON, "entryType: %s (expected: %s)", entryType, EntryType.JSON);
checkArgument(entryType == EntryType.JSON || entryType == EntryType.YAML,
"entryType: %s (expected: %s or %s)", entryType, EntryType.JSON, EntryType.YAML);
this.entryType = requireNonNull(entryType, "entryType");
this.transformer = requireNonNull(transformer, "transformer");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.linecorp.centraldogma.common.CentralDogmaException;
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.internal.Jackson;
import com.linecorp.centraldogma.internal.Yaml;
import com.linecorp.centraldogma.server.storage.StorageException;

abstract class AbstractChangesApplier {
Expand Down Expand Up @@ -98,6 +99,28 @@ public void apply(DirCacheEntry ent) {
}
}

static final class InsertYaml extends PathEdit {
private final ObjectInserter inserter;
private final JsonNode jsonNode;

InsertYaml(String entryPath, ObjectInserter inserter, JsonNode jsonNode) {
super(entryPath);
this.inserter = inserter;
this.jsonNode = jsonNode;
}

@Override
public void apply(DirCacheEntry ent) {
try {
final byte[] yamlBytes = Yaml.writeValueAsString(jsonNode).getBytes(UTF_8);
ent.setObjectId(inserter.insert(Constants.OBJ_BLOB, yamlBytes));
ent.setFileMode(FileMode.REGULAR_FILE);
} catch (IOException e) {
throw new StorageException("failed to create a new YAML blob", e);
}
}
}

static final class InsertText extends PathEdit {
private final ObjectInserter inserter;
private final String text;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@
import com.linecorp.centraldogma.common.EntryType;
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.internal.Jackson;
import com.linecorp.centraldogma.internal.Yaml;
import com.linecorp.centraldogma.server.command.ContentTransformer;

final class TransformingChangesApplier extends AbstractChangesApplier {

private final ContentTransformer<JsonNode> transformer;

TransformingChangesApplier(ContentTransformer<?> transformer) {
checkArgument(transformer.entryType() == EntryType.JSON,
"transformer: %s (expected: JSON type)", transformer);
checkArgument(transformer.entryType() == EntryType.JSON || transformer.entryType() == EntryType.YAML,
"transformer: %s (expected: JSON or YAML type)", transformer);
//noinspection unchecked
this.transformer = (ContentTransformer<JsonNode>) transformer;
}
Expand All @@ -55,13 +56,23 @@ int doApply(Revision headRevision, DirCache dirCache,
final DirCacheEntry oldEntry = dirCache.getEntry(changePath);
final byte[] oldContent = oldEntry != null ? reader.open(oldEntry.getObjectId()).getBytes()
: null;
final JsonNode oldJsonNode = oldContent != null ? Jackson.readTree(oldContent)
: JsonNodeFactory.instance.nullNode();
final JsonNode oldJsonNode;
if (oldContent == null) {
oldJsonNode = JsonNodeFactory.instance.nullNode();
} else if (transformer.entryType() == EntryType.YAML) {
oldJsonNode = Yaml.readTree(oldContent);
} else {
oldJsonNode = Jackson.readTree(oldContent);
}
try {
final JsonNode newJsonNode = transformer.transformer().apply(headRevision, oldJsonNode.deepCopy());
requireNonNull(newJsonNode, "transformer.transformer().apply() returned null");
if (!Objects.equals(newJsonNode, oldJsonNode)) {
applyPathEdit(dirCache, new InsertJson(changePath, inserter, newJsonNode));
if (transformer.entryType() == EntryType.YAML) {
applyPathEdit(dirCache, new InsertYaml(changePath, inserter, newJsonNode));
} else {
applyPathEdit(dirCache, new InsertJson(changePath, inserter, newJsonNode));
}
return 1;
}
} catch (CentralDogmaException e) {
Expand Down
2 changes: 1 addition & 1 deletion webapp/src/dogma/features/xds/K8sAggregatorStatus.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { useGetGroupHistoryQuery, useGetResourceQuery } from 'dogma/features/xds

// The aggregator writes its generated endpoints to this path; reading it tells us whether (and when) the
// aggregator has produced endpoints from Kubernetes.
const generatedPath = (id: string) => `/k8s/endpoints/${id}.json`;
const generatedPath = (id: string) => `/k8s/endpoints/${id}.yaml`;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
function countEndpoints(content: any): { localities: number; endpoints: number } {
Expand Down
8 changes: 4 additions & 4 deletions webapp/src/dogma/features/xds/XdsTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ export interface GroupDto {

export interface XdsResourceDto {
// The resource id, i.e. the file path under the type directory without the
// leading '/{type}/' prefix and the trailing '.json' suffix.
// leading '/{type}/' prefix and the trailing file extension (.yaml or .json).
id: string;
// The full repository path, e.g. '/clusters/foo.json'.
// The full repository path, e.g. '/clusters/foo.yaml'.
path: string;
revision: number;
}

// The full xDS resource name derived from its repository path, e.g. group 'foo' + '/clusters/c1.json'
// The full xDS resource name derived from its repository path, e.g. group 'foo' + '/clusters/c1.yaml'
// becomes 'groups/foo/clusters/c1'. This matches the `name` the server assigns to CDS/LDS/RDS resources.
export function resourceName(group: string, path: string): string {
return `groups/${group}${path.replace(/\.json$/, '')}`;
return `groups/${group}${path.replace(/\.(json|yaml)$/, '')}`;
}

// A starter template offered when creating a new resource of each type.
Expand Down
80 changes: 59 additions & 21 deletions webapp/src/dogma/features/xds/xdsApiSlice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import { createApi } from '@reduxjs/toolkit/query/react';
import { FetchBaseQueryError } from '@reduxjs/toolkit/query';
import { FetchBaseQueryError, FetchArgs } from '@reduxjs/toolkit/query';
import { baseQueryWithReauth } from 'dogma/features/api/baseQuery';
import { createLoginUrl } from 'dogma/util/auth';
import {
Expand Down Expand Up @@ -53,7 +53,7 @@ interface RawRepoDto {

interface RawFileDto {
path: string;
type: 'TEXT' | 'DIRECTORY' | 'JSON' | 'YML';
type: 'TEXT' | 'DIRECTORY' | 'JSON' | 'YAML';
revision: number;
}

Expand All @@ -68,26 +68,26 @@ export interface FileContentDto {

export type ResourceArg = { group: string; type: XdsResourceType };
// `k8s` marks an endpoint generated by a k8s aggregator (stored under '/k8s/endpoints/'); it is read-only.
export type ResourceIdArg = ResourceArg & { id: string; k8s?: boolean };
// `path` is the full repository path (e.g. '/clusters/foo.yaml') — when provided, it is used directly to
// avoid a .yaml→.json fallback request on servers that still have legacy .json files.
export type ResourceIdArg = ResourceArg & { id: string; k8s?: boolean; path?: string };
export type CreateResourceArg = ResourceArg & { id: string; body: string };
export type UpdateResourceArg = ResourceIdArg & { body: string };

// Derives a Kubernetes endpoint aggregator id from its repository file path, e.g.
// '/k8s/endpointAggregators/foo.json' becomes 'foo'.
// '/k8s/endpointAggregators/foo.yaml' becomes 'foo'.
function k8sAggregatorIdFromPath(path: string): string {
let id = path;
const prefix = '/k8s/endpointAggregators/';
if (id.startsWith(prefix)) {
id = id.substring(prefix.length);
}
if (id.endsWith('.json')) {
id = id.substring(0, id.length - '.json'.length);
}
id = id.replace(/\.(json|yaml)$/, '');
return id;
}

// Derives a resource id from a repository file path, e.g.
// '/clusters/foo/bar.json' with type 'clusters' becomes 'foo/bar'.
// '/clusters/foo/bar.yaml' with type 'clusters' becomes 'foo/bar'.
function idFromPath(path: string, type: XdsResourceType): string {
let id = path;
if (id.startsWith('/')) {
Expand All @@ -97,9 +97,7 @@ function idFromPath(path: string, type: XdsResourceType): string {
if (id.startsWith(prefix)) {
id = id.substring(prefix.length);
}
if (id.endsWith('.json')) {
id = id.substring(0, id.length - '.json'.length);
}
id = id.replace(/\.(json|yaml)$/, '');
return id;
}

Expand All @@ -110,6 +108,40 @@ function encodeResourcePath(id: string): string {
return id.split('/').map(encodeURIComponent).join('/');
}

// Fetches a file by its full repository path, falling back from .yaml to .json for legacy servers.
// Pass `path` (including extension) when known from a listing response to skip the fallback round-trip.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
type FetchFn = (arg: string | FetchArgs) => any;

async function fetchYamlOrJson(
group: string,
basePath: string,
fetchWithBQ: FetchFn,
knownPath?: string,
): Promise<{ data: FileContentDto } | { error: FetchBaseQueryError }> {
if (knownPath) {
const direct = await fetchWithBQ(
`/api/v1/projects/${XDS_PROJECT}/repos/${group}/contents${knownPath}?revision=head`,
);
if (!direct.error) return { data: direct.data as FileContentDto };
if ((direct.error as FetchBaseQueryError).status !== 404) {
return { error: direct.error as FetchBaseQueryError };
}
}
const yamlResult = await fetchWithBQ(
`/api/v1/projects/${XDS_PROJECT}/repos/${group}/contents${basePath}.yaml?revision=head`,
);
if (!yamlResult.error) return { data: yamlResult.data as FileContentDto };
if ((yamlResult.error as FetchBaseQueryError).status !== 404) {
return { error: yamlResult.error as FetchBaseQueryError };
}
const jsonResult = await fetchWithBQ(
`/api/v1/projects/${XDS_PROJECT}/repos/${group}/contents${basePath}.json?revision=head`,
);
if (jsonResult.error) return { error: jsonResult.error as FetchBaseQueryError };
return { data: jsonResult.data as FileContentDto };
}

export const xdsApiSlice = createApi({
reducerPath: 'xdsApi',
// The login page is in the main web app at the server root (outside the '/xds' basePath), so navigate out
Expand Down Expand Up @@ -168,7 +200,7 @@ export const xdsApiSlice = createApi({
// clean id. They are flagged read-only by their path in the UI.
id:
type === 'endpoints' && file.path.startsWith('/k8s/endpoints/')
? file.path.slice('/k8s/endpoints/'.length).replace(/\.json$/, '')
? file.path.slice('/k8s/endpoints/'.length).replace(/\.(json|yaml)$/, '')
: idFromPath(file.path, type),
path: file.path,
revision: file.revision,
Expand All @@ -181,12 +213,17 @@ export const xdsApiSlice = createApi({
// Endpoints (EDS) are not access-controlled, so they are read through a dedicated ungated API instead
// of the generic content API which requires the READ repository role. k8s-aggregator-generated endpoints
// live under '/k8s/endpoints/' and are read via their own path.
query: ({ group, type, id, k8s }) =>
type === 'endpoints'
? k8s
async queryFn({ group, type, id, k8s, path }, _queryApi, _extraOptions, fetchWithBQ) {
if (type === 'endpoints') {
const url = k8s
? `/api/v1/xds/groups/${group}/k8s/endpoints/${id}`
: `/api/v1/xds/groups/${group}/endpoints/${id}`
: `/api/v1/projects/${XDS_PROJECT}/repos/${group}/contents/${type}/${id}.json?revision=head`,
: `/api/v1/xds/groups/${group}/endpoints/${id}`;
const result = await fetchWithBQ(url);
if (result.error) return { error: result.error as FetchBaseQueryError };
return { data: result.data as FileContentDto };
}
return fetchYamlOrJson(group, `/${type}/${id}`, fetchWithBQ, path);
},
providesTags: ['Resource'],
}),
createResource: builder.mutation<unknown, CreateResourceArg>({
Expand Down Expand Up @@ -240,9 +277,10 @@ export const xdsApiSlice = createApi({
},
providesTags: ['K8sAggregator'],
}),
getK8sAggregator: builder.query<FileContentDto, { group: string; id: string }>({
query: ({ group, id }) =>
`/api/v1/projects/${XDS_PROJECT}/repos/${group}/contents/k8s/endpointAggregators/${id}.json?revision=head`,
getK8sAggregator: builder.query<FileContentDto, { group: string; id: string; path?: string }>({
async queryFn({ group, id, path }, _queryApi, _extraOptions, fetchWithBQ) {
return fetchYamlOrJson(group, `/k8s/endpointAggregators/${id}`, fetchWithBQ, path);
},
providesTags: ['K8sAggregator'],
}),
// Dry-run preview: resolves the aggregator's watchers against Kubernetes and returns the
Expand Down Expand Up @@ -394,7 +432,7 @@ export const xdsApiSlice = createApi({

const idOf = (type: XdsResourceType, path: string): string =>
type === 'endpoints' && path.startsWith('/k8s/endpoints/')
? path.slice('/k8s/endpoints/'.length).replace(/\.json$/, '')
? path.slice('/k8s/endpoints/'.length).replace(/\.(json|yaml)$/, '')
: idFromPath(path, type);

const nodes: XdsGraphNode[] = [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public void createCluster(CreateClusterRequest request, StreamObserver<Cluster>
// can be set to false via the update API.
.setRespectDnsTtl(true)
.build();
xdsResourceManager.push(responseObserver, group, clusterName, CLUSTERS_DIRECTORY + clusterId + ".json",
"Create cluster: " + clusterName, cluster, currentAuthor(), true);
xdsResourceManager.push(responseObserver, group, clusterName, CLUSTERS_DIRECTORY + clusterId + ".yaml",
"Create cluster: " + clusterName, cluster, currentAuthor());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void createEndpoint(CreateEndpointRequest request,
.setClusterName(clusterName)
.build();
xdsResourceManager.push(responseObserver, group, clusterName, fileName(endpointId),
"Create endpoint: " + clusterName, endpoint, currentAuthor(), true);
"Create endpoint: " + clusterName, endpoint, currentAuthor());
}

private static String clusterName(String parent, String endpointId) {
Expand Down Expand Up @@ -125,7 +125,7 @@ private static Matcher checkEndpointName(String endpointName) {
}

private static String fileName(String endpointId) {
return ENDPOINTS_DIRECTORY + endpointId + ".json";
return ENDPOINTS_DIRECTORY + endpointId + ".yaml";
}

@Override
Expand Down
Loading
Loading