Skip to content

Commit

Permalink
Latency logger added.
Browse files Browse the repository at this point in the history
  • Loading branch information
UellingtonDamasceno committed Sep 13, 2023
1 parent b676253 commit 3f4576f
Show file tree
Hide file tree
Showing 7 changed files with 275 additions and 177 deletions.
137 changes: 71 additions & 66 deletions src/main/java/com/device/fot/virtual/app/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
import org.json.JSONArray;

import com.device.fot.virtual.controller.BrokerUpdateCallback;
import com.device.fot.virtual.controller.DataController;
import com.device.fot.virtual.controller.LatencyLogController;
import com.device.fot.virtual.controller.MessageLogController;
import com.device.fot.virtual.model.BrokerSettings;
import com.device.fot.virtual.model.BrokerSettingsBuilder;
import com.device.fot.virtual.model.FoTDevice;
Expand All @@ -30,72 +31,76 @@
*/
public class Main {

public static void main(String[] args) {
try (InputStream input = Main.class.getResourceAsStream("broker.properties")) {
if (input == null) {
System.err.println("Sorry, unable to find config.properties.");
return;
}
Properties props = new Properties();
props.load(input);
String deviceId = CLI.getDeviceId(args)
.orElse(UUID.randomUUID().toString());

String brokerIp = CLI.getBrokerIp(args)
.orElse(props.getProperty("brokerIp"));

String port = CLI.getPort(args)
.orElse(props.getProperty("port"));

String password = CLI.getPassword(args)
.orElse(props.getProperty("password"));

String user = CLI.getUsername(args)
.orElse(props.getProperty("username"));

String timeout = CLI.getTimeout(args)
.orElse("10000");

BrokerSettings brokerSettings = BrokerSettingsBuilder
.builder()
.setBrokerIp(brokerIp)
.setPort(port)
.setPassword(password)
.setUsername(user)
.deviceId(deviceId)
.build();

if (CLI.hasParam("-ps", args)) {
DataController.getInstance().createAndSetDataFile(deviceId + ".csv");
DataController.getInstance().start();
DataController.getInstance().setCanSaveData(true);
}

List<Sensor> sensors = readSensors("sensors.json", deviceId)
.stream()
.map(Sensor.class::cast)
.collect(toList());

FoTDevice device = new FoTDevice(deviceId, sensors);
BrokerUpdateCallback callback = new BrokerUpdateCallback(device);
callback.startUpdateBroker(brokerSettings, Long.parseLong(timeout), true);

} catch (IOException ex) {
System.err.println("Sorry, unable to find sensors.json or not create pesistence file.");
}
public static void main(String[] args) {
try (InputStream input = Main.class.getResourceAsStream("broker.properties")) {
if (input == null) {
System.err.println("Sorry, unable to find config.properties.");
return;
}
Properties props = new Properties();
props.load(input);
String deviceId = CLI.getDeviceId(args)
.orElse(UUID.randomUUID().toString());

String brokerIp = CLI.getBrokerIp(args)
.orElse(props.getProperty("brokerIp"));

String port = CLI.getPort(args)
.orElse(props.getProperty("port"));

String password = CLI.getPassword(args)
.orElse(props.getProperty("password"));

String user = CLI.getUsername(args)
.orElse(props.getProperty("username"));

String timeout = CLI.getTimeout(args)
.orElse("10000");

BrokerSettings brokerSettings = BrokerSettingsBuilder
.builder()
.setBrokerIp(brokerIp)
.setPort(port)
.setPassword(password)
.setUsername(user)
.deviceId(deviceId)
.build();

if (CLI.hasParam("-ps", args)) {
MessageLogController.getInstance().createAndUpdateFileName(deviceId + "_messages_log.csv");
MessageLogController.getInstance().start();
MessageLogController.getInstance().setCanSaveData(true);
}

if(CLI.hasParam("-ll", args)){
LatencyLogController.getInstance().createAndUpdateFileName(deviceId + "_latency_log.csv");
LatencyLogController.getInstance().start();
LatencyLogController.getInstance().setCanSaveData(true);
}

List<Sensor> sensors = readSensors("sensors.json", deviceId)
.stream()
.map(Sensor.class::cast)
.collect(toList());

FoTDevice device = new FoTDevice(deviceId, sensors);
BrokerUpdateCallback callback = new BrokerUpdateCallback(device);
callback.startUpdateBroker(brokerSettings, Long.parseLong(timeout), true);

} catch (IOException ex) {
System.err.println("Sorry, unable to find sensors.json or not create pesistence file.");
}
}

private static List<FoTSensor> readSensors(String fileName, String deviceName) throws IOException {
try (var inputStream = Main.class.getResourceAsStream(fileName);
var inputReader = new InputStreamReader(inputStream);
var bufferedReader = new BufferedReader(inputReader)) {

String textFile = bufferedReader.lines().collect(joining());
JSONArray sensorsArray = new JSONArray(textFile);
return SensorWrapper.getAllSensors(sensorsArray)
.stream()
.map(sensor -> new FoTSensor(deviceName, sensor))
.collect(toList());
}
private static List<FoTSensor> readSensors(String fileName, String deviceName) throws IOException {
try (var inputStream = Main.class.getResourceAsStream(fileName); var inputReader = new InputStreamReader(inputStream); var bufferedReader = new BufferedReader(inputReader)) {

String textFile = bufferedReader.lines().collect(joining());
JSONArray sensorsArray = new JSONArray(textFile);
return SensorWrapper.getAllSensors(sensorsArray)
.stream()
.map(sensor -> new FoTSensor(deviceName, sensor))
.collect(toList());
}
}
}
104 changes: 0 additions & 104 deletions src/main/java/com/device/fot/virtual/controller/DataController.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package com.device.fot.virtual.controller;

import java.util.logging.Level;
import java.util.logging.Logger;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
Expand All @@ -16,6 +13,9 @@

import extended.tatu.wrapper.model.TATUMessage;
import extended.tatu.wrapper.util.TATUWrapper;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.eclipse.paho.client.mqttv3.MqttException;

/**
*
Expand Down Expand Up @@ -95,11 +95,31 @@ public void messageArrived(String topic, MqttMessage mqttMessage) throws Excepti

@Override
public void deliveryComplete(IMqttDeliveryToken imdt) {

MqttMessage deliveredMessage;
try {
deliveredMessage = imdt.getMessage();
String messageContent = new String(deliveredMessage.getPayload());
long customTimestamp = TATUWrapper.getMessageTimestamp(messageContent);
if(customTimestamp == 0){
System.out.println("The message"+ messageContent +" don't have timestamp");
}

long latency = System.currentTimeMillis() - customTimestamp;
LatencyLogController.getInstance().putLatency(latency);
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("excep "+me);
Logger.getLogger(DefaultFlowCallback.class.getName()).log(Level.SEVERE, null, me);
} catch (InterruptedException ex) {
Logger.getLogger(DefaultFlowCallback.class.getName()).log(Level.SEVERE, null, ex);
}
}

@Override
public void connectionLost(Throwable cause) {
Logger.getLogger(DefaultFlowCallback.class.getName()).log(Level.SEVERE, null, cause);
Logger.getLogger(DefaultFlowCallback.class
.getName()).log(Level.SEVERE, null, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.device.fot.virtual.controller;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;

public class LatencyLogController extends PersistenceController<Long> {

private static LatencyLogController latencyLogController = new LatencyLogController();

private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("dd-MM-yyyy HH:mm:ss.SSS");

private LatencyLogController() {
super("latency_log.csv");
}

public synchronized static LatencyLogController getInstance() {
return latencyLogController;
}

public void putLatency(Long latency) throws InterruptedException {
if (canSaveData) {
buffer.put(latency);
}
}

private String buildLogLatencyLine(Long latency) {
LocalDateTime currentTime = LocalDateTime.now();
String formattedTime = currentTime.format(formatter);
return String.format("%s,%d", formattedTime, latency);
}

@Override
public void run() {
running = true;
var latencyLines = new ArrayList<String>(bufferSize);
while (running) {
try {
if (!buffer.isEmpty()) {
latencyLines.add(this.buildLogLatencyLine(buffer.take()));
if (latencyLines.size() >= bufferSize) {
this.write(latencyLines);
latencyLines.clear();
}
}
} catch (InterruptedException ex) {
this.write(latencyLines);
this.running = false;
}
}
}

@Override
public String getThreadName() {
return "LATENCY_LOG_WRITER";
}
}
Loading

0 comments on commit 3f4576f

Please sign in to comment.