zl程序教程

您现在的位置是:首页 >  其它

当前栏目

socket

socket
2023-09-11 14:16:16 时间

 

Socket套接字

Python提供socket.py标准库

AF Address Family

 

Server:

  • 创建Socket对象
  • 绑定IP地址和Port bind()方法
    IPv4地址必须是一个二元组('ipv4',Port)  list报错
  • 监听于bind()后的地址,listen()方法
  • 获取用于传输数据的Socket对象
    socket.accept(socket object,address info)
    accept()方法阻塞等待client建立连接,返回一个新的Socket对象和客户端地址的二元组
    IPv4中是一个二元组(clientaddr,Port)
  1. 接收数据
    recv(bufsize[,flag]) 使用缓冲区接收数据
  2. 发送数据
    send(bytes)
  • fsa
import socket,logging,threading
FORMAT='%(asctime)s [%(thread)d %(threadName)s] %(message)s'
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt='%H:%M:%S')

class S:
    def __init__(self,ip='127.0.0.1',port=9999):
        self.sock=socket.socket()
        self.addr=(ip,port)
        self.clients={}
        self.event=threading.Event()

    def start(self):
        self.sock.bind(self.addr)
        self.sock.listen()
        threading.Thread(target=self._accept,name='accept').start()

    def stop(self):
        for c in self.clients.values():
            c.close()
        self.sock.close()
        self.event.wait(2)
        self.event.set()

    def _accept(self):
        while not self.event.is_set():
            conn,client=self.sock.accept()
            self.clients[client]=conn
            threading.Thread(target=self._recv,args=(conn,client),name='recv').start()

    def _recv(self,conn,client):
        while not self.event.is_set():
            try:
                data=conn.recv(1024)
            except Exception as e:
                logging.warning(e)
                data=b'quit'
            data=data.decode()
            logging.warning(data)
            if data == 'quit':
                self.clients.pop(client,None)
                logging.error('{} quit'.format(conn))
                conn.close()
                break
            msg='ack {}'.format(data)
            for c in self.clients.values():
                c.send(msg.encode())

v=S()
v.start()

e=threading.Event()

def c():
    while not e.wait(3):
        logging.critical(v.clients)
        logging.critical(threading.enumerate())
threading.Thread(target=c,daemon=True).start()

while True:
    cmd=input('>>').strip()
    if cmd == 'quit':
        v.stop()
        break

 

 

import socket,threading,logging
event=threading.Event()
sock=socket.socket()
ip='127.0.0.1'
port=9999
addr=(ip,port)
sock.bind(addr)
sock.listen()

def accept(sock,event:threading.Event):
    s,_=sock.accept()
    v=s.makefile(mode='rw')
    while True:
        line=v.readline()
        logging.error(line)
        if line.strip() == 'quit':
            break
        v.write('ack {}'.format(line))
        v.flush()
    v.close()
    sock.close()
    event.wait(2)
    logging.error('end')
    event.set()
threading.Thread(target=accept,args=(sock,event)).start()
while not event.wait(2):
    logging.error(sock)

 

makefile:

import logging,threading,socket
FORMAT='%(asctime)s [%(thread)d:%(threadName)s] %(message)s'
logging.basicConfig(level=logging.WARNING,format=FORMAT,datefmt='%H:%M:%S')

class Server:
    def __init__(self,ip='127.0.0.1',port=9999):
        self.sock=socket.socket()
        self.addr=(ip,port)
        self.event=threading.Event()
        self.clients={}

    def start(self):
        self.sock.bind(self.addr)
        self.sock.listen()
        threading.Thread(target=self.__accept,name='accept Thread').start()

    def stop(self):
        for v in self.clients.values():
            v.flush()
            v.close()
        self.sock.close()
        self.event.wait(1)
        self.event.set()

    def __accept(self):
        index=1
        while not self.event.is_set():
            conn,client=self.sock.accept()
            v=conn.makefile(mode='rw')
            self.clients[client]=v
            threading.Thread(target=self.__recv,args=(v,client),name='recv Thread-{}'.format(index)).start()
            index+=1

    def __recv(self,v,client):
        while not self.event.is_set():
            try:
                data=v.readline() # blocked until '\n'
            except ConnectionAbortedError as e:
                logging.error(e)
                data='q'
            logging.error(data)
            print(type(data))
            if data.strip() == 'q':
                logging.critical('quit !!!!!!!!!!!')
                self.clients.pop(v,None)
                v.close()
                break
            for c in self.clients.values():
                try:
                    c.write('ack {}'.format(data))
                    c.flush()
                except ValueError as e:
                    logging.error(e)
s=Server()
s.start()
e=threading.Event()

def showThreads():
    while not e.wait(2):
        logging.error(threading.enumerate())

threading.Thread(target=showThreads,name='showThreads',daemon=True).start()

while True:
    cmd=input('>>').strip()
    if cmd == 'quit':
        s.stop()
        break

 

UDP:

udp server编程

  1. 创建socket对象,socket.SOCK_DGRAM
  2. 绑定IP和Port,bind()
  3. 传输
    接收数据    socket.recvfrom(bufsize[,flags]) 获得一个二元组(string,address)
    发送数据    socket.sendto(bytes,address) 
  4. 释放资源

服务端加入ack和heartbeat机制

import threading,socket,logging

class UdpServer:
    def __init__(self,ip='127.0.0.1',port=9988):
        self.sock=socket.socket()
        self.addr=(ip,port)
        self.clients={}
        self.event=threading.Event()

    def start(self):
        self.sock.bind(self.addr)
        threading.Thread(target=self.recv,name='recv').start()

    def recv(self):
        while not self.event.is_set():
            lostset=set()
            data,client=self.sock.recvfrom(1024)
            data=data.decode()

            if data.strip() == 'quit':
                self.clients.pop(client,None)
                logging.info('{} leaving'.format(client))
                continue
            fugacious=datetime.datetime.now().timestamp()
            self.clients[client]=fugacious
            
            if data.strip() == '\001\002':
                self.clients[client]=fugacious
                continue
            
            dope='{} {} {}'.format(*client,data)
            logging.warning(dope)
            
            for c,stamp in self.clients.items():
                if fugacious-stamp > 10:
                    lostset.add(c)
                else:
                    self.sock.sendto(dope.encode(),self.addr)
            
            for c in lostset:
                self.clients.pop(c)
    def stop(self):
        for c in selt.clients:
            self.sock.sendto(b'quit!!!!',c)
        self.sock.close()
        self.event.set()

if __name__ == '__main__':
    us=UdpServer()
    us.start()
    threading.Event.wait(2)
    
            
            
View Code

 

ack:

服务器发送消息给client,client收到后必须回应ack,表示收到了pdu,否则服务端从clients中移除这个客户端记录

heartbeat:

heartbeat,可以让客户端定时发送消息到server,如果超时没有发送,server移除客户端记录

 

 

upd client编程

  1. 创建socket对象,socket.SOCK_DGRAM
  2. 发送数据,socket.sendto(bytes,address)
  3. 释放资源

UdpClient:

 

import threading,socket,logging
logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(message)s',\
                    datefmt='%H:%M:%S')
class UdpClient:
    def __init__(self,ip='127.0.0.1',port=9988):
        self.addr=(ip,port)
        self.sock=socket.socket(family=socket.AF_INET,type=socket.SOCK_DGRAM)
        self.event=threading.Event()
        self.sock.connect(self.addr)
        threading.Thread(target=self.__heartbeat,name='heartbeat').start()
        threading.Thread(target=self.recv,name='recv').start()

    def __heartbeat(self):
        while not self.event.wait(5):
            self.sock.sendto(b'\001\002',self.addr)

    def send(self,dope:str):
        tidings='To {}:{} dope {}'.format(*self.addr,dope)
        self.sock.sendto(tidings.encode(),self.addr)

    def recv(self):
        while not self.event.is_set():
            data,saddr=self.sock.recvfrom(1024)
            logging.warning('recv {} from {}'.format(data.decode(),saddr))

    def stop(self):
        self.sock.close()
        self.event.wait(2)
        self.event.set()

uc=UdpClinet()

while True:
    data=input('>>').strip()
    if data=='quit':
        uc.stop()
        break
    else:
        uc.send(data)
    

 

 

Udp是无连接协议,基于

  • 网络稳定
  • 消息不丢包
  • 包不乱序

但即使在LAN中,也可能丢包,包的到达不一定有序

DNS,数据内容小,一个包就能包含pdu,不存在乱序,丢包,重新请求解析

 

多改进版:

UdpServer:

import threading,logging,socket,datetime
logging.basicConfig(level=logging.INFO,format='%(asctime)s %(threadName)s %(message)s',\
                    datefmt='%H:%M:%S')

class UdpServer:
    def __init__(self,ip='127.0.0.1',port=9999,interval=6):
        self.sock=socket.socket(type=socket.SOCK_DGRAM)
        self.addr=(ip,port)
        self.clients={}
        self.interval=interval
        self.event=threading.Event()

    def start(self):
        self.sock.bind(self.addr)
        threading.Thread(target=self.__recvfrom,name='udp recvfrom').start()

    def stop(self):
        for c in self.clients:
            self.sock.sendto(b'UdpServer stop!',c)
        self.event.set()
        self.sock.close()


    def __recvfrom(self):
        while not self.event.is_set():
            try: # 调用stop时,sock.recvfrom处于阻塞,self.event.set()会跳出loop,结束线程,但阻止不了异常
                data,client=self.sock.recvfrom(1024)
            except OSError as e:
                logging.critical(e)
                continue

            data=data.decode().strip()
            logging.critical('{} {} {}'.format(data,client,self.clients))
            fugacious=datetime.datetime.now().timestamp()

            if data == '\001\002':
                logging.error('heartbeat!!')
                self.clients[client]=fugacious
                continue
            elif data == 'comming':
                logging.error('{} newly come'.format(client))
                self.clients[client]=fugacious
                continue
            elif data == 'quit':
                logging.error('{} leaving'.format(client))
                self.clients.pop(client,'default')
                continue

            self.clients[client]=fugacious

            msg='ack {} {}\n{}\n'.format(*client,data)
            current=datetime.datetime.now().timestamp()

            int=set()
            for c,ts in self.clients.items():
                logging.error('{} {} {}'.format(22222222222222222222,c,ts))
                b=current-ts
                logging.error('current-ts: {} {}'.format(b,c))
                if b <= self.interval:
                    logging.error('{} {} {} {}'.format('3'*20,current-ts,c,ts))
                    self.sock.sendto(msg.encode(),c)
                else:
                    int.add(c)
            for b in int:
                self.clients.pop(b)



if __name__ == '__main__':
    vv=UdpServer()
    vv.start()
    while True:
        msg=input('>>').strip()
        if msg == 'quit':
            vv.stop()
            break
    threading.Event().wait(2)

 

Udp'Client:

import threading,logging,socket
class UdpClient:
    def __init__(self,ip='127.0.0.1',port=9988,interval=5):
        self.sock=socket.socket(type=socket.SOCK_DGRAM)
        self.addr=(ip,port)
        self.interval=interval
        self.event=threading.Event()

    def start(self):
        self.sendto('comming')
        threading.Thread(target=self.recvfrom,name='UdpClient recvfrom').start()
        threading.Thread(target=self.__heartbeat,name='heartbeat Thread',daemon=True).start()

    def recvfrom(self):
        while not self.event.is_set():
            try:
                data,saddr=self.sock.recvfrom(1024)
            except OSError as e:
                logging.critical(e)
                continue
            logging.error('{} {}'.format(data,saddr))

    def sendto(self,data='quit'):
        self.sock.sendto(data.encode(),self.addr)

    def __heartbeat(self):
        msg='\001\002'
        while not self.event.wait(self.interval):
            self.sock.sendto(msg.encode(),self.addr)

    def stop(self):
        self.sock.close()
        self.event.set()

if __name__ == '__main__':
    uc=UdpClient(port=9999)
    uc.start()
    while True:
        msg=input('>>').strip()
        if msg == 'quit':
            uc.sendto()
            uc.stop()
            break
        uc.sendto(msg)