zl程序教程

您现在的位置是:首页 >  其他

当前栏目

3.FastDFS分布式的文件存储系统进阶API使用实践

文件分布式分布式API 使用 实践 进阶 存储系统
2023-06-13 09:13:35 时间

[TOC]

0x00 FastDFS API 使用实践

Java

描述: FastDFS 项目的开发者余庆大佬已经为我们Java API来对接FastDFS服务器。

Github 项目地址: https://github.com/happyfish100/fastdfs-client-java.git

操作流程:

Step 1.maven 源码构建

mvn clean install

# pom.xml 依赖添加
<dependency>
    <groupId>org.csource</groupId>
    <artifactId>fastdfs-client-java</artifactId>
    <version>1.27-SNAPSHOT</version>
</dependency>

Step 2.添加并配置文件 fdfs_client.conf ,其中参数此处不做过多的解析。

connect_timeout = 2
network_timeout = 30
charset = UTF-8
http.tracker_http_port = 8070
http.anti_steal_token = no
http.secret_key = WeiyiGee.top
tracker_server = 192.168.56.10:22122
tracker_server = 192.168.56.11:22122
  • Step 3.单例模式,无线程池;读取classpath下fdfs_client.conf配置文件
package com.aixin.tuna.fdfs;
import org.csource.fastdfs.*;
import java.sql.Connection;
import java.sql.SQLException;

/**
 * Created by dailin on 2018/7/11.
 */
public class FdfsUtil {

  static class Nested {
      private static TrackerServer trackerServer =null;
      private static StorageServer storageServer = null;
      private static StorageClient storageClient = null;

      static {
          try {
              ClientGlobal.init("fdfs_client.conf");
              TrackerClient tracker = new TrackerClient();
              trackerServer = tracker.getConnection();
              storageClient = new StorageClient(trackerServer, storageServer);
          }catch (Exception e) {
              e.printStackTrace();
          }

      }
  }

  //获取单例
  public  static StorageClient getStorageClient() {
    return Nested.storageClient;
  }
}

Step 4.实际使用测试。

package fdfs;

import java.io.FileOutputStream;
import java.io.OutputStream;

import com.aixin.tuna.fdfs.FdfsUtil;
import org.csource.common.NameValuePair;

import org.csource.fastdfs.FileInfo;
import org.csource.fastdfs.StorageClient;
import org.junit.Before;
import org.junit.Test;

public class TestFastDfs {
    private StorageClient storageClient;

    @Before
    public void init() {
        this.storageClient = FdfsUtil.getStorageClient();
    }

    /**
     * 测试文件上传,得到上传后的文件路径及文件名称
     */
    @Test
    public void testUpload() {
        try {
            String local_filename = "C:\\Users\\1\\Desktop\\1.png";  //本地文件路径

            String[] file = storageClient.upload_file(local_filename, "png", null);

            System.out.println(file.length);
            System.out.println("组名:" + file[0]);
            System.out.println("路径: " + file[1]);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 测试文件下载,根据文件所在group和文件路径信息访问文件
     */
    @Test
    public void testDownload() {
        try {
            String groupName = "group2";
            String filePath = "M00/00/00/wKg4C1tFegGAK7WZAADdeFFxlXA481.png";

            byte[] bytes = storageClient.download_file(groupName, filePath);
            String storePath = "C:\\Users\\1\\Desktop\\download1.png";

            OutputStream out = new FileOutputStream(storePath);
            out.write(bytes);
            out.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取文件基本信息,根据文件所在group和文件路径信息访问文件
     */
    @Test
    public void testGetFileInfo() {

        try {
            String groupName = "group2";
            String filePath = "M00/00/00/wKg4C1tFegGAK7WZAADdeFFxlXA481.png";


            FileInfo file = storageClient.get_file_info(groupName, filePath);
            System.out.println("source_ip:" + file.getSourceIpAddr());
            System.out.println("file_size:" + file.getFileSize());
            System.out.println("upload_time:" + file.getCreateTimestamp());
            System.out.println(file.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Test
    public void testGetFileMatedata() {
        try {
            String groupName = "group2";
            String filePath = "M00/00/00/wKg4C1tFmmqATh8jAA-B0CmJB5A.tar.gz";

            NameValuePair nvps[] = storageClient.get_metadata(groupName, filePath);
            if (null != nvps && nvps.length > 0) {
                for (NameValuePair nvp : nvps) {
                    System.out.println(nvp.getName() + ":" + nvp.getValue());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 删除文件,根据文件所在group和文件路径信息访问文件
     */
    @Test
    public void testDelete() {

        try {
            String groupName = "group2";
            String filePath = "M00/00/00/wKg4C1tFlT6ASyqKAADdeFFxlXA080.png";
            int i = storageClient.delete_file(groupName, filePath);
            System.out.println(i == 0 ? "删除成功" : "删除失败:" + i);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Python

描述: 使用Python3来操作Fastdfs进行文件上传、删除、下载以信息的查看,此 Python 适用于 Fastdfs Ver 4.06 的 Python 接口。

项目官方地址:https://pypi.org/project/fastdfs-client-py3/

操作流程:

Step 1.安装 FastDFS 客户端的 Python 扩展包 fastdfs-client-py3 项目.

pip install fastdfs-client-py3
# pip3 install fastdfs-client-py3 -i  https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn

Step 2.定义客户端扩展配置文件 fdfs_client.conf, 主要设置traker服务器的域名和端口.

connect_timeout = 30
network_timeout = 60
tracker_server = 10.10.107.225:22122
tracker_server = 10.10.107.226:22122
http.tracker_server_port = 8080

Step 3.新建一个Python工程, 导入 fdfs_client.client 模块, 实例化Fdfs_client类.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @Desc: Python3 操作 Fastdfs 分布式存储实践
# @Author: WeiyiGeek
# @Create: 2021年11月9日 10:20:40
# @Blog: https://weiyigeek.top

from fdfs_client.client import *

def main():
  # (1) 文件资源上传
  client  = Fdfs_client(get_tracker_conf('fdfs_client.conf'))
  ret1 = client.upload_by_filename('code.png')
  print("文件资源上传测试: \n",ret1)

  # (2) 文件内容追加上传
  # PS Y:\DevOps\分布式存储\FastDFS\Client> echo "Fastdfs Python fdfs_client" > a.txt
  # PS Y:\DevOps\分布式存储\FastDFS\Client> echo "append Test WeiyiGeek" > b.txt
  # # 表示该上传的是可被追加的文件a.txt到storage中
  ret2 = client.upload_appender_by_filename('a.txt')
  # # 表示将b.txt内容追加到以上传的Remote file中
  result = client.append_by_filename('b.txt',ret2['Remote file_id'])
  print("文件内容追加测试: \n",result)

  # (3) 上传指定文件的子文件 (未复现成功)
  ret3 = client.upload_by_filename('code_normal.png')
  result = client.upload_slave_by_filename('code_smaller.png',ret3['Remote file_id'],b'_small')
  print("上传指定文件的子文件测试: \n",result)

  # (4) 下载指定文件到本地
  ret4 = client.download_to_file('download.txt',b'group1/M00/00/00/Cgpr4WGKKJ2EI7xnAAAAALpwFI4945.txt')
  print("下载指定文件到本地测试: \n",ret4)

  # (5) 从storage中删除指定文件
  ret5 = client.delete_file(b'group1/M00/00/00/Cgpr4WGKJOiAXEJoAABbG2WiyDE715.png')
  print("从storage中删除指定文件: \n",ret5)

  # (6) 列出组中的所有存储服务器信息
  ret6 = client.list_all_groups()
  print("list_all_groups: ",ret6)
  for item in ret6['Groups']:
    print(item)

if __name__ == '__main__':
  main()

执行结果:

# {'Group name': b'group1', 'Remote file_id': b'group1/M00/00/00/Cgpr4WGKJOiAXEJoAABbG2WiyDE715.png', 'Status': 'Upload successed.', 'Local file name': 'code.png', 'Uploaded size': '22.78KB', 'Storage IP': b'10.10.107.226'}

# 下载指定文件到本地测试:
#  {'Remote file_id': b'group1/M00/00/00/Cgpr4WGKKJ2EI7xnAAAAALpwFI4945.txt', 'Content': 'download.txt', 'Download size': '106B', 'Storage IP': b'10.10.107.225'}

# 从storage中删除指定文件:
#  ('Delete file successed.', b'group1/M00/00/00/Cgpr4WGKJOiAXEJoAABbG2WiyDE715.png', b'10.10.107.225')

# list_all_groups:  {'Groups count': 1.0, 'Groups': [<fdfs_client.tracker_client.Group_info object at 0x014AFE30>]}
# Group information:
#         group name = b'group1'
#         total disk space = 95.96GB
#         disk free space = 78.17GB
#         trunk free space = 0MB
#         storage server count = 2
#         storage port = 23000
#         storage HTTP port = 8888
#         active server count = 2
#         current write server index = 1
#         store path count = 1
#         subdir count per path = 256
#         current trunk file id = 0

WeiyiGeek. fastdfs-client-py3使用说明

实践案例之在Flask中使用FastDFS 描述: 此时采用Python中的Flask包创建一个接受网站上传文件和返回上传文件索引id的服务,具体操作流程如下。

Step 1.准备一个前端上传页面,这是一个使用flask模板引擎的html文件

$vim index.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>FastDFS Test</title>
</head>
<body>
    文件上传:<br>
    <form enctype="multipart/form-data" method="post" action="/upload">
        <input type="file" name="_file"/>
        <input type="submit" name="submit">
    </form>
    <br>
    <a href="{{ saved_file }}">{{saved_file_name }}</a>
    <br>
    {{ ret }}
</body>
</html>

Step 2.Flask 的主文件入口以及路由设置。

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

from flask import Flask,render_template,request,send_file
from fdfs_client.client import  Fdfs_client,get_tracker_conf
import io

app = Flask(__name__)

def getFileExt(filename):
    return ('%s' % filename.split('.')[-1]).lower()

@app.route('/')
def index():
    return render_template('index.html')

# (1) 上传,通用下载,适用于图片等公开的文件
@app.route('/upload',methods=['GET','POST'])
def upload():
    # print(request.files['_file'])
    saved_file_name = request.files['_file'].filename
    client = Fdfs_client(get_tracker_conf('client.conf'))
    #二进制存储
    ret = client.upload_by_buffer(request.files['_file'].read(),getFileExt(saved_file_name))
    saved_file = 'http://weiyigeek.top:8888/'+ ret['Remote file_id']
    return render_template('index.html',ret=ret,saved_file=saved_file,saved_file_name=saved_file_name)

# (2) 下载,可在fastfds的storage中的配置文件配置只允许后端访问,适用于非公开的文件
@app.route('/download/',methods=['GET','POST'])
def download():
    file_id = request.args['file_id'].replace('\\\\','/')
    # print(file_id)
    client = Fdfs_client(get_tracker_conf('client.conf'))
    #二进制读取
    saved_file_buffer = client.download_to_buffer(file_id)
    return send_file(io.BytesIO(saved_file_buffer['Content']), attachment_filename=file_id,)


if __name__ == '__main__':
    app.run(debug=True)

0x01 FastDFS 基础命令与配置

1.FastDFS 客户端命令浅析

描述: 在Liunx编译安装好FastDFS后会在/usr/bin/目录存放fastdfs常用的客户端命令。

常用命令:

$ ls /usr/bin/fdfs_* | grep "/"
# (1) 上传一个文件到fastdfs服务器之中。
/usr/bin/fdfs_upload_file /etc/fdfs/client.conf QRcode.png
group1/M00/00/00/Cgpr4WGCfr-AbGHAAABGukk1gY0846.png

# (2) 从Fastdfs服务器之中下载文件到本地
# /usr/bin/fdfs_download_file <config_file> <file_id> [local_filename] [<download_offset> <download_bytes>]
/usr/bin/fdfs_download_file /etc/fdfs/client.conf group1/M00/00/00/Cgpr4WGCfr-AbGHAAABGukk1gY0846.png
$ ls
Cgpr4WGCfr-AbGHAAABGukk1gY0846.png  QRcode.png 

# (3) 查看指定文件在Fastdfs服务器中详细存储信息
/usr/bin/fdfs_file_info /etc/fdfs/client.conf group1/M00/00/00/Cgpr4WGCfr-AbGHAAABGukk1gY0846.png
GET FROM SERVER: false
file type: normal
source storage id: 0
source ip address: 10.10.107.225
file create timestamp: 2021-11-03 20:21:19
file size: 18106
file crc32: 1228243341 (0x4935818d)


# (4) 从Fastdfs服务器之中删除指定的文件
/usr/bin/fdfs_delete_file /etc/fdfs/client.conf group1/M00/00/00/Cgpr4WGCfr-AbGHAAABGukk1gY0846.png
find /home/fdfs/storage/data/ -name "Cgpr4WGCfr-*.png" -exec ls -alh {} \;


# (5) fdfs_upload_appender 表示要上传一个可以追加内容的文件,fdfs_append_file表示可以将内容追加到目标文件尾
echo "hello" > 1.txt
echo "FastDFS" > 2.txt
# 上传一个可以追加内容的文件
/usr/bin/fdfs_upload_appender /etc/fdfs/client.conf 1.txt
# 将内容追加到目标文件 group1/M00/00/00/Cgpr4mGCgE6ERChsAAAAADY6MCA574.txt 尾
/usr/bin/fdfs_append_file /etc/fdfs/client.conf group1/M00/00/00/Cgpr4mGCgE6ERChsAAAAADY6MCA574.txt 2.txt
curl http://10.10.107.226:8888/group1/M00/00/00/Cgpr4mGCgE6ERChsAAAAADY6MCA574.txt
  # hello
  # FastDFS


# (6) 显示当前所有可连接的Tracker Server状态以及相关的存储组信息、以及 Storage Server的管理。
/usr/bin/fdfs_monitor /etc/fdfs/client.conf 
/usr/bin/fdfs_monitor /etc/fdfs/storage.conf delete group2 192.168.56.11


# (7) 指定在FastDFS中 Storage存储的文件的crc32值获取
/usr/bin/fdfs_crc32 /etc/fdfs/client.conf group1/M00/00/00/Cgpr4WGCfr-AbGHAAABGukk1gY0846.png
ef916fcc

# (8) 注意:重新生成的文件将是普通文件!
# /usr/bin/fdfs_regenerate_filename <config_file> <appender_file_id>
/usr/bin/fdfs_regenerate_filename /etc/fdfs/client.conf group1/M00/00/00/Cgpr4mGCgE6ERChsAAAAADY6MCA574.txt
  # group1/M00/00/00/Cgpr4mGCgvqADLYiAAAADu1CHUo574.txt
find /home/fdfs/storage/data/ -name "*574.txt" -exec ls -alh {} \;
  # -rw-r--r-- 1 fdfs fdfs 14 Nov  3 20:28 /home/fdfs/storage/data/00/00/Cgpr4mGCgvqADLYiAAAADu1CHUo574.txt
cat /home/fdfs/storage/data/00/00/Cgpr4mGCgvqADLYiAAAADu1CHUo574.txt
  # hello
  # FastDFS


# (9) fdfs 服务器综合测试工具(Test 阶段)
# Usage: /usr/bin/fdfs_test <config_file> <operation>  # operation: upload, download, getmeta, setmeta, delete and query_servers
# Usage: /usr/bin/fdfs_test1 <config_file> <operation> # operation: upload, download, getmeta, setmeta, delete and query_servers
# # 简单使用
/usr/bin/fdfs_test /etc/fdfs/client.conf query_servers group1 group1/00/00/Cgpr4mGCgvqADLYiAAAADu1CHUo574.txt
# server list (1):
#   10.10.107.225:23000


# (9) fastdfs 向文件追加测试工具
Usage: /usr/bin/fdfs_appender_test <config_file> <local_filename> [FILE | BUFF | CALLBACK]
Usage: /usr/bin/fdfs_appender_test1 <config_file> <local_filename> [FILE | BUFF | CALLBACK]


# (n) FastDFS tracker 与 storage 服务器启动、停止的可执行二进制文件。
/usr/bin/fdfs_trackerd
/usr/bin/fdfs_storaged

2.FastDFS 服务端配置浅析

下述主要针对/etc/fdfs/storage.conf文件进行一个简单的介绍。

# 配置文件是否不生效,false 为生效
disabled = false
# 指定此 storage server 所在组(卷), 如果使用use_storage_id则必须在tracker.conf中设置为true,并且必须正确配置storage_id.conf。
group_name = group1
# 绑定此主机的地址
bind_addr = 10.10.107.225
# 如果在连接到其他服务器(此存储服务器作为客户端)时绑定此主机的地址,则为true,用于绑定由上述参数配置的地址:“bind_addr”为false,用于绑定此主机的任何地址
client_bind = true
# storage server 服务端口
port = 23000
# 连接超时时间
connect_timeout = 5
# 网络传输接收超时时间
network_timeout = 60
# 心跳间隔时间,单位为秒 (这里是指主动向 tracker server 发送心跳)
heart_beat_interval = 30
# 磁盘使用情况报告间隔(秒)default value is 300
stat_report_interval = 60
# Storage 数据和日志目录地址(根目录必须存在,子目录会自动生成) 建议此路径空间大于50GB
base_path = /home/fdfs/storage
# 服务器支持的最大并发连接数,一般设10240
max_connections = 10240
# 从网络接收/发送数据的缓冲区大小,建议使用256KB or 512KB,默认64
buff_size = 256KB
# 接受线程数
accept_threads = 1
# 工作线程数
work_threads = 4
# 如果磁盘读写分离,则对于混合读写为false,对于分离读写为true
disk_rw_separated = true
# 每个存储路径的磁盘读取器线程数,对于混合读/写,此参数可以为0
disk_reader_threads = 1
# 每个存储路径的磁盘写入程序线程数,对于混合读/写,此参数可以为0
disk_writer_threads = 1
# 如果没有要同步的条目,请在X毫秒后再次尝试读取binlog
sync_wait_msec = 50
# 同步文件后,usleep毫秒0连续同步(从不调用usleep)
sync_interval = 0
# 允许系统同步的时间段 (默认是全天) 。一般用于避免高峰同步产生一些问题而设定。
sync_start_time = 00:00
sync_end_time = 23:59
# 同步N个文件后写入标记文件
write_mark_file_freq = 500
# 磁盘恢复线程数
disk_recovery_threads = 3
# 存放文件时 storage server 支持多个路径, 此处配置存放文件的基路径数目,通常只配一个目录。
store_path_count = 1
# 逐一配置 store_path_count 个路径,索引号基于 0。(建议不和base_path设置的路径一样)
store_path0 = /home/fdfs/storage1
# store_path1 = /home/fdfs/storage2
# 存储文件时配置存放文件的目录个数,此处采用了两级目录则会在 store_path 下自动创建 N * N 个存放文件的子目录。
subdir_count_per_path = 256
# tracker_server 的列表 ,会主动连接 tracker_server,有多个 tracker server 时,每个 tracker server 写一行
tracker_server = 10.10.107.225:22122
# 支持域名方式
tracker_server = file.weiyigeek.top:22122
# 支持内网+外网
tracker_server = 10.10.107.227:22122,122.244.141.46:22122
# 日志运行级别
log_level = info
# 运行该程序的用户和组
run_by_group = fdfs
run_by_user =fdfs
# 允许那些主机进行通信连接,主机可以是主机名或ip地址,“*”(只有一个星号)表示匹配所有ip地址
# allow_hosts=10.0.1.[1-15,20] 、 host[01-08,20-25].domain.com 、192.168.5.64/26
allow_hosts = *
# 分发到数据路径的文件的模式
# 0: round robin(default)
# 1: random, distributted by hash code
file_distribute_path_mode = 0
# 当文件分配路径设置为0(循环)时有效,当写入的文件计数达到这个数字时,然后将使用下一个目录进行存储例如`00/00` 满100个时 => `00/01`
file_distribute_rotate_count = 100
# 何时写入大文件时调用fsync到磁盘
# 0: never call fsync
# other: call fsync when written bytes >= this bytes
fsync_after_written_bytes = 0
# 每隔几秒钟将日志buff同步到磁盘
sync_log_buff_interval = 1
# 每隔几秒钟将binlog buff/cache同步到磁盘
sync_binlog_buff_interval = 1
# 每隔几秒钟将存储统计信息同步到磁盘
sync_stat_file_interval = 300
# 线程堆栈大小,应>=512KB
thread_stack_size = 512KB
#作为上载文件的源服务器的优先级,此值越低,其上载优先级越高。选主
upload_priority = 10
# NIC别名前缀,如Linux中的eth,可以通过ifconfig-a查看用逗号分割的多个别名。
if_alias_prefix =
# 如果选中文件复制,当设置为true时,使用FastDHT存储文件索引
## 1 or yes: need check
## 0 or no: do not check
check_file_duplicate = 0
# 用于检查文件副本的文件签名方法
## hash: four 32 bits hash code
## md5: MD5 signature
file_signature_method = hash
# 用于存储文件索引(键值对)的命名空间,当check_file_duplicate为1时,必须设置此项
key_namespace = FastDFS
# 将keep_alive设置为1以启用与FastDHT服务器的持久连接
keep_alive = 0
# 安装设置FastDHT服务器必须开启文件包含
##include /etc/fdht/fdht_servers.conf
# 如果要访问日志,请输入日志
use_access_log = false
# 如果每天旋转访问日志
rotate_access_log = false
# 旋转访问日志时基,时间格式:小时:分钟小时从0到23,分钟从0到59
access_log_rotate_time = 00:00
# 如果使用gzip压缩旧的访问日志
compress_old_access_log = false
# 在前几天压缩访问日志
compress_access_log_days_before = 7
# 如果每天旋转错误日志
rotate_error_log = false
# 旋转错误日志时基,时间格式:小时:分钟
error_log_rotate_time = 00:00
# 如果使用gzip压缩旧的错误日志
compress_old_error_log = false
# 压缩前几天的错误日志
compress_error_log_days_before = 7
# 当日志文件超过此大小时,旋转访问日志,0表示从不按日志文件大小旋转日志文件
rotate_access_log_size = 0
# 当日志文件超过此大小时,旋转错误日志,0表示从不按日志文件大小旋转日志文件
rotate_error_log_size = 0
# 保留日志文件的天数,0表示不删除旧日志文件
log_file_keep_days = 0
# 如果同步文件时跳过无效记录
file_sync_skip_invalid_record = false
# 是否使用连接池
use_connection_pool = true
# 空闲时间超过此时间的连接将被关闭 second
connection_pool_max_idle_time = 3600
# 如果使用gzip压缩binlog文件
compress_binlog = true
# 尝试压缩binlog时间,时间格式:小时:分钟
compress_binlog_time = 01:30
# 检查存储路径的标记以防止混淆。
check_store_path_mark = true
# 如果域名为空,请使用此存储服务器的ip地址,否则此域名将出现在跟踪服务器重定向的url中
http.domain_name = file.weiyigeek.top
# Web 服务端口 (一般与nginx设置相同即可)
http.server_port = 8888