1. kurento-composite-node-example例子的測試
原文鏈接:
https://github.com/subu1979/kurento-composite-conference-recording-nodejs
這是一個nodejs的例子,
$ git clone https://github.com/subu1979/kurento-composite-conference-recording-nodejs.git
$ cd kurento-composite-node-example
$ npm install
$ npm start
啟動后無法工作;
2. kurento-composite-recording的測試
原文鏈接:
https://codedump.io/share/Slmk2efR1g4v/1/composite-recording-creates-empty-video
http://stackoverflow.com/questions/34823540/compoiste-recording-creates-empty-video
先確認基於kurento-media-server的6.4.0的kurento-one2one-call能正常工作;
然后修改CallMediaPipeline.java成如下:
public CallMediaPipeline(KurentoClient kurento) {
try {
this.pipeline = kurento.createMediaPipeline();
/* The original
this.callerWebRtcEp = new WebRtcEndpoint.Builder(pipeline).build();
this.calleeWebRtcEp = new WebRtcEndpoint.Builder(pipeline).build();
this.callerWebRtcEp.connect(this.calleeWebRtcEp);
this.calleeWebRtcEp.connect(this.callerWebRtcEp);
*/
/* Added by Hank, for testing composite */
this.callerWebRtcEp = new WebRtcEndpoint.Builder(pipeline).build();
this.calleeWebRtcEp = new WebRtcEndpoint.Builder(pipeline).build();
Composite composite = new Composite.Builder(this.pipeline).build();
HubPort hubPort1 = new HubPort.Builder(composite).build();
HubPort hubPort2 = new HubPort.Builder(composite).build();
HubPort hubPort3 = new HubPort.Builder(composite).build();
RecorderEndpoint recorderEP =
new RecorderEndpoint.Builder(this.pipeline,
"file:////opt/PJT_kurento/kms-6.4.0/kurento-composite-implement/record-samples/recording.webm").withMediaProfile(MediaProfileSpecType.WEBM).build();
this.callerWebRtcEp.connect(this.calleeWebRtcEp);
this.callerWebRtcEp.connect(hubPort1);
this.calleeWebRtcEp.connect(hubPort2);
this.calleeWebRtcEp.connect(this.callerWebRtcEp);
hubPort3.connect(recorderEP);
recorderEP.record();
/* END */
} catch (Throwable t) {
if (this.pipeline != null) {
pipeline.release();
}
}
}
運行后可以的目錄:
/opt/PJT_kurento/kms-6.4.0/kurento-composite-implement/record-samples/
下看到錄制的音頻與視頻文件: recording.webm
NOTE:
1).如果沒有看到文件的生成,則需要查看下目錄和kurento-media-server是不是相同的用戶名,
不是的話,修改目錄的用戶名即可。
2). 因為音頻的編碼是opus, 視頻的編碼是vp8,所以很多播放器是播放不了的,
用ffmpeg轉碼后可看;
代碼分析:
原始的pipeline結構為:
callerWebRtcEp -------> calleeWebRtcEp
callerWebRtcEp <------- calleeWebRtcEp
添加了Composite與recorder后pipeline結構為:
callerWebRtcEp -----------------|------------------>calleeWebRtcEp
V
HubPort1
________|_______
| composite |-->HubPort3-->recorderEp
|______________|
^
HubPort2
callerWebRtcEp <----------------|-------------------calleeWebRtcEp
3. 開發成caller顯示雙人的畫面
修改CallMediaPipeline.java代碼如下:
public CallMediaPipeline(KurentoClient kurento) {
try {
this.pipeline = kurento.createMediaPipeline();
/* The original
this.callerWebRtcEp = new WebRtcEndpoint.Builder(pipeline).build();
this.calleeWebRtcEp = new WebRtcEndpoint.Builder(pipeline).build();
this.callerWebRtcEp.connect(this.calleeWebRtcEp);
this.calleeWebRtcEp.connect(this.callerWebRtcEp);
*/
/* Added by Hank, for testing composite */
this.callerWebRtcEp = new WebRtcEndpoint.Builder(pipeline).build();
this.calleeWebRtcEp = new WebRtcEndpoint.Builder(pipeline).build();
Composite composite = new Composite.Builder(this.pipeline).build();
HubPort hubPort1 = new HubPort.Builder(composite).build();
HubPort hubPort2 = new HubPort.Builder(composite).build();
HubPort hubPort3 = new HubPort.Builder(composite).build();
RecorderEndpoint recorderEP =
new RecorderEndpoint.Builder(this.pipeline,
"file:////opt/PJT_kurento/kms-6.4.0/kurento-composite-implement/record-samples/recording.webm").withMediaProfile(MediaProfileSpecType.WEBM).build();
this.callerWebRtcEp.connect(this.calleeWebRtcEp);
this.callerWebRtcEp.connect(hubPort1);
this.calleeWebRtcEp.connect(hubPort2);
hubPort3.connect(this.callerWebRtcEp);
/* END */
} catch (Throwable t) {
if (this.pipeline != null) {
pipeline.release();
}
}
}
添加了Composite后pipeline結構為:
callerWebRtcEp -----------------|------------------>calleeWebRtcEp
V
hubPort1
________|_______
callerWebRtcEp<-hubport3| composite |
|______________|
^
hubPort2<--------------calleeWebRtcEp
運行后的效果圖如下:
二、kurento-group-call添加composite功能
修改的文件有兩個,完整的代碼如下:
Room.java
- /*
- * (C) Copyright 2014 Kurento (http://kurento.org/)
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the GNU Lesser General Public License
- * (LGPL) version 2.1 which accompanies this distribution, and is available at
- * http://www.gnu.org/licenses/lgpl-2.1.html
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- */
- package org.kurento.tutorial.groupcall;
- import java.io.Closeable;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.List;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ConcurrentMap;
- import javax.annotation.PreDestroy;
- import org.kurento.client.Continuation;
- import org.kurento.client.MediaPipeline;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.web.socket.WebSocketSession;
- import com.google.gson.JsonArray;
- import com.google.gson.JsonElement;
- import com.google.gson.JsonObject;
- import com.google.gson.JsonPrimitive;
- /* Added by Hank */
- import org.kurento.client.*;
- /* END */
- /**
- * @author Ivan Gracia (izanmail@gmail.com)
- * @since 4.3.1
- */
- public class Room implements Closeable {
- private final Logger log = LoggerFactory.getLogger(Room.class);
- private final ConcurrentMap<String, UserSession> participants = new ConcurrentHashMap<>();
- private final MediaPipeline pipeline;
- private final String name;
- /* Added by Hank */
- private Composite composite = null;
- private RecorderEndpoint recorderEndpoint;
- private HubPort compoisteOutputHubport;
- public HubPort getCompositeOutputHubport() {
- return compoisteOutputHubport;
- }
- /* END */
- public String getName() {
- return name;
- }
- public Room(String roomName, MediaPipeline pipeline) {
- this.name = roomName;
- this.pipeline = pipeline;
- /* Added by Hank */
- this.composite = new Composite.Builder(this.pipeline).build();
- this.compoisteOutputHubport = new HubPort.Builder(composite).build();
- /* END */
- log.info("ROOM {} has been created", roomName);
- }
- @PreDestroy
- private void shutdown() {
- this.close();
- }
- public UserSession join(String userName, WebSocketSession session) throws IOException {
- log.info("ROOM {}: adding participant {}", userName, userName);
- /* Added by Hank */
- // Original
- //final UserSession participant = new UserSession(userName, this.name, session, this.pipeline);
- // Adding
- final UserSession participant = new UserSession(userName, this.name, session,
- this.pipeline, this.composite, this.compoisteOutputHubport);
- if (participants.size() == 1){
- // first user join
- log.info("ROOM {} : Start recording", getName());
- this.recorderEndpoint = new RecorderEndpoint.Builder(pipeline,
- "file:///opt/PJT_kurento/kms-6.4.0/kurento-composite-implement/record-samples/"+getName()+".webm")
- .withMediaProfile(MediaProfileSpecType.MP4)
- .build();
- compoisteOutputHubport.connect(recorderEndpoint);
- recorderEndpoint.connect(compoisteOutputHubport);
- recorderEndpoint.record();
- }
- /* END */
- joinRoom(participant);
- participants.put(participant.getName(), participant);
- sendParticipantNames(participant);
- return participant;
- }
- public void leave(UserSession user) throws IOException {
- log.debug("PARTICIPANT {}: Leaving room {}", user.getName(), this.name);
- this.removeParticipant(user.getName());
- user.close();
- }
- private Collection<String> joinRoom(UserSession newParticipant) throws IOException {
- final JsonObject newParticipantMsg = new JsonObject();
- newParticipantMsg.addProperty("id", "newParticipantArrived");
- newParticipantMsg.addProperty("name", newParticipant.getName());
- final List<String> participantsList = new ArrayList<>(participants.values().size());
- log.debug("ROOM {}: notifying other participants of new participant {}", name,
- newParticipant.getName());
- for (final UserSession participant : participants.values()) {
- try {
- participant.sendMessage(newParticipantMsg);
- } catch (final IOException e) {
- log.debug("ROOM {}: participant {} could not be notified", name, participant.getName(), e);
- }
- participantsList.add(participant.getName());
- }
- return participantsList;
- }
- private void removeParticipant(String name) throws IOException {
- participants.remove(name);
- log.debug("ROOM {}: notifying all users that {} is leaving the room", this.name, name);
- final List<String> unnotifiedParticipants = new ArrayList<>();
- final JsonObject participantLeftJson = new JsonObject();
- participantLeftJson.addProperty("id", "participantLeft");
- participantLeftJson.addProperty("name", name);
- for (final UserSession participant : participants.values()) {
- try {
- participant.cancelVideoFrom(name);
- participant.sendMessage(participantLeftJson);
- } catch (final IOException e) {
- unnotifiedParticipants.add(participant.getName());
- }
- }
- if (!unnotifiedParticipants.isEmpty()) {
- log.debug("ROOM {}: The users {} could not be notified that {} left the room", this.name,
- unnotifiedParticipants, name);
- }
- }
- public void sendParticipantNames(UserSession user) throws IOException {
- final JsonArray participantsArray = new JsonArray();
- for (final UserSession participant : this.getParticipants()) {
- if (!participant.equals(user)) {
- final JsonElement participantName = new JsonPrimitive(participant.getName());
- participantsArray.add(participantName);
- }
- }
- final JsonObject existingParticipantsMsg = new JsonObject();
- existingParticipantsMsg.addProperty("id", "existingParticipants");
- existingParticipantsMsg.add("data", participantsArray);
- log.debug("PARTICIPANT {}: sending a list of {} participants", user.getName(),
- participantsArray.size());
- user.sendMessage(existingParticipantsMsg);
- }
- public Collection<UserSession> getParticipants() {
- return participants.values();
- }
- public UserSession getParticipant(String name) {
- return participants.get(name);
- }
- @Override
- public void close() {
- for (final UserSession user : participants.values()) {
- try {
- user.close();
- } catch (IOException e) {
- log.debug("ROOM {}: Could not invoke close on participant {}", this.name, user.getName(),
- e);
- }
- }
- participants.clear();
- pipeline.release(new Continuation<Void>() {
- @Override
- public void onSuccess(Void result) throws Exception {
- log.trace("ROOM {}: Released Pipeline", Room.this.name);
- }
- @Override
- public void onError(Throwable cause) throws Exception {
- log.warn("PARTICIPANT {}: Could not release Pipeline", Room.this.name);
- }
- });
- log.debug("Room {} closed", this.name);
- }
- }
- // UserSession.java
- /*
- * (C) Copyright 2014 Kurento (http://kurento.org/)
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the GNU Lesser General Public License
- * (LGPL) version 2.1 which accompanies this distribution, and is available at
- * http://www.gnu.org/licenses/lgpl-2.1.html
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- */
- package org.kurento.tutorial.groupcall;
- import java.io.Closeable;
- import java.io.IOException;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ConcurrentMap;
- import org.kurento.client.Continuation;
- import org.kurento.client.EventListener;
- import org.kurento.client.IceCandidate;
- import org.kurento.client.MediaPipeline;
- import org.kurento.client.OnIceCandidateEvent;
- import org.kurento.client.WebRtcEndpoint;
- /* Added by Hank */
- import org.kurento.client.*;
- /* END */
- import org.kurento.jsonrpc.JsonUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.web.socket.TextMessage;
- import org.springframework.web.socket.WebSocketSession;
- import com.google.gson.JsonObject;
- /**
- *
- * @author Ivan Gracia (izanmail@gmail.com)
- * @since 4.3.1
- */
- public class UserSession implements Closeable {
- private static final Logger log = LoggerFactory.getLogger(UserSession.class);
- private final String name;
- private final WebSocketSession session;
- private final MediaPipeline pipeline;
- private final String roomName;
- private final WebRtcEndpoint outgoingMedia;
- private final ConcurrentMap<String, WebRtcEndpoint> incomingMedia = new ConcurrentHashMap<>();
- /* Added by Hank */
- private HubPort compositeInputHubPort;
- private HubPort compositeOutputHubPort;
- /* END */
- /* Added by Hank */
- // Original
- //public UserSession(final String name, String roomName, final WebSocketSession session,
- // MediaPipeline pipeline) {
- // New
- public UserSession(final String name, String roomName, final WebSocketSession session,
- MediaPipeline pipeline, Hub composite, HubPort compositeOutputHubPort) {
- /* END */
- this.pipeline = pipeline;
- this.name = name;
- this.session = session;
- this.roomName = roomName;
- this.outgoingMedia = new WebRtcEndpoint.Builder(pipeline).build();
- /* Added by Hank */
- this.compositeInputHubPort = new HubPort.Builder(composite).build();
- compositeInputHubPort.connect(outgoingMedia);
- outgoingMedia.connect(compositeInputHubPort);
- this.compositeOutputHubPort = compositeOutputHubPort;
- /* END */
- this.outgoingMedia.addOnIceCandidateListener(new EventListener<OnIceCandidateEvent>() {
- @Override
- public void onEvent(OnIceCandidateEvent event) {
- JsonObject response = new JsonObject();
- response.addProperty("id", "iceCandidate");
- response.addProperty("name", name);
- response.add("candidate", JsonUtils.toJsonObject(event.getCandidate()));
- try {
- synchronized (session) {
- session.sendMessage(new TextMessage(response.toString()));
- }
- } catch (IOException e) {
- log.debug(e.getMessage());
- }
- }
- });
- }
- public WebRtcEndpoint getOutgoingWebRtcPeer() {
- return outgoingMedia;
- }
- public String getName() {
- return name;
- }
- public WebSocketSession getSession() {
- return session;
- }
- /**
- * The room to which the user is currently attending.
- *
- * @return The room
- */
- public String getRoomName() {
- return this.roomName;
- }
- public void receiveVideoFrom(UserSession sender, String sdpOffer) throws IOException {
- log.info("USER {}: connecting with {} in room {}", this.name, sender.getName(), this.roomName);
- log.trace("USER {}: SdpOffer for {} is {}", this.name, sender.getName(), sdpOffer);
- final String ipSdpAnswer = this.getEndpointForUser(sender).processOffer(sdpOffer);
- final JsonObject scParams = new JsonObject();
- scParams.addProperty("id", "receiveVideoAnswer");
- scParams.addProperty("name", sender.getName());
- scParams.addProperty("sdpAnswer", ipSdpAnswer);
- log.trace("USER {}: SdpAnswer for {} is {}", this.name, sender.getName(), ipSdpAnswer);
- this.sendMessage(scParams);
- log.debug("gather candidates");
- this.getEndpointForUser(sender).gatherCandidates();
- }
- private WebRtcEndpoint getEndpointForUser(final UserSession sender) {
- if (sender.getName().equals(name)) {
- log.debug("PARTICIPANT {}: configuring loopback", this.name);
- return outgoingMedia;
- }
- log.debug("PARTICIPANT {}: receiving video from {}", this.name, sender.getName());
- WebRtcEndpoint incoming = incomingMedia.get(sender.getName());
- if (incoming == null) {
- log.debug("PARTICIPANT {}: creating new endpoint for {}", this.name, sender.getName());
- incoming = new WebRtcEndpoint.Builder(pipeline).build();
- incoming.addOnIceCandidateListener(new EventListener<OnIceCandidateEvent>() {
- @Override
- public void onEvent(OnIceCandidateEvent event) {
- JsonObject response = new JsonObject();
- response.addProperty("id", "iceCandidate");
- response.addProperty("name", sender.getName());
- response.add("candidate", JsonUtils.toJsonObject(event.getCandidate()));
- try {
- synchronized (session) {
- session.sendMessage(new TextMessage(response.toString()));
- }
- } catch (IOException e) {
- log.debug(e.getMessage());
- }
- }
- });
- incomingMedia.put(sender.getName(), incoming);
- }
- log.debug("PARTICIPANT {}: obtained endpoint for {}", this.name, sender.getName());
- /* Added by Hank */
- // Original
- //sender.getOutgoingWebRtcPeer().connect(incoming);
- // New
- this.compositeInputHubPort.connect(incoming);
- /* END */
- return incoming;
- }
- public void cancelVideoFrom(final UserSession sender) {
- this.cancelVideoFrom(sender.getName());
- }
- public void cancelVideoFrom(final String senderName) {
- log.debug("PARTICIPANT {}: canceling video reception from {}", this.name, senderName);
- final WebRtcEndpoint incoming = incomingMedia.remove(senderName);
- log.debug("PARTICIPANT {}: removing endpoint for {}", this.name, senderName);
- incoming.release(new Continuation<Void>() {
- @Override
- public void onSuccess(Void result) throws Exception {
- log.trace("PARTICIPANT {}: Released successfully incoming EP for {}", UserSession.this.name,
- senderName);
- }
- @Override
- public void onError(Throwable cause) throws Exception {
- log.warn("PARTICIPANT {}: Could not release incoming EP for {}", UserSession.this.name,
- senderName);
- }
- });
- }
- @Override
- public void close() throws IOException {
- log.debug("PARTICIPANT {}: Releasing resources", this.name);
- for (final String remoteParticipantName : incomingMedia.keySet()) {
- log.trace("PARTICIPANT {}: Released incoming EP for {}", this.name, remoteParticipantName);
- final WebRtcEndpoint ep = this.incomingMedia.get(remoteParticipantName);
- ep.release(new Continuation<Void>() {
- @Override
- public void onSuccess(Void result) throws Exception {
- log.trace("PARTICIPANT {}: Released successfully incoming EP for {}",
- UserSession.this.name, remoteParticipantName);
- }
- @Override
- public void onError(Throwable cause) throws Exception {
- log.warn("PARTICIPANT {}: Could not release incoming EP for {}", UserSession.this.name,
- remoteParticipantName);
- }
- });
- }
- outgoingMedia.release(new Continuation<Void>() {
- @Override
- public void onSuccess(Void result) throws Exception {
- log.trace("PARTICIPANT {}: Released outgoing EP", UserSession.this.name);
- }
- @Override
- public void onError(Throwable cause) throws Exception {
- log.warn("USER {}: Could not release outgoing EP", UserSession.this.name);
- }
- });
- }
- public void sendMessage(JsonObject message) throws IOException {
- log.debug("USER {}: Sending message {}", name, message);
- synchronized (session) {
- session.sendMessage(new TextMessage(message.toString()));
- }
- }
- public void addCandidate(IceCandidate candidate, String name) {
- if (this.name.compareTo(name) == 0) {
- outgoingMedia.addIceCandidate(candidate);
- } else {
- WebRtcEndpoint webRtc = incomingMedia.get(name);
- if (webRtc != null) {
- webRtc.addIceCandidate(candidate);
- }
- }
- }
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Object#equals(java.lang.Object)
- */
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null || !(obj instanceof UserSession)) {
- return false;
- }
- UserSession other = (UserSession) obj;
- boolean eq = name.equals(other.name);
- eq &= roomName.equals(other.roomName);
- return eq;
- }
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Object#hashCode()
- */
- @Override
- public int hashCode() {
- int result = 1;
- result = 31 * result + name.hashCode();
- result = 31 * result + roomName.hashCode();
- return result;
- }
- }