-
Notifications
You must be signed in to change notification settings - Fork 6
/
test_gst_videoroom.py
159 lines (126 loc) · 4.35 KB
/
test_gst_videoroom.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from __future__ import annotations
import ssl
import asyncio
import pathlib
from concurrent.futures import TimeoutError
from janus_client import JanusClient, JanusAdminMonitorClient
from janus_client.experiments.plugin_video_room_gst import JanusVideoRoomPlugin
from typing import TYPE_CHECKING, Type
if TYPE_CHECKING:
from janus_client import JanusSession
import gi
gi.require_version("GLib", "2.0")
gi.require_version("GObject", "2.0")
gi.require_version("Gst", "1.0")
from gi.repository import Gst
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("lt_limmengkiat_name_my.crt")
ssl_context.load_verify_locations(localhost_pem)
# ssl_context.check_hostname = False
# ssl_context.verify_mode = ssl.CERT_NONE
async def publish_some_video(session: JanusSession):
# Create plugin
plugin_handle: JanusVideoRoomPlugin = await session.create_plugin_handle(
JanusVideoRoomPlugin
)
await plugin_handle.join(1234, 333, "qweqwe")
await plugin_handle.publish()
print("Let it stream for 60 seconds")
await asyncio.sleep(60)
print("Stop streaming")
await plugin_handle.unpublish()
print("Stream unpublished")
# Destroy plugin
await plugin_handle.destroy()
async def subscribe_to_a_feed(session: JanusSession):
# Create plugin
plugin_handle: JanusVideoRoomPlugin = await session.create_plugin_handle(
JanusVideoRoomPlugin
)
participants = await plugin_handle.list_participants(1234)
print(participants)
if len(participants) > 0:
# Publishers available
participants_data_1 = participants[0]
participant_id = participants_data_1["id"]
await plugin_handle.subscribe(1234, participant_id)
await asyncio.sleep(30)
await plugin_handle.unsubscribe()
# await plugin_handle.join(1234, 333, "qweqwe")
# await asyncio.sleep(5)
# await plugin_handle.unsubscribe()
# Destroy plugin
await plugin_handle.destroy()
# API secret is used when you're communicating with Janus as a server,
# such as when wrapping Janus requests with another server
api_secret = "janusrocks"
async def main():
# Start connection
client = JanusClient(
uri="wss://lt.limmengkiat.name.my:8989/", api_secret=api_secret, token="111"
)
await client.connect(ssl=ssl_context)
adminClient = JanusAdminMonitorClient(
"wss://lt.limmengkiat.name.my:7989/", "janusoverlord"
)
await adminClient.connect(ssl=ssl_context)
# Authentication
token = "ccc"
# The following statements are not documented
# It's fine to add a token when it already exists
# The plugin access scope will be limited to the unified set of existing access scope
# and new access scope when adding the token again. Thus, it is better to be explicit
# for security purposes.
await adminClient.add_token(token, ["janus.plugin.videoroom"])
client.token = token
# Create session
session = await client.create_session()
# await subscribe_to_a_feed(session)
await publish_some_video(session)
# Destroy session
await session.destroy()
# Delete token
await adminClient.remove_token(client.token)
client.token = None
# Destroy connection
await adminClient.disconnect()
await client.disconnect()
print("End of main")
async def main2():
adminClient = JanusAdminMonitorClient(
"wss://lt.limmengkiat.name.my:7989/", "janusoverlord"
)
await adminClient.connect(ssl=ssl_context)
# print(await adminClient.info())
# print(await adminClient.ping())
print(await adminClient.list_tokens())
token = "cccs"
await adminClient.add_token(
token, ["janus.plugin.voicemail", "janus.plugin.audiobridge"]
)
await adminClient.list_tokens()
await adminClient.remove_token(token)
# Destroy connection
await adminClient.disconnect()
print("End of main")
def check_plugins():
needed = [
"opus",
"vpx",
"nice",
"webrtc",
"dtls",
"srtp",
"rtp",
"rtpmanager",
"videotestsrc",
"audiotestsrc",
]
missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
if len(missing):
print("Missing gstreamer plugins:", missing)
return False
return True
Gst.init(None)
check_plugins()
asyncio.run(main())