Skip to content

Commit

Permalink
Refactor session invalidation to fix hashicorp#7
Browse files Browse the repository at this point in the history
Fixes hashicorp#7 by refactoring the Renew method to better approximate the Go method
with a try/finally block.

Also makes sure the session renew operation has finished before returning on
lock/semaphore release.
  • Loading branch information
Tiru Srikantha committed May 22, 2015
1 parent 79de14c commit 02acc81
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 155 deletions.
2 changes: 2 additions & 0 deletions Consul.Test/EventTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// </copyright>
// -----------------------------------------------------------------------

using System.Threading;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace Consul.Test
Expand All @@ -35,6 +36,7 @@ public void Event_FireList()

var res = c.Event.Fire(p);

Thread.Sleep(100);
Assert.AreNotEqual(0, res.RequestTime);
Assert.IsFalse(string.IsNullOrEmpty(res.Response));

Expand Down
78 changes: 40 additions & 38 deletions Consul.Test/LockTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@ public void Lock_AcquireRelease()
public void Lock_EphemeralAcquireRelease()
{
var c = ClientTest.MakeClient();
using (var l = c.AcquireLock(new LockOptions("test/ephemerallock") { SessionBehavior = SessionBehavior.Delete }))
var s = c.Session.Create(new SessionEntry { Behavior = SessionBehavior.Delete });
using (var l = c.AcquireLock(new LockOptions("test/ephemerallock") { Session = s.Response }))
{
Assert.IsTrue(l.IsHeld);
c.Session.Destroy(s.Response);
}
Assert.IsNull(c.KV.Get("test/ephemerallock").Response);
}
Expand Down Expand Up @@ -217,43 +219,6 @@ public void Lock_RunAction()
}));
}

[TestMethod]
public void Lock_SemaphoreConflict()
{
var c = ClientTest.MakeClient();
var sema = c.Semaphore("test/lock", 2);

sema.Acquire(CancellationToken.None);

Assert.IsTrue(sema.IsHeld);

var lockKey = c.CreateLock("test/lock/.lock");
try
{
lockKey.Acquire(CancellationToken.None);
}
catch (LockConflictException ex)
{
Assert.IsInstanceOfType(ex, typeof(LockConflictException));
}
catch (Exception ex)
{
Assert.Fail(ex.ToString());
}
try
{
lockKey.Destroy();
}
catch (LockConflictException ex)
{
Assert.IsInstanceOfType(ex, typeof(LockConflictException));
}
catch (Exception ex)
{
Assert.Fail(ex.ToString());
}
sema.Release();
}
[TestMethod]
public void Lock_ReclaimLock()
{
Expand Down Expand Up @@ -338,6 +303,43 @@ public void Lock_ReclaimLock()
c.Session.Destroy(sess);
}

[TestMethod]
public void Lock_SemaphoreConflict()
{
var c = ClientTest.MakeClient();
var sema = c.Semaphore("test/lock", 2);

sema.Acquire(CancellationToken.None);

Assert.IsTrue(sema.IsHeld);

var lockKey = c.CreateLock("test/lock/.lock");
try
{
lockKey.Acquire(CancellationToken.None);
}
catch (LockConflictException ex)
{
Assert.IsInstanceOfType(ex, typeof(LockConflictException));
}
catch (Exception ex)
{
Assert.Fail(ex.ToString());
}
try
{
lockKey.Destroy();
}
catch (LockConflictException ex)
{
Assert.IsInstanceOfType(ex, typeof(LockConflictException));
}
catch (Exception ex)
{
Assert.Fail(ex.ToString());
}
sema.Release();
}
[TestMethod]
public void Lock_ForceInvalidate()
{
Expand Down
26 changes: 10 additions & 16 deletions Consul/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -391,30 +391,24 @@ public QueryResult<T> Execute()
{
throw new ApplicationException("Unexpected HTTP exception calling Consul", ex);
}
else if (res.StatusCode == HttpStatusCode.NotFound)
if (res.StatusCode == HttpStatusCode.NotFound)
{
var result = new QueryResult<T>();
ParseQueryHeaders(res, ref result);
stopwatch.Stop();
result.RequestTime = stopwatch.Elapsed;
return result;
}
else
var stream = res.GetResponseStream();
if (stream == null)
{
var stream = res.GetResponseStream();
if (stream == null)
{
throw new ArgumentException(string.Format("Unexpected response code {0}",
res.StatusCode));
}
else
{
using (var sr = new StreamReader(stream))
{
throw new ArgumentException(string.Format("Unexpected response code {0}: {1}",
res.StatusCode, sr.ReadToEnd()));
}
}
throw new ArgumentException(string.Format("Unexpected response code {0}",
res.StatusCode));
}
using (var sr = new StreamReader(stream))
{
throw new ArgumentException(string.Format("Unexpected response code {0}: {1}",
res.StatusCode, sr.ReadToEnd()));
}
}
}
Expand Down
67 changes: 33 additions & 34 deletions Consul/Lock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,17 @@ public CancellationToken Acquire(CancellationToken ct)
if (ct.IsCancellationRequested || (!IsHeld && !string.IsNullOrEmpty(Opts.Session)))
{
_cts.Cancel();
if (_sessionRenewTask != null)
{
try
{
_sessionRenewTask.Wait();
}
catch (AggregateException)
{
// Ignore AggregateExceptions from the tasks during Release, since if the Renew task died, the developer will be Super Confused if they see the exception during Release.
}
}
}
}
}
Expand All @@ -335,45 +346,36 @@ public void Release()
{
lock (_lock)
{
if (!IsHeld)
{
throw new LockNotHeldException();
}

IsHeld = false;

var lockEnt = LockEntry(Opts.Session);
Opts.Session = null;

_cts.Cancel();

if (Equals(Opts.SessionBehavior, SessionBehavior.Delete))
try
{

try
{
if (_sessionRenewTask != null)
_sessionRenewTask.Wait();
}
catch (AggregateException)
if (!IsHeld)
{
// Ignore AggregateExceptions from the tasks during Release, since if the Renew task died, the developer will be Super Confused if they see the exception during Release.
throw new LockNotHeldException();
}
_cts.Cancel();
IsHeld = false;

var lockEnt = LockEntry(Opts.Session);

Opts.Session = null;
_client.KV.Release(lockEnt);
}
else
finally
{
_client.KV.Release(lockEnt);
try
if (_sessionRenewTask != null)
{
if (_sessionRenewTask != null)
_sessionRenewTask.Wait();
}
catch (AggregateException)
{
// Ignore AggregateExceptions from the tasks during Release, since if the Renew task died, the developer will be Super Confused if they see the exception during Release.
{
try
{
_sessionRenewTask.Wait();
}
catch (AggregateException)
{
// Ignore AggregateExceptions from the tasks during Release, since if the Renew task died, the developer will be Super Confused if they see the exception during Release.
}
}
}

}
}
}
Expand Down Expand Up @@ -467,8 +469,7 @@ private string CreateSession()
var se = new SessionEntry
{
Name = Opts.SessionName,
TTL = Opts.SessionTTL,
Behavior = Opts.SessionBehavior
TTL = Opts.SessionTTL
};
return _client.Session.Create(se).Response;
}
Expand Down Expand Up @@ -534,14 +535,12 @@ public class LockOptions
public string Session { get; set; }
public string SessionName { get; set; }
public TimeSpan SessionTTL { get; set; }
public SessionBehavior SessionBehavior { get; set; }

public LockOptions(string key)
{
Key = key;
SessionName = DefaultLockSessionName;
SessionTTL = DefaultLockSessionTTL;
SessionBehavior = SessionBehavior.Release;
}
}

Expand Down
89 changes: 59 additions & 30 deletions Consul/Semaphore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,17 @@ public CancellationToken Acquire(CancellationToken ct)
{
_cts.Cancel();
_client.KV.Delete(ContenderEntry(LockSession).Key);
if (_sessionRenewTask != null)
{
try
{
_sessionRenewTask.Wait();
}
catch (AggregateException)
{
// Ignore AggregateExceptions from the tasks during Release, since if the Renew task died, the developer will be Super Confused if they see the exception during Release.
}
}
}
}
}
Expand All @@ -397,48 +408,65 @@ public void Release()
{
lock (_lock)
{
if (!IsHeld)
try
{
throw new SemaphoreNotHeldException();
}

IsHeld = false;
_cts.Cancel();
_cts.Cancel();

var lockSession = LockSession;
LockSession = null;
if (!IsHeld)
{
throw new SemaphoreNotHeldException();
}
IsHeld = false;

var key = string.Join("/", Opts.Prefix, DefaultSemaphoreKey);
var lockSession = LockSession;
LockSession = null;

var didSet = false;
var key = string.Join("/", Opts.Prefix, DefaultSemaphoreKey);

while (!didSet)
{
var pair = _client.KV.Get(key);
var didSet = false;

if (pair.Response == null)
while (!didSet)
{
pair.Response = new KVPair(key);
}
var pair = _client.KV.Get(key);

var semaphoreLock = DecodeLock(pair.Response);
if (pair.Response == null)
{
pair.Response = new KVPair(key);
}

if (semaphoreLock.Holders.ContainsKey(lockSession))
{
semaphoreLock.Holders.Remove(lockSession);
var newLock = EncodeLock(semaphoreLock, pair.Response.ModifyIndex);
var semaphoreLock = DecodeLock(pair.Response);

if (semaphoreLock.Holders.ContainsKey(lockSession))
{
semaphoreLock.Holders.Remove(lockSession);
var newLock = EncodeLock(semaphoreLock, pair.Response.ModifyIndex);

didSet = _client.KV.CAS(newLock).Response;
didSet = _client.KV.CAS(newLock).Response;
}
else
{
break;
}
}
else

var contenderKey = string.Join("/", Opts.Prefix, lockSession);

_client.KV.Delete(contenderKey);
}
finally
{
if (_sessionRenewTask != null)
{
break;
try
{
_sessionRenewTask.Wait();
}
catch (AggregateException)
{
// Ignore AggregateExceptions from the tasks during Release, since if the Renew task died, the developer will be Super Confused if they see the exception during Release.
}
}
}

var contenderKey = string.Join("/", Opts.Prefix, lockSession);

_client.KV.Delete(contenderKey);
}
}

Expand Down Expand Up @@ -504,7 +532,7 @@ private Task MonitorLock(string lockSession)
{
try
{
var opts = new QueryOptions() {Consistency = ConsistencyMode.Consistent};
var opts = new QueryOptions() { Consistency = ConsistencyMode.Consistent };
while (IsHeld && !_cts.Token.IsCancellationRequested)
{
var pairs = _client.KV.List(Opts.Prefix, opts);
Expand Down Expand Up @@ -653,7 +681,8 @@ private static void PruneDeadHolders(SemaphoreLock l, IEnumerable<KVPair> pairs)
public class AutoSemaphore : Semaphore, IDisposable
{
public CancellationToken CancellationToken { get; private set; }
internal AutoSemaphore(Client c, SemaphoreOptions opts) : base(c)
internal AutoSemaphore(Client c, SemaphoreOptions opts)
: base(c)
{
Opts = opts;
CancellationToken = Acquire();
Expand Down
Loading

0 comments on commit 02acc81

Please sign in to comment.