diff --git a/ocpp-common/src/main/java/eu/chargetime/ocpp/Communicator.java b/ocpp-common/src/main/java/eu/chargetime/ocpp/Communicator.java index 4192d3ef4..b2a489c19 100644 --- a/ocpp-common/src/main/java/eu/chargetime/ocpp/Communicator.java +++ b/ocpp-common/src/main/java/eu/chargetime/ocpp/Communicator.java @@ -171,7 +171,7 @@ synchronized public void sendCall(String uniqueId, String action, Request reques radio.send(call); } } catch (NotConnectedException ex) { - logger.warn("sendCall() failed", ex); + logger.warn("sendCall() failed: not connected"); if (request.transactionRelated()) { transactionQueue.add(call); } else { diff --git a/ocpp-common/src/main/java/eu/chargetime/ocpp/ConfirmatinHandler.java b/ocpp-common/src/main/java/eu/chargetime/ocpp/ConfirmatinHandler.java index 0ef896dab..b739c9a55 100644 --- a/ocpp-common/src/main/java/eu/chargetime/ocpp/ConfirmatinHandler.java +++ b/ocpp-common/src/main/java/eu/chargetime/ocpp/ConfirmatinHandler.java @@ -1,4 +1,6 @@ -package eu.chargetime.ocpp;/* +package eu.chargetime.ocpp; + +/* ChargeTime.eu - Java-OCA-OCPP MIT License diff --git a/ocpp-common/src/main/java/eu/chargetime/ocpp/ISession.java b/ocpp-common/src/main/java/eu/chargetime/ocpp/ISession.java index 1cd99a24d..71592f13c 100644 --- a/ocpp-common/src/main/java/eu/chargetime/ocpp/ISession.java +++ b/ocpp-common/src/main/java/eu/chargetime/ocpp/ISession.java @@ -2,6 +2,8 @@ import eu.chargetime.ocpp.model.Request; +import java.util.UUID; + /* ChargeTime.eu - Java-OCA-OCPP @@ -28,6 +30,8 @@ of this software and associated documentation files (the "Software"), to deal SOFTWARE. */ public interface ISession { + UUID getSessionId(); + void open(String uri, SessionEvents eventHandler); void accept(SessionEvents eventHandler); diff --git a/ocpp-common/src/main/java/eu/chargetime/ocpp/PromiseRepository.java b/ocpp-common/src/main/java/eu/chargetime/ocpp/PromiseRepository.java index ed19bce93..ffb423899 100644 --- a/ocpp-common/src/main/java/eu/chargetime/ocpp/PromiseRepository.java +++ b/ocpp-common/src/main/java/eu/chargetime/ocpp/PromiseRepository.java @@ -3,10 +3,10 @@ import eu.chargetime.ocpp.model.Confirmation; import eu.chargetime.ocpp.model.Request; - -import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; /* ChargeTime.eu - Java-OCA-OCPP @@ -35,10 +35,10 @@ of this software and associated documentation files (the "Software"), to deal */ public class PromiseRepository implements IPromiseRepository { - private HashMap> promises; + private Map> promises; public PromiseRepository() { - this.promises = new HashMap<>(); + this.promises = new ConcurrentHashMap<>(); } /** diff --git a/ocpp-common/src/main/java/eu/chargetime/ocpp/Queue.java b/ocpp-common/src/main/java/eu/chargetime/ocpp/Queue.java index b251f332c..8c7d58f64 100644 --- a/ocpp-common/src/main/java/eu/chargetime/ocpp/Queue.java +++ b/ocpp-common/src/main/java/eu/chargetime/ocpp/Queue.java @@ -6,9 +6,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; /* ChargeTime.eu - Java-OCA-OCPP @@ -44,10 +45,12 @@ public class Queue { private static final Logger logger = LoggerFactory.getLogger(Queue.class); - private HashMap requestQueue; + public static final int REQUEST_QUEUE_INITIAL_CAPACITY = 1000; + + private Map requestQueue; public Queue () { - requestQueue = new HashMap<>(); + requestQueue = new ConcurrentHashMap<>(REQUEST_QUEUE_INITIAL_CAPACITY); } /** diff --git a/ocpp-common/src/main/java/eu/chargetime/ocpp/Receiver.java b/ocpp-common/src/main/java/eu/chargetime/ocpp/Receiver.java index a0080a864..fd04c5f9f 100644 --- a/ocpp-common/src/main/java/eu/chargetime/ocpp/Receiver.java +++ b/ocpp-common/src/main/java/eu/chargetime/ocpp/Receiver.java @@ -1,4 +1,5 @@ -package eu.chargetime.ocpp;/* +package eu.chargetime.ocpp; +/* ChargeTime.eu - Java-OCA-OCPP MIT License diff --git a/ocpp-common/src/main/java/eu/chargetime/ocpp/Server.java b/ocpp-common/src/main/java/eu/chargetime/ocpp/Server.java index 7a396598f..8e80b379f 100644 --- a/ocpp-common/src/main/java/eu/chargetime/ocpp/Server.java +++ b/ocpp-common/src/main/java/eu/chargetime/ocpp/Server.java @@ -5,11 +5,11 @@ import eu.chargetime.ocpp.model.Request; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; /* ChargeTime.eu - Java-OCA-OCPP @@ -45,7 +45,10 @@ of this software and associated documentation files (the "Software"), to deal public class Server { private static final Logger logger = LoggerFactory.getLogger(Server.class); - private HashMap sessions; + + public static final int INITIAL_SESSIONS_NUMBER = 1000; + + private Map sessions; private Listener listener; private final IFeatureRepository featureRepository; private final IPromiseRepository promiseRepository; @@ -59,7 +62,7 @@ public Server(Listener listener, IFeatureRepository featureRepository, IPromiseR this.listener = listener; this.featureRepository = featureRepository; this.promiseRepository = promiseRepository; - this.sessions = new HashMap(); + this.sessions = new ConcurrentHashMap<>(INITIAL_SESSIONS_NUMBER); } /** @@ -126,22 +129,25 @@ public void handleConnectionOpened() { } }); - sessions.put(UUID.randomUUID(), session); - serverEvents.newSession(getSessionID(session).get(), information); + + sessions.put(session.getSessionId(), session); + + Optional sessionIdOptional = getSessionID(session); + if(sessionIdOptional.isPresent()) { + serverEvents.newSession(sessionIdOptional.get(), information); + logger.debug("Session created: {}", session.getSessionId()); + } else { + throw new IllegalStateException("Failed to create a session"); + } }); } private Optional getSessionID(ISession session) { - - if (!sessions.containsValue(session)) + if (!sessions.containsValue(session)) { return Optional.empty(); - - for (Map.Entry entry : sessions.entrySet()) { - if (entry.getValue() == session) - return Optional.of(entry.getKey()); } - return Optional.empty(); + return Optional.of(session.getSessionId()); } /** @@ -160,7 +166,7 @@ public void close() { * @throws UnsupportedFeatureException Thrown if the feature isn't among the list of supported featured. * @throws OccurenceConstraintException Thrown if the request isn't valid. */ - public CompletableFuture send(UUID sessionIndex, Request request) throws UnsupportedFeatureException, OccurenceConstraintException { + public CompletableFuture send(UUID sessionIndex, Request request) throws UnsupportedFeatureException, OccurenceConstraintException, NotConnectedException { Optional featureOptional = featureRepository.findFeature(request); if (!featureOptional.isPresent()) { throw new UnsupportedFeatureException(); @@ -172,6 +178,12 @@ public CompletableFuture send(UUID sessionIndex, Request request) ISession session = sessions.get(sessionIndex); + if(session == null) { + logger.warn("Session not found by index: {}", sessionIndex); + + // No session found means client disconnected and request should be cancelled + throw new NotConnectedException(); + } String id = session.storeRequest(request); CompletableFuture promise = promiseRepository.createPromise(id); @@ -186,8 +198,9 @@ public CompletableFuture send(UUID sessionIndex, Request request) */ public void closeSession(UUID sessionIndex) { ISession session = sessions.get(sessionIndex); - if (session != null) + if (session != null) { session.close(); + } } public void setAsyncRequestHandler(boolean asyncRequestHandler) { diff --git a/ocpp-common/src/main/java/eu/chargetime/ocpp/Session.java b/ocpp-common/src/main/java/eu/chargetime/ocpp/Session.java index b79a31f7a..18b93a524 100644 --- a/ocpp-common/src/main/java/eu/chargetime/ocpp/Session.java +++ b/ocpp-common/src/main/java/eu/chargetime/ocpp/Session.java @@ -3,10 +3,12 @@ import eu.chargetime.ocpp.feature.Feature; import eu.chargetime.ocpp.model.Confirmation; import eu.chargetime.ocpp.model.Request; +import eu.chargetime.ocpp.utilities.MoreObjects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; /* @@ -44,6 +46,7 @@ public class Session implements ISession { private static final Logger logger = LoggerFactory.getLogger(Session.class); + private final UUID sessionId = UUID.randomUUID(); private final Communicator communicator; private final Queue queue; private final RequestDispatcher dispatcher; @@ -63,6 +66,15 @@ public Session(Communicator communicator, Queue queue, PromiseFulfiller fulfille this.featureRepository = featureRepository; } + /** + * Get a unique session {@link UUID} identifier. + * + * @return the unique session {@link UUID} identifier + */ + public UUID getSessionId() { + return sessionId; + } + /** * Send a {@link Request}. * @@ -214,4 +226,26 @@ public void onConnected() { } } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Session session = (Session) o; + return MoreObjects.equals(sessionId, session.sessionId); + } + + @Override + public int hashCode() { + return MoreObjects.hash(sessionId); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("sessionId", sessionId) + .add("queue", queue) + .add("featureRepository", featureRepository) + .toString(); + } } diff --git a/ocpp-common/src/test/java/eu/chargetime/ocpp/test/ServerTest.java b/ocpp-common/src/test/java/eu/chargetime/ocpp/test/ServerTest.java index 6c01a51be..82119c348 100644 --- a/ocpp-common/src/test/java/eu/chargetime/ocpp/test/ServerTest.java +++ b/ocpp-common/src/test/java/eu/chargetime/ocpp/test/ServerTest.java @@ -72,7 +72,9 @@ public class ServerTest extends TestUtilities { @Before public void setup() { + UUID sessionId = UUID.randomUUID(); when(request.validate()).thenReturn(true); + when(session.getSessionId()).thenReturn(sessionId); doAnswer(invocation -> listenerEvents = invocation.getArgumentAt(2, ListenerEvents.class)).when(listener).open(anyString(), anyInt(), any()); doAnswer(invocation -> sessionEvents = invocation.getArgumentAt(0, SessionEvents.class)).when(session).accept(any()); doAnswer(invocation -> sessionIndex = invocation.getArgumentAt(0, UUID.class)).when(serverEvents).newSession(any(), any()); diff --git a/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/IServerAPI.java b/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/IServerAPI.java index 436c96a90..82c1849c8 100644 --- a/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/IServerAPI.java +++ b/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/IServerAPI.java @@ -42,5 +42,6 @@ public interface IServerAPI { void close(); boolean isClosed(); - CompletionStage send(UUID session, Request request) throws OccurenceConstraintException, UnsupportedFeatureException; + + CompletionStage send(UUID session, Request request) throws OccurenceConstraintException, UnsupportedFeatureException, NotConnectedException; } \ No newline at end of file diff --git a/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/JSONClient.java b/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/JSONClient.java index f73cbc097..ad1927077 100644 --- a/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/JSONClient.java +++ b/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/JSONClient.java @@ -6,8 +6,15 @@ import eu.chargetime.ocpp.model.Request; import eu.chargetime.ocpp.wss.BaseWssSocketBuilder; import eu.chargetime.ocpp.wss.WssSocketBuilder; +import org.java_websocket.drafts.Draft; +import org.java_websocket.drafts.Draft_6455; +import org.java_websocket.protocols.Protocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.net.ssl.SSLContext; import java.io.IOException; +import java.util.Collections; import java.util.concurrent.CompletionStage; /* @@ -41,6 +48,11 @@ */ public class JSONClient implements IClientAPI { + private static final Logger logger = LoggerFactory.getLogger(JSONClient.class); + + public final Draft draftOcppOnly = + new Draft_6455(Collections.emptyList(), + Collections.singletonList(new Protocol("ocpp1.6"))); private final WebSocketTransmitter transmitter; private final FeatureRepository featureRepository; private final Client client; @@ -59,7 +71,7 @@ public JSONClient(ClientCoreProfile coreProfile) { */ public JSONClient(ClientCoreProfile coreProfile, String identity) { this.identity = identity; - transmitter = new WebSocketTransmitter(); + transmitter = new WebSocketTransmitter(draftOcppOnly); JSONCommunicator communicator = new JSONCommunicator(transmitter); AsyncPromiseFulfilerDecorator promiseFulfiler = new AsyncPromiseFulfilerDecorator(new SimplePromiseFulfiller()); featureRepository = new FeatureRepository(); @@ -116,6 +128,8 @@ public void addFeatureProfile(Profile profile) { @Override public void connect(String url, ClientEvents clientEvents) { + logger.debug("Feature repository: {}", featureRepository); + String identityUrl = (identity != null) ? String.format("%s/%s", url, identity) : url; client.connect(identityUrl, clientEvents); } diff --git a/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/JSONServer.java b/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/JSONServer.java index 5b0303ffa..098ff18bf 100644 --- a/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/JSONServer.java +++ b/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/JSONServer.java @@ -31,7 +31,13 @@ of this software and associated documentation files (the "Software"), to deal import eu.chargetime.ocpp.model.Request; import eu.chargetime.ocpp.wss.BaseWssFactoryBuilder; import eu.chargetime.ocpp.wss.WssFactoryBuilder; +import org.java_websocket.drafts.Draft; +import org.java_websocket.drafts.Draft_6455; +import org.java_websocket.protocols.Protocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Collections; import javax.net.ssl.SSLContext; import java.io.IOException; import java.util.UUID; @@ -39,9 +45,14 @@ of this software and associated documentation files (the "Software"), to deal public class JSONServer implements IServerAPI { - private WebSocketListener listener; - private Server server; - private FeatureRepository featureRepository; + private static final Logger logger = LoggerFactory.getLogger(JSONServer.class); + + public final Draft draftOcppOnly = + new Draft_6455(Collections.emptyList(), + Collections.singletonList(new Protocol("ocpp1.6"))); + private final WebSocketListener listener; + private final Server server; + private final FeatureRepository featureRepository; /** * The core feature profile is required as a minimum. @@ -52,7 +63,7 @@ public class JSONServer implements IServerAPI { public JSONServer(ServerCoreProfile coreProfile) { featureRepository = new FeatureRepository(); ServerSessionFactory sessionFactory = new ServerSessionFactory(featureRepository); - this.listener = new WebSocketListener(sessionFactory); + this.listener = new WebSocketListener(sessionFactory, draftOcppOnly); server = new Server(this.listener, featureRepository, new PromiseRepository()); featureRepository.addFeatureProfile(coreProfile); } @@ -108,6 +119,8 @@ public void closeSession(UUID session) { @Override public void open(String host, int port, ServerEvents serverEvents) { + logger.info("Feature repository: {}", featureRepository); + server.open(host, port, serverEvents); } @@ -122,7 +135,7 @@ public boolean isClosed() { } @Override - public CompletionStage send(UUID session, Request request) throws OccurenceConstraintException, UnsupportedFeatureException { + public CompletionStage send(UUID session, Request request) throws OccurenceConstraintException, UnsupportedFeatureException, NotConnectedException { return server.send(session, request); } } diff --git a/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/SOAPServer.java b/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/SOAPServer.java index 8171a10b5..5fe99f77d 100644 --- a/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/SOAPServer.java +++ b/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/SOAPServer.java @@ -79,7 +79,7 @@ public boolean isClosed() { } @Override - public CompletionStage send(UUID session, Request request) throws OccurenceConstraintException, UnsupportedFeatureException { + public CompletionStage send(UUID session, Request request) throws OccurenceConstraintException, UnsupportedFeatureException, NotConnectedException { return server.send(session, request); } } diff --git a/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/TimeoutSessionDecorator.java b/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/TimeoutSessionDecorator.java index 618ffe521..bc7def040 100644 --- a/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/TimeoutSessionDecorator.java +++ b/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/TimeoutSessionDecorator.java @@ -31,6 +31,8 @@ of this software and associated documentation files (the "Software"), to deal import eu.chargetime.ocpp.model.core.RegistrationStatus; import eu.chargetime.ocpp.utilities.TimeoutTimer; +import java.util.UUID; + public class TimeoutSessionDecorator implements ISession { private TimeoutTimer timeoutTimer; @@ -66,6 +68,11 @@ private void startTimer() { timeoutTimer.begin(); } + @Override + public UUID getSessionId() { + return session.getSessionId(); + } + @Override public void open(String uri, SessionEvents eventHandler) { SessionEvents events = createEventHandler(eventHandler); diff --git a/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/WebSocketListener.java b/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/WebSocketListener.java index a417b1c15..024489a49 100644 --- a/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/WebSocketListener.java +++ b/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/WebSocketListener.java @@ -29,45 +29,47 @@ of this software and associated documentation files (the "Software"), to deal import eu.chargetime.ocpp.wss.WssFactoryBuilder; import org.java_websocket.WebSocket; import org.java_websocket.drafts.Draft; -import org.java_websocket.drafts.Draft_6455; -import org.java_websocket.extensions.IExtension; import org.java_websocket.handshake.ClientHandshake; -import org.java_websocket.protocols.IProtocol; -import org.java_websocket.protocols.Protocol; -import org.java_websocket.server.DefaultSSLWebSocketServerFactory; import org.java_websocket.server.WebSocketServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; +import java.net.ConnectException; import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.HashMap; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class WebSocketListener implements Listener { private static final Logger logger = LoggerFactory.getLogger(WebSocketListener.class); + + private static final int TIMEOUT_IN_MILLIS = 1; + private final IServerSessionFactory sessionFactory; + private final List drafts; // In seconds private int pingInterval = 60; private WebSocketServer server; private WssFactoryBuilder wssFactoryBuilder; - private HashMap sockets; + private Map sockets; private volatile boolean closed = true; private boolean handleRequestAsync; - public WebSocketListener(IServerSessionFactory sessionFactory) { + public WebSocketListener(IServerSessionFactory sessionFactory, Draft... drafts) { this.sessionFactory = sessionFactory; - this.sockets = new HashMap<>(); + this.drafts = Arrays.asList(drafts); + this.sockets = new ConcurrentHashMap<>(); } @Override public void open(String hostname, int port, ListenerEvents handler) { - Draft_6455 draft = new Draft_6455(Collections.emptyList(), Collections.singletonList(new Protocol("ocpp1.6"))); - server = new WebSocketServer(new InetSocketAddress(hostname, port), Collections.singletonList(draft)) { + server = new WebSocketServer(new InetSocketAddress(hostname, port), drafts) { @Override - public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) { + public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) { + logger.debug("On connection open (resource descriptor: {})", clientHandshake.getResourceDescriptor()); WebSocketReceiver receiver = new WebSocketReceiver( new WebSocketReceiverEvents() { @@ -91,24 +93,30 @@ public void relay(String message) { } @Override - public void onClose(WebSocket webSocket, int i, String s, boolean b) { + public void onClose(WebSocket webSocket, int code, String reason, boolean remote) { + logger.debug("On connection close (resource descriptor: {}, code: {}, reason: {}, remote: {})", webSocket.getResourceDescriptor(), code, reason, remote); + sockets.get(webSocket).disconnect(); sockets.remove(webSocket); } @Override - public void onMessage(WebSocket webSocket, String s) { - sockets.get(webSocket).relay(s); + public void onMessage(WebSocket webSocket, String message) { + sockets.get(webSocket).relay(message); } @Override - public void onError(WebSocket webSocket, Exception e) { - + public void onError(WebSocket webSocket, Exception ex) { + if(ex instanceof ConnectException) { + logger.error("On error (resource descriptor: " + webSocket.getResourceDescriptor() + ") triggered caused by:", ex); + } else { + logger.error("On error (resource descriptor: " + webSocket.getResourceDescriptor() + ") triggered:", ex); + } } @Override public void onStart() { - + logger.debug("On start"); } }; @@ -142,13 +150,12 @@ public void setPingInterval(int interval) { public void close() { try { closed = true; - for (WebSocket ws : sockets.keySet()) + for (WebSocket ws : sockets.keySet()) { ws.close(); + } sockets.clear(); - - server.stop(1); - + server.stop(TIMEOUT_IN_MILLIS); } catch (InterruptedException e) { logger.error("Failed to close listener", e); } finally { diff --git a/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/WebSocketTransmitter.java b/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/WebSocketTransmitter.java index 15939d481..a095d849b 100644 --- a/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/WebSocketTransmitter.java +++ b/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/WebSocketTransmitter.java @@ -29,24 +29,15 @@ of this software and associated documentation files (the "Software"), to deal import eu.chargetime.ocpp.wss.WssSocketBuilder; import org.java_websocket.client.WebSocketClient; -import org.java_websocket.drafts.Draft_6455; +import org.java_websocket.drafts.Draft; import org.java_websocket.exceptions.WebsocketNotConnectedException; -import org.java_websocket.extensions.IExtension; import org.java_websocket.handshake.ServerHandshake; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.java_websocket.protocols.IProtocol; -import org.java_websocket.protocols.Protocol; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocketFactory; import java.io.IOException; import java.net.ConnectException; import java.net.URI; -import java.util.Collections; - /** * Web Socket implementation of the Transmitter. */ @@ -55,46 +46,47 @@ public class WebSocketTransmitter implements Transmitter private static final Logger logger = LoggerFactory.getLogger(WebSocketTransmitter.class); public static final String WSS_SCHEME = "wss"; + private final Draft draft; + // In seconds private int pingInterval = 60; private volatile boolean closed = true; private WebSocketClient client; private WssSocketBuilder wssSocketBuilder; - public WebSocketTransmitter() { + public WebSocketTransmitter(Draft draft) { + this.draft = draft; } @Override public void connect(String uri, RadioEvents events) { final URI resource = URI.create(uri); - Draft_6455 draft = new Draft_6455(Collections.emptyList(), Collections.singletonList(new Protocol("ocpp1.6"))); + client = new WebSocketClient(resource, draft) { @Override - public void onOpen(ServerHandshake serverHandshake) - { + public void onOpen(ServerHandshake serverHandshake) { + logger.debug("On connection open (HTTP status: {})", serverHandshake.getHttpStatus()); events.connected(); } @Override - public void onMessage(String s) - { - events.receivedMessage(s); + public void onMessage(String message) { + events.receivedMessage(message); } @Override - public void onClose(int i, String s, boolean b) - { - logger.debug("WebSocketClient.onClose: code = " + i + ", message = " + s + ", host closed = " + b); + public void onClose(int code, String reason, boolean remote) { + logger.debug("On connection close (code: {}, reason: {}, remote: {})", code, reason, remote); + events.disconnected(); } @Override - public void onError(Exception ex) - { + public void onError(Exception ex) { if(ex instanceof ConnectException) { - logger.warn("onError() triggered caused by: " + ex); + logger.error("On error triggered caused by:", ex); } else { - logger.warn("onError() triggered", ex); + logger.error("On error triggered:", ex); } } }; @@ -159,7 +151,10 @@ public void disconnect() @Override public void send(Object request) throws NotConnectedException { - logger.debug("Sending: " + request); + if(client == null) { + throw new NotConnectedException(); + } + try { client.send(request.toString()); } catch (WebsocketNotConnectedException ex) {