Skip to content

Commit

Permalink
Merge pull request #657 from ClusterHQ/incremental-push-implementatio…
Browse files Browse the repository at this point in the history
…n-46

Add support for generating incremental streams when pushing volumes.
  • Loading branch information
exarkun committed Sep 5, 2014
2 parents 9f6577b + 66829b0 commit 07fa05d
Show file tree
Hide file tree
Showing 6 changed files with 399 additions and 31 deletions.
9 changes: 9 additions & 0 deletions flocker/volume/filesystems/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ def get_path():
:return: The path as a ``FilePath``.
"""

def snapshots():
"""
Retrieve the information about the snapshots of this filesystem.
:return: A ``Deferred`` that fires with a ``list`` of ``Snapshot``
instances, ordered from oldest to newest, describing the snapshots
which exist of this filesystem.
"""

def reader():
"""Context manager that allows reading the contents of the filesystem.
Expand Down
7 changes: 7 additions & 0 deletions flocker/volume/filesystems/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ class DirectoryFilesystem(object):
def get_path(self):
return self.path

def snapshots(self):
"""
There is no support for snapshotting ``DirectoryFilesystem``. So there
are never any snapshots.
"""
return succeed([])

@contextmanager
def reader(self):
"""Package up filesystem contents as a tarball."""
Expand Down
185 changes: 163 additions & 22 deletions flocker/volume/filesystems/zfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import os
from contextlib import contextmanager
from uuid import uuid4
from subprocess import STDOUT, PIPE, Popen, check_call
from subprocess import STDOUT, PIPE, Popen, check_call, check_output

from characteristic import with_cmp, with_repr
from characteristic import attributes, with_cmp, with_repr

from zope.interface import implementer

Expand Down Expand Up @@ -130,6 +130,40 @@ def _sync_command_error_squashed(arguments, logger):
message.write(logger)


@attributes(["name"])
class Snapshot(object):
"""
A snapshot of a ZFS filesystem.
:ivar unicode name: The name of the snapshot.
"""
# TODO: The name should probably be a SnapshotName instead of unicode.
# However, SnapshotName enforces a convention that we might not want to
# use. Fix the convention before trying to adopt it here.
# https://github.com/ClusterHQ/flocker/issues/668


def _latest_common_snapshot(some, others):
"""
Pick the most recent snapshot that is common to two snapshot lists.
:param list some: One ``list`` of ``Snapshot`` instances to consider,
ordered from oldest to newest.
:param list others: Another ``list`` of ``Snapshot`` instances to consider,
ordered from oldest to newest.
:return: The ``Snapshot`` instance which occurs closest to the end of both
``some`` and ``others`` If no ``Snapshot`` appears in both, ``None`` is
returned.
"""
others_set = set(others)
for snapshot in reversed(some):
if snapshot in others_set:
return snapshot
return None


@implementer(IFilesystem)
@with_cmp(["pool", "dataset"])
@with_repr(["pool", "dataset"])
Expand All @@ -140,7 +174,7 @@ class Filesystem(object):
filesystem. This will likely grow into a more sophisticiated
implementation over time.
"""
def __init__(self, pool, dataset, mountpoint=None):
def __init__(self, pool, dataset, mountpoint=None, reactor=None):
"""
:param pool: The filesystem's pool name, e.g. ``b"hpool"``.
Expand All @@ -153,6 +187,12 @@ def __init__(self, pool, dataset, mountpoint=None):
self.pool = pool
self.dataset = dataset
self._mountpoint = mountpoint
if reactor is None:
from twisted.internet import reactor
self._reactor = reactor

def snapshots(self):
return _list_snapshots(self._reactor, self)

@property
def name(self):
Expand All @@ -165,8 +205,15 @@ def get_path(self):
return self._mountpoint

@contextmanager
def reader(self):
"""Send zfs stream of contents."""
def reader(self, remote_snapshots=None):
"""
Send zfs stream of contents.
:param list remote_snapshots: ``Snapshot`` instances, ordered from
oldest to newest, which are available on the writer. The reader
may generate a partial stream which relies on one of these
snapshots in order to minimize the data to be transferred.
"""
# The existing snapshot code uses Twisted, so we're not using it
# in this iteration. What's worse, though, is that it's not clear
# if the current snapshot naming scheme makes any sense, and
Expand All @@ -175,7 +222,31 @@ def reader(self):
# clearer as we iterate.
snapshot = b"%s@%s" % (self.name, uuid4())
check_call([b"zfs", b"snapshot", snapshot])
process = Popen([b"zfs", b"send", snapshot], stdout=PIPE)

# Determine whether there is a shared snapshot which can be used as the
# basis for an incremental send.
local_snapshots = _parse_snapshots(
check_output([b"zfs"] + _list_snapshots_command(self)),
self
)

if remote_snapshots is None:
remote_snapshots = []

latest_common_snapshot = _latest_common_snapshot(
remote_snapshots, local_snapshots)

if latest_common_snapshot is None:
identifier = [snapshot]
else:
identifier = [
b"-i",
u"{}@{}".format(
self.name, latest_common_snapshot.name).encode("ascii"),
snapshot,
]

process = Popen([b"zfs", b"send"] + identifier, stdout=PIPE)
try:
yield process.stdout
finally:
Expand Down Expand Up @@ -223,30 +294,100 @@ def create(self, name):
return d

def list(self):
"""List ZFS snapshots known to the volume manager.
"""
List ZFS snapshots known to the volume manager.
Snapshots whose names cannot be decoded are presumed not to be
related to Flocker, and therefore will not be included in the
result.
"""
d = zfs_command(self._reactor,
[b"list", b"-H", b"-r", b"-t", b"snapshot", b"-o",
b"name", b"-s", b"name", self._filesystem.pool])

def parse_snapshots(data):
result = []
for line in data.splitlines():
pool, encoded_name = line.split(b'@', 1)
if pool == self._filesystem.pool:
try:
result.append(SnapshotName.from_bytes(encoded_name))
except ValueError:
pass
return result
d.addCallback(parse_snapshots)
d = _list_snapshots(self._reactor, self._filesystem)

def convert(snapshots):
results = []
for snapshot in snapshots:
try:
results.append(SnapshotName.from_bytes(snapshot.name))
except ValueError:
pass
return results

d.addCallback(convert)
return d


def _list_snapshots_command(filesystem):
"""
Construct a ``zfs`` command which will output the names of the snapshots of
the given filesystem.
:param Filesystem filesystem: The ZFS filesystem the snapshots of which to
list.
:return list: An argument list (of ``bytes``) which can be passed to
``zfs`` to produce the desired list of snapshots. ``zfs`` is not
included as the first element.
"""
return [
b"list",
# Format the output without a header.
b"-H",
# Recurse to datasets beneath the named dataset.
b"-r",
# Only output datasets of type snapshot.
b"-t", b"snapshot",
# Only output the name of each dataset encountered. The name is the
# only thing we currently store in our snapshot model.
b"-o", b"name",
# Sort by the creation property. This gives us the snapshots in the
# order they were taken.
b"-s", b"creation",
# Start with this the dataset we're interested in.
filesystem.name,
]


def _parse_snapshots(data, filesystem):
"""
Parse the output of a ``zfs list`` command (like the one defined by
``_list_snapshots_command`` into a ``list`` of ``Snapshot`` instances.
:param bytes data: The output to parse.
:param Filesystem filesystem: The filesystem from which to extract
snapshots. If the output includes snapshots for other filesystems (eg
siblings or children) they are excluded from the result.
:return list: A ``list`` of ``Snapshot`` instances corresponding to the
names of the snapshots in the output. The order of the list is the
same as the order of the snapshots in the data being parsed.
"""
result = []
for line in data.splitlines():
dataset, snapshot = line.split(b'@', 1)
if dataset == filesystem.name:
result.append(Snapshot(name=snapshot))
return result


def _list_snapshots(reactor, filesystem):
"""
List the snapshots of the given filesystem.
:param IReactorProcess reactor: The reactor to use to launch the ``zfs``
child process.
:param Filesystem filesystem: The filesystem the snapshots of which to
retrieve.
:return: A ``Deferred`` which fires with a ``list`` of ``Snapshot``
instances giving the requested snapshot information.
"""
d = zfs_command(reactor, _list_snapshots_command(filesystem))
d.addCallback(_parse_snapshots, filesystem)
return d


def volume_to_dataset(volume):
"""Convert a volume to a dataset name.
Expand Down
Loading

0 comments on commit 07fa05d

Please sign in to comment.