From a22e0264be431d8f3c5be50f4c8b3614109d4a61 Mon Sep 17 00:00:00 2001 From: maaikelimper Date: Mon, 7 Aug 2023 14:06:47 +0000 Subject: [PATCH] subscribe to wis2box/# on internal broker --- default.env | 7 +++--- docker-compose.dev.yml | 2 +- docker-compose.yml | 2 +- wis2box-broker/acl.conf | 1 - .../wis2box/pubsub/subscribe.py | 23 ++++++++----------- .../mqtt_metrics_collector.py | 4 ++-- 6 files changed, 16 insertions(+), 23 deletions(-) diff --git a/default.env b/default.env index b424fd49..43dd7e99 100644 --- a/default.env +++ b/default.env @@ -56,7 +56,6 @@ MINIO_NOTIFY_MQTT_ENABLE_WIS2BOX=on MINIO_NOTIFY_MQTT_USERNAME_WIS2BOX=${WIS2BOX_BROKER_USERNAME} MINIO_NOTIFY_MQTT_PASSWORD_WIS2BOX=${WIS2BOX_BROKER_PASSWORD} MINIO_NOTIFY_MQTT_BROKER_WIS2BOX=tcp://${WIS2BOX_BROKER_HOST}:${WIS2BOX_BROKER_PORT} -MINIO_NOTIFY_MQTT_TOPIC_WIS2BOX=wis2box-storage/minio -MINIO_NOTIFY_MQTT_TOPIC_WIS2BOX=wis2box-storage/minio -MINIO_NOTIFY_MQTT_QOS_WIS2BOX=1 - +MINIO_NOTIFY_MQTT_TOPIC_WIS2BOX=wis2box/storage +MINIO_NOTIFY_MQTT_TOPIC_WIS2BOX=wis2box/storage +MINIO_NOTIFY_MQTT_QOS_WIS2BOX=1 \ No newline at end of file diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 3d219b63..2a63b6b6 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -4,7 +4,7 @@ services: - ${WIS2BOX_HOST_DATADIR}:/data/wis2box:rw - ./wis2box-management/wis2box/wis2box.cron:/etc/cron.d/wis2box:ro - ./wis2box-management/wis2box:/usr/local/lib/python3.9/site-packages/wis2box-0.3.dev0-py3.9.egg/wis2box - command: ["wis2box", "pubsub" , "subscribe", "--broker", "http://wis2box-minio:9000", "--topic", "wis2box-storage/#", "--verbosity", "INFO"] + command: ["wis2box", "pubsub" , "subscribe", "--verbosity", "INFO"] # wis2box-api: # volumes: diff --git a/docker-compose.yml b/docker-compose.yml index d77b02c8..9cae07a4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -139,7 +139,7 @@ services: condition: service_started wis2box-api: condition: service_healthy - command: ["wis2box", "pubsub" , "subscribe", "--broker", "http://wis2box-minio:9000", "--topic", "wis2box-storage/#"] + command: ["wis2box", "pubsub" , "subscribe"] wis2box-auth: container_name: wis2box-auth diff --git a/wis2box-broker/acl.conf b/wis2box-broker/acl.conf index f7a82e5a..64f9e36b 100644 --- a/wis2box-broker/acl.conf +++ b/wis2box-broker/acl.conf @@ -4,6 +4,5 @@ topic read origin/# user _WIS2BOX_BROKER_USERNAME topic readwrite origin/# topic readwrite wis2box/# -topic readwrite wis2box-storage/# topic readwrite data-incoming/# topic read $SYS/# \ No newline at end of file diff --git a/wis2box-management/wis2box/pubsub/subscribe.py b/wis2box-management/wis2box/pubsub/subscribe.py index 2feb3fe0..7450ac41 100644 --- a/wis2box-management/wis2box/pubsub/subscribe.py +++ b/wis2box-management/wis2box/pubsub/subscribe.py @@ -85,26 +85,21 @@ def on_message_handler(client, userdata, msg): else: LOGGER.warning('message payload could not be parsed') return - - while len(mp.active_children()) == mp.cpu_count(): - sleep(0.1) - - p = mp.Process(target=handle, args=(filepath,)) - p.start() + + while len(mp.active_children()) == mp.cpu_count(): + sleep(0.1) + p = mp.Process(target=handle, args=(filepath,)) + p.start() @click.command() @click.pass_context -@click.option('--broker', '-b', help='URL to broker') -@click.option('--topic', '-t', help='topic to subscribe to') @cli_helpers.OPTION_VERBOSITY -def subscribe(ctx, broker, topic, verbosity): - """Subscribe to a broker/topic""" +def subscribe(ctx, verbosity): + """Subscribe to the internal broker and process incoming messages""" click.echo('Adding messages collection') setup_collection(meta=gcm()) - click.echo(f'Subscribing to broker {broker}, topic {topic}') - defs = { 'codepath': PLUGINS['pubsub']['mqtt']['plugin'], 'url': f'mqtt://{BROKER_USERNAME}:{BROKER_PASSWORD}@{BROKER_HOST}:{BROKER_PORT}', # noqa @@ -115,5 +110,5 @@ def subscribe(ctx, broker, topic, verbosity): broker.bind('on_message', on_message_handler) - broker.sub(topic) - broker.sub('wis2box/notifications') + click.echo(f'Subscribing to internal broker on topic wis2box/#') + broker.sub('wis2box/#') diff --git a/wis2box-mqtt-metrics-collector/mqtt_metrics_collector.py b/wis2box-mqtt-metrics-collector/mqtt_metrics_collector.py index b3d001da..7e2aa6c4 100644 --- a/wis2box-mqtt-metrics-collector/mqtt_metrics_collector.py +++ b/wis2box-mqtt-metrics-collector/mqtt_metrics_collector.py @@ -128,7 +128,7 @@ def sub_connect(client, userdata, flags, rc, properties=None): """ logger.info(f"on connection to subscribe: {mqtt.connack_string(rc)}") - for s in ["wis2box/#", "wis2box-storage/#", '$SYS/broker/messages/#']: + for s in ["wis2box/#", '$SYS/broker/messages/#']: print(f'subscribe to: {s}') client.subscribe(s, qos=0) @@ -174,7 +174,7 @@ def sub_mqtt_metrics(client, userdata, msg): failure_descr_wsi_total.labels(descr, wsi).inc(1) failure_wsi_total.labels(wsi).inc(1) failure_total.inc(1) - elif str(msg.topic).startswith('wis2box-storage'): + elif str(msg.topic).startswith('wis2box/storage'): if str(m["Key"]).startswith('wis2box-incoming'): storage_incoming_total.inc(1) if str(m["Key"]).startswith('wis2box-public'):