Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement proxy mode #860

Merged
merged 25 commits into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

* Update favicon to be the Elastic Package Registry logo. [#858](https://github.com/elastic/package-registry/pull/858)
* Implement proxy mode. [#860](https://github.com/elastic/package-registry/pull/860)

### Deprecated

Expand Down
20 changes: 17 additions & 3 deletions artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.uber.org/zap"

"github.com/elastic/package-registry/packages"
"github.com/elastic/package-registry/proxymode"
"github.com/elastic/package-registry/util"
)

Expand All @@ -22,6 +23,10 @@ const artifactsRouterPath = "/epr/{packageName}/{packageName:[a-z0-9_]+}-{packag
var errArtifactNotFound = errors.New("artifact not found")

func artifactsHandler(indexer Indexer, cacheTime time.Duration) func(w http.ResponseWriter, r *http.Request) {
return artifactsHandlerWithProxyMode(indexer, proxymode.NoProxy(), cacheTime)
}

func artifactsHandlerWithProxyMode(indexer Indexer, proxyMode *proxymode.ProxyMode, cacheTime time.Duration) func(w http.ResponseWriter, r *http.Request) {
logger := util.Logger()
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
Expand All @@ -44,7 +49,7 @@ func artifactsHandler(indexer Indexer, cacheTime time.Duration) func(w http.Resp
}

opts := packages.NameVersionFilter(packageName, packageVersion)
packageList, err := indexer.Get(r.Context(), &opts)
pkgs, err := indexer.Get(r.Context(), &opts)
if err != nil {
logger.Error("getting package path failed",
zap.String("package.name", packageName),
Expand All @@ -53,12 +58,21 @@ func artifactsHandler(indexer Indexer, cacheTime time.Duration) func(w http.Resp
http.Error(w, "internal server error", http.StatusInternalServerError)
return
}
if len(packageList) == 0 {
if len(pkgs) == 0 && proxyMode.Enabled() {
proxiedPackage, err := proxyMode.Package(r)
if err != nil {
logger.Error("proxy mode: package failed", zap.Error(err))
http.Error(w, "internal server error", http.StatusInternalServerError)
return
}
pkgs = pkgs.Join(packages.Packages{proxiedPackage})
}
if len(pkgs) == 0 {
notFoundError(w, errArtifactNotFound)
return
}

cacheHeaders(w, cacheTime)
packages.ServePackage(w, r, packageList[0])
packages.ServePackage(w, r, pkgs[0])
}
}
40 changes: 26 additions & 14 deletions categories.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,24 @@ import (
"strconv"
"time"

"go.uber.org/zap"

"github.com/Masterminds/semver/v3"
"go.elastic.co/apm"

"github.com/elastic/package-registry/packages"
"github.com/elastic/package-registry/proxymode"
"github.com/elastic/package-registry/util"
)

type Category struct {
Id string `yaml:"id" json:"id"`
Title string `yaml:"title" json:"title"`
Count int `yaml:"count" json:"count"`
// categoriesHandler is a dynamic handler as it will also allow filtering in the future.
func categoriesHandler(indexer Indexer, cacheTime time.Duration) func(w http.ResponseWriter, r *http.Request) {
return categoriesHandlerWithProxyMode(indexer, proxymode.NoProxy(), cacheTime)
}

// categoriesHandler is a dynamic handler as it will also allow filtering in the future.
func categoriesHandler(indexer Indexer, cacheTime time.Duration) func(w http.ResponseWriter, r *http.Request) {
func categoriesHandlerWithProxyMode(indexer Indexer, proxyMode *proxymode.ProxyMode, cacheTime time.Duration) func(w http.ResponseWriter, r *http.Request) {
logger := util.Logger()
return func(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()

Expand All @@ -49,13 +52,22 @@ func categoriesHandler(indexer Indexer, cacheTime time.Duration) func(w http.Res
opts := packages.GetOptions{
Filter: filter,
}
packages, err := indexer.Get(r.Context(), &opts)
pkgs, err := indexer.Get(r.Context(), &opts)
if err != nil {
notFoundError(w, err)
return
}
categories := getCategories(r.Context(), pkgs, includePolicyTemplates)

categories := getCategories(r.Context(), packages, includePolicyTemplates)
if proxyMode.Enabled() {
proxiedCategories, err := proxyMode.Categories(r)
if err != nil {
logger.Error("proxy mode: categories failed", zap.Error(err))
http.Error(w, "internal server error", http.StatusInternalServerError)
return
}
categories = proxiedCategories // FIXME merge categories instead of replacing
}

data, err := getCategoriesOutput(r.Context(), categories)
if err != nil {
Expand Down Expand Up @@ -108,16 +120,16 @@ func newCategoriesFilterFromQuery(query url.Values) (*packages.Filter, error) {
return &filter, nil
}

func getCategories(ctx context.Context, packages packages.Packages, includePolicyTemplates bool) map[string]*Category {
func getCategories(ctx context.Context, pkgs packages.Packages, includePolicyTemplates bool) map[string]*packages.Category {
span, ctx := apm.StartSpan(ctx, "FilterCategories", "app")
defer span.End()

categories := map[string]*Category{}
categories := map[string]*packages.Category{}

for _, p := range packages {
for _, p := range pkgs {
for _, c := range p.Categories {
if _, ok := categories[c]; !ok {
categories[c] = &Category{
categories[c] = &packages.Category{
Id: c,
Title: c,
Count: 0,
Expand Down Expand Up @@ -146,7 +158,7 @@ func getCategories(ctx context.Context, packages packages.Packages, includePolic
// Add policy template level categories.
for _, c := range t.Categories {
if _, ok := categories[c]; !ok {
categories[c] = &Category{
categories[c] = &packages.Category{
Id: c,
Title: c,
Count: 0,
Expand All @@ -167,7 +179,7 @@ func getCategories(ctx context.Context, packages packages.Packages, includePolic
return categories
}

func getCategoriesOutput(ctx context.Context, categories map[string]*Category) ([]byte, error) {
func getCategoriesOutput(ctx context.Context, categories map[string]*packages.Category) ([]byte, error) {
span, ctx := apm.StartSpan(ctx, "GetCategoriesOutput", "app")
defer span.End()

Expand All @@ -177,7 +189,7 @@ func getCategoriesOutput(ctx context.Context, categories map[string]*Category) (
}
sort.Strings(keys)

var outputCategories []*Category
var outputCategories []*packages.Category
for _, k := range keys {
c := categories[k]
if title, ok := packages.CategoryTitles[c.Title]; ok {
Expand Down
51 changes: 34 additions & 17 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/elastic/package-registry/metrics"
"github.com/elastic/package-registry/packages"
"github.com/elastic/package-registry/proxymode"
"github.com/elastic/package-registry/storage"
"github.com/elastic/package-registry/util"
)
Expand Down Expand Up @@ -57,6 +58,9 @@ var (
storageEndpoint string
storageIndexerWatchInterval time.Duration

featureProxyMode bool
proxyTo string

defaultConfig = Config{
CacheTimeIndex: 10 * time.Second,
CacheTimeSearch: 10 * time.Minute,
Expand All @@ -82,6 +86,9 @@ func init() {
flag.StringVar(&storageEndpoint, "storage-endpoint", "https://package-storage.elastic.co/", "Package Storage public endpoint.")
flag.DurationVar(&storageIndexerWatchInterval, "storage-indexer-watch-interval", 1*time.Minute, "Address of the package-registry service.")

// The following proxy-indexer related flags are technical preview and might be removed in the future or renamed
flag.BoolVar(&featureProxyMode, "feature-proxy-mode", false, "Enable proxy mode to include packages from other endpoint (technical preview).")
flag.StringVar(&proxyTo, "proxy-to", "https://epr-v2.ea-web.elastic.dev/", "Proxy-to endpoint")
}

type Config struct {
Expand Down Expand Up @@ -180,26 +187,26 @@ func initMetricsServer(logger *zap.Logger) {
func initIndexer(ctx context.Context, logger *zap.Logger, config *Config) Indexer {
packagesBasePaths := getPackagesBasePaths(config)

var indexer Indexer
var combined CombinedIndexer

if featureStorageIndexer {
storageClient, err := gstorage.NewClient(ctx)
if err != nil {
logger.Fatal("can't initialize storage client", zap.Error(err))
}
indexer = storage.NewIndexer(storageClient, storage.IndexerOptions{
combined = append(combined, storage.NewIndexer(storageClient, storage.IndexerOptions{
PackageStorageBucketInternal: storageIndexerBucketInternal,
PackageStorageEndpoint: storageEndpoint,
WatchInterval: storageIndexerWatchInterval,
})
} else {
indexer = NewCombinedIndexer(
packages.NewZipFileSystemIndexer(packagesBasePaths...),
packages.NewFileSystemIndexer(packagesBasePaths...),
)
}))
}
ensurePackagesAvailable(ctx, logger, indexer)

return indexer
combined = append(combined,
packages.NewZipFileSystemIndexer(packagesBasePaths...),
packages.NewFileSystemIndexer(packagesBasePaths...),
)
ensurePackagesAvailable(ctx, logger, combined)
return combined
}

func initServer(logger *zap.Logger, config *Config) *http.Server {
Expand Down Expand Up @@ -310,8 +317,16 @@ func mustLoadRouter(logger *zap.Logger, config *Config, indexer Indexer) *mux.Ro
}

func getRouter(logger *zap.Logger, config *Config, indexer Indexer) (*mux.Router, error) {
artifactsHandler := artifactsHandler(indexer, config.CacheTimeCatchAll)
signaturesHandler := signaturesHandler(indexer, config.CacheTimeCatchAll)
proxyMode, err := proxymode.NewProxyMode(proxymode.ProxyOptions{
Enabled: featureProxyMode,
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
ProxyTo: proxyTo,
})
if err != nil {
return nil, errors.Wrapf(err, "can't create proxy mode")
}

artifactsHandler := artifactsHandlerWithProxyMode(indexer, proxyMode, config.CacheTimeCatchAll)
signaturesHandler := signaturesHandlerWithProxyMode(indexer, proxyMode, config.CacheTimeCatchAll)
faviconHandleFunc, err := faviconHandler(config.CacheTimeCatchAll)
if err != nil {
return nil, err
Expand All @@ -321,14 +336,16 @@ func getRouter(logger *zap.Logger, config *Config, indexer Indexer) (*mux.Router
return nil, err
}

packageIndexHandler := packageIndexHandler(indexer, config.CacheTimeCatchAll)
staticHandler := staticHandler(indexer, config.CacheTimeCatchAll)
categoriesHandler := categoriesHandlerWithProxyMode(indexer, proxyMode, config.CacheTimeCategories)
packageIndexHandler := packageIndexHandlerWithProxyMode(indexer, proxyMode, config.CacheTimeCatchAll)
searchHandler := searchHandlerWithProxyMode(indexer, proxyMode, config.CacheTimeSearch)
staticHandler := staticHandlerWithProxyMode(indexer, proxyMode, config.CacheTimeCatchAll)

router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/", indexHandlerFunc)
router.HandleFunc("/index.json", indexHandlerFunc)
router.HandleFunc("/search", searchHandler(indexer, config.CacheTimeSearch))
router.HandleFunc("/categories", categoriesHandler(indexer, config.CacheTimeCategories))
router.HandleFunc("/search", searchHandler)
router.HandleFunc("/categories", categoriesHandler)
router.HandleFunc("/health", healthHandler)
router.HandleFunc("/favicon.ico", faviconHandleFunc)
router.HandleFunc(artifactsRouterPath, artifactsHandler)
Expand All @@ -339,7 +356,7 @@ func getRouter(logger *zap.Logger, config *Config, indexer Indexer) (*mux.Router
if metricsAddress != "" {
router.Use(metrics.MetricsMiddleware())
}
router.NotFoundHandler = http.Handler(notFoundHandler(fmt.Errorf("404 page not found")))
router.NotFoundHandler = notFoundHandler(fmt.Errorf("404 page not found"))
return router, nil
}

Expand Down
22 changes: 18 additions & 4 deletions package_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/gorilla/mux"

"github.com/elastic/package-registry/packages"
"github.com/elastic/package-registry/proxymode"
"github.com/elastic/package-registry/util"
)

Expand All @@ -25,6 +26,10 @@ const (
var errPackageRevisionNotFound = errors.New("package revision not found")

func packageIndexHandler(indexer Indexer, cacheTime time.Duration) func(w http.ResponseWriter, r *http.Request) {
return packageIndexHandlerWithProxyMode(indexer, proxymode.NoProxy(), cacheTime)
}

func packageIndexHandlerWithProxyMode(indexer Indexer, proxyMode *proxymode.ProxyMode, cacheTime time.Duration) func(w http.ResponseWriter, r *http.Request) {
logger := util.Logger()
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
Expand All @@ -47,24 +52,33 @@ func packageIndexHandler(indexer Indexer, cacheTime time.Duration) func(w http.R
}

opts := packages.NameVersionFilter(packageName, packageVersion)
packages, err := indexer.Get(r.Context(), &opts)
pkgs, err := indexer.Get(r.Context(), &opts)
if err != nil {
logger.Error("getting package path failed", zap.Error(err))
http.Error(w, "internal server error", http.StatusInternalServerError)
return
}
if len(packages) == 0 {
if len(pkgs) == 0 && proxyMode.Enabled() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like so much that we need to add logic to all handlers for the proxy mode.

I guess that another option would be to implement it as an indexer, but as indexers are now we would be making unneeded calls to the remote registry if there is already a local package that matches. Is this the reason to don't use indexers?

Maybe a way to overcome this could be to add to the indexer interface a GetFirst() (*packages.Package, error) method, that could be used by indexers to optimize the [several] cases where we only want the first package. Most indexers would implement GetFirst() as returning the first package in the result of Get(), but the CombinedIndexer could implement it as breaking the loop once it finds a package in one indexer.
Or a similar approach would be to add a new MaxResults option to GetOptions, to return only the first n packages found. But an option can be ignored/forgotten, while a method in the interface needs to be implemented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that another option would be to implement it as an indexer, but as indexers are now we would be making unneeded calls to the remote registry if there is already a local package that matches. Is this the reason to don't use indexers?

I analyzed this case. If we want to implement an indexer then we will have to call the search endpoint (with all=true) on every API call. Responses would be heavy and heavier in the future. I wouldn't like to implement an artificial cache that syncs periodically with the remote Package Registry, but I'm happy to discuss other options.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I analyzed this case. If we want to implement an indexer then we will have to call the search endpoint (with all=true) on every API call.

Would this still be true with the GetFirst() approach? The logic would be similar to what is done here in each handler, but it would be centralized in the CombinedIndexer: it would return the first package found without needing to look for all indexers, so it wouldn't call the proxy for packages available locally. And when it calls the proxy, it would be with the parameters of the request, not with all=true.

Another advantage of using an indexer is that we can add it only when the feature is enabled, without needing to check on every call if the proxy is enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't get me wrong, I'd love to use a dedicated indexer here, but I'm not sure if it's possible to apply the concept here.

The problem with an indexer is that it isn't aware of the calling context and it can't return correct/full package metadata. Consider following flows:

Search packages:

  1. Call indexer.Get(options):
    1.1 Mashal the filter to query.
    1.2 Make a call /search?all=true (heavy HTTP response)
    1.3 Apply the filter.
    1.4 Fill private (fsBuilder, resolver, semver) or skipped fields (BasePath)

Problem: Unfortunately, returned packages don't contain all properties of policy templates (Package vs BasePackage), so the result returned by indexer is always incorrect.

Package resources/signature:

  1. Call indexer.Get(options):
    1.1 Mashal the filter to query - based on the filter we should figure out to which endpoint we should proxy request?
    1.2 Make a call, for example: /package/apm/8.4.0/
    1.3 Apply the filter.
    1.4 Fill private (fsBuilder, resolver, semver) or skipped fields (BasePath)

Categories:

  1. Call indexer.Get(options):
    1.1 Mashal the filter to query.
    1.2 Make a call /search?all=true (heavy HTTP response)
    ...

Problem: Now, we need to pull all packages to map them into categories. We need to visit also categories in policy templates, BUT the /search endpoint don't expose them (all categories are group together in categories: [ ... ]. For the sake of /categories, we could use those buggy packages, but they would work accidentally (we don't analyze policy templates).

I might have missed something while I tried to implement this approach. Feel free to share your thoughts!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to use a dedicated indexer here, but I'm not sure if it's possible to apply the concept here.

Yeah, it is a bit beyond of their concept to use indexers for this. More than specifically using them, I was wondering about a way to separate more the proxy mode without needing to modify all the handlers, so when disabled no additional code is executed.

To search packages I don't think that we would need to search all and filter, we could translate the options.Filter into an equivalent search query, and wouldn't be needed to filter afterwards. From the result the private fields would be filled as they are now in this PR. https://github.com/elastic/package-registry/pull/860/files#diff-d3d83ca5e1802bcf5d721013c2c366abb0d682c3e100e4307630c34f85cf6f89R93-R100

Though for categories it is true that we would need to search for all packages, and even this could be inaccurate as you mention.

Two more ideas to separate the proxy mode:

  • Use a middleware in the router. This middleware would need to be able to handle every API endpoint, capture the response from the actual handlers in the response writer and interpret it, decide if needed to make a request to the proxified registry, and merge the results. May be cumbersome and not sure if always possible, but would allow to inject the proxy only when enabled.
  • Follow an approach similar to the storage indexer, each registry has a new API to expose an "index" of the packages it serves. Proxy registries can periodically query this API and update their local index.

But we can continue with the current approach, every option seems to have its cons. Thanks for the discussion!

proxiedPackage, err := proxyMode.Package(r)
if err != nil {
logger.Error("proxy mode: package failed", zap.Error(err))
http.Error(w, "internal server error", http.StatusInternalServerError)
return
}
pkgs = pkgs.Join(packages.Packages{proxiedPackage})
}
if len(pkgs) == 0 {
notFoundError(w, errPackageRevisionNotFound)
return
}

w.Header().Set("Content-Type", "application/json")
cacheHeaders(w, cacheTime)

err = util.WriteJSONPretty(w, packages[0])
err = util.WriteJSONPretty(w, pkgs[0])
if err != nil {
logger.Error("marshaling package index failed",
zap.String("package.path", packages[0].BasePath),
zap.String("package.path", pkgs[0].BasePath),
zap.Error(err))
http.Error(w, "internal server error", http.StatusInternalServerError)
return
Expand Down
11 changes: 11 additions & 0 deletions packages/category.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package packages

type Category struct {
Id string `yaml:"id" json:"id"`
Title string `yaml:"title" json:"title"`
Count int `yaml:"count" json:"count"`
}
Loading