Skip to content

Commit

Permalink
subscribe to wis2box/# on internal broker
Browse files Browse the repository at this point in the history
  • Loading branch information
maaikelimper committed Aug 7, 2023
1 parent a99c22f commit a22e026
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 23 deletions.
7 changes: 3 additions & 4 deletions default.env
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ MINIO_NOTIFY_MQTT_ENABLE_WIS2BOX=on
MINIO_NOTIFY_MQTT_USERNAME_WIS2BOX=${WIS2BOX_BROKER_USERNAME}
MINIO_NOTIFY_MQTT_PASSWORD_WIS2BOX=${WIS2BOX_BROKER_PASSWORD}
MINIO_NOTIFY_MQTT_BROKER_WIS2BOX=tcp://${WIS2BOX_BROKER_HOST}:${WIS2BOX_BROKER_PORT}
MINIO_NOTIFY_MQTT_TOPIC_WIS2BOX=wis2box-storage/minio
MINIO_NOTIFY_MQTT_TOPIC_WIS2BOX=wis2box-storage/minio
MINIO_NOTIFY_MQTT_QOS_WIS2BOX=1

MINIO_NOTIFY_MQTT_TOPIC_WIS2BOX=wis2box/storage
MINIO_NOTIFY_MQTT_TOPIC_WIS2BOX=wis2box/storage
MINIO_NOTIFY_MQTT_QOS_WIS2BOX=1
2 changes: 1 addition & 1 deletion docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ services:
- ${WIS2BOX_HOST_DATADIR}:/data/wis2box:rw
- ./wis2box-management/wis2box/wis2box.cron:/etc/cron.d/wis2box:ro
- ./wis2box-management/wis2box:/usr/local/lib/python3.9/site-packages/wis2box-0.3.dev0-py3.9.egg/wis2box
command: ["wis2box", "pubsub" , "subscribe", "--broker", "http://wis2box-minio:9000", "--topic", "wis2box-storage/#", "--verbosity", "INFO"]
command: ["wis2box", "pubsub" , "subscribe", "--verbosity", "INFO"]

# wis2box-api:
# volumes:
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ services:
condition: service_started
wis2box-api:
condition: service_healthy
command: ["wis2box", "pubsub" , "subscribe", "--broker", "http://wis2box-minio:9000", "--topic", "wis2box-storage/#"]
command: ["wis2box", "pubsub" , "subscribe"]

wis2box-auth:
container_name: wis2box-auth
Expand Down
1 change: 0 additions & 1 deletion wis2box-broker/acl.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,5 @@ topic read origin/#
user _WIS2BOX_BROKER_USERNAME
topic readwrite origin/#
topic readwrite wis2box/#
topic readwrite wis2box-storage/#
topic readwrite data-incoming/#
topic read $SYS/#
23 changes: 9 additions & 14 deletions wis2box-management/wis2box/pubsub/subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,26 +85,21 @@ def on_message_handler(client, userdata, msg):
else:
LOGGER.warning('message payload could not be parsed')
return

while len(mp.active_children()) == mp.cpu_count():
sleep(0.1)

p = mp.Process(target=handle, args=(filepath,))
p.start()

while len(mp.active_children()) == mp.cpu_count():
sleep(0.1)
p = mp.Process(target=handle, args=(filepath,))
p.start()


@click.command()
@click.pass_context
@click.option('--broker', '-b', help='URL to broker')
@click.option('--topic', '-t', help='topic to subscribe to')
@cli_helpers.OPTION_VERBOSITY
def subscribe(ctx, broker, topic, verbosity):
"""Subscribe to a broker/topic"""
def subscribe(ctx, verbosity):
"""Subscribe to the internal broker and process incoming messages"""
click.echo('Adding messages collection')
setup_collection(meta=gcm())

click.echo(f'Subscribing to broker {broker}, topic {topic}')

defs = {
'codepath': PLUGINS['pubsub']['mqtt']['plugin'],
'url': f'mqtt://{BROKER_USERNAME}:{BROKER_PASSWORD}@{BROKER_HOST}:{BROKER_PORT}', # noqa
Expand All @@ -115,5 +110,5 @@ def subscribe(ctx, broker, topic, verbosity):

broker.bind('on_message', on_message_handler)

broker.sub(topic)
broker.sub('wis2box/notifications')
click.echo(f'Subscribing to internal broker on topic wis2box/#')
broker.sub('wis2box/#')
4 changes: 2 additions & 2 deletions wis2box-mqtt-metrics-collector/mqtt_metrics_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def sub_connect(client, userdata, flags, rc, properties=None):
"""

logger.info(f"on connection to subscribe: {mqtt.connack_string(rc)}")
for s in ["wis2box/#", "wis2box-storage/#", '$SYS/broker/messages/#']:
for s in ["wis2box/#", '$SYS/broker/messages/#']:
print(f'subscribe to: {s}')
client.subscribe(s, qos=0)

Expand Down Expand Up @@ -174,7 +174,7 @@ def sub_mqtt_metrics(client, userdata, msg):
failure_descr_wsi_total.labels(descr, wsi).inc(1)
failure_wsi_total.labels(wsi).inc(1)
failure_total.inc(1)
elif str(msg.topic).startswith('wis2box-storage'):
elif str(msg.topic).startswith('wis2box/storage'):
if str(m["Key"]).startswith('wis2box-incoming'):
storage_incoming_total.inc(1)
if str(m["Key"]).startswith('wis2box-public'):
Expand Down

0 comments on commit a22e026

Please sign in to comment.