zl程序教程

您现在的位置是:首页 >  云平台

当前栏目

boto3 - sqs - 发送/清除/接收/删除消息

消息 删除 发送 清除 接收
2023-09-27 14:29:08 时间
import json
import boto3

# 数据库连接/云服务 ============================================
AP_S3_AKI = 'xxx'
AP_S3_SAK = 'xxx'
AP_REGION_NAME = 'xxx'

# 队列URL
TEST_QUEUE_URL = 'https://sqs.xxx'


sqs = boto3.client('sqs', region_name=AP_REGION_NAME, aws_access_key_id=AP_S3_AKI, aws_secret_access_key=AP_S3_SAK)


def producter(msg):
    """
    发送消息
    :param msg:
    :return:
    """
    sqs.send_message(
        QueueUrl=TEST_QUEUE_URL,
        MessageBody=json.dumps(msg),
    )
    print(f"发送到消息队列 成功!msg_detail = {msg} ")


def del_queue():
    """
    清除消息队列
    :return:
    """
    sqs.purge_queue(QueueUrl=TEST_QUEUE_URL)


def receive_msg():
    """
    读取并删除队列消息
    :return:
    """
    try:
        response = sqs.receive_message(
            QueueUrl=TEST_QUEUE_URL,
            MaxNumberOfMessages=1,
            MessageAttributeNames=['All'],
            VisibilityTimeout=100,
            WaitTimeSeconds=20
        )
    except Exception as e:
        print(f'Receive message failed. | Exception: {e}')
        return

    if response.get('Messages'):
        msg = response.get('Messages')[0]
        print(f"msg = {msg}")
        handle = msg.get('ReceiptHandle')
        msg_info = json.loads(msg.get('Body'))
        print(f"msg_info = {msg_info}")

        try:
            # del_response = sqs.delete_message(
            #     QueueUrl=TEST_QUEUE_URL,
            #     ReceiptHandle=handle
            # )
            # print(f"del_response = {del_response}")
            pass
        except Exception as e:
            print(f'Receive cyberstar_info from cyberstar_queue failed. | Exception: {e}')


if __name__ == "__main__":
    """
        发送, 接收, 删除, 清除 sqs消息到消息队列 - TEST_QUEUE_URL 示例
    """
    # 发送
    msg = {"name": "xiaoming", "age": "18"}
    # producter(msg)

    # 清除
    # del_queue()

    # 接收
    receive_msg()