Skip to content

Commit

Permalink
Remove snapshot restore functionality (#823)
Browse files Browse the repository at this point in the history
* Remove snapshot restore functionality (#fixes 789)
  • Loading branch information
bartv committed Dec 10, 2018
1 parent 1dbd782 commit fe90aa3
Show file tree
Hide file tree
Showing 11 changed files with 14 additions and 1,045 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
v 2018.4 (2018-12-xx)
Changes in this release:
- Various bugfixes and performance enhancements
- Dependency updates
- Removal of snapshot and restore functionality from the server (#789)

v 2018.3 (2018-12-07)
Changes in this release:
- Various bugfixes and performance enhancements
Expand Down
162 changes: 0 additions & 162 deletions src/inmanta/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
Contact: code@inmanta.com
"""

import base64
from concurrent.futures.thread import ThreadPoolExecutor
import datetime
import hashlib
import logging
import os
import random
Expand Down Expand Up @@ -598,144 +596,6 @@ def do_run_dryrun(self, version, dry_run_id):

self._cache.close_version(version)

@gen.coroutine
def do_restore(self, restore_id, snapshot_id, resources):
with (yield self.ratelimiter.acquire()):

LOGGER.info("Start a restore %s", restore_id)

yield self.process._ensure_code(self._env_id, resources[0][1]["model"],
[res[1]["resource_type"] for res in resources])

version = resources[0][1]["model"]
self._cache.open_version(version)

for restore, resource in resources:
start = datetime.datetime.now()
provider = None
try:
data = resource["attributes"]
data["id"] = resource["id"]
resource_obj = Resource.deserialize(data)
provider = yield self.get_provider(resource_obj)
if not hasattr(resource_obj, "allow_restore") or not resource_obj.allow_restore:
yield self.get_client().update_restore(tid=self._env_id,
id=restore_id,
resource_id=str(resource_obj.id),
start=start,
stop=datetime.datetime.now(),
success=False,
error=False,
msg="Resource %s does not allow restore" % resource["id"])
continue

try:
yield self.thread_pool.submit(provider.restore, resource_obj, restore["content_hash"])
yield self.get_client().update_restore(tid=self._env_id, id=restore_id,
resource_id=str(resource_obj.id),
success=True, error=False,
start=start, stop=datetime.datetime.now(), msg="")
except NotImplementedError:
yield self.get_client().update_restore(tid=self._env_id, id=restore_id,
resource_id=str(resource_obj.id),
success=False, error=False,
start=start, stop=datetime.datetime.now(),
msg="The handler for resource "
"%s does not support restores" % resource["id"])

except Exception:
LOGGER.exception("Unable to find a handler for %s", resource["id"])
yield self.get_client().update_restore(tid=self._env_id, id=restore_id,
resource_id=resource_obj.id.resource_str(),
success=False, error=False,
start=start, stop=datetime.datetime.now(),
msg="Unable to find a handler to restore a snapshot of resource %s" %
resource["id"])
finally:
if provider is not None:
provider.close()
self._cache.close_version(version)

return 200

@gen.coroutine
def do_snapshot(self, snapshot_id, resources):
with (yield self.ratelimiter.acquire()):
LOGGER.info("Start snapshot %s", snapshot_id)

yield self.process._ensure_code(self._env_id, resources[0]["model"],
[res["resource_type"] for res in resources])

version = resources[0]["model"]
self._cache.open_version(version)

for resource in resources:
start = datetime.datetime.now()
provider = None
try:
data = resource["attributes"]
data["id"] = resource["id"]
resource_obj = Resource.deserialize(data)
provider = yield self.get_provider(resource_obj)

if not hasattr(resource_obj, "allow_snapshot") or not resource_obj.allow_snapshot:
yield self.get_client().update_snapshot(tid=self._env_id, id=snapshot_id,
resource_id=resource_obj.id.resource_str(), snapshot_data="",
start=start, stop=datetime.datetime.now(), size=0,
success=False, error=False,
msg="Resource %s does not allow snapshots" % resource["id"])
continue

try:
result = yield self.thread_pool.submit(provider.snapshot, resource_obj)
if result is not None:
sha1sum = hashlib.sha1()
sha1sum.update(result)
content_id = sha1sum.hexdigest()
yield self.get_client().upload_file(id=content_id, content=base64.b64encode(result).decode("ascii"))

yield self.get_client().update_snapshot(tid=self._env_id, id=snapshot_id,
resource_id=resource_obj.id.resource_str(),
snapshot_data=content_id,
start=start, stop=datetime.datetime.now(),
size=len(result), success=True, error=False,
msg="")
else:
raise Exception("Snapshot returned no data")

except NotImplementedError:
yield self.get_client().update_snapshot(tid=self._env_id, id=snapshot_id, error=False,
resource_id=resource_obj.id.resource_str(),
snapshot_data="",
start=start, stop=datetime.datetime.now(),
size=0, success=False,
msg="The handler for resource "
"%s does not support snapshots" % resource["id"])
except Exception:
LOGGER.exception("An exception occurred while creating the snapshot of %s", resource["id"])
yield self.get_client().update_snapshot(tid=self._env_id, id=snapshot_id, snapshot_data="",
resource_id=resource_obj.id.resource_str(), error=True,
start=start,
stop=datetime.datetime.now(),
size=0, success=False,
msg="The handler for resource "
"%s does not support snapshots" % resource["id"])

except Exception:
LOGGER.exception("Unable to find a handler for %s", resource["id"])
yield self.get_client().update_snapshot(tid=self._env_id,
id=snapshot_id, snapshot_data="",
resource_id=resource_obj.id.resource_str(), error=False,
start=start, stop=datetime.datetime.now(),
size=0, success=False,
msg="Unable to find a handler for %s" % resource["id"])
finally:
if provider is not None:
provider.close()

self._cache.close_version(version)
return 200

@gen.coroutine
def get_facts(self, resource):
with (yield self.ratelimiter.acquire()):
Expand Down Expand Up @@ -977,28 +837,6 @@ def check_storage(self):

return dir_map

@protocol.handle(methods.AgentRestore.do_restore, env="tid")
@gen.coroutine
def do_restore(self, env, agent, restore_id, snapshot_id, resources):
"""
Restore a snapshot
"""
if agent not in self._instances:
return 200

return (yield self._instances[agent].do_restore(restore_id, snapshot_id, resources))

@protocol.handle(methods.AgentSnapshot.do_snapshot, env="tid")
@gen.coroutine
def do_snapshot(self, env, agent, snapshot_id, resources):
"""
Create a snapshot of stateful resources managed by this agent
"""
if agent not in self._instances:
return 200

return (yield self._instances[agent].do_snapshot(snapshot_id, resources))

@protocol.handle(methods.AgentParameterMethod.get_parameter, env="tid")
@gen.coroutine
def get_facts(self, env, agent, resource):
Expand Down
20 changes: 0 additions & 20 deletions src/inmanta/agent/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,26 +584,6 @@ def available(self, resource: resources.Resource) -> bool:
"""
return True

def snapshot(self, resource: resources.Resource) -> bytes:
"""
Create a new snapshot and upload it to the server
:param resource: The state of the resource for which a snapshot is created
:return: The data that needs to be uploaded to the server. This data is passed back to the restore method on
snapshot restore.
"""
raise NotImplementedError()

def restore(self, resource: resources.Resource, snapshot_id: str) -> None:
"""
Restore a resource from a snapshot.
:param resource: The resource for which a snapshot needs to be restored.
:param snapshot_id: The id of the "file" on the server that contains the snapshot data. This data can be retrieved
with the :func:`~inmanta.agent.handler.ResourceHandler.get_file` method
"""
raise NotImplementedError()

def get_file(self, hash_id) -> bytes:
"""
Retrieve a file from the fileserver identified with the given id. The convention is to use the sha1sum of the
Expand Down
2 changes: 0 additions & 2 deletions src/inmanta/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ class ResourceAction(Enum):
pull = 3
deploy = 4
dryrun = 5
snapshot = 6
restore = 7
other = 8


Expand Down
122 changes: 1 addition & 121 deletions src/inmanta/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1260,20 +1260,6 @@ def get(cls, environment, resource_version_id):
if value is not None:
return cls(from_mongo=True, **value)

@classmethod
@gen.coroutine
def get_with_state(cls, environment, version):
"""
Get all resources from the given version that have "state_id" defined
"""
cursor = cls._coll.find({"environment": environment, "model": version, "attributes.state_id": {"$exists": True}})

resources = []
while (yield cursor.fetch_next):
resources.append(cls(from_mongo=True, **cursor.next_object()))

return resources

@classmethod
def new(cls, environment, resource_version_id, **kwargs):
vid = Id.parse_id(resource_version_id)
Expand Down Expand Up @@ -1477,9 +1463,6 @@ def delete_cascade(self):
resources = yield Resource.get_list(environment=self.environment, model=self.version)
for res in resources:
yield res.delete_cascade()
snaps = yield Snapshot.get_list(environment=self.environment, model=self.version)
for snap in snaps:
yield snap.delete_cascade()
yield UnknownParameter.delete_all(environment=self.environment, version=self.version)
yield Code.delete_all(environment=self.environment, version=self.version)
yield DryRun.delete_all(environment=self.environment, model=self.version)
Expand Down Expand Up @@ -1624,111 +1607,8 @@ def to_dict(self):
return dict_result


class ResourceSnapshot(BaseDocument):
"""
Snapshot of a resource
:param error Indicates if an error made the snapshot fail
"""
environment = Field(field_type=uuid.UUID, required=True)
snapshot = Field(field_type=uuid.UUID, required=True)
resource_id = Field(field_type=str, required=True)
state_id = Field(field_type=str, required=True)
started = Field(field_type=datetime.datetime, default=None)
finished = Field(field_type=datetime.datetime, default=None)
content_hash = Field(field_type=str)
success = Field(field_type=bool)
error = Field(field_type=bool)
msg = Field(field_type=str)
size = Field(field_type=int)


class ResourceRestore(BaseDocument):
"""
A restore of a resource from a snapshot
"""
environment = Field(field_type=uuid.UUID, required=True)
restore = Field(field_type=uuid.UUID, required=True)
state_id = Field(field_type=str)
resource_id = Field(field_type=str)
started = Field(field_type=datetime.datetime, default=None)
finished = Field(field_type=datetime.datetime, default=None)
success = Field(field_type=bool)
error = Field(field_type=bool)
msg = Field(field_type=str)


class SnapshotRestore(BaseDocument):
"""
Information about a snapshot restore
"""
environment = Field(field_type=uuid.UUID, required=True)
snapshot = Field(field_type=uuid.UUID, required=True)
started = Field(field_type=datetime.datetime, default=None)
finished = Field(field_type=datetime.datetime, default=None)
resources_todo = Field(field_type=int, default=0)

@gen.coroutine
def delete_cascade(self):
yield ResourceRestore.delete_all(restore=self.id)
yield self.delete()

@gen.coroutine
def resource_updated(self):
yield SnapshotRestore._coll.update_one({"_id": self.id}, {"$inc": {"resources_todo": int(-1)}})
self.resources_todo -= 1

now = datetime.datetime.now()
result = yield SnapshotRestore._coll.update_one({"_id": self.id, "resources_todo": 0}, {"$set": {"finished": now}})
if result.matched_count == 1 and (result.modified_count == 1 or result.modified_count is None):
# modified_count is None for mongodb < 2.6
self.finished = now


class Snapshot(BaseDocument):
"""
A snapshot of an environment
:param id The id of the snapshot
:param environment A reference to the environment
:param started When was this snapshot started
:param finished When was this snapshot finished
:param total_size The total size of this snapshot
"""
environment = Field(field_type=uuid.UUID, required=True)
model = Field(field_type=int, required=True)
name = Field(field_type=str)
started = Field(field_type=datetime.datetime, default=None)
finished = Field(field_type=datetime.datetime, default=None)
total_size = Field(field_type=int, default=0)
resources_todo = Field(field_type=int, default=0)

@gen.coroutine
def delete_cascade(self):
yield ResourceSnapshot.delete_all(snapshot=self.id)
restores = yield SnapshotRestore.get_list(snapshot=self.id)
for restore in restores:
yield restore.delete_cascade()

yield self.delete()

@gen.coroutine
def resource_updated(self, size):
yield Snapshot._coll.update_one({"_id": self.id},
{"$inc": {"resources_todo": int(-1), "total_size": size}})
self.total_size += size
self.resources_todo -= 1

now = datetime.datetime.now()
result = yield Snapshot._coll.update_one({"_id": self.id, "resources_todo": 0}, {"$set": {"finished": now}})
if result.matched_count == 1 and (result.modified_count == 1 or result.modified_count is None):
# modified_count is None for mongodb < 2.6
self.finished = now


_classes = [Project, Environment, Parameter, UnknownParameter, AgentProcess, AgentInstance, Agent, Report, Compile, Form,
FormRecord, Resource, ResourceAction, ConfigurationModel, Code, DryRun, ResourceSnapshot, ResourceRestore,
SnapshotRestore, Snapshot]
FormRecord, Resource, ResourceAction, ConfigurationModel, Code, DryRun]


def use_motor(motor):
Expand Down
Loading

0 comments on commit fe90aa3

Please sign in to comment.