一.go-fastdfs简介

go-fastdfs是一个基于http协议的分布式文件系统,具备高性能、高可靠、无中心、免维护等优点。以下内容抄官网:
特点:

支持curl命令上传
支持浏览器上传
支持HTTP下载
支持多机自动同步
支持断点下载
支持配置自动生成
支持小文件自动合并(减少inode占用)
支持秒传
支持跨域访问
支持一键迁移
支持并行体验
支持断点续传(tus)
支持docker部署
支持自监控告警
支持图片缩放
支持google认证码
支持自定义认证
支持集群文件信息查看
使用通用HTTP协议
无需专用客户端(支持wget,curl等工具)
类fastdfs
高性能 (使用leveldb作为kv库)
高可靠(设计极其简单,使用成熟组件)
无中心设计(所有节点都可以同时读写)

优点:

无依赖(单一文件)
自动同步
失败自动修复
按天分目录方便维护
支持不同的场景
文件自动去重
支持目录自定义
支持保留原文件名
支持自动生成唯一文件名
支持浏览器上传
支持查看集群文件信息
支持集群监控邮件告警
支持小文件自动合并(减少inode占用)
支持秒传
支持图片缩放
支持google认证码
支持自定义认证
支持跨域访问
极低资源开销
支持断点续传(tus)
支持docker部署
支持一键迁移(从其他系统文件系统迁移过来)
支持并行体验(与现有的文件系统并行体验,确认OK再一键迁移)
支持token下载 token=md5(file_md5+timestamp)
运维简单,只有一个角色(不像fastdfs有三个角色Tracker Server,Storage Server,Client),配置自动生成
每个节点对等(简化运维)
所有节点都可以同时读写

二.集群安装(二进制)

2.1下载

如果只是简单的部署,可以直接采用二进制(编绎好的版本)直接安装方式。

  1. 编译好的版本下载1.4.2版本(写文章时这个版本是最新的)
    https://github.com/sjqzhang/go-fastdfs/releases
    在这里插入图片描述

2.2 安装

下载后,得到一个fileserver的二进制文件

##1.建好目录
mkdir gofastdfs

##2.将执行文件移到目录
mv fileserver gofastdfs

##3.修改执行权限
cd gofastdfs
chmod +x fileserver 

##4.运行一下,然后ctrl+c进行取消,目的是生成好配置文件
##开始目录下没有文件的,执行下面的命令后再取消,可以看到目录下的配置文件成功自动生成。
./fileserver

##5.修改集群配置文件
一般主要是修改集群的host和peers参数。
如果是三台机器,修改conf目录下的cfg.json即可,主要是peers配置,配置文件中说得特别清楚.
比如:"peers": ["http://192.168.56.101:8080","http://192.168.56.102:8080","http://192.168.56.103:8080"]
host参数修改成对应的各自服务器的IP,不要以127.0.0.1进行配置.
如果是一台机的单结点环境,不需要修改,其他配置可以看cfg.json,描述说明很细致。

## 6.启动集群
./fileserver &

查看进程是否存在
ps -ef | grep fileserver | grep -v grep

2.3 nginx配置

集群搭建好后,三个服务器都能进行服务,但还是建议通过nginx配置代理,提供统一的出口http地址,方便使用,配置好就可以使用新的统一地址如:http://192.168.56.1:8881/,这个地址提供了一个简单的浏览器测试表单进行文件上传测试

worker_processes  1;
events {
        worker_connections  1024;
}
http {
        include       mime.types;
        default_type  application/html;
        log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';
        access_log  /var/log/nginx/access.log  main;
        error_log  /var/log/nginx/error.log  error;
        sendfile        on;
        keepalive_timeout  65;
        client_max_body_size 0; 
        proxy_redirect ~/big/upload/(.*) /big/upload/$1;  #继点续传一定要设置(注意)
        upstream go-fastdfs {
                server 192.168.56.101:8080;
                server 192.168.56.102:8080;
                server 192.168.56.103:8080;
                ip_hash;     #notice:very important(注意)
        }
        server {
                listen       8881;
                server_name  localhost;
                location / {
                    proxy_set_header Host $host; #notice:very important(注意)
                    proxy_set_header X-Real-IP $remote_addr; #notice:very important(注意)
                    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; #notice:very important(注意)
                    proxy_pass http://go-fastdfs;
                }

        }
}

三、API使用

https://gitee.com/sjqzhang/go-fastdfs/blob/master/doc/api.md
官网文档:https://sjqzhang.github.io/go-fastdfs/#character

Java代码中的例子用到了自己封装的的HttpsKit工具类,带连接池功能,需要的话可以看附件内容。

3.1文件统计

http://192.168.56.1:8881/group1/stat
具体请参阅示例代码(用浏览器访问http://192.168.56.1:8881(nginx访问)或http://192.168.56.101:8080(直接访问,换成具体的ip和端口))

Java代码:

String url = "http://192.168.56.1:8881/group1/stat";
String resultStr = HttpsKit.get(url);
System.out.println(resultStr);
//测试时调用完关闭连接池,正式环境中这一行代码去掉,利用连接池的重复使用特性。
HttpsKit.closeConnectionPool(); 

3.2文件上传

http://192.168.56.1:8881/group1/upload

参数:
file:上传的文件
scene:场景
output:输出
path:自定义路径

具体请参阅示例代码(用浏览器访问http://192.168.56.1:8881(nginx访问)或http://192.168.56.101:8080(直接访问,换成具体的ip和端口))

Java代码:

String url = "http://192.168.56.1:8881/group1/upload";
File sourceFile = new File("D:\\temp\\remotestcpserver_sock5.jar");
FileInputStream fin = new FileInputStream(sourceFile);
String resultStr = HttpsKit.postFormMultipart(url, fin, sourceFile.getName());
fin.close();
System.out.println(resultStr);
//测试时调用完关闭连接池,正式环境中这一行代码去掉,利用连接池的重复使用特性。
HttpsKit.closeConnectionPool(); 

3.3文件秒传

http://192.168.56.1:8881/group1/upload

参数:
md5:文件的摘要
摘要算法要与cfg.json中配置的一样

具体请参阅示例代码(用浏览器访问http://192.168.56.1:8881(nginx访问)或http://192.168.56.101:8080(直接访问,换成具体的ip和端口))
Java代码:

String url = "http://192.168.56.1:8881/group1/upload?md5=cbad55d3e4fd02455e425a193205257b&output=json";
String resultStr = HttpsKit.get(url);
System.out.println(resultStr);
//测试时调用完关闭连接池,正式环境中这一行代码去掉,利用连接池的重复使用特性。
HttpsKit.closeConnectionPool();

3.4文件删除

http://192.168.56.1:8881/group1/delete

参数:
md5:文件的摘要(md5|sha1) 视配置定
path:文件路径
md5与path二选一
说明:md5或path都是上传文件时返回的信息,要以json方式返回才能看到(参阅浏览器上传)
http://192.168.56.1:8881/group1/delete?md5=430a71f5c5e093a105452819cc48cc9c

Java代码:

String url = "http://192.168.56.1:8881/group1/delete?md5=cbad55d3e4fd02455e425a193205257b";
String resultStr = HttpsKit.get(url);
System.out.println(resultStr);
//测试时调用完关闭连接池,正式环境中这一行代码去掉,利用连接池的重复使用特性。
HttpsKit.closeConnectionPool();

3.5文件信息

http://192.168.56.1:8881/group1/get_file_info

参数:
md5:文件的摘要(md5|sha1) 视配置定
path:文件路径
md5与path二选一
说明:md5或path都是上传文件时返回的信息,要以json方式返回才能看到(参阅浏览器上传)
例子:http://192.168.56.1:8881/group1/get_file_info?md5=430a71f5c5e093a105452819cc48cc9c

Java代码:

String url = "http://192.168.56.1:8881/group1/get_file_info?path=files/default/20220223/14/51/2/%E6%94%BF%E5%8A%A1%E9%A1%B9%E7%9B%AE%E4%BA%A4%E4%BB%98%E5%91%A8%E6%8A%A5-20220218.xlsx";
String resultStr = HttpsKit.get(url);
System.out.println(resultStr);
//测试时调用完关闭连接池,正式环境中这一行代码去掉,利用连接池的重复使用特性。
HttpsKit.closeConnectionPool();

3.6文件列表

http://192.168.56.1:8881/group1/list_dir

参数:
dir:要查看文件列表的目录名
例子:http://192.168.56.1:8881/group1/list_dir?dir=default

Java代码:

String url = "http://192.168.56.1:8881/group1/list_dir?dir=default";
String resultStr = HttpsKit.get(url);
System.out.println(resultStr);
//测试时调用完关闭连接池,正式环境中这一行代码去掉,利用连接池的重复使用特性。
HttpsKit.closeConnectionPool();

3.7修复统计信息

http://192.168.56.1:8881/group1/repair_stat

参数:
date:要修复的日期,格式如:20220223
例子:http://192.168.56.1:8881/group1/repair_stat?date=20220223

Java代码:

String url = "http://192.168.56.1:8881/group1/repair_stat?date=20220223";
String resultStr = HttpsKit.get(url);
System.out.println(resultStr);
//测试时调用完关闭连接池,正式环境中这一行代码去掉,利用连接池的重复使用特性。
HttpsKit.closeConnectionPool();

3.8同步失败修复

注:假如集群中有一台使用过程中挂掉了,在挂了的时间段内其他集群会写新的文件,但这台挂了的服务器重新启动后,文件不会自动同步(也有可能还没有到同步的时间,总之重新启动机器需需要进行修复),此时需要调用同步失败修复接口。
http://192.168.56.1:8881/group1/repair

参数:
force:是否强行修复(0|1)
例子:http://192.168.56.1:8881/group1/repair?force=1

Java代码:

String url = "http://192.168.56.1:8881/group1/repair?force=1";
String resultStr = HttpsKit.get(url);
System.out.println(resultStr);
//测试时调用完关闭连接池,正式环境中这一行代码去掉,利用连接池的重复使用特性。
HttpsKit.closeConnectionPool();

四、附件工具(HttpsKit)

pom.xml依赖:

		<!-- httpclient start -->
		<dependency>
			<groupId>org.apache.httpcomponents</groupId>
			<artifactId>httpclient</artifactId>
			<version>4.5.12</version>
		</dependency>
		<dependency>
			<groupId>org.apache.httpcomponents</groupId>
			<artifactId>httpmime</artifactId>
			<version>4.5.12</version>
		</dependency>
		<!-- httpclient end -->
	
		<!-- common-io start -->
		<dependency>
			<groupId>commons-io</groupId>
			<artifactId>commons-io</artifactId>
			<version>2.7</version>
		</dependency>
		<!-- common-io end -->

		<!-- log start -->
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.7.25</version>
		</dependency>

		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-core</artifactId>
			<version>1.2.3</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.2.3</version>
		</dependency>
		<!-- log end -->

源码:

import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.UnknownHostException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;

import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpRequest;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.LayeredConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.entity.mime.content.InputStreamBody;
import org.apache.http.entity.mime.content.StringBody;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.HttpContext;
import org.apache.http.ssl.SSLContextBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpsKit {

	private static Logger logger = LoggerFactory.getLogger(HttpsKit.class);
	private static final int CONNECT_TIMEOUT = 10000;// 设置连接建立的超时时间为10000ms
	private static final int SOCKET_TIMEOUT = 30000; // 多少时间没有数据传输
	private static final int HttpIdelTimeout = 30000;//空闲时间
	private static final int HttpMonitorInterval = 10000;//多久检查一次
	private static final int MAX_CONN = 200; // 最大连接数
	private static final int Max_PRE_ROUTE = 200; //设置到路由的最大连接数,
	private static CloseableHttpClient httpClient; // 发送请求的客户端单例
	private static PoolingHttpClientConnectionManager manager; // 连接池管理类
	private static ScheduledExecutorService monitorExecutor;
	
	private final static Object syncLock = new Object(); // 相当于线程锁,用于线程安全
	
	private static RequestConfig requestConfig = RequestConfig.custom()
			.setConnectionRequestTimeout(CONNECT_TIMEOUT)
			.setConnectTimeout(CONNECT_TIMEOUT)
			.setSocketTimeout(SOCKET_TIMEOUT).build();

	private static CloseableHttpClient getHttpClient() {

		if (httpClient == null) {
			// 多线程下多个线程同时调用getHttpClient容易导致重复创建httpClient对象的问题,所以加上了同步锁
			synchronized (syncLock) {
				if (httpClient == null) {
					
					try {
						httpClient = createHttpClient();
					} catch (KeyManagementException e) {
						logger.error("error",e);
					} catch (NoSuchAlgorithmException e) {
						logger.error("error",e);
					} catch (KeyStoreException e) {
						logger.error("error",e);
					}
					
					// 开启监控线程,对异常和空闲线程进行关闭
					monitorExecutor = Executors.newScheduledThreadPool(1);
					monitorExecutor.scheduleAtFixedRate(new TimerTask() {
						@Override
						public void run() {
							
							// 关闭异常连接
							manager.closeExpiredConnections();
							
							// 关闭5s空闲的连接
							manager.closeIdleConnections(HttpIdelTimeout,TimeUnit.MILLISECONDS);
							
							logger.info(manager.getTotalStats().toString());
							//logger.info("close expired and idle for over "+HttpIdelTimeout+"ms connection");
						}
						
					}, HttpMonitorInterval, HttpMonitorInterval, TimeUnit.MILLISECONDS);
				}
			}
		}
		return httpClient;
	}

	/**
	 * 构建httpclient实例
	 * @return
	 * @throws KeyStoreException 
	 * @throws NoSuchAlgorithmException 
	 * @throws KeyManagementException 
	 */
	private static CloseableHttpClient createHttpClient() throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
		
		SSLContextBuilder builder = new SSLContextBuilder();
        // 全部信任 不做身份鉴定
        builder.loadTrustMaterial(null, new TrustStrategy() {
            @Override
            public boolean isTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
                return true;
            }
        });
		
		ConnectionSocketFactory plainSocketFactory = PlainConnectionSocketFactory.getSocketFactory();
		LayeredConnectionSocketFactory sslSocketFactory = new SSLConnectionSocketFactory(builder.build(), NoopHostnameVerifier.INSTANCE);
		Registry<ConnectionSocketFactory> registry = RegistryBuilder
				.<ConnectionSocketFactory> create()
				.register("http", plainSocketFactory)
				.register("https", sslSocketFactory).build();

		manager = new PoolingHttpClientConnectionManager(registry);
		// 设置连接参数
		manager.setMaxTotal(MAX_CONN); // 最大连接数
		manager.setDefaultMaxPerRoute(Max_PRE_ROUTE); // 路由最大连接数

		// 请求失败时,进行请求重试
		HttpRequestRetryHandler handler = new HttpRequestRetryHandler() {
			
			@Override
			public boolean retryRequest(IOException e, int i,	HttpContext httpContext) {
				
				if (i > 3) {
					// 重试超过3次,放弃请求
					logger.error("retry has more than 3 time, give up request");
					return false;
				}
				if (e instanceof NoHttpResponseException) {
					// 服务器没有响应,可能是服务器断开了连接,应该重试
					logger.error("receive no response from server, retry");
					return true;
				}
				if (e instanceof SSLHandshakeException) {
					// SSL握手异常
					logger.error("SSL hand shake exception");
					return false;
				}
				if (e instanceof InterruptedIOException) {
					// 超时
					logger.error("InterruptedIOException");
					return false;
				}
				if (e instanceof UnknownHostException) {
					// 服务器不可达
					logger.error("server host unknown");
					return false;
				}
				if (e instanceof ConnectTimeoutException) {
					// 连接超时
					logger.error("Connection Time out");
					return false;
				}
				if (e instanceof SSLException) {
					logger.error("SSLException");
					return false;
				}

				HttpClientContext context = HttpClientContext.adapt(httpContext);
				HttpRequest request = context.getRequest();
				
				if (!(request instanceof HttpEntityEnclosingRequest)) {
					// 如果请求不是关闭连接的请求
					return true;
				}
				return false;
			}
		};

		CloseableHttpClient client = HttpClients.custom().setConnectionManager(manager).setRetryHandler(handler).build();
		return client;
	}
	
    public static String get(String url) {
    	return get(url, null);
    }

	public static String get(String url,Map<String,Object> headerParams) {
		
		HttpGet httpGet = new HttpGet(url);
		httpGet.setHeader("User-Agent","Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36");
		httpGet.setConfig(requestConfig);
		
		if(headerParams != null && headerParams.size()>0){
	    	for(String headerName : headerParams.keySet()) {
	    		httpGet.setHeader(headerName,headerParams.get(headerName)+"");
	        }
	    }
		
		CloseableHttpResponse response = null;
		InputStream in = null;

		String result = null;
		
		try {
			
			response = getHttpClient().execute(httpGet,HttpClientContext.create());
			
			HttpEntity entity = response.getEntity();
			if (entity != null) {
				in = entity.getContent();
				result = IOUtils.toString(in, "utf-8");
			}
			
		} catch (IOException e) {
			logger.error("error",e);
		} finally {
			try {
				if (in != null) in.close();
			} catch (IOException e) {
				logger.error("error",e);
			}
			
			try {
				if (response != null) response.close();
			} catch (IOException e) {
				logger.error("error",e);
			}
		}
		
		return result;
	}
	
	//文件上传的通用方法
	public static String postFormMultipart(String url,InputStream fin,String originalFilename) {
		
		HttpPost httppost = new HttpPost(url);
        
        httppost.setConfig(requestConfig);
        InputStreamBody bin = new InputStreamBody(fin, originalFilename);
        
        MultipartEntityBuilder multipartEntityBuilder = MultipartEntityBuilder.create();
        multipartEntityBuilder.addPart("file",bin);
        multipartEntityBuilder.addPart("scene", new StringBody("default",ContentType.TEXT_PLAIN));
        multipartEntityBuilder.addPart("output", new StringBody("json2",ContentType.TEXT_PLAIN));
        HttpEntity reqEntity = multipartEntityBuilder.build();
        
        httppost.setEntity(reqEntity);
		
		CloseableHttpResponse response = null;
		InputStream in = null;
		String result = null;
		try {
			
			response = getHttpClient().execute(httppost,HttpClientContext.create());
			
			HttpEntity entity = response.getEntity();
			if (entity != null) {
				in = entity.getContent();
				result = IOUtils.toString(in, "utf-8");
			}
			
		} catch (IOException e) {
			logger.error("error",e);
		} finally {
			try {
				if (in != null) in.close();
				if (response != null) response.close();
			} catch (IOException e) {
				logger.error("error",e);
			}
		}
		
		return result;
		
	}
	

	/**
	 * 关闭连接池
	 */
	public static void closeConnectionPool() {
		
		if(manager != null) manager.close();
		if(monitorExecutor != null) monitorExecutor.shutdown();
		try {if(httpClient != null) httpClient.close();} catch (IOException e) {logger.error("error",e);}
		
		manager = null;
		monitorExecutor = null;
		httpClient = null;
		
	}

	public static void main(String[] args) throws InterruptedException {
		
		String url = "http://www.baidu.com";
		System.out.println(HttpsKit.get(url));
		//关闭连接池,正式环境中这个不要关闭
		HttpsKit.closeConnectionPool();
		
	}
}
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐