《Ceph源码分析》——第3章,第2节Simple实现
本节书摘来自华章出版社《Ceph源码分析》一书中的第3章,第3.2节Simple实现,作者常涛,更多章节内容可以访问云栖社区“华章计算机”公众号查看
3.2 Simple实现
Simple在Ceph里实现比较早,目前也比较稳定,是在生产环境中使用的网络通信模块。如其名字所示,实现相对比较简单。下面具体分析一下,Simple如何实现Ceph网络通信框架的各个模块。
3.2.1 SimpleMessager
类SimpleMessager实现了Messager接口。
class SimpleMessenger : public SimplePolicyMessenger {
Accepter accepter; //用于接受客户端的链接请求
DispatchQueue dispatch_queue; //接收到的请求的消息分发队列
bool did_bind; //是否绑定
__u32 global_seq;//生成全局的消息seq ceph_spinlock_t global_seq_lock;//用于保护global_seq
` //地址→pipe映射
ceph::unordered_map rank_pipe;
//正在处理的pipes
set accepting_pipes;
//所有的pipes
set pipes;
//准备释放的pipes
list pipe_reap_queue;`
//内部集群的协议版本
` int cluster_protocol;
}`
3.2.2 Accepter
类Accepter用来在Server端监听端口,接收链接,它继承了Thread类,本身是一个线程,来不断地监听Server的端口:
`class Accepter : public Thread {
SimpleMessenger *msgr;
bool done;
int listen_sd; //监听的端口
uint64_t nonce;
……
}
`
3.2.3 DispatchQueue
DispatchQueue类用于把接收到的请求保存在内部,通过其内部的线程,调用SimpleMessenger类注册的Dispatch类的处理函数来处理相应的消息:
class DispatchQueue { ...... mutable Mutex lock; Cond cond; class QueueItem { int type; ConnectionRef con; MessageRef m; ...... PrioritizedQueue QueueItem, uint64_t mqueue; //接收消息的优先队列 set pair double, Message* marrival; //接收到的消息集合 pair为(recv_time, message) map Message *, set pair double, Message* ::iterator marrival_map; //消息→所在集合位置的映射 };
其内部的mqueue为优先级队列,用来保存消息,marrival保存了接收到的消息。marrival_map保存消息在集合中的位置。
函数DispatchQueue::enqueue用来把接收到的消息添加到消息队列中,函数DispatchQueue::entry为线程的处理函数,用于处理消息。
3.2.4 Pipe
类Pipe实现了PipeConnection的接口,它实现了两个端口之间的类似管道的功能。
对于每一个pipe,内部都有一个Reader和一个Writer线程,分别用来处理这个Pipe有关的消息接收和请求的发送。线程DelayedDelivery用于故障注入测试:
class Pipe : public RefCountedObject { class Reader : public Thread { } reader_thread; //接收线程,用于接收数据 class Writer : public Thread { } writer_thread; //发送线程,用于发送数据 SimpleMessenger *msgr; // msgr的指针 uint64_t conn_id; //分配给Pipe自己唯一的id char *recv_buf; //接收缓存区 int recv_max_prefetch; //接收缓冲区一次预取的最大值 int recv_ofs; //接收的偏移量 int recv_len; //接收的长度 int sd; // pipe对应的socked fd struct iovec msgvec[IOV_MAX]; //发送消息的iovec结构 int port; //链接端口 int peer_type; //链接对方的类型 entity_addr_t peer_addr; //对方地址 Messenger::Policy policy; //策略 Mutex pipe_lock; int state; //当前链接的状态 atomic_t state_closed; //如果非0,那么状态为STATE_CLOSED PipeConnectionRef connection_state; //PipeConnection的引用 utime_t backoff; // backoff的时间
map int, list Message* out_q; //准备发送的消息优先队列 DispatchQueue *in_q; //接收消息的DispatchQueue list Message* sent; //要发送的消息 Cond cond; bool send_keepalive; bool send_keepalive_ack; utime_t keepalive_ack_stamp; bool halt_delivery; //如果Pipe队列消毁,停止增加 __u32 connect_seq, peer_global_seq; uint64_t out_seq; //发送消息的序列号 uint64_t in_seq, in_seq_acked; //接收到消息序号和ACK的序号 }
3.2.5 消息的发送
1)当发送一个消息时,首先要通过Messenger类,获取对应的Connection:
conn = messenger- get_connection(dest_server);
具体到SimpleMessenger的实现如下所示:
a)首先比较,如果dest.addr是my_inst.addr,就直接返回local_connection。
b)调用函数_lookup_pipe在已经存在的Pipe中查找。如果找到,就直接返回pipeConnectionRef;否则调用函数connect_rank新创建一个Pipe,并加入到msgr的register_pipe里。
2)当获得一个Connection之后,就可以调用Connection的发送函数来发送消息。
conn- send_message(m);
其最终调用了SimpleMessenger::submit_message函数:
a)如果Pipe不为空,并且状态不是Pipe::STATE_CLOSED状态,调用函数pipe→_send把发送的消息添加到out_q发送队列里,触发发送线程。
b)如果Pipe为空,就调用connect_rank创建Pipe,并把消息添加到out_q发送队列中。
3)发送线程writer把消息发送出去。通过步骤2,要发送的消息已经保存在相应Pipe的out_q队列里,并触发了发送线程。每个Pipe的Writer线程负责发送out_q的消息,其线程入口函数为Pipe::writer,实现功能:
a)调用函数_get_next_outgoing从out_q中获取消息。
b)调用函数write_message(header, footer, blist)把消息的header、footer、数据blist发送出去。
3.2.6 消息的接收
1)每个Pipe对应的线程Reader用于接收消息。入口函数为Pipe::reader,其功能如下:
a)判断当前的state,如果为STATE_ACCEPTING,就调用函数Pipe::accept来接收连接,如果不是STATE_CLOSED,并且不是STATE_CONNECTING状态,就接收消息。
b)先调用函数tcp_read来接收一个tag。
c)根据tag,来接收不同类型的消息如下所示:
`CEPH_MSGR_TAG_KEEPALIVE消息。
CEPH_MSGR_TAG_KEEPALIVE2,在CEPH_MSGR_TAG_KEEPALIVE的基础上,添加了时间。
CEPH_MSGR_TAG_KEEPALIVE2_ACK。
CEPH_MSGR_TAG_ACK。
CEPH_MSGR_TAG_MSG,这里才是接收的消息。
CEPH_MSGR_TAG_CLOSE。`
d)调用函数read_message来接收消息,当本函数返回后,就完成了接收消息。
2)调用函数in_q- fast_preprocess(m)预处理消息。
3)调用函数in_q- can_fast_dispatch(m),如果可以进行fast_dispatch,就in_q- fast_dispatch(m)处理。fast_dispatch并不把消息加入到mqueue里,而是直接调用msgr- ms_fast_dispatch函数,并最终调用注册的fast_dispatcher函数处理。
4)如果不能fast_dispatch,就调用函数in_q- enqueue(m, m- get_priority(), conn_id) 把接收到的消息加入到DispatchQueue的mqueue队列里,由DispatchQueue的分发线程调用ms_dispatch处理。
ms_fast_dispath和ms_dispatch两种处理的区别在于:ms_dispatch是由DispatchQueue的线程处理的,它是一个单线程;ms_fast_dispatch函数是由Pipe的接收线程直接调用处理的,因此性能比前者要好。
3.2.7 错误处理
网络模块复杂的功能是如何处理网络错误。无论是接收还是发送,会出现各种异常错误,包括返回异常错误码,接收数据的magic验证不一致,接收的数据的效验验证不一致,等等。错误的原因主要是由于网络本身的错误(物理链路等),或者字节跳变引起的。
目前错误处理的方法比较简单,处理流程如下:
1)关闭当前socket的连接。
2)重新建立一个socket连接。
3)重新发送没有接受到ACK应对的消息。
函数Pipe::fault用来处理错误:
1)调用shutdown_socket关闭pipe的socket。
2)调用函数requeue_sent把没有收到ACK的消息重新加入发送队列,当发送队列有请求时,发送线程会不断地尝试重新连接。
kube-proxy源码分析:深入浅出理解k8s的services工作原理 在进行k8s实践中, services 是经常碰到的资源对象,services 充当了 k8s 集群 pod 服务抽象的功能,为后端pod 提供了负载均衡和服务发现,那他到底是如何工作的呢,这里从 services 的具体实现 kube-proxy 出发解读 services 的工作机制。
源码分析Node的Cluster模块 ### 从源码分析Node的Cluster模块 前段时间,公司的洋彬哥老哥遇到一个问题,大概就是本机有个node的http服务器,但是每次请求这个服务器的端口返回的数据都报错,一看返回的数据根本不是http的报文格式,然后经过一番排查发现是另外一个服务器同时监听了http服务器的这个端口。这个时候洋彬老哥就很奇怪,为啥我这个端口明明使用了,却还是可以启动呢?这个时候我根据以前看libuv源码
Drill storage plugin实现原理分析 # Drill Storage Plugin介绍 Drill是一个交互式SQL查询引擎,官方默认支持的数据源有hive、hbase、kafka、kudu、mongo、opentsdb、jdbc等,其中jdbc storage plugin可以覆盖所有支持jdbc协议的数据源,如:mysql、oracle等关系型数据库。所有数据源的接入都是通过drill的storage plugin实
Kubernetes源码分析-Device Manager的初始化 Kubernetes引入的Devic Plugin,通过扩展机制实现支持GPU、FPGA、高性能 NIC、InfiniBand等各种设备的集成,而Device Manager正是Kubelet内负责Device Plugin交互和设备生命周期管理的模块,在[了解其基本设计](https://yq.aliyun.com/articles/498185)后,我们需要通过对Device Manager的
《Ceph源码分析》——第2章,第6节SafeTimer 本节书摘来自华章出版社《Ceph源码分析》一书中的第2章,第2.6节SafeTimer,作者常涛,更多章节内容可以访问云栖社区“华章计算机”公众号查看 2.6 SafeTimer 类SafeTimer实现了定时器的功能,代码如下: class SafeTimer CephConte
《Ceph源码分析》——第2章,第4节Finisher 本节书摘来自华章出版社《Ceph源码分析》一书中的第2章,第2.4节Finisher,作者常涛,更多章节内容可以访问云栖社区“华章计算机”公众号查看 2.4 Finisher 类Finisher用来完成回调函数Context的执行,其内部有一个FinisherThread线程来用于执行Conte
相关文章
- Android系统自带的android.util.Base64的实现源码
- Android开发之使用BroadcastReceiver实现开机自己主动启动(源码分享)
- BIND9源码分析之acl 的实现
- Tensorflow 源码算子以及架构实现分析
- 深入理解Spark:核心思想与源码分析. 3.8 TaskScheduler的启动
- spring-boot-admin源码分析及单机监控spring-boot-monitor的实现(一)
- JS axios cancelToken 是如何实现取消请求?稍有啰嗦但超有耐心的 axios 源码分析
- linux下如何源码安装expect
- Java FutureTask<V> 源码分析 Android上的实现
- SwiftUI 通过 ARKit 使用微笑控制 SwiftUI 视图(教程含源码)
- SwiftUI Swift 之正向地理编码与反向地理编码(教程含源码)
- SwiftUI Form 实现靠左忘记密码提示(教程含源码)
- SwiftUI macOS 3D教程之构建3D地球实现traceroute curl (教程含源码)
- SwiftUI 自定义悬浮Tabbar实现Navigation自动隐藏与显示(教程含源码)@escaping
- 毕业设计 特征熵值分析的网站分类系统实现(源码+论文)
- 【毕业设计_课程设计】基于深度学习与词嵌入的情感分析系统设计与实现(源码+论文)
- 【毕业设计_课程设计】基于Android和Flask的最炫广场舞APP设计与实现(源码+论文)
- 【毕业设计_课程设计】基于网络爬虫的新闻采集和订阅系统的设计与实现(源码+论文)
- 编译Tomcat9源码及tomcat乱码问题解决
- RT-Thread系列--内存池MEMPOOL源码分析
- 源码编译OpenJdk 8,Netbeans调试Java原子类在JVM中的实现(Ubuntu 16.04)
- Java多线程基础(一)---Thread API(join深度详解、源码分析和案例分析之代码实现,优雅关闭线程三种方式)