Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to implement custom JanusTextRoomPlugin? #3

Open
deaffella opened this issue Jun 8, 2024 · 10 comments
Open

How to implement custom JanusTextRoomPlugin? #3

deaffella opened this issue Jun 8, 2024 · 10 comments
Assignees

Comments

@deaffella
Copy link

Hi! I successfully launched JanusVideoCallPlugin and it works great. However, I'm confused about how to write my own JanusTextRoomPlugin. Could you help me with an example? I would like to receive and send messages to the text room.

@josephlim94
Copy link
Owner

Hi! Great to hear that.

import asyncio
import logging

from janus_client import (
    JanusSession,
    JanusPlugin,
)
from janus_client.message_transaction import is_subset

logger = logging.getLogger(__name__)


class JanusTextRoomPlugin(JanusPlugin):
    """Janus TextRoom plugin implementation"""

    name = "janus.plugin.textroom"

    async def on_receive(self, response: dict):
        print(f"Received message: {response}")

    async def list(
        self,
    ) -> dict:
        """List available rooms."""

        success_matcher = {
            "janus": "success",
            "plugindata": {
                "plugin": self.name,
                "data": {
                    "textroom": "event",
                },
            },
        }

        body = {
            "textroom": "list",
        }

        message_transaction = await self.send(
            message={
                "janus": "message",
                "body": body,
            },
        )
        response = await message_transaction.get(matcher=success_matcher, timeout=15)
        await message_transaction.done()

        if is_subset(response, {"janus": "error", "error": {}}):
            raise Exception(f"Janus error: {response}")

        return response
    

async def main():
    session = JanusSession(
        base_url="yoururlhere.com/janus",
    )

    plugin_textroom = JanusTextRoomPlugin()

    await plugin_textroom.attach(session=session),

    response = await plugin_textroom.list()

    print(response)
    print("--- Everything done ---")

    await plugin_textroom.destroy()

    await session.destroy()

asyncio.run(main())

You could start with this and refer to the documentation to implement other APIs for the plugin. Here: https://janus.conf.meetecho.com/docs/textroom.html

Let me know if you have more questions.

@deaffella
Copy link
Author

I managed to create a session and get a list of rooms and even a list of participants in the room. However, when I try to join or send a message to a room, I get an "invalid request join" error. I tried to send different types of requests according to Janus documentation, but I can't join the room.

@deaffella
Copy link
Author

Ok, I managed to join the room and send a message! The janus gateway documentation does not say that yo ushould use 'request': 'list' to join the room or send messages to it.

{
    "janus": "message",
    "body": {
        "textroom": "message",
        "username": **username**,
        "room": **room**,
        "text": **text**,
        'request': 'list'
    }
}

Now I'm faced with the fact that I can't receive messages. I tried to reimplement the method as in your example above, but this method is simply not called when sending messages to the text room. Nothing happens at python side when i send messages to the same text room in browser.

async def on_receive(self, response: dict):
    print(f"Received message: {response}")

@josephlim94
Copy link
Owner

A common mistake here is that you blocked the main thread when waiting to receive the message. Did you do something like using time.sleep(100) instead of await asyncio.sleep(100)? Please verify.

@deaffella
Copy link
Author

deaffella commented Jun 9, 2024

Let me show you what I'm doing.
Here is the plugin_text_room.py:

import asyncio
from janus_client import JanusPlugin
from typing import Dict, Any


class JanusTextRoomPlugin(JanusPlugin):
  """Janus EchoTest plugin implementation"""

  name = "janus.plugin.textroom"
  __webrtcup_event: asyncio.Event

  def __init__(self) -> None:
      super().__init__()

      self.__webrtcup_event = asyncio.Event()

  async def on_receive(self, response: dict):
      print(f'[textroom_plugin] response: {response}')

  async def send_request(self, request: [Dict[str, Any]]):
      full_message = {"janus": "message"}
      full_message["body"] = request
      message_transaction = await self.send(message=full_message)
      return message_transaction

And run.py:

import asyncio
import json
import random
import time

from janus_client import JanusSession, JanusEchoTestPlugin, JanusVideoRoomPlugin
from plugins.plugin_text_room import JanusTextRoomPlugin


class JanusTextRoomClient:
    def __init__(self,
                 url: str,
                 username: str = f'usernameTest-{random.randint(0, 100)}',
                 ):
        self._url = url
        self._username = username
        self.session: JanusSession = None
        self.plugin_handle: JanusTextRoomPlugin = None

    @property
    def url(self):
        return self._url

    @property
    def username(self):
        return self._username

    async def connect(self):
        self.session = JanusSession(base_url=self._url)
        self.plugin_handle = JanusTextRoomPlugin()
        await self.plugin_handle.attach(session=self.session)

    async def disconnect(self):
        await self.plugin_handle.destroy()
        await self.session.destroy()

    async def on_receive(self, response: dict):
        print(f"New message: {response}")

    def run(self):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(
            self._main_loop()
        )

    async def get_room_list(self):
        request = {'request': 'list'}
        return await self.plugin_handle.send_request(request=request)

    async def get_participants_list(self, room: int):
        request = {'request': 'listparticipants', "room": room}
        return await self.plugin_handle.send_request(request=request)

    async def join_room(self, room: int):
        join_room_msg = {
            "janus": "message",
            "body": {
                "textroom": "join",
                "username": self.username,
                "room": room,
                'request': 'list'
            }
        }
        return await self.plugin_handle.send(message=join_room_msg)

    async def send_message(self, room: int, text: str):
        join_room_msg = {
            "janus": "message",
            "body": {
                "textroom": "message",
                "username": self.username,
                "room": room,
                "text": text,
                'request': 'list'
            }
        }
        return await self.plugin_handle.send(message=join_room_msg)

    async def _main_loop(self):
        await self.connect()

        # join
        print(f'Plugin handle ID:\t{self.plugin_handle.id}')
        print(f'room_list:\n{json.dumps(await self.get_room_list(), indent=4)}')
        print(f'participants_list:\n{json.dumps(await self.get_participants_list(room=1234), indent=4)}')
        print(f'join_room:\n{json.dumps(await self.join_room(room=1234), indent=4)}')

        # send test message
        print(f'result:\n{json.dumps(await self.send_message(room=1234, text=str(time.time())), indent=4)}')

        # wait for new?
        await asyncio.sleep(100)

        await self.disconnect()


if __name__ == "__main__":
    base_url = "ws://***:****/janus"

    client = JanusTextRoomClient(url=base_url)
    print(client.url)
    client.run()

@josephlim94
Copy link
Owner

Looks good to me. I'll try when I have time.

@josephlim94
Copy link
Owner

Hi, I tried. It seems that we need to connect a DataChannel to receive messages. janus-client uses aiortc to create RTCPeerConnection, you can refer to their documentation (https://github.com/aiortc/aiortc?tab=readme-ov-file) and take a look at the code for JanusEchoTestPlugin (https://github.com/josephlim94/python_janus_client/blob/master/janus_client/plugin_echotest.py) to find out how to make a WebRTC connection.

import asyncio
import logging

from janus_client import (
    JanusSession,
    JanusPlugin,
)
from janus_client.message_transaction import is_subset

logger = logging.getLogger(__name__)


class JanusTextRoomPlugin(JanusPlugin):
    """Janus TextRoom plugin implementation"""

    name = "janus.plugin.textroom"

    async def on_receive(self, response: dict):
        print(f"Received message: {response}")

    async def send_wrapper(self, message: dict, matcher: dict) -> dict:
        def function_matcher(response: dict):
            return (
                is_subset(
                    response,
                    {
                        "janus": "success",
                        "plugindata": {
                            "plugin": self.name,
                            "data": matcher,
                        },
                    },
                )
                or is_subset(
                    response,
                    {
                        "janus": "success",
                        "plugindata": {
                            "plugin": self.name,
                            "data": {
                                "textroom": "event",
                            },
                        },
                    },
                )
                or is_subset(response, {"janus": "error", "error": {}})
            )

        message_transaction = await self.send(
            message={
                "janus": "message",
                "body": message,
            },
        )
        message_response = await message_transaction.get(
            matcher=function_matcher, timeout=15
        )
        await message_transaction.done()

        if is_subset(message_response, {"janus": "error", "error": {}}):
            raise Exception(f"Janus error: {message_response}")

    async def list(
        self,
    ) -> dict:
        """List available rooms."""

        return await self.send_wrapper(
            message={
                "request": "list",
            },
            matcher={
                "textroom": "success",
                "list": [],
            },
        )

    async def get_participants_list(self, room: int):
        """List participants in a specific room"""

        return await self.send_wrapper(
            message={
                "request": "listparticipants",
                "room": room,
            },
            matcher={
                "room": room,
                "participants": [],
            },
        )

    async def join_room(self, room: int):
        return await self.send_wrapper(
            message={
                "request": "list",
                "textroom": "join",
                "username": "test_username",
                "room": room,
            },
            matcher={
                "textroom": "success",
                "participants": [],
            },
        )

    async def message(self, room: int, text: str, ack: bool = True):
        return await self.send_wrapper(
            message={
                "request": "list",
                "textroom": "message",
                "room": room,
                "text": text,
                "ack": ack,
            },
            matcher={
                "textroom": "success",
            },
        )

    async def leave(self, room: int):
        return await self.send_wrapper(
            message={
                "request": "list",
                "textroom": "leave",
                "room": room,
            },
            matcher={
                "textroom": "success",
            },
        )

    async def announcement(self, room: int, text: str) -> dict:
        return await self.send_wrapper(
            message={
                "request": "list",
                "textroom": "announcement",
                "room": room,
                "secret": "adminpwd",
                "text": text,
            },
            matcher={
                "textroom": "success",
            },
        )


async def main():
    session = JanusSession(
        base_url="wss://janusmy.josephgetmyip.com/janusbasews/janus",
        api_secret="janusrocks",
    )

    plugin_textroom = JanusTextRoomPlugin()

    await plugin_textroom.attach(session=session),

    response = await plugin_textroom.list()

    response = await plugin_textroom.get_participants_list(1234)

    response = await plugin_textroom.join_room(1234)

    response = await plugin_textroom.message(1234, "test msg")

    await asyncio.sleep(20)
    print("--- Wait for awhile ---")

    response = await plugin_textroom.leave(1234)

    response = await plugin_textroom.announcement(1234, "test announcement")

    print(response)
    print("--- Everything done ---")

    await plugin_textroom.destroy()

    await session.destroy()


asyncio.run(main())

This is the code I have now. I very much welcome contributions 😃 .

@iamletenkov
Copy link

I finally managed to connect to the textroom. I can send and receive messages. Unfortunately, I was unable to integrate this approach into your code. Could you please help me write the correct plugin for python_janus_client?

here is my code:

import asyncio
import datetime
import logging
import string
import sys
import random
import time
import json
import websockets as ws
from queue import Queue
from threading import Thread
from aiortc.rtcdatachannel import RTCDataChannel
from aiortc import RTCPeerConnection, RTCSessionDescription

logger = logging.getLogger('echo')


class WebSocketClient():
    def __init__(self, url='ws://localhost:8188/'):
        self._url = url
        self.connection = None
        self._transactions = {}

    async def connect(self):
        self.connection = await ws.connect(self._url,
                                           subprotocols=['janus-protocol'],
                                           ping_interval=10,
                                           ping_timeout=10,
                                           compression=None)
        if self.connection.open:
            asyncio.ensure_future(self.receiveMessage())
            logger.info('WebSocket connected')
            return self

    def transaction_id(self):
        return ''.join(random.choice(string.ascii_letters) for x in range(12))

    async def send(self, message):
        tx_id = self.transaction_id()
        message.update({'transaction': tx_id})
        tx = asyncio.get_event_loop().create_future()
        tx_in = {'tx': tx, 'request': message['janus']}
        self._transactions[tx_id] = tx_in
        try:
            await asyncio.gather(self.connection.send(json.dumps(message)), tx)
        except Exception as e:
            tx.set_result(e)
        return tx.result()

    async def receiveMessage(self):
        try:
            async for message in self.connection:
                data = json.loads(message)
                tx_id = data.get('transaction')
                response = data['janus']

                # Handle ACK
                if tx_id and response == 'ack':
                    logger.debug(f'Received ACK for transaction {tx_id}')
                    if tx_id in self._transactions:
                        tx_in = self._transactions[tx_id]
                        if tx_in['request'] == 'keepalive':
                            tx = tx_in['tx']
                            tx.set_result(data)
                            del self._transactions[tx_id]
                            logger.debug(f'Closed transaction {tx_id}'
                                         f' with {response}')
                    continue

                # Handle Success / Event / Error
                if response not in {'success', 'error'}:
                    logger.info(f'Janus Event --> {response}')

                if tx_id and tx_id in self._transactions:
                    tx_in = self._transactions[tx_id]
                    tx = tx_in['tx']
                    tx.set_result(data)
                    del self._transactions[tx_id]
                    logger.debug(f'Closed transaction {tx_id}'
                                 f' with {response}')
        except Exception:
            logger.error('WebSocket failure')
        logger.info('Connection closed')

    async def close(self):
        if self.connection:
            await self.connection.close()
            self.connection = None
        self._transactions = {}


class JanusPlugin:
    def __init__(self, session, handle_id):
        self._session = session
        self._handle_id = handle_id

    async def sendMessage(self, message):
        logger.info('Sending message to the plugin')
        message.update({'janus': 'message', 'handle_id': self._handle_id})
        response = await self._session.send(message)
        return response


class JanusSession:
    def __init__(self, url='ws://localhost:8188/'):
        self._websocket = None
        self._url = url
        self._handles = {}
        self._session_id = None
        self._ka_interval = 15
        self._ka_task = None

    async def send(self, message):
        message.update({'session_id': self._session_id})
        response = await self._websocket.send(message)
        return response

    async def create(self):
        logger.info('Creating session')
        self._websocket = await WebSocketClient(self._url).connect()
        message = {'janus': 'create'}
        response = await self.send(message)
        assert response['janus'] == 'success'
        session_id = response['data']['id']
        self._session_id = session_id
        self._ka_task = asyncio.ensure_future(self._keepalive())
        logger.info('Session created')

    async def attach(self, plugin):
        logger.info('Attaching handle')
        message = {'janus': 'attach', 'plugin': plugin}
        response = await self.send(message)
        assert response['janus'] == 'success'
        handle_id = response['data']['id']
        handle = JanusPlugin(self, handle_id)
        self._handles[handle_id] = handle
        logger.info('Handle attached')
        return handle

    async def destroy(self):
        logger.info('Destroying session')
        if self._session_id:
            message = {'janus': 'destroy'}
            await self.send(message)
            self._session_id = None
        if self._ka_task:
            self._ka_task.cancel()
            try:
                await self._ka_task
            except asyncio.CancelledError:
                pass
            self._ka_task = None
        self._handles = {}
        logger.info('Session destroyed')

        logger.info('Closing WebSocket')
        if self._websocket:
            await self._websocket.close()
            self._websocket = None

    async def _keepalive(self):
        while True:
            logger.info('Sending keepalive')
            message = {'janus': 'keepalive'}
            await self.send(message)
            logger.info('Keepalive OK')
            await asyncio.sleep(self._ka_interval)


class JanusTextRoomPlugin:
    name = "janus.plugin.textroom"
    plugin: JanusPlugin
    data_channel: RTCDataChannel

    def __init__(self,
                 url: str = 'ws://localhost:8188/',
                 room: int = 1234,
                 username: str = f"user-{random.randint(0, 100)}",
                 ):
        self.url = url
        self.room = room
        self.username = username
        self.session = JanusSession(self.url)
        self.pc = RTCPeerConnection()

        self.send_queue = Queue()
        self.send_thread = Thread(target=self.run_send_loop, daemon=True, name='send_thread')

    async def _create_data_channel(self):
        # create session
        await self.session.create()

        # attach to echotest plugin
        self.plugin = await self.session.attach(self.name)

        # create data-channel
        self.data_channel = self.pc.createDataChannel('JanusDataChannel')
        print(f'DataChannel ({self.data_channel.label}) created')

        @self.data_channel.on('open')
        def on_open():
            print(f'DataChannel ({self.data_channel.label}) open')

        @self.data_channel.on('close')
        def on_close():
            print(f'DataChannel ({self.data_channel.label}) closed')

        @self.data_channel.on('message')
        def on_message(message):
            print(f'DataChannel ({self.data_channel.label}) received: {message}')

        setup_response = await self.plugin.sendMessage({'body': {'request': 'setup'}})
        print()
        print('setup_response')
        print(setup_response)
        print()
        offer = RTCSessionDescription(sdp=setup_response['jsep']['sdp'], type=setup_response['jsep']['type'])
        await self.pc.setRemoteDescription(offer)
        await self.pc.setLocalDescription(await self.pc.createAnswer())

    async def join(self):
        response = await self.plugin.sendMessage({'body': {
            'request': 'list',
            'textroom': 'join',
            'room': self.room,
            'display': self.username,
            'username': self.username,
        },
            'jsep': {
                'sdp': self.pc.localDescription.sdp,
                'trickle': True,
                'type': self.pc.localDescription.type,
            },
        })
        await asyncio.sleep(4)

    async def _send_loop(self):
        await self._create_data_channel()
        await self.join()
        while True:
            if not self.send_queue.empty():
                message = self.send_queue.get()
                await self.plugin.sendMessage({'body': {
                    'request': 'list',
                    'textroom': 'message',
                    'room': self.room,
                    "text": str(message),
                    "ack": True,
                }})

    def run_send_loop(self):
        loop = asyncio.new_event_loop()
        try:
            loop.run_until_complete(
                self._send_loop()
            )
            sys.exit(0)
        except Exception:
            sys.exit(1)

    def run_send_loop(self):
        loop = asyncio.new_event_loop()
        try:
            loop.run_until_complete(
                self._send_loop()
            )
            sys.exit(0)
        except Exception:
            sys.exit(1)

    def start(self):
        self.send_thread.start()

    def add_message_to_queue(self, message: str):
        self.send_queue.put(message)

if __name__ == '__main__':
    url = 'ws://*.*.*.*:8188/janus'
    plugin = JanusTextRoomPlugin(url=url, room=1234)
    plugin.start()

    while True:
        time.sleep(1)
        plugin.add_message_to_queue(
            f'test message{datetime.datetime.now()}'
        )

I also noticed some strange behavior when reading and sending messages. I can receive messages in the browser using the standard janus gateway interface only if inside janus.jcfg has set the value of the variable full_trickle = true. However, if full_trickle = true, then my python client is not able to read messages, only send them. I noticed that only if full_trickle = false, then I can exchange messages using python clients.

@josephlim94
Copy link
Owner

Hi @iamletenkov , thank you for your effort. It seems that the DataChannel is not connected.

In your script you create the RTCDataChannel and then register event handlers to it. Since we must send the setup request and wait for an offer, instead of us generating the offer, I think it's more correct to let RTCPeerConnection create the DataChannel on it's own and we only register the event handler for it.

When full_trickle = true, the peer connection can't be established because python_janus_client doesn't support trickle. However in my case whether full_trickle is true or false, in both cases the DataChannel can't be connected and I still can't receive message from browser. If set full_trickle = false then it should be able to work between Python clients, but full_trickle = true will need more work.

In my script I tried to implement support for trickle, it's still done in the plugin handler, but I didn't manage to get it to work. Not tested between Python clients yet though.

I've created the branch implement_plugin_text_room and added the work we have so far to https://github.com/josephlim94/python_janus_client/blob/implement_plugin_text_room/plugin_textroom.py
I hope that we can work together on this branch. It helps to give credit for your contribution.

@josephlim94
Copy link
Owner

Hi @iamletenkov , I finally managed to setup the data channel.

  1. I was wrong to assume the JSEP answer is to be sent with "join" request. But I still need to send a request to reply the JSEP answer, so I've chosen to send it with an "ack" request instead.
  2. aiortc is using aioice which supports "half trickle" I think (Add support for ICE trickle aiortc/aioice#4). To work with Janus full_trickle=true, we need to wait for Janus to finish sending ICE candidates from trickle messages. For now I just made the code wait for awhile and assume Janus already finish sending, then only create JSEP answer and set local description (await self._pc.setLocalDescription(await self._pc.createAnswer())).
  3. The JSEP answer created by aiortc peer connection always contains all ICE candidates, and Janus can work with that so no worries there.

I've pushed my changes. Now I can't accept the hardcoded wait for the trickle ice candidates. I'm going to take my time to find a solution for this.

@josephlim94 josephlim94 self-assigned this Jul 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants