使用Exchanger实现两个线程之间的数据交互
2023-09-14 08:59:40 时间
在看Jetty源码中的EndPointTest类,对EndPoint的测试,我的思路是:
1. 建立一个连接(创建ServerSocket实例,一般还会给定一个端口,其实可以bind(null)以让操作系统分配一个可用端口),新启动一个线程,在新线程中监听给定端口(调用accept方法)。
2. 发送客户端请求(创建一个Socket实例,并向该Socket写入请求数据)。
3. 在接收端读取数据,验证写入的请求和接收到的数据相同。
在以上流程实现中,accept方法返回的接收端Socket需要传给主线程,同时要保证使用该Socket是在accept方法返回之后,以我习惯,我会使用一个Lock或CountDownLatch:
private static class SocketHolder {
Socket socket;
}
@Test
public void levinOldWayTest() throws Exception {
final ServerSocket server = new ServerSocket(10240);
final CountDownLatch latch = new CountDownLatch(1);
final SocketHolder socketHolder = new SocketHolder();
new Thread() {
public void run() {
try {
socketHolder.socket = server.accept();
latch.countDown();
} catch(Exception ex) {
ex.printStackTrace();
}
}
}.start();
Socket socket = new Socket(server.getInetAddress(), server.getLocalPort());
socket.getOutputStream().write("My Test String".getBytes());
latch.await(5, TimeUnit.SECONDS);
byte[] receives = new byte[4096];
int length = socketHolder.socket.getInputStream().read(receives);
assertEquals("My Test String", new String(receives, 0, length));
socket.close();
socketHolder.socket.close();
server.close();
不知道有多少人也像我一样把这段代码写成这样?这里有两个问题:
1. ServerSocket的监听的端口不一定是可用的,类似测试代码我之前没有写过,我估计自己正真在写的时候应该会想到让操作系统动态分配。
2. 为了在两个线程中传递数据,这里首先创建了一个SocketHolder类,然后使用CountDownLatch,写起来好麻烦。为了简化这段代码,可以使用Exchanger,即当一个生产者线程准备好数据后可以通过Exchanger将数据传递给消费者,而消费者在生产者传递过来数据后就可以消费了,这里的数据就是Socket。
改进后的代码如下:
@Test
public void levinImprovedWayTest() throws Exception {
final ServerSocket server = new ServerSocket();
server.bind(null);
final Exchanger Socket exchanger = new Exchanger Socket
new Thread() {
public void run() {
try {
exchanger.exchange(server.accept());
} catch(Exception ex) {
ex.printStackTrace();
}
}
}.start();
Socket socket = new Socket(server.getInetAddress(), server.getLocalPort());
socket.getOutputStream().write("My Test String".getBytes());
Socket receiverSocket = exchanger.exchange(null, 5, TimeUnit.SECONDS);
byte[] receives = new byte[4096];
int length = receiverSocket.getInputStream().read(receives);
assertEquals("My Test String", new String(receives, 0, length));
socket.close();
receiverSocket.close();
server.close();
多线程之间通信及线程池 线程通信 应用场景:生产者和消费者问题 假设仓库中只能存放一件产品,生产者将生产出来的产品放入仓库,消费者将仓库中产品取走消费 如果仓库中没有产品,则生产者将产品放入仓库,否则停止生产并等待,直到仓库中的产品被消费者取走为止 如果仓库中放有产品,则消费者可以将产品取走消费,否则停止消费并等待,直到仓
java线程间通讯的几种方式 并发编程中,我们可能会遇到这样一个场景 A、B两个线程并行,但是我希望保证B线程在A线程执行完了后再执行 这个时候就需要线程间进行通讯 A执行完了后对B说一声,喂,我执行完了
1. 建立一个连接(创建ServerSocket实例,一般还会给定一个端口,其实可以bind(null)以让操作系统分配一个可用端口),新启动一个线程,在新线程中监听给定端口(调用accept方法)。
2. 发送客户端请求(创建一个Socket实例,并向该Socket写入请求数据)。
3. 在接收端读取数据,验证写入的请求和接收到的数据相同。
在以上流程实现中,accept方法返回的接收端Socket需要传给主线程,同时要保证使用该Socket是在accept方法返回之后,以我习惯,我会使用一个Lock或CountDownLatch:
private static class SocketHolder {
Socket socket;
}
@Test
public void levinOldWayTest() throws Exception {
final ServerSocket server = new ServerSocket(10240);
final CountDownLatch latch = new CountDownLatch(1);
final SocketHolder socketHolder = new SocketHolder();
new Thread() {
public void run() {
try {
socketHolder.socket = server.accept();
latch.countDown();
} catch(Exception ex) {
ex.printStackTrace();
}
}
}.start();
Socket socket = new Socket(server.getInetAddress(), server.getLocalPort());
socket.getOutputStream().write("My Test String".getBytes());
latch.await(5, TimeUnit.SECONDS);
byte[] receives = new byte[4096];
int length = socketHolder.socket.getInputStream().read(receives);
assertEquals("My Test String", new String(receives, 0, length));
socket.close();
socketHolder.socket.close();
server.close();
不知道有多少人也像我一样把这段代码写成这样?这里有两个问题:
1. ServerSocket的监听的端口不一定是可用的,类似测试代码我之前没有写过,我估计自己正真在写的时候应该会想到让操作系统动态分配。
2. 为了在两个线程中传递数据,这里首先创建了一个SocketHolder类,然后使用CountDownLatch,写起来好麻烦。为了简化这段代码,可以使用Exchanger,即当一个生产者线程准备好数据后可以通过Exchanger将数据传递给消费者,而消费者在生产者传递过来数据后就可以消费了,这里的数据就是Socket。
改进后的代码如下:
@Test
public void levinImprovedWayTest() throws Exception {
final ServerSocket server = new ServerSocket();
server.bind(null);
final Exchanger Socket exchanger = new Exchanger Socket
new Thread() {
public void run() {
try {
exchanger.exchange(server.accept());
} catch(Exception ex) {
ex.printStackTrace();
}
}
}.start();
Socket socket = new Socket(server.getInetAddress(), server.getLocalPort());
socket.getOutputStream().write("My Test String".getBytes());
Socket receiverSocket = exchanger.exchange(null, 5, TimeUnit.SECONDS);
byte[] receives = new byte[4096];
int length = receiverSocket.getInputStream().read(receives);
assertEquals("My Test String", new String(receives, 0, length));
socket.close();
receiverSocket.close();
server.close();
多线程之间通信及线程池 线程通信 应用场景:生产者和消费者问题 假设仓库中只能存放一件产品,生产者将生产出来的产品放入仓库,消费者将仓库中产品取走消费 如果仓库中没有产品,则生产者将产品放入仓库,否则停止生产并等待,直到仓库中的产品被消费者取走为止 如果仓库中放有产品,则消费者可以将产品取走消费,否则停止消费并等待,直到仓
java线程间通讯的几种方式 并发编程中,我们可能会遇到这样一个场景 A、B两个线程并行,但是我希望保证B线程在A线程执行完了后再执行 这个时候就需要线程间进行通讯 A执行完了后对B说一声,喂,我执行完了
相关文章
- 【黑马Android】(05)短信/查询和添加/内容观察者使用/子线程网络图片查看器和Handler消息处理器/html查看器/使用HttpURLConnection采用Post方式请求数据/开源项目
- Linux线程同步:条件变量
- Spring Cloud中Hystrix 线程隔离导致ThreadLocal数据丢失问题分析
- Android--Handler的使用方法:在子线程中更新界面
- 【完整代码】Exchanger两个线程交换数据代码示例
- Atitit usrqbg1821 Tls 线程本地存储(ThreadLocal Storage 规范标准化草案解决方案ThreadStatic
- 【项目实战】并发编程之Java集合框架中的一个线程安全的队列实现 ——BlockingQueue入门介绍
- 深入分析3种线程池执行任务的逻辑方法
- redis 加锁与解锁的详细总结,解决线程并发导致脏数据
- 【Linux 内核】线程调度示例一 ② ( 获取指定调度策略的最大和最小优先级 | 代码示例 )
- 一个线程池中的线程异常了,那么线程池会怎么处理这个线程?
- Netty IO线程模型学习总结