-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
How to implement custom JanusTextRoomPlugin? #3
Comments
Hi! Great to hear that. import asyncio
import logging
from janus_client import (
JanusSession,
JanusPlugin,
)
from janus_client.message_transaction import is_subset
logger = logging.getLogger(__name__)
class JanusTextRoomPlugin(JanusPlugin):
"""Janus TextRoom plugin implementation"""
name = "janus.plugin.textroom"
async def on_receive(self, response: dict):
print(f"Received message: {response}")
async def list(
self,
) -> dict:
"""List available rooms."""
success_matcher = {
"janus": "success",
"plugindata": {
"plugin": self.name,
"data": {
"textroom": "event",
},
},
}
body = {
"textroom": "list",
}
message_transaction = await self.send(
message={
"janus": "message",
"body": body,
},
)
response = await message_transaction.get(matcher=success_matcher, timeout=15)
await message_transaction.done()
if is_subset(response, {"janus": "error", "error": {}}):
raise Exception(f"Janus error: {response}")
return response
async def main():
session = JanusSession(
base_url="yoururlhere.com/janus",
)
plugin_textroom = JanusTextRoomPlugin()
await plugin_textroom.attach(session=session),
response = await plugin_textroom.list()
print(response)
print("--- Everything done ---")
await plugin_textroom.destroy()
await session.destroy()
asyncio.run(main()) You could start with this and refer to the documentation to implement other APIs for the plugin. Here: https://janus.conf.meetecho.com/docs/textroom.html Let me know if you have more questions. |
I managed to create a session and get a list of rooms and even a list of participants in the room. However, when I try to join or send a message to a room, I get an "invalid request join" error. I tried to send different types of requests according to Janus documentation, but I can't join the room. |
Ok, I managed to join the room and send a message! The janus gateway documentation does not say that yo ushould use
Now I'm faced with the fact that I can't receive messages. I tried to reimplement the method as in your example above, but this method is simply not called when sending messages to the text room. Nothing happens at python side when i send messages to the same text room in browser.
|
A common mistake here is that you blocked the main thread when waiting to receive the message. Did you do something like using |
Let me show you what I'm doing.
And
|
Looks good to me. I'll try when I have time. |
Hi, I tried. It seems that we need to connect a DataChannel to receive messages. import asyncio
import logging
from janus_client import (
JanusSession,
JanusPlugin,
)
from janus_client.message_transaction import is_subset
logger = logging.getLogger(__name__)
class JanusTextRoomPlugin(JanusPlugin):
"""Janus TextRoom plugin implementation"""
name = "janus.plugin.textroom"
async def on_receive(self, response: dict):
print(f"Received message: {response}")
async def send_wrapper(self, message: dict, matcher: dict) -> dict:
def function_matcher(response: dict):
return (
is_subset(
response,
{
"janus": "success",
"plugindata": {
"plugin": self.name,
"data": matcher,
},
},
)
or is_subset(
response,
{
"janus": "success",
"plugindata": {
"plugin": self.name,
"data": {
"textroom": "event",
},
},
},
)
or is_subset(response, {"janus": "error", "error": {}})
)
message_transaction = await self.send(
message={
"janus": "message",
"body": message,
},
)
message_response = await message_transaction.get(
matcher=function_matcher, timeout=15
)
await message_transaction.done()
if is_subset(message_response, {"janus": "error", "error": {}}):
raise Exception(f"Janus error: {message_response}")
async def list(
self,
) -> dict:
"""List available rooms."""
return await self.send_wrapper(
message={
"request": "list",
},
matcher={
"textroom": "success",
"list": [],
},
)
async def get_participants_list(self, room: int):
"""List participants in a specific room"""
return await self.send_wrapper(
message={
"request": "listparticipants",
"room": room,
},
matcher={
"room": room,
"participants": [],
},
)
async def join_room(self, room: int):
return await self.send_wrapper(
message={
"request": "list",
"textroom": "join",
"username": "test_username",
"room": room,
},
matcher={
"textroom": "success",
"participants": [],
},
)
async def message(self, room: int, text: str, ack: bool = True):
return await self.send_wrapper(
message={
"request": "list",
"textroom": "message",
"room": room,
"text": text,
"ack": ack,
},
matcher={
"textroom": "success",
},
)
async def leave(self, room: int):
return await self.send_wrapper(
message={
"request": "list",
"textroom": "leave",
"room": room,
},
matcher={
"textroom": "success",
},
)
async def announcement(self, room: int, text: str) -> dict:
return await self.send_wrapper(
message={
"request": "list",
"textroom": "announcement",
"room": room,
"secret": "adminpwd",
"text": text,
},
matcher={
"textroom": "success",
},
)
async def main():
session = JanusSession(
base_url="wss://janusmy.josephgetmyip.com/janusbasews/janus",
api_secret="janusrocks",
)
plugin_textroom = JanusTextRoomPlugin()
await plugin_textroom.attach(session=session),
response = await plugin_textroom.list()
response = await plugin_textroom.get_participants_list(1234)
response = await plugin_textroom.join_room(1234)
response = await plugin_textroom.message(1234, "test msg")
await asyncio.sleep(20)
print("--- Wait for awhile ---")
response = await plugin_textroom.leave(1234)
response = await plugin_textroom.announcement(1234, "test announcement")
print(response)
print("--- Everything done ---")
await plugin_textroom.destroy()
await session.destroy()
asyncio.run(main()) This is the code I have now. I very much welcome contributions 😃 . |
I finally managed to connect to the textroom. I can send and receive messages. Unfortunately, I was unable to integrate this approach into your code. Could you please help me write the correct plugin for here is my code: import asyncio
import datetime
import logging
import string
import sys
import random
import time
import json
import websockets as ws
from queue import Queue
from threading import Thread
from aiortc.rtcdatachannel import RTCDataChannel
from aiortc import RTCPeerConnection, RTCSessionDescription
logger = logging.getLogger('echo')
class WebSocketClient():
def __init__(self, url='ws://localhost:8188/'):
self._url = url
self.connection = None
self._transactions = {}
async def connect(self):
self.connection = await ws.connect(self._url,
subprotocols=['janus-protocol'],
ping_interval=10,
ping_timeout=10,
compression=None)
if self.connection.open:
asyncio.ensure_future(self.receiveMessage())
logger.info('WebSocket connected')
return self
def transaction_id(self):
return ''.join(random.choice(string.ascii_letters) for x in range(12))
async def send(self, message):
tx_id = self.transaction_id()
message.update({'transaction': tx_id})
tx = asyncio.get_event_loop().create_future()
tx_in = {'tx': tx, 'request': message['janus']}
self._transactions[tx_id] = tx_in
try:
await asyncio.gather(self.connection.send(json.dumps(message)), tx)
except Exception as e:
tx.set_result(e)
return tx.result()
async def receiveMessage(self):
try:
async for message in self.connection:
data = json.loads(message)
tx_id = data.get('transaction')
response = data['janus']
# Handle ACK
if tx_id and response == 'ack':
logger.debug(f'Received ACK for transaction {tx_id}')
if tx_id in self._transactions:
tx_in = self._transactions[tx_id]
if tx_in['request'] == 'keepalive':
tx = tx_in['tx']
tx.set_result(data)
del self._transactions[tx_id]
logger.debug(f'Closed transaction {tx_id}'
f' with {response}')
continue
# Handle Success / Event / Error
if response not in {'success', 'error'}:
logger.info(f'Janus Event --> {response}')
if tx_id and tx_id in self._transactions:
tx_in = self._transactions[tx_id]
tx = tx_in['tx']
tx.set_result(data)
del self._transactions[tx_id]
logger.debug(f'Closed transaction {tx_id}'
f' with {response}')
except Exception:
logger.error('WebSocket failure')
logger.info('Connection closed')
async def close(self):
if self.connection:
await self.connection.close()
self.connection = None
self._transactions = {}
class JanusPlugin:
def __init__(self, session, handle_id):
self._session = session
self._handle_id = handle_id
async def sendMessage(self, message):
logger.info('Sending message to the plugin')
message.update({'janus': 'message', 'handle_id': self._handle_id})
response = await self._session.send(message)
return response
class JanusSession:
def __init__(self, url='ws://localhost:8188/'):
self._websocket = None
self._url = url
self._handles = {}
self._session_id = None
self._ka_interval = 15
self._ka_task = None
async def send(self, message):
message.update({'session_id': self._session_id})
response = await self._websocket.send(message)
return response
async def create(self):
logger.info('Creating session')
self._websocket = await WebSocketClient(self._url).connect()
message = {'janus': 'create'}
response = await self.send(message)
assert response['janus'] == 'success'
session_id = response['data']['id']
self._session_id = session_id
self._ka_task = asyncio.ensure_future(self._keepalive())
logger.info('Session created')
async def attach(self, plugin):
logger.info('Attaching handle')
message = {'janus': 'attach', 'plugin': plugin}
response = await self.send(message)
assert response['janus'] == 'success'
handle_id = response['data']['id']
handle = JanusPlugin(self, handle_id)
self._handles[handle_id] = handle
logger.info('Handle attached')
return handle
async def destroy(self):
logger.info('Destroying session')
if self._session_id:
message = {'janus': 'destroy'}
await self.send(message)
self._session_id = None
if self._ka_task:
self._ka_task.cancel()
try:
await self._ka_task
except asyncio.CancelledError:
pass
self._ka_task = None
self._handles = {}
logger.info('Session destroyed')
logger.info('Closing WebSocket')
if self._websocket:
await self._websocket.close()
self._websocket = None
async def _keepalive(self):
while True:
logger.info('Sending keepalive')
message = {'janus': 'keepalive'}
await self.send(message)
logger.info('Keepalive OK')
await asyncio.sleep(self._ka_interval)
class JanusTextRoomPlugin:
name = "janus.plugin.textroom"
plugin: JanusPlugin
data_channel: RTCDataChannel
def __init__(self,
url: str = 'ws://localhost:8188/',
room: int = 1234,
username: str = f"user-{random.randint(0, 100)}",
):
self.url = url
self.room = room
self.username = username
self.session = JanusSession(self.url)
self.pc = RTCPeerConnection()
self.send_queue = Queue()
self.send_thread = Thread(target=self.run_send_loop, daemon=True, name='send_thread')
async def _create_data_channel(self):
# create session
await self.session.create()
# attach to echotest plugin
self.plugin = await self.session.attach(self.name)
# create data-channel
self.data_channel = self.pc.createDataChannel('JanusDataChannel')
print(f'DataChannel ({self.data_channel.label}) created')
@self.data_channel.on('open')
def on_open():
print(f'DataChannel ({self.data_channel.label}) open')
@self.data_channel.on('close')
def on_close():
print(f'DataChannel ({self.data_channel.label}) closed')
@self.data_channel.on('message')
def on_message(message):
print(f'DataChannel ({self.data_channel.label}) received: {message}')
setup_response = await self.plugin.sendMessage({'body': {'request': 'setup'}})
print()
print('setup_response')
print(setup_response)
print()
offer = RTCSessionDescription(sdp=setup_response['jsep']['sdp'], type=setup_response['jsep']['type'])
await self.pc.setRemoteDescription(offer)
await self.pc.setLocalDescription(await self.pc.createAnswer())
async def join(self):
response = await self.plugin.sendMessage({'body': {
'request': 'list',
'textroom': 'join',
'room': self.room,
'display': self.username,
'username': self.username,
},
'jsep': {
'sdp': self.pc.localDescription.sdp,
'trickle': True,
'type': self.pc.localDescription.type,
},
})
await asyncio.sleep(4)
async def _send_loop(self):
await self._create_data_channel()
await self.join()
while True:
if not self.send_queue.empty():
message = self.send_queue.get()
await self.plugin.sendMessage({'body': {
'request': 'list',
'textroom': 'message',
'room': self.room,
"text": str(message),
"ack": True,
}})
def run_send_loop(self):
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(
self._send_loop()
)
sys.exit(0)
except Exception:
sys.exit(1)
def run_send_loop(self):
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(
self._send_loop()
)
sys.exit(0)
except Exception:
sys.exit(1)
def start(self):
self.send_thread.start()
def add_message_to_queue(self, message: str):
self.send_queue.put(message)
if __name__ == '__main__':
url = 'ws://*.*.*.*:8188/janus'
plugin = JanusTextRoomPlugin(url=url, room=1234)
plugin.start()
while True:
time.sleep(1)
plugin.add_message_to_queue(
f'test message{datetime.datetime.now()}'
) I also noticed some strange behavior when reading and sending messages. I can receive messages in the browser using the standard janus gateway interface only if inside janus.jcfg has set the value of the variable |
Hi @iamletenkov , thank you for your effort. It seems that the DataChannel is not connected. In your script you create the When In my script I tried to implement support for trickle, it's still done in the plugin handler, but I didn't manage to get it to work. Not tested between Python clients yet though. I've created the branch |
Hi @iamletenkov , I finally managed to setup the data channel.
I've pushed my changes. Now I can't accept the hardcoded wait for the trickle ice candidates. I'm going to take my time to find a solution for this. |
Hi! I successfully launched JanusVideoCallPlugin and it works great. However, I'm confused about how to write my own JanusTextRoomPlugin. Could you help me with an example? I would like to receive and send messages to the text room.
The text was updated successfully, but these errors were encountered: