Skip to content

Commit

Permalink
Refactor caching of package archive content
Browse files Browse the repository at this point in the history
  • Loading branch information
skh committed Sep 24, 2020
1 parent e6a8236 commit e0baec6
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 67 deletions.
1 change: 1 addition & 0 deletions x-pack/plugins/ingest_manager/server/errors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ export class PackageNotFoundError extends IngestManagerError {}
export class PackageOutdatedError extends IngestManagerError {}
export class PackageUnsupportedMediaTypeError extends IngestManagerError {}
export class PackageInvalidArchiveError extends IngestManagerError {}
export class PackageCacheError extends IngestManagerError {}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ export async function installIndexPatterns(
// TODO: move to install package
// cache all installed packages if they don't exist
const packagePromises = installedPackages.map((pkg) =>
Registry.ensureCachedArchiveInfo(pkg.pkgName, pkg.pkgVersion)
// TODO: this hard-codes 'registry' as installSource, so uploaded packages are ignored
// and their fields will be removed from the generated index patterns after this runs.
Registry.ensureCachedArchiveInfo(pkg.pkgName, pkg.pkgVersion, 'registry')
);
await Promise.all(packagePromises);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import { RegistryPackage } from '../../../types';
import * as Registry from '../registry';
import { ensureCachedArchiveInfo } from '../registry';

// paths from RegistryPackage are routes to the assets on EPR
// e.g. `/package/nginx/1.2.0/dataset/access/fields/fields.yml`
Expand Down Expand Up @@ -58,7 +57,7 @@ export async function getAssetsData(
): Promise<Registry.ArchiveEntry[]> {
// TODO: Needs to be called to fill the cache but should not be required

await ensureCachedArchiveInfo(packageInfo.name, packageInfo.version);
await Registry.ensureCachedArchiveInfo(packageInfo.name, packageInfo.version, 'registry');

// Gather all asset data
const assets = getAssets(packageInfo, filter, datasetName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,14 @@ export async function getPackageInfo(options: {
pkgVersion: string;
}): Promise<PackageInfo> {
const { savedObjectsClient, pkgName, pkgVersion } = options;
const [item, savedObject, latestPackage, assets] = await Promise.all([
Registry.fetchInfo(pkgName, pkgVersion),
const [
savedObject,
latestPackage,
{ paths: assets, registryPackageInfo: item },
] = await Promise.all([
getInstallationObject({ savedObjectsClient, pkgName }),
Registry.fetchFindLatestPackage(pkgName),
Registry.getArchiveInfo(pkgName, pkgVersion),
Registry.loadRegistryPackage(pkgName, pkgVersion),
]);

// add properties that aren't (or aren't yet) on Registry response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,8 @@ export async function installPackageFromRegistry({
if (semver.lt(pkgVersion, latestPackage.version) && !force && !installOutOfDateVersionOk) {
throw new PackageOutdatedError(`${pkgkey} is out-of-date and cannot be installed or updated`);
}
const paths = await Registry.getArchiveInfo(pkgName, pkgVersion);
const registryPackageInfo = await Registry.fetchInfo(pkgName, pkgVersion);

const { paths, registryPackageInfo } = await Registry.loadRegistryPackage(pkgName, pkgVersion);

const removable = !isRequiredPackage(pkgName);
const { internal = false } = registryPackageInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { deletePipeline } from '../elasticsearch/ingest_pipeline/';
import { installIndexPatterns } from '../kibana/index_pattern/install';
import { deleteTransforms } from '../elasticsearch/transform/remove';
import { packagePolicyService, appContextService } from '../..';
import { splitPkgKey, deletePackageCache, getArchiveInfo } from '../registry';
import { splitPkgKey, deletePackageCache } from '../registry';

export async function removeInstallation(options: {
savedObjectsClient: SavedObjectsClientContract;
Expand Down Expand Up @@ -57,8 +57,7 @@ export async function removeInstallation(options: {

// remove the package archive and its contents from the cache so that a reinstall fetches
// a fresh copy from the registry
const paths = await getArchiveInfo(pkgName, pkgVersion);
deletePackageCache(pkgName, pkgVersion, paths);
deletePackageCache(pkgName, pkgVersion);

// successful delete's in SO client return {}. return something more useful
return installedAssets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,6 @@ export const cacheHas = (key: string) => cache.has(key);
export const cacheClear = () => cache.clear();
export const cacheDelete = (key: string) => cache.delete(key);

const archiveLocationCache: Map<string, string> = new Map();
export const getArchiveLocation = (name: string, version: string) =>
archiveLocationCache.get(pkgToPkgKey({ name, version }));

export const setArchiveLocation = (name: string, version: string, location: string) =>
archiveLocationCache.set(pkgToPkgKey({ name, version }), location);

export const deleteArchiveLocation = (name: string, version: string) =>
archiveLocationCache.delete(pkgToPkgKey({ name, version }));

const archiveFilelistCache: Map<string, string[]> = new Map();
export const getArchiveFilelist = (name: string, version: string) =>
archiveFilelistCache.get(pkgToPkgKey({ name, version }));
Expand Down
98 changes: 52 additions & 46 deletions x-pack/plugins/ingest_manager/server/services/epm/registry/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
AssetsGroupedByServiceByType,
CategoryId,
CategorySummaryList,
InstallSource,
KibanaAssetType,
RegistryPackage,
RegistrySearchResults,
Expand All @@ -21,17 +22,15 @@ import {
cacheGet,
cacheSet,
cacheDelete,
cacheHas,
getArchiveLocation,
setArchiveLocation,
deleteArchiveLocation,
getArchiveFilelist,
deleteArchiveFilelist,
} from './cache';
import { ArchiveEntry, untarBuffer, unzipBuffer } from './extract';
import { fetchUrl, getResponse, getResponseStream } from './requests';
import { streamToBuffer } from './streams';
import { getRegistryUrl } from './registry_url';
import { appContextService } from '../..';
import { PackageNotFoundError } from '../../../errors';
import { PackageNotFoundError, PackageCacheError } from '../../../errors';

export { ArchiveEntry } from './extract';

Expand Down Expand Up @@ -132,14 +131,14 @@ export async function fetchCategories(params?: CategoriesParams): Promise<Catego
return fetchUrl(url.toString()).then(JSON.parse);
}

export async function getArchiveInfo(
export async function unpackRegistryPackageToCache(
pkgName: string,
pkgVersion: string,
filter = (entry: ArchiveEntry): boolean => true
): Promise<string[]> {
const paths: string[] = [];
const archiveBuffer = await getOrFetchArchiveBuffer(pkgName, pkgVersion);
const bufferExtractor = getBufferExtractor(pkgName, pkgVersion);
const { archiveBuffer, archivePath } = await fetchArchiveBuffer(pkgName, pkgVersion);
const bufferExtractor = getBufferExtractor(archivePath);
await bufferExtractor(archiveBuffer, filter, (entry: ArchiveEntry) => {
const { path, buffer } = entry;
const { file } = pathParts(path);
Expand All @@ -153,6 +152,21 @@ export async function getArchiveInfo(
return paths;
}

export async function loadRegistryPackage(
pkgName: string,
pkgVersion: string
): Promise<{ paths: string[]; registryPackageInfo: RegistryPackage }> {
let paths = getArchiveFilelist(pkgName, pkgVersion);
if (!paths || paths.length === 0) {
paths = await unpackRegistryPackageToCache(pkgName, pkgVersion);
}

// TODO: cache this as well?
const registryPackageInfo = await fetchInfo(pkgName, pkgVersion);

return { paths, registryPackageInfo };
}

export function pathParts(path: string): AssetParts {
let dataset;

Expand Down Expand Up @@ -183,45 +197,39 @@ export function pathParts(path: string): AssetParts {
} as AssetParts;
}

export function getBufferExtractor(pkgName: string, pkgVersion: string) {
const archiveLocation = getArchiveLocation(pkgName, pkgVersion);
if (!archiveLocation) throw new Error(`no archive location for ${pkgName} ${pkgVersion}`);
const isZip = archiveLocation.endsWith('.zip');
export function getBufferExtractor(archivePath: string) {
const isZip = archivePath.endsWith('.zip');
const bufferExtractor = isZip ? unzipBuffer : untarBuffer;

return bufferExtractor;
}

async function getOrFetchArchiveBuffer(pkgName: string, pkgVersion: string): Promise<Buffer> {
const key = getArchiveLocation(pkgName, pkgVersion);
let buffer = key && cacheGet(key);
if (!buffer) {
buffer = await fetchArchiveBuffer(pkgName, pkgVersion);
}

if (buffer) {
return buffer;
export async function ensureCachedArchiveInfo(
name: string,
version: string,
installSource: InstallSource = 'registry'
) {
const paths = getArchiveFilelist(name, version);
if (!paths || paths.length === 0) {
if (installSource === 'registry') {
await loadRegistryPackage(name, version);
}
} else {
throw new Error(`no archive buffer for ${key}`);
throw new PackageCacheError(
`Package ${name}-${version} not cached. If it was uploaded, try uninstalling and reinstalling manually.`
);
}
}

export async function ensureCachedArchiveInfo(name: string, version: string) {
const pkgkey = getArchiveLocation(name, version);
if (!pkgkey || !cacheHas(pkgkey)) {
await getArchiveInfo(name, version);
}
}

async function fetchArchiveBuffer(pkgName: string, pkgVersion: string): Promise<Buffer> {
async function fetchArchiveBuffer(
pkgName: string,
pkgVersion: string
): Promise<{ archiveBuffer: Buffer; archivePath: string }> {
const { download: archivePath } = await fetchInfo(pkgName, pkgVersion);
const archiveUrl = `${getRegistryUrl()}${archivePath}`;
const buffer = await getResponseStream(archiveUrl).then(streamToBuffer);
const archiveBuffer = await getResponseStream(archiveUrl).then(streamToBuffer);

setArchiveLocation(pkgName, pkgVersion, archivePath);
cacheSet(archivePath, buffer);

return buffer;
return { archiveBuffer, archivePath };
}

export function getAsset(key: string) {
Expand Down Expand Up @@ -250,16 +258,14 @@ export function groupPathsByService(paths: string[]): AssetsGroupedByServiceByTy
};
}

export const deletePackageCache = (name: string, version: string, paths: string[]) => {
const archiveLocation = getArchiveLocation(name, version);
if (archiveLocation) {
// delete cached archive
cacheDelete(archiveLocation);
export const deletePackageCache = (name: string, version: string) => {
// get cached archive filelist
const paths = getArchiveFilelist(name, version);

// delete cached archive location
deleteArchiveLocation(name, version);
}
// delete cached archive contents
// this has been populated in Registry.getArchiveInfo()
paths.forEach((path) => cacheDelete(path));
// delete cached archive filelist
deleteArchiveFilelist(name, version);

// delete cached archive files
// this has been populated in unpackRegistryPackageToCache()
paths?.forEach((path) => cacheDelete(path));
};
1 change: 1 addition & 0 deletions x-pack/plugins/ingest_manager/server/types/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export {
Settings,
SettingsSOAttributes,
InstallType,
InstallSource,
// Agent Request types
PostAgentEnrollRequest,
PostAgentCheckinRequest,
Expand Down

0 comments on commit e0baec6

Please sign in to comment.