Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Detect and recycle broken connections on non-GCM devices #62

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,39 @@ public class RealtimeSleepTimer implements SleepTimer {

private final AlarmReceiver alarmReceiver;
private final Context context;
private boolean armed;

public RealtimeSleepTimer(Context context) {
this.context = context;
alarmReceiver = new RealtimeSleepTimer.AlarmReceiver();
}

@Override
public void sleep(long millis) {
context.registerReceiver(alarmReceiver,
new IntentFilter(AlarmReceiver.WAKE_UP_THREAD_ACTION));

final long startTime = System.currentTimeMillis();
alarmReceiver.setAlarm(millis);

while (System.currentTimeMillis() - startTime < millis) {
try {
synchronized (this) {
wait(millis - System.currentTimeMillis() + startTime);
}
} catch (InterruptedException e) {
Log.w(TAG, e);
}
public void sleep(long millis) throws InterruptedException {
boolean arm;

synchronized (this) {
if (!armed) {
armed = true;
arm = true;
} else {
arm = false;
}
}

context.unregisterReceiver(alarmReceiver);
if (arm) {
context.registerReceiver(alarmReceiver,
new IntentFilter(AlarmReceiver.WAKE_UP_THREAD_ACTION));

alarmReceiver.setAlarm(millis);
synchronized (this) {
wait(millis);
}
} else {
synchronized (this) {
wait();
}
}
}

private class AlarmReceiver extends BroadcastReceiver {
Expand Down Expand Up @@ -81,9 +89,11 @@ public void onReceive(Context context, Intent intent) {
Log.w(TAG, "Waking up.");

synchronized (RealtimeSleepTimer.this) {
RealtimeSleepTimer.this.context.unregisterReceiver(this);

RealtimeSleepTimer.this.armed = false;
RealtimeSleepTimer.this.notifyAll();
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class SignalServiceMessagePipe {
* @throws TimeoutException
*/
public SignalServiceEnvelope read(long timeout, TimeUnit unit)
throws InvalidVersionException, IOException, TimeoutException
throws InvalidVersionException, IOException, TimeoutException, InterruptedException
{
return read(timeout, unit, new NullMessagePipeCallback());
}
Expand All @@ -91,7 +91,7 @@ public SignalServiceEnvelope read(long timeout, TimeUnit unit)
* @throws InvalidVersionException
*/
public SignalServiceEnvelope read(long timeout, TimeUnit unit, MessagePipeCallback callback)
throws TimeoutException, IOException, InvalidVersionException
throws TimeoutException, IOException, InvalidVersionException, InterruptedException
{
if (!credentialsProvider.isPresent()) {
throw new IllegalArgumentException("You can't read messages if you haven't specified credentials");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ public class SignalServiceMessageSender {

private static final String TAG = SignalServiceMessageSender.class.getSimpleName();

private final PushServiceSocket socket;
private final SignalProtocolStore store;
private final SignalServiceAddress localAddress;
private final Optional<EventListener> eventListener;
private final PushServiceSocket socket;
private final SignalProtocolStore store;
private final SignalServiceAddress localAddress;
private final Optional<EventListener> eventListener;

private final AtomicReference<Optional<SignalServiceMessagePipe>> pipe;
private final AtomicReference<Optional<SignalServiceMessagePipe>> unidentifiedPipe;
private final AtomicBoolean isMultiDevice;
private final AtomicReference<SignalServiceMessagePipe> pipe;
private final AtomicReference<SignalServiceMessagePipe> unidentifiedPipe;
private final AtomicBoolean isMultiDevice;

/**
* Construct a SignalServiceMessageSender.
Expand All @@ -113,8 +113,8 @@ public SignalServiceMessageSender(SignalServiceConfiguration urls,
SignalProtocolStore store,
String userAgent,
boolean isMultiDevice,
Optional<SignalServiceMessagePipe> pipe,
Optional<SignalServiceMessagePipe> unidentifiedPipe,
AtomicReference<SignalServiceMessagePipe> pipe,
AtomicReference<SignalServiceMessagePipe> unidentifiedPipe,
Optional<EventListener> eventListener)
{
this(urls, new StaticCredentialsProvider(user, password, null), store, userAgent, isMultiDevice, pipe, unidentifiedPipe, eventListener);
Expand All @@ -125,15 +125,15 @@ public SignalServiceMessageSender(SignalServiceConfiguration urls,
SignalProtocolStore store,
String userAgent,
boolean isMultiDevice,
Optional<SignalServiceMessagePipe> pipe,
Optional<SignalServiceMessagePipe> unidentifiedPipe,
AtomicReference<SignalServiceMessagePipe> pipe,
AtomicReference<SignalServiceMessagePipe> unidentifiedPipe,
Optional<EventListener> eventListener)
{
this.socket = new PushServiceSocket(urls, credentialsProvider, userAgent);
this.store = store;
this.localAddress = new SignalServiceAddress(credentialsProvider.getUser());
this.pipe = new AtomicReference<>(pipe);
this.unidentifiedPipe = new AtomicReference<>(unidentifiedPipe);
this.pipe = pipe;
this.unidentifiedPipe = unidentifiedPipe;
this.isMultiDevice = new AtomicBoolean(isMultiDevice);
this.eventListener = eventListener;
}
Expand Down Expand Up @@ -301,11 +301,6 @@ public void cancelInFlightRequests() {
socket.cancelInFlightRequests();
}

public void setMessagePipe(SignalServiceMessagePipe pipe, SignalServiceMessagePipe unidentifiedPipe) {
this.pipe.set(Optional.fromNullable(pipe));
this.unidentifiedPipe.set(Optional.fromNullable(unidentifiedPipe));
}

public void setIsMultiDevice(boolean isMultiDevice) {
this.isMultiDevice.set(isMultiDevice);
}
Expand Down Expand Up @@ -824,22 +819,22 @@ private SendMessageResult sendMessage(SignalServiceAddress recipient,
for (int i=0;i<4;i++) {
try {
OutgoingPushMessageList messages = getEncryptedMessages(socket, recipient, unidentifiedAccess, timestamp, content, online);
Optional<SignalServiceMessagePipe> pipe = this.pipe.get();
Optional<SignalServiceMessagePipe> unidentifiedPipe = this.unidentifiedPipe.get();
SignalServiceMessagePipe pipe = this.pipe.get();
SignalServiceMessagePipe unidentifiedPipe = this.unidentifiedPipe.get();

if (pipe.isPresent() && !unidentifiedAccess.isPresent()) {
if (pipe != null && !unidentifiedAccess.isPresent()) {
try {
Log.w(TAG, "Transmitting over pipe...");
SendMessageResponse response = pipe.get().send(messages, Optional.<UnidentifiedAccess>absent());
SendMessageResponse response = pipe.send(messages, Optional.<UnidentifiedAccess>absent());
return SendMessageResult.success(recipient, false, response.getNeedsSync());
} catch (IOException e) {
Log.w(TAG, e);
Log.w(TAG, "Falling back to new connection...");
}
} else if (unidentifiedPipe.isPresent() && unidentifiedAccess.isPresent()) {
} else if (unidentifiedPipe != null && unidentifiedAccess.isPresent()) {
try {
Log.w(TAG, "Transmitting over unidentified pipe...");
SendMessageResponse response = unidentifiedPipe.get().send(messages, unidentifiedAccess);
SendMessageResponse response = unidentifiedPipe.send(messages, unidentifiedAccess);
return SendMessageResult.success(recipient, true, response.getNeedsSync());
} catch (IOException e) {
Log.w(TAG, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -127,22 +128,20 @@ public synchronized void disconnect() {
}

if (keepAliveSender != null) {
keepAliveSender.shutdown();
keepAliveSender.interrupt();
keepAliveSender = null;
}
}

public synchronized WebSocketRequestMessage readRequest(long timeoutMillis)
throws TimeoutException, IOException
throws TimeoutException, IOException, InterruptedException
{
if (client == null) {
throw new IOException("Connection closed!");
}

long startTime = System.currentTimeMillis();

while (client != null && incomingRequests.isEmpty() && elapsedTime(startTime) < timeoutMillis) {
Util.wait(this, Math.max(1, timeoutMillis - elapsedTime(startTime)));
if (client != null && incomingRequests.isEmpty()) {
wait(timeoutMillis);
}

if (incomingRequests.isEmpty() && client == null) throw new IOException("Connection closed!");
Expand Down Expand Up @@ -183,20 +182,16 @@ public synchronized void sendResponse(WebSocketResponseMessage response) throws
}
}

private synchronized void sendKeepAlive() throws IOException {
private synchronized Future<Pair<Integer, String>> sendKeepAlive() throws IOException {
if (keepAliveSender != null && client != null) {
byte[] message = WebSocketMessage.newBuilder()
.setType(WebSocketMessage.Type.REQUEST)
.setRequest(WebSocketRequestMessage.newBuilder()
.setId(System.currentTimeMillis())
.setPath("/v1/keepalive")
.setVerb("GET")
.build()).build()
.toByteArray();

if (!client.send(ByteString.of(message))) {
throw new IOException("Write failed!");
}
WebSocketRequestMessage request = WebSocketRequestMessage.newBuilder()
.setId(System.currentTimeMillis())
.setPath("/v1/keepalive")
.setVerb("GET")
.build();
return sendRequest(request);
} else {
return null;
}
}

Expand Down Expand Up @@ -249,7 +244,7 @@ public synchronized void onClosed(WebSocket webSocket, int code, String reason)
}

if (keepAliveSender != null) {
keepAliveSender.shutdown();
keepAliveSender.interrupt();
keepAliveSender = null;
}

Expand Down Expand Up @@ -312,23 +307,43 @@ private Pair<SSLSocketFactory, X509TrustManager> createTlsSocketFactory(TrustSto

private class KeepAliveSender extends Thread {

private AtomicBoolean stop = new AtomicBoolean(false);

public void run() {
while (!stop.get()) {
Future future = null;
boolean severed = false;

while (!interrupted()) {
try {
sleepTimer.sleep(TimeUnit.SECONDS.toMillis(KEEPALIVE_TIMEOUT_SECONDS));

Log.w(TAG, "Sending keep alive...");
sendKeepAlive();
} catch (Throwable e) {
Log.w(TAG, e);
if (future != null) {
try {
future.get(0L, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e){
severed = true;
}
}
} catch (InterruptedException e) {
Log.d(TAG, "Keep alive sender interrupted; exiting loop.");
break;
}
}
}

public void shutdown() {
stop.set(true);
if (severed) {
Log.d(TAG, "No response to previous keep-alive; forcing new connection.");

disconnect();
synchronized(WebSocketConnection.this) {
WebSocketConnection.this.notifyAll();
}
} else {
Log.d(TAG, "Sending keep alive...");

try {
future = sendKeepAlive();
} catch (IOException e) {
Log.d(TAG, "Failed to send keep alive: " + e.getMessage());
}
}
}
}
}

Expand Down