Java优雅使用连接池连接SFTP进行文件上传下载 解决请求量大问题
# Java优雅使用连接池连接SFTP进行文件上传下载 解决请求量大问题> 使用FTP连接池降低资源消耗,提高响应速率## 为什么要使用连接池连接SFTP呢?在Java中使用连接池来连接SFTP(Secure File Transfer Protocol)工具的原因主要与性能、资源管理和效率有关。以下是一些关键原因:
·
Java优雅使用连接池连接SFTP进行文件上传下载 解决请求量大问题
使用FTP连接池降低资源消耗,提高响应速率
为什么要使用连接池连接SFTP呢?
在Java中使用连接池来连接SFTP(Secure File Transfer Protocol)工具的原因主要与性能、资源管理和效率有关。以下是一些关键原因:
- 资源管理: 创建和销毁线程是一个相对昂贵的操作,涉及操作系统资源的分配和释放。当有大量短暂的任务(如上传或下载文件到SFTP服务器)需要执行时,频繁地创建和销毁线程会显著增加系统的开销。线程池允许预先创建一定数量的线程并复用它们,从而减少这种开销。
- 并发控制: 线程池可以限制同时运行的线程数,这有助于防止系统过载。例如,如果你的应用程序尝试同时发起大量的SFTP连接,这可能会耗尽服务器的资源,导致其他服务受到影响。通过线程池,你可以设置最大线程数,确保系统在可控的负载下运行。
- 响应时间: 当任务到达时,线程池中的空闲线程可以立即处理任务,而不需要等待新线程的创建。这减少了任务的响应时间,并提高了整体的吞吐量。
- 易于管理: 线程池提供了一种机制来管理线程的生命周期,包括启动、终止以及任务的排队。这对于维护和调试应用程序非常有用,因为你可以控制线程的数量和行为。
- 异步操作: 使用线程池进行SFTP操作可以实现异步文件传输,这意味着你可以在等待文件传输完成的同时继续执行其他任务,提高了应用程序的整体效率。
- 扩展性: 线程池可以动态调整线程数量以适应不同的负载情况,这使得系统能够更好地应对变化的工作负载。
在Spring Boot这样的框架中,通常会推荐使用线程池来处理SFTP连接,因为框架本身支持配置线程池参数,如核心线程数、最大线程数、超时时间等,这使得SFTP操作更加高效且易于集成到整个应用架构中。
例如,使用Spring Boot的@Async
注解,你可以指定一个任务应该异步执行,并由框架管理的线程池来处理,这样可以确保SFTP操作不会阻塞应用程序的主线程。
使用
引入Maven依赖
<!-- apache ftp支持 -->
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.8.0</version>
</dependency>
<!-- apache 连接池支持 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.0</version>
</dependency>
FTP配置文件
ftp:
# 服务器地址
host: xx.xxx.xx.xxx
# 端口号
port: 21
# 用户名
userName: xxx
# 密码
password: xxxxxxx
# 工作目录
workingDirectory: /root
# 编码
encoding: utf-8
#被动模式
passiveMode: true
#连接超时时间
clientTimeout: 30000
# 线程数
threaNum: 1
# 0=ASCII_FILE_TYPE(ASCII格式),1=EBCDIC_FILE_TYPE,2=LOCAL_FILE_TYPE(二进制文件)
transferFileType: 2
# 是否重命名
renameUploaded: true
# 重新连接时间
retryTimes: 1200
# 缓存大小
bufferSize: 8192
# 最大数
maxTotal: 10
# 最小空闲
minldle: 10
# 最大空闲
maxldle: 50
# 最大等待时间
maxWait: 30000
# 池对象耗尽之后是否阻塞,maxWait < 0 时一直等待
blockWhenExhausted: true
# 取对象时验证
testOnBorrow: true
# 回收验证
testOnReturn: true
# 创建时验证
testOnCreate: true
# 空闲验证
testWhileldle: false
# 后进先出
lifo: false
FTP配置类
import lombok.Data;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* @author wuzhenyong
* ClassName:FtpConfig.java
* date:2024-07-13 18:08
* Description: FTP配置类
*/
@Data
@Configuration
@ConfigurationProperties(prefix = "ftp")
public class FtpConfig extends GenericObjectPoolConfig<FTPClient> {
/**
* FTP服务器地址
*/
private String host;
/**
* FTP服务器端口
*/
private Integer port;
/**
* FTP用户名
*/
private String userName;
/**
* FTP密码
*/
private String password;
/**
* FTP服务器根目录
*/
private String workingDirectory;
/**
* 传输编码
*/
String encoding;
/**
* 被动模式:在这种模式下,数据连接是由客户程序发起的
*/
boolean passiveMode;
/**
* 连接超时时间
*/
int clientTimeout;
/**
* 线程数
*/
int threaNum;
/**
* 0=ASCII_FILE_TYPE(ASCII格式),1=EBCDIC_FILE_TYPE,2=LOCAL_FILE_TYPE(二进制文件)
*/
int transferFileType;
/**
* 是否重命名
*/
boolean renameUploaded;
/**
* 重新连接时间
*/
int retryTimes;
/**
* 缓存大小
*/
int bufferSize;
/**
* 最大数
*/
int maxTotal;
/**
* 最小空闲
*/
int minldle;
/**
* 最大空闲
*/
int maxldle;
/**
* 最大等待时间
*/
int maxWait;
/**
* 池对象耗尽之后是否阻塞,maxWait < 0 时一直等待
*/
boolean blockWhenExhausted;
/**
* 取对象时验证
*/
boolean testOnBorrow;
/**
* 回收验证
*/
boolean testOnReturn;
/**
* 创建时验证
*/
boolean testOnCreate;
/**
* 空闲验证
*/
boolean testWhileldle;
/**
* 后进先出
*/
boolean lifo;
}
创建FTP连接工厂
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author wuzhenyong
* ClassName:FTPClientFactory.java
* date:2024-07-13 18:12
* Description: FTP连接工厂
*/
@Component
@Slf4j
public class FTPClientFactory implements PooledObjectFactory<FTPClient> {
/**
* 注入 ftp 连接配置
*/
@Autowired
FtpConfig config;
/**
* 创建连接到池中
*
* @return {@link PooledObject}<{@link FTPClient}>
* @throws Exception 异常
*/
@Override
public PooledObject<FTPClient> makeObject() throws Exception {
FTPClient ftpClient = new FTPClient();
ftpClient.setConnectTimeout(config.getClientTimeout());
ftpClient.connect(config.getHost(), config.getPort());
int reply = ftpClient.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
ftpClient.disconnect();
return null;
}
boolean success;
if (StringUtils.isBlank(config.getUserName())) {
success = ftpClient.login("anonymous", "anonymous");
} else {
success = ftpClient.login(config.getUserName(), config.getPassword());
}
if (!success) {
return null;
}
ftpClient.setFileType(config.getTransferFileType());
ftpClient.setBufferSize(1024);
ftpClient.setControlEncoding(config.getEncoding());
if (config.isPassiveMode()) {
ftpClient.enterLocalPassiveMode();
}
log.debug("创建ftp连接");
return new DefaultPooledObject<>(ftpClient);
}
/**
* 链接状态检查
*
* @param pool 线程池
* @return boolean
*/
@Override
public boolean validateObject(PooledObject<FTPClient> pool) {
FTPClient ftpClient = pool.getObject();
try {
return ftpClient != null && ftpClient.sendNoOp();
} catch (Exception e) {
return false;
}
}
/**
* 销毁连接,当连接池空闲数量达到上限时,调用此方法销毁连接
*
* @param pool 线程池
* @throws Exception 异常
*/
@Override
public void destroyObject(PooledObject<FTPClient> pool) throws Exception {
FTPClient ftpClient = pool.getObject();
if (ftpClient != null) {
try {
ftpClient.disconnect();
log.debug("销毁ftp连接");
} catch (Exception e) {
log.error("销毁ftpClient异常,error:", e.getMessage());
}
}
}
/**
* 钝化连接,是连接变为可用状态
*
* @param p 线程池
* @throws Exception 异常
*/
@Override
public void passivateObject(PooledObject<FTPClient> p) throws Exception{
FTPClient ftpClient = p.getObject();
try {
ftpClient.changeWorkingDirectory(config.getWorkingDirectory());
ftpClient.logout();
if (ftpClient.isConnected()) {
ftpClient.disconnect();
}
} catch (Exception e) {
throw new RuntimeException("Could not disconnect from server.", e);
}
}
/**
* 初始化连接
*
* @param pool 线程池
* @throws Exception 异常
*/
@Override
public void activateObject(PooledObject<FTPClient> pool) throws Exception {
FTPClient ftpClient = pool.getObject();
ftpClient.connect(config.getHost(),config.getPort());
ftpClient.login(config.getUserName(), config.getPassword());
ftpClient.setControlEncoding(config.getEncoding());
ftpClient.changeWorkingDirectory(config.getWorkingDirectory());
//设置上传文件类型为二进制,否则将无法打开文件
ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
}
/**
* 获取 FTP 连接配置
*
* @return {@link FtpConfig}
*/
public FtpConfig getConfig(){
return config;
}
}
创建操作连接FTP相关API接口
import org.apache.commons.net.ftp.FTPClient;
/**
* @author wuzhenyong
* ClassName:FtpPoolService.java
* date:2024-07-13 18:14
* Description:
*/
public interface FtpPoolService {
/**
* 获取ftpClient
*
* @return {@link FTPClient}
*/
FTPClient borrowObject();
/**
* 归还ftpClient
*
* @param ftpClient ftp客户端
*/
void returnObject(FTPClient ftpClient);
/**
* 获取 ftp 配置信息
*
* @return {@link FtpConfig}
*/
FtpConfig getFtpPoolConfig();
}
创建操作连接FTP相关API接口实现类
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author wuzhenyong
* ClassName:FtpPoolServiceImpl.java
* date:2024-07-13 18:19
* Description:
*/
@Component
@Slf4j
public class FtpPoolServiceImpl implements FtpPoolService{
/**
* ftp 连接池生成
*/
private GenericObjectPool<FTPClient> pool;
/**
* ftp 客户端配置文件
*/
@Autowired
private FtpConfig config;
/**
* ftp 客户端工厂
*/
@Autowired
private FTPClientFactory factory;
/**
* 初始化线程池
*/
@PostConstruct
private void initPool () {
this.pool = new GenericObjectPool<FTPClient>(this.factory, this.config);
}
/**
* 获取ftpClient
*
* @return {@link FTPClient}
*/
@Override
public FTPClient borrowObject() {
if (this.pool != null) {
try {
return this.pool.borrowObject();
} catch (Exception e) {
log.error("获取 FTPClient 失败 ", e);
}
}
return null;
}
/**
* 归还ftpClient
*
* @param ftpClient ftp客户端
*/
@Override
public void returnObject(FTPClient ftpClient) {
if (this.pool != null && ftpClient != null) {
this.pool.returnObject(ftpClient);
}
}
/**
* 获取 ftp 配置信息
*
* @return {@link FtpConfig}
*/
@Override
public FtpConfig getFtpPoolConfig() {
return config;
}
}
使用方法
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
/**
* @author wuzhenyong
* ClassName:FtpUtil.java
* date:2024-07-13 18:24
* Description:
*/
@Component
@Slf4j
public class FtpUtil {
/**
* ftp 连接池
*/
@Autowired
private FtpPoolService ftpPoolService;
public static final String DIR_SPLIT = "/";
/**
* 上传文件
*
* @param sourcePath 上传目录
* @throws Exception 异常
*/
public void upload(String sourcePath) throws Exception {
// 从连接池获取客户端
FTPClient ftpClient = ftpPoolService.borrowObject();
ftpClient.enterLocalPassiveMode();
File file = new File(sourcePath);
if (!file.exists() || !file.isFile()) {
return;
}
// 中文目录处理存在问题, 转化为ftp能够识别中文的字符集
String remotePath;
try {
remotePath = new String(file.getName().getBytes(StandardCharsets.UTF_8), FTP.DEFAULT_CONTROL_ENCODING);
} catch (UnsupportedEncodingException e) {
remotePath = file.getName();
}
try (
InputStream inputStream = new FileInputStream(file);
OutputStream outputStream = ftpClient.storeFileStream(remotePath);
) {
byte[] buffer = new byte[2048];
int length;
while ((length = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, length);
outputStream.flush();
}
// 关闭流之后必须执行,否则下一个文件导致流为空
boolean complete = ftpClient.completePendingCommand();
if (complete) {
log.info("文件{}上传完成", remotePath);
} else {
log.error("文件{}上传失败", remotePath);
}
} catch (Exception e) {
log.error("文件上传异常", e);
} finally {
// 释放客户端资源
ftpPoolService.returnObject(ftpClient);
}
}
/**
* 文件下载
*
* @param ftpClient ftp连接客户端
* @param ftpPath ftp文件路径
*/
public byte[] download(FTPClient ftpClient,
String ftpPath,
String ftpFileName
) throws Exception {
if (ftpClient == null || ftpPath == null ) {
return null;
}
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
boolean isChange = ftpClient.changeWorkingDirectory(ftpPath);
log.info("文件地址{}监测:{}", ftpPath, isChange);
log.info("文件名称{}", ftpFileName);
//第三步:下载指定文件
ftpClient.setFileType(FTPClient.BINARY_FILE_TYPE); //文件类型
boolean isRetrieve = ftpClient.retrieveFile(new String(ftpFileName.getBytes(StandardCharsets.UTF_8), FTP.DEFAULT_CONTROL_ENCODING), outputStream);
// 关闭流之后必须执行,否则下一个文件导致流为空
// boolean complete = ftpClient.completePendingCommand();
if (isRetrieve) {
// log.info("文件{}下载完成", ftpPath);
} else {
throw new RuntimeException("下载失败"+ftpPath);
};
return outputStream.toByteArray();
}
/**
* 下载文件到本地
*
* @param ftpPath FTP服务器文件目录
* @param ftpFileName 文件名称
* @param localPath 下载后的文件路径
* @return boolean
*/
public boolean download(String ftpPath, String ftpFileName, String localPath) {
FTPClient ftpClient = ftpPoolService.borrowObject();
OutputStream outputStream = null;
try {
FTPFile[] ftpFiles = ftpClient.listFiles(ftpPath, file -> file.isFile() && file.getName().equals(ftpFileName));
if (ftpFiles != null && ftpFiles.length > 0) {
FTPFile ftpFile = ftpFiles[0];
File localFile = new File(localPath + DIR_SPLIT + ftpFile.getName());
// 判断本地路径目录是否存在,不存在则创建
if (!localFile.getParentFile().exists()) {
localFile.getParentFile().mkdirs();
}
outputStream = Files.newOutputStream(localFile.toPath());
ftpClient.retrieveFile(ftpFile.getName(), outputStream);
log.info("fileName:{},size:{}", ftpFile.getName(), ftpFile.getSize());
log.info("下载文件成功...");
return true;
} else {
log.info("文件不存在,filePathname:{},", ftpPath + DIR_SPLIT + ftpFileName);
}
} catch (Exception e) {
log.error("下载文件失败...",e);
} finally {
try {
outputStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
ftpPoolService.returnObject(ftpClient);
}
return false;
}
}
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
已为社区贡献7条内容
所有评论(0)