Skip to content

Commit

Permalink
Merge pull request #45 from eupakhomov/0.5_json_stable_under_load
Browse files Browse the repository at this point in the history
0.5 json stable under load
  • Loading branch information
TVolden authored Apr 17, 2018
2 parents 8486082 + 4203764 commit 4c8ba16
Show file tree
Hide file tree
Showing 16 changed files with 177 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ synchronized public void sendCall(String uniqueId, String action, Request reques
radio.send(call);
}
} catch (NotConnectedException ex) {
logger.warn("sendCall() failed", ex);
logger.warn("sendCall() failed: not connected");
if (request.transactionRelated()) {
transactionQueue.add(call);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
package eu.chargetime.ocpp;/*
package eu.chargetime.ocpp;

/*
ChargeTime.eu - Java-OCA-OCPP
MIT License
Expand Down
4 changes: 4 additions & 0 deletions ocpp-common/src/main/java/eu/chargetime/ocpp/ISession.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import eu.chargetime.ocpp.model.Request;

import java.util.UUID;

/*
ChargeTime.eu - Java-OCA-OCPP
Expand All @@ -28,6 +30,8 @@ of this software and associated documentation files (the "Software"), to deal
SOFTWARE.
*/
public interface ISession {
UUID getSessionId();

void open(String uri, SessionEvents eventHandler);

void accept(SessionEvents eventHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import eu.chargetime.ocpp.model.Confirmation;
import eu.chargetime.ocpp.model.Request;


import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

/*
ChargeTime.eu - Java-OCA-OCPP
Expand Down Expand Up @@ -35,10 +35,10 @@ of this software and associated documentation files (the "Software"), to deal
*/
public class PromiseRepository implements IPromiseRepository {

private HashMap<String, CompletableFuture<Confirmation>> promises;
private Map<String, CompletableFuture<Confirmation>> promises;

public PromiseRepository() {
this.promises = new HashMap<>();
this.promises = new ConcurrentHashMap<>();
}

/**
Expand Down
9 changes: 6 additions & 3 deletions ocpp-common/src/main/java/eu/chargetime/ocpp/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/*
ChargeTime.eu - Java-OCA-OCPP
Expand Down Expand Up @@ -44,10 +45,12 @@ public class Queue
{
private static final Logger logger = LoggerFactory.getLogger(Queue.class);

private HashMap<String, Request> requestQueue;
public static final int REQUEST_QUEUE_INITIAL_CAPACITY = 1000;

private Map<String, Request> requestQueue;

public Queue () {
requestQueue = new HashMap<>();
requestQueue = new ConcurrentHashMap<>(REQUEST_QUEUE_INITIAL_CAPACITY);
}

/**
Expand Down
3 changes: 2 additions & 1 deletion ocpp-common/src/main/java/eu/chargetime/ocpp/Receiver.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package eu.chargetime.ocpp;/*
package eu.chargetime.ocpp;
/*
ChargeTime.eu - Java-OCA-OCPP
MIT License
Expand Down
41 changes: 27 additions & 14 deletions ocpp-common/src/main/java/eu/chargetime/ocpp/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import eu.chargetime.ocpp.model.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
/*
ChargeTime.eu - Java-OCA-OCPP
Expand Down Expand Up @@ -45,7 +45,10 @@ of this software and associated documentation files (the "Software"), to deal
public class Server {

private static final Logger logger = LoggerFactory.getLogger(Server.class);
private HashMap<UUID, ISession> sessions;

public static final int INITIAL_SESSIONS_NUMBER = 1000;

private Map<UUID, ISession> sessions;
private Listener listener;
private final IFeatureRepository featureRepository;
private final IPromiseRepository promiseRepository;
Expand All @@ -59,7 +62,7 @@ public Server(Listener listener, IFeatureRepository featureRepository, IPromiseR
this.listener = listener;
this.featureRepository = featureRepository;
this.promiseRepository = promiseRepository;
this.sessions = new HashMap();
this.sessions = new ConcurrentHashMap<>(INITIAL_SESSIONS_NUMBER);
}

/**
Expand Down Expand Up @@ -126,22 +129,25 @@ public void handleConnectionOpened() {

}
});
sessions.put(UUID.randomUUID(), session);
serverEvents.newSession(getSessionID(session).get(), information);

sessions.put(session.getSessionId(), session);

Optional<UUID> sessionIdOptional = getSessionID(session);
if(sessionIdOptional.isPresent()) {
serverEvents.newSession(sessionIdOptional.get(), information);
logger.debug("Session created: {}", session.getSessionId());
} else {
throw new IllegalStateException("Failed to create a session");
}
});
}

private Optional<UUID> getSessionID(ISession session) {

if (!sessions.containsValue(session))
if (!sessions.containsValue(session)) {
return Optional.empty();

for (Map.Entry<UUID, ISession> entry : sessions.entrySet()) {
if (entry.getValue() == session)
return Optional.of(entry.getKey());
}

return Optional.empty();
return Optional.of(session.getSessionId());
}

/**
Expand All @@ -160,7 +166,7 @@ public void close() {
* @throws UnsupportedFeatureException Thrown if the feature isn't among the list of supported featured.
* @throws OccurenceConstraintException Thrown if the request isn't valid.
*/
public CompletableFuture<Confirmation> send(UUID sessionIndex, Request request) throws UnsupportedFeatureException, OccurenceConstraintException {
public CompletableFuture<Confirmation> send(UUID sessionIndex, Request request) throws UnsupportedFeatureException, OccurenceConstraintException, NotConnectedException {
Optional<Feature> featureOptional = featureRepository.findFeature(request);
if (!featureOptional.isPresent()) {
throw new UnsupportedFeatureException();
Expand All @@ -172,6 +178,12 @@ public CompletableFuture<Confirmation> send(UUID sessionIndex, Request request)

ISession session = sessions.get(sessionIndex);

if(session == null) {
logger.warn("Session not found by index: {}", sessionIndex);

// No session found means client disconnected and request should be cancelled
throw new NotConnectedException();
}

String id = session.storeRequest(request);
CompletableFuture<Confirmation> promise = promiseRepository.createPromise(id);
Expand All @@ -186,8 +198,9 @@ public CompletableFuture<Confirmation> send(UUID sessionIndex, Request request)
*/
public void closeSession(UUID sessionIndex) {
ISession session = sessions.get(sessionIndex);
if (session != null)
if (session != null) {
session.close();
}
}

public void setAsyncRequestHandler(boolean asyncRequestHandler) {
Expand Down
34 changes: 34 additions & 0 deletions ocpp-common/src/main/java/eu/chargetime/ocpp/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import eu.chargetime.ocpp.feature.Feature;
import eu.chargetime.ocpp.model.Confirmation;
import eu.chargetime.ocpp.model.Request;
import eu.chargetime.ocpp.utilities.MoreObjects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

/*
Expand Down Expand Up @@ -44,6 +46,7 @@ public class Session implements ISession {

private static final Logger logger = LoggerFactory.getLogger(Session.class);

private final UUID sessionId = UUID.randomUUID();
private final Communicator communicator;
private final Queue queue;
private final RequestDispatcher dispatcher;
Expand All @@ -63,6 +66,15 @@ public Session(Communicator communicator, Queue queue, PromiseFulfiller fulfille
this.featureRepository = featureRepository;
}

/**
* Get a unique session {@link UUID} identifier.
*
* @return the unique session {@link UUID} identifier
*/
public UUID getSessionId() {
return sessionId;
}

/**
* Send a {@link Request}.
*
Expand Down Expand Up @@ -214,4 +226,26 @@ public void onConnected() {
}

}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Session session = (Session) o;
return MoreObjects.equals(sessionId, session.sessionId);
}

@Override
public int hashCode() {
return MoreObjects.hash(sessionId);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("sessionId", sessionId)
.add("queue", queue)
.add("featureRepository", featureRepository)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ public class ServerTest extends TestUtilities {

@Before
public void setup() {
UUID sessionId = UUID.randomUUID();
when(request.validate()).thenReturn(true);
when(session.getSessionId()).thenReturn(sessionId);
doAnswer(invocation -> listenerEvents = invocation.getArgumentAt(2, ListenerEvents.class)).when(listener).open(anyString(), anyInt(), any());
doAnswer(invocation -> sessionEvents = invocation.getArgumentAt(0, SessionEvents.class)).when(session).accept(any());
doAnswer(invocation -> sessionIndex = invocation.getArgumentAt(0, UUID.class)).when(serverEvents).newSession(any(), any());
Expand Down
3 changes: 2 additions & 1 deletion ocpp-v1_6/src/main/java/eu/chargetime/ocpp/IServerAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,6 @@ public interface IServerAPI {
void close();

boolean isClosed();
CompletionStage<Confirmation> send(UUID session, Request request) throws OccurenceConstraintException, UnsupportedFeatureException;

CompletionStage<Confirmation> send(UUID session, Request request) throws OccurenceConstraintException, UnsupportedFeatureException, NotConnectedException;
}
16 changes: 15 additions & 1 deletion ocpp-v1_6/src/main/java/eu/chargetime/ocpp/JSONClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,15 @@
import eu.chargetime.ocpp.model.Request;
import eu.chargetime.ocpp.wss.BaseWssSocketBuilder;
import eu.chargetime.ocpp.wss.WssSocketBuilder;
import org.java_websocket.drafts.Draft;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.protocols.Protocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CompletionStage;

/*
Expand Down Expand Up @@ -41,6 +48,11 @@
*/
public class JSONClient implements IClientAPI {

private static final Logger logger = LoggerFactory.getLogger(JSONClient.class);

public final Draft draftOcppOnly =
new Draft_6455(Collections.emptyList(),
Collections.singletonList(new Protocol("ocpp1.6")));
private final WebSocketTransmitter transmitter;
private final FeatureRepository featureRepository;
private final Client client;
Expand All @@ -59,7 +71,7 @@ public JSONClient(ClientCoreProfile coreProfile) {
*/
public JSONClient(ClientCoreProfile coreProfile, String identity) {
this.identity = identity;
transmitter = new WebSocketTransmitter();
transmitter = new WebSocketTransmitter(draftOcppOnly);
JSONCommunicator communicator = new JSONCommunicator(transmitter);
AsyncPromiseFulfilerDecorator promiseFulfiler = new AsyncPromiseFulfilerDecorator(new SimplePromiseFulfiller());
featureRepository = new FeatureRepository();
Expand Down Expand Up @@ -116,6 +128,8 @@ public void addFeatureProfile(Profile profile) {

@Override
public void connect(String url, ClientEvents clientEvents) {
logger.debug("Feature repository: {}", featureRepository);

String identityUrl = (identity != null) ? String.format("%s/%s", url, identity) : url;
client.connect(identityUrl, clientEvents);
}
Expand Down
23 changes: 18 additions & 5 deletions ocpp-v1_6/src/main/java/eu/chargetime/ocpp/JSONServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,28 @@ of this software and associated documentation files (the "Software"), to deal
import eu.chargetime.ocpp.model.Request;
import eu.chargetime.ocpp.wss.BaseWssFactoryBuilder;
import eu.chargetime.ocpp.wss.WssFactoryBuilder;
import org.java_websocket.drafts.Draft;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.protocols.Protocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletionStage;

public class JSONServer implements IServerAPI {

private WebSocketListener listener;
private Server server;
private FeatureRepository featureRepository;
private static final Logger logger = LoggerFactory.getLogger(JSONServer.class);

public final Draft draftOcppOnly =
new Draft_6455(Collections.emptyList(),
Collections.singletonList(new Protocol("ocpp1.6")));
private final WebSocketListener listener;
private final Server server;
private final FeatureRepository featureRepository;

/**
* The core feature profile is required as a minimum.
Expand All @@ -52,7 +63,7 @@ public class JSONServer implements IServerAPI {
public JSONServer(ServerCoreProfile coreProfile) {
featureRepository = new FeatureRepository();
ServerSessionFactory sessionFactory = new ServerSessionFactory(featureRepository);
this.listener = new WebSocketListener(sessionFactory);
this.listener = new WebSocketListener(sessionFactory, draftOcppOnly);
server = new Server(this.listener, featureRepository, new PromiseRepository());
featureRepository.addFeatureProfile(coreProfile);
}
Expand Down Expand Up @@ -108,6 +119,8 @@ public void closeSession(UUID session) {

@Override
public void open(String host, int port, ServerEvents serverEvents) {
logger.info("Feature repository: {}", featureRepository);

server.open(host, port, serverEvents);
}

Expand All @@ -122,7 +135,7 @@ public boolean isClosed() {
}

@Override
public CompletionStage<Confirmation> send(UUID session, Request request) throws OccurenceConstraintException, UnsupportedFeatureException {
public CompletionStage<Confirmation> send(UUID session, Request request) throws OccurenceConstraintException, UnsupportedFeatureException, NotConnectedException {
return server.send(session, request);
}
}
2 changes: 1 addition & 1 deletion ocpp-v1_6/src/main/java/eu/chargetime/ocpp/SOAPServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public boolean isClosed() {
}

@Override
public CompletionStage<Confirmation> send(UUID session, Request request) throws OccurenceConstraintException, UnsupportedFeatureException {
public CompletionStage<Confirmation> send(UUID session, Request request) throws OccurenceConstraintException, UnsupportedFeatureException, NotConnectedException {
return server.send(session, request);
}
}
Loading

0 comments on commit 4c8ba16

Please sign in to comment.