netty的Udp單播、組播、廣播實例+Java的Udp單播、組播、廣播實例


      網絡上缺乏netty的udp的單播、組播案例,經過一番學習總結之后終於把這兩個案例調通,下面把這兩個案例的代碼放在這里分享一下。

首先推薦博文:

http://colobu.com/2014/10/21/udp-and-unicast-multicast-broadcast-anycast/#Netty%E4%B8%8E%E5%8D%95%E6%92%AD%EF%BC%8C%E7%BB%84%E6%92%AD

netty的Udp單播、組播、廣播實例+Java的Udp單播、組播、廣播實例,

這些代碼實例可以直接到我的GitHub地址下載(https://github.com/Jethu1/netty_practice.git)。

 

1.單播的案例(包括TheMomentClient+TheMomentClientHandler+TheMomentServer+TheMomentServerHandler+SoketUtils五個類)

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.CharsetUtil;
import practice13_UdpBroadcast.SocketUtils;

/**
* A UDP broadcast client that asks for a quote of the moment (QOTM) to {@link TheMomentServer}.
*
* Inspired by <a href="http://docs.oracle.com/javase/tutorial/networking/datagrams/clientServer.html">the official
* Java tutorial</a>.
*/
public final class TheMomentClient {

static final int PORT = Integer.parseInt(System.getProperty("port", "7686"));

public static void main(String[] args) throws Exception {

EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioDatagramChannel.class)
.remoteAddress("127.0.0.1",PORT)
.handler(new TheMomentClientHandler());

Channel ch = b.bind(0).sync().channel();

// Broadcast the QOTM request to port 8080.
ch.writeAndFlush(new DatagramPacket(
Unpooled.copiedBuffer("QOTM?", CharsetUtil.UTF_8),
SocketUtils.socketAddress("127.0.0.1", PORT))).sync();

// TheMomentClientHandler will close the DatagramChannel when a
// response is received. If the channel is not closed within 5 seconds,
// print an error message and quit.
if (!ch.closeFuture().await(5000)) {
System.err.println("QOTM request timed out.");
}
} finally {
group.shutdownGracefully();
}
}
}

 

 
        
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package practice14_Udp_Unicast;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;

public class TheMomentClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {

@Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
String response = msg.content().toString(CharsetUtil.UTF_8);
if (response.startsWith("QOTM: ")) {
System.out.println("Quote of the Moment: " + response.substring(6));
ctx.close();
}
System.out.println("client receive message from the server");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
 
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package practice14_Udp_Unicast;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;

/**
* A UDP server that responds to the QOTM (quote of the moment) request to a {@link TheMomentClient}.
*
* Inspired by <a href="http://docs.oracle.com/javase/tutorial/networking/datagrams/clientServer.html">the official
* Java tutorial</a>.
*/
public final class TheMomentServer {

private static final int PORT = Integer.parseInt(System.getProperty("port", "7686"));

public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioDatagramChannel.class)
.localAddress(PORT)
.handler(new TheMomentServerHandler());

b.bind(PORT).sync().channel().closeFuture().await();
} finally {
group.shutdownGracefully();
}
}
}
 
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package practice14_Udp_Unicast;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;

import java.util.Random;

public class TheMomentServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {

private static final Random random = new Random();

// Quotes from Mohandas K. Gandhi:
private static final String[] quotes = {
"Where there is love there is life.",
"First they ignore you, then they laugh at you, then they fight you, then you win.",
"Be the change you want to see in the world.",
"The weak can never forgive. Forgiveness is the attribute of the strong.",
};

private static String nextQuote() {
int quoteId;
synchronized (random) {
quoteId = random.nextInt(quotes.length);
}
return quotes[quoteId];
}

@Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
System.err.println(packet);
System.out.println("I receive your message");
if ("QOTM?".equals(packet.content().toString(CharsetUtil.UTF_8))) {
ctx.write(new DatagramPacket(
Unpooled.copiedBuffer("QOTM: " + nextQuote(), CharsetUtil.UTF_8), packet.sender()));
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
// We don't close the channel because we can keep serving requests.
}
}

 

 

 

 

import io.netty.util.internal.PlatformDependent;

import java.io.IOException;
import java.net.*;
import java.nio.channels.DatagramChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Enumeration;
public final class SocketUtils {

private SocketUtils() {
}

public static void connect(final Socket socket, final SocketAddress remoteAddress, final int timeout)
throws IOException {
try {
AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {

public Void run() throws IOException {
socket.connect(remoteAddress, timeout);
return null;
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}

public static void bind(final Socket socket, final SocketAddress bindpoint) throws IOException {
try {
AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {

public Void run() throws IOException {
socket.bind(bindpoint);
return null;
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}

public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {

public Boolean run() throws IOException {
return socketChannel.connect(remoteAddress);
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}


public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {

public SocketChannel run() throws IOException {
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}

/* public static void bind(final DatagramChannel networkChannel, final SocketAddress address) throws IOException {
try {
AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws IOException {
networkChannel.bind(address);
return null;
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}*/

public static SocketAddress localSocketAddress(final ServerSocket socket) {
return AccessController.doPrivileged(new PrivilegedAction<SocketAddress>() {

public SocketAddress run() {
return socket.getLocalSocketAddress();
}
});
}

public static InetAddress addressByName(final String hostname) throws UnknownHostException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<InetAddress>() {

public InetAddress run() throws UnknownHostException {
return InetAddress.getByName(hostname);
}
});
} catch (PrivilegedActionException e) {
throw (UnknownHostException) e.getCause();
}
}

public static InetAddress[] allAddressesByName(final String hostname) throws UnknownHostException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<InetAddress[]>() {

public InetAddress[] run() throws UnknownHostException {
return InetAddress.getAllByName(hostname);
}
});
} catch (PrivilegedActionException e) {
throw (UnknownHostException) e.getCause();
}
}

public static InetSocketAddress socketAddress(final String hostname, final int port) {
return AccessController.doPrivileged(new PrivilegedAction<InetSocketAddress>() {

public InetSocketAddress run() {
return new InetSocketAddress(hostname, port);
}
});
}

public static Enumeration<InetAddress> addressesFromNetworkInterface(final NetworkInterface intf) {
return AccessController.doPrivileged(new PrivilegedAction<Enumeration<InetAddress>>() {

public Enumeration<InetAddress> run() {
return intf.getInetAddresses();
}
});
}

/* public static InetAddress loopbackAddress() {
return AccessController.doPrivileged(new PrivilegedAction<InetAddress>() {
@Override
public InetAddress run() {
if (PlatformDependent.javaVersion() >= 7) {
return InetAddress.getLoopbackAddress();
}
try {
return InetAddress.getByName(null);
} catch (UnknownHostException e) {
throw new IllegalStateException(e);
}
}
});
}*/

/* public static byte[] hardwareAddressFromNetworkInterface(final NetworkInterface intf) throws SocketException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<byte[]>() {

public byte[] run() throws SocketException {
return intf.getHardwareAddress();
}
});
} catch (PrivilegedActionException e) {
throw (SocketException) e.getCause();
}
}
*/
}


2.netty udp組播的案例(包括MulticastClient+MulticastClientHandler+MultcastServer+MultcastServerHandler+SoketUtils五個類)
package practice12_Udp_Multicast;

/**
* Created by jet on 2017/6/14.
*/
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ChannelFactory;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.CharsetUtil;
import io.netty.util.NetUtil;

import java.net.*;
import java.util.Enumeration;

public class MulticastClient extends Thread {
private InetSocketAddress groupAddress;

public MulticastClient(InetSocketAddress groupAddress) {
this.groupAddress = groupAddress;
}
public void run() {
EventLoopGroup group = new NioEventLoopGroup();
try {
// NetworkInterface ni = NetworkInterface.getByName("en1");
NetworkInterface ni = NetUtil.LOOPBACK_IF;
Enumeration<InetAddress> addresses = ni.getInetAddresses();
InetAddress localAddress = null;
while (addresses.hasMoreElements()) {
InetAddress address = addresses.nextElement();
if (address instanceof Inet4Address){
localAddress = address;
}
}

Bootstrap b = new Bootstrap();
b.group(group)
.channelFactory(new ChannelFactory<NioDatagramChannel>() {

public NioDatagramChannel newChannel() {
return new NioDatagramChannel(InternetProtocolFamily.IPv4);
}
})
.localAddress(localAddress, groupAddress.getPort())
.option(ChannelOption.IP_MULTICAST_IF, ni)
.option(ChannelOption.SO_REUSEADDR, true)
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
public void initChannel(NioDatagramChannel ch) throws Exception {
ch.pipeline().addLast(new ClientMulticastHandler());
}
});

Channel ch = b.bind().sync().channel();
ch.writeAndFlush(new DatagramPacket(
Unpooled.copiedBuffer("QOTM?", CharsetUtil.UTF_8),
groupAddress)).sync();

ch.close().awaitUninterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
InetSocketAddress groupAddress = new InetSocketAddress("239.255.27.1", 1234);
new MulticastClient(groupAddress).run();
}
}



package practice12_Udp_Multicast;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;

/**
* Created by jet on 2017/6/14.
*/
public class ClientMulticastHandler extends SimpleChannelInboundHandler<DatagramPacket> {

@Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
String response = msg.content().toString(CharsetUtil.UTF_8);
System.out.println("client receive message from server");
if (response.startsWith("QOTM: ")) {
System.out.println("Quote of the Moment: " + response.substring(6));
ctx.close();
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}





package practice12_Udp_Multicast;

/**
* Created by jet on 2017/6/14.
*/
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ChannelFactory;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.NetUtil;

import java.net.*;
import java.util.Enumeration;

public class MulticastServer extends Thread {
private InetSocketAddress groupAddress;

public MulticastServer(InetSocketAddress groupAddress) {
this.groupAddress = groupAddress;
}

public void run() {
EventLoopGroup group = new NioEventLoopGroup();
try {
// NetworkInterface ni = NetworkInterface.getByName("en1");
NetworkInterface ni = NetUtil.LOOPBACK_IF;
Enumeration<InetAddress> addresses = ni.getInetAddresses();
InetAddress localAddress = null;
while (addresses.hasMoreElements()) {
InetAddress address = addresses.nextElement();
if (address instanceof Inet4Address){
localAddress = address;
}
}

Bootstrap b = new Bootstrap();
b.group(group)
.channelFactory(new ChannelFactory<NioDatagramChannel>() {

public NioDatagramChannel newChannel() {
return new NioDatagramChannel(InternetProtocolFamily.IPv4);
}
})
.localAddress(localAddress, groupAddress.getPort())
.option(ChannelOption.IP_MULTICAST_IF, ni)
.option(ChannelOption.SO_REUSEADDR, true)
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
public void initChannel(NioDatagramChannel ch) throws Exception {
ch.pipeline().addLast(new ServerMulticastHandler());
}
});

NioDatagramChannel ch = (NioDatagramChannel)b.bind(groupAddress.getPort()).sync().channel();
ch.joinGroup(groupAddress, ni).sync();
System.out.println("server");

ch.closeFuture().await();

} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
InetSocketAddress groupAddress = new InetSocketAddress("239.255.27.1", 1234);
new MulticastServer(groupAddress).run();
}
}




package practice12_Udp_Multicast;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;

import java.util.Random;

/**
* Created by jet on 2017/6/14.
*/
public class ServerMulticastHandler extends SimpleChannelInboundHandler<DatagramPacket> {
private static final Random random = new Random();

// Quotes from Mohandas K. Gandhi:
private static final String[] quotes = {
"Where there is love there is life.",
"First they ignore you, then they laugh at you, then they fight you, then you win.",
"Be the change you want to see in the world.",
"The weak can never forgive. Forgiveness is the attribute of the strong.",
};

private static String nextQuote() {
int quoteId;
synchronized (random) {
quoteId = random.nextInt(quotes.length);
}
return quotes[quoteId];
}

@Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
System.err.println(packet);
System.out.println("The server receive message from client");
if ("QOTM?".equals(packet.content().toString(CharsetUtil.UTF_8))) {
System.out.println("the server write some info to client");
ctx.write(new DatagramPacket(
Unpooled.copiedBuffer("QOTM: " + nextQuote(), CharsetUtil.UTF_8), packet.sender()));
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
// We don't close the channel because we can keep serving requests.
}
}
 
 










免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM