The Internet of Things (IoT) is an important source of data streams in the modern digital world.
The “Things” are huge in count and emit messages in very high frequency which flood the
global internet. Hence, efficient stream processing is inevitable.
One use case is the distributed weather stations use case. Each “weather station” emits
readings for the current weather status to the “central base station” for persistence and
analysis. In this project, you will find the implementation of the architecture of a weather
monitoring system.
Ahmed Aboeleid
Mohamed Salama
Youssef Bazina
- You need to install Java (19 Minimum), Kafka, Kafka Image, Docker, K8s, and Elasticsearch Image.
-
Run Kafka Commands Respectively
Service Command Zoo Keeper bin/zookeeper-server-start.sh config/zookeeper.properties Kafka Server bin/kafka-server-start.sh config/server.properties Create Weather Status Messages Topic bin/kafka-topics.sh --create --topic weather-status-messages --bootstrap-server localhost:9092 Create Raining Status Messages Topic bin/kafka-topics.sh --create --topic raining-status-messages --bootstrap-server localhost:9092 Run Kafka Consumer bin/kafka-console-consumer.sh --topic weather-status-messages --from-beginning --bootstrap-server localhost:9092 -
Run Bitcask.
-
Run Base Central Station.
-
Run Weather Station. Run multiple instances with arguments
station_id latitude longitude
to simulate multiple stations, e.g.1 30.0444 31.2357
. -
Run Kafka Processor.
Multiple Weather Stations which feed a message queueing service Kafka with their readings.
- Implemented in Weather Station
- Weather station gets its data from Open-Meteo API according to a latitude and longitude the API.
- Data is fetched every 1 second.
- The properties of this data is battery distribution(30% low - 40% medium - 30% high) and dropping percentage of 10%.
- Built using Adapter Integration Pattern to connect our App to Open-Meteo and to receive data on the needed-form.
- Implemented in Kafka Processor.
- There are two types of processing following
Processing Type | Description |
---|---|
Dropping Messages | Processes messages by probabilistic sampling of 10%, then throw some of them away |
Raining Areas | Processes messages and detects rain when humidity > 70%, then pass new messages to raining topic. Kafka Streams and Filters are used to do this |
- Kafka streams produce messages to Weather Topic And records which have humidity > 70 (Pipe & Filter Patterns) go to Raining Topic.
- Built using Envelope Wrapper as each message Kafka streams unwraps it and processes it then wraps it again as raining status message.
- Dropped messages go to Invalid Channel which is a RocksDB.
- Data Processing and Archiving is implemented in Base Central Station.
- It consumes messages from Weather Topic and Raining Topic.
- It then writes them to Parquet Files, Parquet writer aggregates every 10k records and flushes them to the file.
- Files are partitioned by day and station_id.
- When the writer shuts down, when it restarts it will create a new file for the same station if it's the same day with new version number.
- Implemented in bitCask With JavaDocs.
- We implemented the BitCask Riak LSM to maintain an updated store of each station status as discussed in This paper
-
Scheduled Compaction over Replica Files to avoid disrupting active readers.
-
Tombstones for deletions to mark deleted entries, so they are skipped at compaction process.
-
The Entry Structure in active and replica files is as follows
ENTRY timestamp key size value size key value SIZE 8 bytes 4 bytes 4 bytes key size value size -
The Entry Structure in hint files is as follows
ENTRY timestamp key size value size value position key SIZE 8 bytes 4 bytes 4 bytes 8 bytes key size -
- Create a new in-memory structure called keydir.
- Reads hint files if found, from start to end, and fill keydir with key value pairs.
- If hint file is not found for specific timestamp, it reads Active file, from start to end, and fill keydir with key value pairs.
-
- Loop on all replica files, read each replica file from start to end, add its key value pairs to hashMap.
- Loop on keydir, write each key value as entry in a compacted file.
- Delete replica files.
-
- One writer at a time, other writers wait until the lock is released.
- Multiple readers can read at the same time.
-
No checksums implemented to detect errors.
-
- Implemented in elastic-search-and-kibana.
- A command is run to loop on parquet files in elasticsearch.
- Kibana's visualisations confirming Battery status distribution of some stations confirming the battery distribution of stations
- Kibana's visualisations calculating the percentage of dropped messages from stations confirming the required percentage 10%