zl程序教程

您现在的位置是:首页 >  云平台

当前栏目

UDP发送、接收消息简单示例

消息 简单 示例 发送 UDP 接收
2023-09-11 14:16:24 时间

UDP是将数据打成数据包向对方发送,只关系是否发送成功,而不关心是否接收成功,传输速度快,但是可靠性低

最近在看nacos服务发现的源码,发现服务端在服务实例发生变化时,是通过UDP方式推送更新,所以写个简单的UDP发送、接收的示例。

客户端在查询服务实例的时候,如果提供 udp 端口,则 server 会创建 udpClient,当服务实例发生变化时,是通过UDP方式推送更新,为了保证客户端接收到消息,客户端在接收到消息后,会发送push-ack确认信息。

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.SneakyThrows;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class UDPTest {
    public static final String QUIT = "quit";
    public static final String udpHost = "127.0.0.1";
    public static final int udpPort = 10088;

    public static void main(String[] args) throws Exception {
        Runnable t1 = createUDPSendThread();
        Runnable t2 = createUDPReceiveThread();

        ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("upd-%d").setDaemon(true).build();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5,
                60L, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(10), factory);

        executor.submit(t1);
        executor.submit(t2);
        executor.awaitTermination(10, TimeUnit.MINUTES);
    }

    private static Runnable createUDPSendThread() {
        return new Runnable() {
            @SneakyThrows
            @Override
            public void run() {

                DatagramSocket ds = new DatagramSocket(); // 此类表示用来发送和接收数据报包的套接字。
                BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
                String line = null;
                while ((line = br.readLine()) != null) {
                    byte[] bytes = line.getBytes();
                    DatagramPacket dp = new DatagramPacket(bytes, bytes.length, new InetSocketAddress(udpHost,udpPort)); // 数据包对象
                    ds.send(dp);
                    if (QUIT.equals(line)) {
                        System.out.println("udp server quit");
                        break;
                    }
                }
                ds.close();
            }
        };
    }

    private static Runnable createUDPReceiveThread() throws Exception {
        return new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                DatagramSocket ds = new DatagramSocket(udpPort);
                byte bytes[] = new byte[1024];
                DatagramPacket dp = new DatagramPacket(bytes, bytes.length);
                while (true) {
                    ds.receive(dp);
                    byte[] data = dp.getData();
                    String str = new String(data, 0, dp.getLength());
                    if (QUIT.equals(str)) {
                        System.out.println("udp client quit");
                        break;
                    }
                    printMsg("remote addr:%s,%s,msg:%s",dp.getSocketAddress(),dp.getAddress(),str);
                }
                ds.close();
            }
        };
    }

    private static void printMsg(String format,Object... args){
        System.out.println(String.format(format,args));
    }
}