這是kurento tutorial中的一個例子(groupCall),用於多人音視頻通話,效果如下:
登錄界面:
聊天界面:
運行方法:
2、idea里啟用這個項目
3、瀏覽器里輸入https://localhost:8443/ 輸入用戶名、房間號,然后再開一個瀏覽器tab頁,輸入一個不同的用戶名,房間號與第1個tab相同,正常情況下,這2個tab頁就能聊上了,還可以再加更多tab模擬多人視頻(注:docker容器性能有限,mac本上實測,越過4個人,就很不穩定了)
下面是該項目的一些代碼和邏輯分析:
一、主要模型的類圖如下:
UserSession類:代表每個連接進來的用戶會話信息。
Room類:即房間,1個房間可能有多個UserSession實例。
RoomManager類:房間管理,用於創建或銷毀房間。
UserRegistry類:用戶注冊類,即管理用戶。
二、主要代碼邏輯:
1、創建房間入口
public Room getRoom(String roomName) { log.debug("Searching for room {}", roomName); Room room = rooms.get(roomName); if (room == null) { log.debug("Room {} not existent. Will create now!", roomName); room = new Room(roomName, kurento.createMediaPipeline()); rooms.put(roomName, room); } log.debug("Room {} found!", roomName); return room; }
注:第7行,每個房間實例創建時,都綁定了一個對應的MediaPipeline(用於隔離不同房間的媒體信息等)
2、創建用戶實例入口
public UserSession(final String name, String roomName, final WebSocketSession session, MediaPipeline pipeline) { this.pipeline = pipeline; this.name = name; this.session = session; this.roomName = roomName; this.outgoingMedia = new WebRtcEndpoint.Builder(pipeline).build(); this.outgoingMedia.addIceCandidateFoundListener(event -> { JsonObject response = new JsonObject(); response.addProperty("id", "iceCandidate"); response.addProperty("name", name); response.add("candidate", JsonUtils.toJsonObject(event.getCandidate())); try { synchronized (session) { session.sendMessage(new TextMessage(response.toString())); } } catch (IOException e) { log.debug(e.getMessage()); } }); }
UserSession的構造函數上,把房間實例的pipeline做為入參傳進來,然后上行傳輸的WebRtcEndPoint實例outgoingMedia又跟pipeline綁定(第8行)。這樣:"用戶實例--pipeline實例--房間實例" 就串起來了。
用戶加入房間的代碼:
public UserSession join(String userName, WebSocketSession session) throws IOException { log.info("ROOM {}: adding participant {}", this.name, userName); final UserSession participant = new UserSession(userName, this.name, session, this.pipeline); //示例工程上,沒考慮“相同用戶名”的人進入同1個房間的情況,這里加上了“用戶名重名”檢測 if (participants.containsKey(userName)) { final JsonObject jsonFailMsg = new JsonObject(); final JsonArray jsonFailArray = new JsonArray(); jsonFailArray.add(userName + " exist!"); jsonFailMsg.addProperty("id", "joinFail"); jsonFailMsg.add("data", jsonFailArray); participant.sendMessage(jsonFailMsg); participant.close(); return null; } joinRoom(participant); participants.put(participant.getName(), participant); sendParticipantNames(participant); return participant; }
原代碼沒考慮到用戶名重名的問題,我加上了這段檢測,倒數第2行代碼,sendParticipantNames在加入成功后,給房間里的其它人發通知。
3、SDP交換的入口
kurento-group-call/src/main/resources/static/js/conferenceroom.js 中有一段監聽websocket的代碼:
ws.onmessage = function (message) { let parsedMessage = JSON.parse(message.data); console.info('Received message: ' + message.data); switch (parsedMessage.id) { case 'existingParticipants': onExistingParticipants(parsedMessage); break; case 'newParticipantArrived': onNewParticipant(parsedMessage); break; case 'participantLeft': onParticipantLeft(parsedMessage); break; case 'receiveVideoAnswer': receiveVideoResponse(parsedMessage); break; case 'iceCandidate': participants[parsedMessage.name].rtcPeer.addIceCandidate(parsedMessage.candidate, function (error) { if (error) { console.error("Error adding candidate: " + error); return; } }); break; case 'joinFail': alert(parsedMessage.data[0]); window.location.reload(); break; default: console.error('Unrecognized message', parsedMessage); } }
服務端在剛才提到的sendParticipantNames后,會給js發送各種消息,existingParticipants(其它人加入)、newParticipantArrived(新人加入) 這二類消息,就會觸發generateOffer,開始向服務端發送SDP
function onExistingParticipants(msg) { const constraints = { audio: true, video: { mandatory: { maxWidth: 320, maxFrameRate: 15, minFrameRate: 15 } } }; console.log(name + " registered in room " + room); let participant = new Participant(name); participants[name] = participant; let video = participant.getVideoElement(); const options = { localVideo: video, mediaConstraints: constraints, onicecandidate: participant.onIceCandidate.bind(participant) }; participant.rtcPeer = new kurentoUtils.WebRtcPeer.WebRtcPeerSendonly(options, function (error) { if (error) { return console.error(error); } this.generateOffer(participant.offerToReceiveVideo.bind(participant)); }); msg.data.forEach(receiveVideo); }
4、服務端回應各種websocket消息
org.kurento.tutorial.groupcall.CallHandler#handleTextMessage 信令處理的主要邏輯,就在這里:
@Override public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { final JsonObject jsonMessage = gson.fromJson(message.getPayload(), JsonObject.class); final UserSession user = registry.getBySession(session); if (user != null) { log.debug("Incoming message from user '{}': {}", user.getName(), jsonMessage); } else { log.debug("Incoming message from new user: {}", jsonMessage); } switch (jsonMessage.get("id").getAsString()) { case "joinRoom": joinRoom(jsonMessage, session); break; case "receiveVideoFrom": final String senderName = jsonMessage.get("sender").getAsString(); final UserSession sender = registry.getByName(senderName); final String sdpOffer = jsonMessage.get("sdpOffer").getAsString(); user.receiveVideoFrom(sender, sdpOffer); break; case "leaveRoom": leaveRoom(user); break; case "onIceCandidate": JsonObject candidate = jsonMessage.get("candidate").getAsJsonObject(); if (user != null) { IceCandidate cand = new IceCandidate(candidate.get("candidate").getAsString(), candidate.get("sdpMid").getAsString(), candidate.get("sdpMLineIndex").getAsInt()); user.addCandidate(cand, jsonMessage.get("name").getAsString()); } break; default: break; } }
其中user.receiveVideoFrom方法,就會回應SDP
public void receiveVideoFrom(UserSession sender, String sdpOffer) throws IOException { log.info("USER {}: connecting with {} in room {}", this.name, sender.getName(), this.roomName); log.trace("USER {}: SdpOffer for {} is {}", this.name, sender.getName(), sdpOffer); final String ipSdpAnswer = this.getEndpointForUser(sender).processOffer(sdpOffer); final JsonObject scParams = new JsonObject(); scParams.addProperty("id", "receiveVideoAnswer"); scParams.addProperty("name", sender.getName()); scParams.addProperty("sdpAnswer", ipSdpAnswer); log.trace("USER {}: SdpAnswer for {} is {}", this.name, sender.getName(), ipSdpAnswer); this.sendMessage(scParams); log.debug("gather candidates"); this.getEndpointForUser(sender).gatherCandidates(); }
SDP和ICE信息交換完成,就開始視頻通訊了。
參考文章:
https://doc-kurento.readthedocs.io/en/6.10.0/tutorials/java/tutorial-groupcall.html