diff --git a/components/infra-proxy-service/server/orgs.go b/components/infra-proxy-service/server/orgs.go index cf42c41dcd9..2071cca608f 100644 --- a/components/infra-proxy-service/server/orgs.go +++ b/components/infra-proxy-service/server/orgs.go @@ -2,6 +2,7 @@ package server import ( "context" + "encoding/json" "errors" "github.com/chef/automate/api/external/common/query" @@ -229,9 +230,20 @@ func (s *Server) GetInfraServerOrgs(ctx context.Context, req *request.GetInfraSe setMigrationStatus(true) defer setMigrationStatus(false) - //Store the status in migration table as in progress - migration, err := s.service.Migration.StartMigration(ctx, uuid.Must(uuid.NewV4()).String(), req.ServerId) - migration, err = s.service.Migration.StartOrgMigration(ctx, uuid.Must(uuid.NewV4()).String(), req.ServerId) + // This is only for reference to use the migration_stage storage functions + parsedData, err := json.Marshal(&storage.Org{ID: "1234", Name: "demo_org"}) + if err != nil { + return nil, err + } + _, err = s.service.Migration.StoreMigrationStage(ctx, uuid.Must(uuid.NewV4()).String(), parsedData) + if err != nil { + return nil, err + + } + _, _ = s.service.Migration.GetMigrationStage(ctx, "bfc4ebe1-1256-4cf1-aab8-557edcc48658") + _, _ = s.service.Migration.DeleteMigrationStage(ctx, "bfc4ebe1-1256-4cf1-aab8-557edcc48658") + _, _ = s.service.Migration.GetMigrationStage(ctx, "bfc4ebe1-1256-4cf1-aab8-557edcc48658") + migration, err := s.service.Migration.StartOrgMigration(ctx, uuid.Must(uuid.NewV4()).String(), req.ServerId) if err != nil { return nil, err } diff --git a/components/infra-proxy-service/storage/postgres/migration/sql/07_migration.down.sql b/components/infra-proxy-service/storage/postgres/migration/sql/07_migration.down.sql index 2c4d81fba2d..84090acd77c 100644 --- a/components/infra-proxy-service/storage/postgres/migration/sql/07_migration.down.sql +++ b/components/infra-proxy-service/storage/postgres/migration/sql/07_migration.down.sql @@ -8,7 +8,22 @@ DROP TABLE IF EXISTS migration_type; DROP TABLE IF EXISTS migration_status; -- drop function insert_migration -DROP FUNCTION IF EXISTS insert_migration; +DROP FUNCTION IF EXISTS insert_migration(TEXT,TEXT,INT,INT,INT,INT,INT,TEXT); -- drop type migration_records DROP TYPE IF EXISTS migration_records; + +-- drop table migration_stage +DROP TABLE IF EXISTS migration_stage; + +-- drop function insert_migration_stage +DROP FUNCTION IF EXISTS insert_migration_stage(TEXT,json); + +-- drop function get_migration_stage +DROP FUNCTION IF EXISTS get_migration_stage(TEXT); + +-- drop function delete_migration_stage +DROP FUNCTION IF EXISTS delete_migration_stage(TEXT); + +-- drop type migration_stage_records +DROP TYPE IF EXISTS migration_stage_records; diff --git a/components/infra-proxy-service/storage/postgres/migration/sql/07_migration.up.sql b/components/infra-proxy-service/storage/postgres/migration/sql/07_migration.up.sql index 814796af0af..239547bc057 100644 --- a/components/infra-proxy-service/storage/postgres/migration/sql/07_migration.up.sql +++ b/components/infra-proxy-service/storage/postgres/migration/sql/07_migration.up.sql @@ -98,3 +98,77 @@ BEGIN END; $$ LANGUAGE plpgsql; + +-- Create table migration_stage +CREATE TABLE IF NOT EXISTS migration_stage ( + id SERIAL PRIMARY KEY, + migration_id TEXT NOT NULL, + parsed_data json NOT NULL DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +-- Created type migration_stage_records +CREATE TYPE migration_stage_records AS ( + id TEXT, + migration_id TEXT, + parsed_data json, + created_at TIMESTAMPTZ, + updated_at TIMESTAMPTZ +); + +-- Insert entry into migration_stage table +CREATE OR REPLACE FUNCTION insert_migration_stage(migrationId TEXT, parsedData json) +RETURNS json AS +$$ +DECLARE result_record migration_stage_records; +BEGIN + INSERT INTO migration_stage ( + migration_id, parsed_data, + created_at, updated_at) + VALUES (migrationId, parsedData, now(), now()) + RETURNING id, migration_id, parsed_data ,created_at, updated_at + INTO + result_record.id,result_record.migration_id,result_record.parsed_data, + result_record.created_at,result_record.updated_at; + + RETURN row_to_json(result_record); + +END; +$$ +LANGUAGE plpgsql; + +-- Get entry from migration_stage table +CREATE OR REPLACE FUNCTION get_migration_stage(migrationId TEXT) +RETURNS json AS +$$ +DECLARE result_record migration_stage_records; +BEGIN + SELECT * FROM migration_stage where migration_id = migrationId + INTO + result_record.id,result_record.migration_id,result_record.parsed_data, + result_record.created_at,result_record.updated_at; + + RETURN row_to_json(result_record); + +END; +$$ +LANGUAGE plpgsql; + +-- Delete entry from migration_stage table +CREATE OR REPLACE FUNCTION delete_migration_stage(migrationId TEXT) +RETURNS json AS +$$ +DECLARE result_record migration_stage_records; +BEGIN + DELETE FROM migration_stage where migration_id = migrationId + RETURNING id, migration_id, parsed_data ,created_at, updated_at + INTO + result_record.id,result_record.migration_id,result_record.parsed_data, + result_record.created_at,result_record.updated_at; + + RETURN row_to_json(result_record); + +END; +$$ +LANGUAGE plpgsql; diff --git a/components/infra-proxy-service/storage/postgres/migrations.go b/components/infra-proxy-service/storage/postgres/migrations.go index 631491b9205..fe0d42893ab 100644 --- a/components/infra-proxy-service/storage/postgres/migrations.go +++ b/components/infra-proxy-service/storage/postgres/migrations.go @@ -3,6 +3,7 @@ package postgres import ( "context" "encoding/json" + "errors" "github.com/chef/automate/components/infra-proxy-service/constants" "github.com/chef/automate/components/infra-proxy-service/storage" @@ -143,3 +144,79 @@ func (p *postgres) insertMigration(ctx context.Context, migrationId, serverId, m return m, nil } + +//StoreMigrationStage Inserts an entry to the migration_stage +// To use this function, make sure that you should pass the searialized parsed data in []byte +func (p *postgres) StoreMigrationStage(ctx context.Context, migrationId string, parsedData interface{}) (storage.MigrationStage, error) { + return p.insertMigrationStage(ctx, migrationId, parsedData) +} + +//GetMigrationStage Get entry to the migration_stage +func (p *postgres) GetMigrationStage(ctx context.Context, migrationId string) (storage.MigrationStage, error) { + return p.getMigrationStage(ctx, migrationId) +} + +//DeleteMigrationStage Delete entry from migration_stage +func (p *postgres) DeleteMigrationStage(ctx context.Context, migrationId string) (storage.MigrationStage, error) { + return p.deleteMigrationStage(ctx, migrationId) +} + +//insertMigrationStage Inserts an entry to the migration_stage +func (p *postgres) insertMigrationStage(ctx context.Context, migrationId string, parsedData interface{}) (storage.MigrationStage, error) { + + var m storage.MigrationStage + var mByte []byte + var ok bool + if mByte, ok = parsedData.([]byte); !ok { + return m, errors.New("Cannot parse the data") + } + query := "SELECT insert_migration_stage($1, $2)" + row := p.db.QueryRowContext(ctx, query, migrationId, mByte) + err := row.Scan(&mByte) + if err != nil { + return storage.MigrationStage{}, err + } + err = json.Unmarshal(mByte, &m) + if err != nil { + return storage.MigrationStage{}, err + } + return m, nil +} + +//getMigrationStage Get an entry from migration_stage +func (p *postgres) getMigrationStage(ctx context.Context, migrationId string) (storage.MigrationStage, error) { + + var m storage.MigrationStage + var mByte []byte + + query := "SELECT get_migration_stage($1)" + row := p.db.QueryRowContext(ctx, query, migrationId) + err := row.Scan(&mByte) + if err != nil { + return storage.MigrationStage{}, err + } + err = json.Unmarshal(mByte, &m) + if err != nil { + return storage.MigrationStage{}, err + } + return m, nil +} + +//deleteMigrationStage Delete an entry from migration_stage +func (p *postgres) deleteMigrationStage(ctx context.Context, migrationId string) (storage.MigrationStage, error) { + + var m storage.MigrationStage + var mByte []byte + + query := "SELECT delete_migration_stage($1)" + row := p.db.QueryRowContext(ctx, query, migrationId) + err := row.Scan(&mByte) + if err != nil { + return storage.MigrationStage{}, err + } + err = json.Unmarshal(mByte, &m) + if err != nil { + return storage.MigrationStage{}, err + } + return m, nil +} diff --git a/components/infra-proxy-service/storage/postgres/postgres.go b/components/infra-proxy-service/storage/postgres/postgres.go index 2163bb901f5..9a576048e4e 100644 --- a/components/infra-proxy-service/storage/postgres/postgres.go +++ b/components/infra-proxy-service/storage/postgres/postgres.go @@ -40,21 +40,6 @@ func New(logger logger.Logger, migrationConfig migration.Config, authzClient aut return &postgres{db, logger, authzClient}, &postgres{db, logger, authzClient}, nil } -// New instantiates and returns a postgres migration implementation -func NewMigration(logger logger.Logger, migrationConfig migration.Config, authzClient authz.AuthorizationServiceClient) (storage.MigrationStorage, error) { - - db, err := initPostgresDB(migrationConfig.PGURL.String()) - if err != nil { - return nil, errors.Wrap(err, "initialize database") - } - - // if err := migrationConfig.Migrate(); err != nil { - // return nil, errors.Wrap(err, "database migrations") - // } - - return &postgres{db, logger, authzClient}, nil -} - func initPostgresDB(pgURL string) (*sql.DB, error) { d, err := db.PGOpen(pgURL) if err != nil { diff --git a/components/infra-proxy-service/storage/storage.go b/components/infra-proxy-service/storage/storage.go index ac2b3b2368b..8bbd2b7d211 100644 --- a/components/infra-proxy-service/storage/storage.go +++ b/components/infra-proxy-service/storage/storage.go @@ -53,6 +53,10 @@ type MigrationStorage interface { CompletePermissionMigration(ctx context.Context, migrationId, serverId string, totalSucceeded, totalSkipped, totalFailed int64) (Migration, error) FailedPermissionMigration(ctx context.Context, migrationId, serverId, message string, totalSucceeded, totalSkipped, totalFailed int64) (Migration, error) CompleteMigration(ctx context.Context, migrationId, serverId string, totalSucceeded, totalSkipped, totalFailed int64) (Migration, error) + + StoreMigrationStage(ctx context.Context, migrationId string, parsedData interface{}) (MigrationStage, error) + GetMigrationStage(ctx context.Context, migrationId string) (MigrationStage, error) + DeleteMigrationStage(ctx context.Context, migrationId string) (MigrationStage, error) } // Resetter is, if exposed, used for tests to reset the storage backend to a @@ -112,6 +116,14 @@ type Migration struct { UpdatedTimestamp time.Time `json:"updated_timestamp"` } +type MigrationStage struct { + ID string `json:"id"` + MigrationID string `json:"migration_id"` + ParsedData interface{} `json:"parsed_data"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + // Errors returned from the backend var ( // ErrNotFound is returned when a requested server wasn't found