Java优雅使用连接池连接SFTP进行文件上传下载 解决请求量大问题

使用FTP连接池降低资源消耗,提高响应速率

为什么要使用连接池连接SFTP呢?

在Java中使用连接池来连接SFTP(Secure File Transfer Protocol)工具的原因主要与性能、资源管理和效率有关。以下是一些关键原因:

  1. 资源管理: 创建和销毁线程是一个相对昂贵的操作,涉及操作系统资源的分配和释放。当有大量短暂的任务(如上传或下载文件到SFTP服务器)需要执行时,频繁地创建和销毁线程会显著增加系统的开销。线程池允许预先创建一定数量的线程并复用它们,从而减少这种开销。
  2. 并发控制: 线程池可以限制同时运行的线程数,这有助于防止系统过载。例如,如果你的应用程序尝试同时发起大量的SFTP连接,这可能会耗尽服务器的资源,导致其他服务受到影响。通过线程池,你可以设置最大线程数,确保系统在可控的负载下运行。
  3. 响应时间: 当任务到达时,线程池中的空闲线程可以立即处理任务,而不需要等待新线程的创建。这减少了任务的响应时间,并提高了整体的吞吐量。
  4. 易于管理: 线程池提供了一种机制来管理线程的生命周期,包括启动、终止以及任务的排队。这对于维护和调试应用程序非常有用,因为你可以控制线程的数量和行为。
  5. 异步操作: 使用线程池进行SFTP操作可以实现异步文件传输,这意味着你可以在等待文件传输完成的同时继续执行其他任务,提高了应用程序的整体效率。
  6. 扩展性: 线程池可以动态调整线程数量以适应不同的负载情况,这使得系统能够更好地应对变化的工作负载。

在Spring Boot这样的框架中,通常会推荐使用线程池来处理SFTP连接,因为框架本身支持配置线程池参数,如核心线程数、最大线程数、超时时间等,这使得SFTP操作更加高效且易于集成到整个应用架构中。

例如,使用Spring Boot的@Async注解,你可以指定一个任务应该异步执行,并由框架管理的线程池来处理,这样可以确保SFTP操作不会阻塞应用程序的主线程。

使用

引入Maven依赖

<!-- apache ftp支持 -->
<dependency>
  <groupId>commons-net</groupId>
  <artifactId>commons-net</artifactId>
  <version>3.8.0</version>
</dependency>
<!-- apache 连接池支持 -->
<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-pool2</artifactId>
  <version>2.6.0</version>
</dependency>

FTP配置文件

ftp:
    # 服务器地址
    host: xx.xxx.xx.xxx
    # 端口号
    port: 21
    # 用户名
    userName: xxx
    # 密码
    password: xxxxxxx
    # 工作目录
    workingDirectory: /root
    # 编码
    encoding: utf-8
    #被动模式
    passiveMode: true
    #连接超时时间
    clientTimeout: 30000
    # 线程数
    threaNum: 1
    # 0=ASCII_FILE_TYPE(ASCII格式),1=EBCDIC_FILE_TYPE,2=LOCAL_FILE_TYPE(二进制文件)
    transferFileType: 2
    # 是否重命名
    renameUploaded: true
    # 重新连接时间
    retryTimes: 1200
    # 缓存大小
    bufferSize: 8192
    # 最大数
    maxTotal: 10
    # 最小空闲
    minldle: 10
    # 最大空闲
    maxldle: 50
    # 最大等待时间
    maxWait: 30000
    # 池对象耗尽之后是否阻塞,maxWait < 0 时一直等待
    blockWhenExhausted: true
    # 取对象时验证
    testOnBorrow: true
    # 回收验证
    testOnReturn: true
    # 创建时验证
    testOnCreate: true
    # 空闲验证
    testWhileldle: false
    # 后进先出
    lifo: false

FTP配置类


import lombok.Data;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * @author wuzhenyong
 * ClassName:FtpConfig.java
 * date:2024-07-13 18:08
 * Description: FTP配置类
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "ftp")
public class FtpConfig extends GenericObjectPoolConfig<FTPClient> {
    /**
     * FTP服务器地址
     */
    private String host;

    /**
     * FTP服务器端口
     */
    private Integer port;

    /**
     * FTP用户名
     */
    private String userName;

    /**
     * FTP密码
     */
    private String password;

    /**
     * FTP服务器根目录
     */
    private String workingDirectory;

    /**
     * 传输编码
     */
    String encoding;

    /**
     * 被动模式:在这种模式下,数据连接是由客户程序发起的
     */
    boolean passiveMode;

    /**
     * 连接超时时间
     */
    int clientTimeout;

    /**
     * 线程数
     */
    int threaNum;
    /**
     * 0=ASCII_FILE_TYPE(ASCII格式),1=EBCDIC_FILE_TYPE,2=LOCAL_FILE_TYPE(二进制文件)
     */
    int transferFileType;

    /**
     * 是否重命名
     */
    boolean renameUploaded;

    /**
     * 重新连接时间
     */
    int retryTimes;

    /**
     * 缓存大小
     */
    int bufferSize;

    /**
     * 最大数
     */
    int maxTotal;

    /**
     * 最小空闲
     */
    int minldle;

    /**
     * 最大空闲
     */
    int maxldle;

    /**
     * 最大等待时间
     */
    int maxWait;
    /**
     *  池对象耗尽之后是否阻塞,maxWait < 0 时一直等待
     */
    boolean blockWhenExhausted;
    /**
     * 取对象时验证
     */
    boolean testOnBorrow;
    /**
     * 回收验证
     */
    boolean testOnReturn;
    /**
     * 创建时验证
     */
    boolean testOnCreate;
    /**
     * 空闲验证
     */
    boolean testWhileldle;
    /**
     * 后进先出
     */
    boolean lifo;
}

创建FTP连接工厂


import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author wuzhenyong
 * ClassName:FTPClientFactory.java
 * date:2024-07-13 18:12
 * Description: FTP连接工厂
 */
@Component
@Slf4j
public class FTPClientFactory implements PooledObjectFactory<FTPClient> {
    /**
     * 注入 ftp 连接配置
     */
    @Autowired
    FtpConfig config;

    /**
     * 创建连接到池中
     *
     * @return {@link PooledObject}<{@link FTPClient}>
     * @throws Exception 异常
     */
    @Override
    public PooledObject<FTPClient> makeObject() throws Exception {
        FTPClient ftpClient = new FTPClient();
        ftpClient.setConnectTimeout(config.getClientTimeout());
        ftpClient.connect(config.getHost(), config.getPort());
        int reply = ftpClient.getReplyCode();
        if (!FTPReply.isPositiveCompletion(reply)) {
            ftpClient.disconnect();
            return null;
        }
        boolean success;
        if (StringUtils.isBlank(config.getUserName())) {
            success = ftpClient.login("anonymous", "anonymous");
        } else {
            success = ftpClient.login(config.getUserName(), config.getPassword());
        }
        if (!success) {
            return null;
        }
        ftpClient.setFileType(config.getTransferFileType());
        ftpClient.setBufferSize(1024);
        ftpClient.setControlEncoding(config.getEncoding());
        if (config.isPassiveMode()) {
            ftpClient.enterLocalPassiveMode();
        }
        log.debug("创建ftp连接");
        return new DefaultPooledObject<>(ftpClient);
    }

    /**
     * 链接状态检查
     *
     * @param pool 线程池
     * @return boolean
     */
    @Override
    public boolean validateObject(PooledObject<FTPClient> pool) {
        FTPClient ftpClient = pool.getObject();
        try {
            return ftpClient != null && ftpClient.sendNoOp();
        } catch (Exception e) {
            return false;
        }
    }

    /**
     * 销毁连接,当连接池空闲数量达到上限时,调用此方法销毁连接
     *
     * @param pool 线程池
     * @throws Exception 异常
     */
    @Override
    public void destroyObject(PooledObject<FTPClient> pool) throws Exception {
        FTPClient ftpClient = pool.getObject();
        if (ftpClient != null) {
            try {
                ftpClient.disconnect();
                log.debug("销毁ftp连接");
            } catch (Exception e) {
                log.error("销毁ftpClient异常,error:", e.getMessage());
            }
        }
    }

    /**
     * 钝化连接,是连接变为可用状态
     *
     * @param p 线程池
     * @throws Exception 异常
     */
    @Override
    public void passivateObject(PooledObject<FTPClient> p) throws Exception{
        FTPClient ftpClient = p.getObject();
        try {
            ftpClient.changeWorkingDirectory(config.getWorkingDirectory());
            ftpClient.logout();
            if (ftpClient.isConnected()) {
                ftpClient.disconnect();
            }
        } catch (Exception e) {
            throw new RuntimeException("Could not disconnect from server.", e);
        }
    }

    /**
     * 初始化连接
     *
     * @param pool 线程池
     * @throws Exception 异常
     */
    @Override
    public void activateObject(PooledObject<FTPClient> pool) throws Exception {
        FTPClient ftpClient = pool.getObject();
        ftpClient.connect(config.getHost(),config.getPort());
        ftpClient.login(config.getUserName(), config.getPassword());
        ftpClient.setControlEncoding(config.getEncoding());
        ftpClient.changeWorkingDirectory(config.getWorkingDirectory());
        //设置上传文件类型为二进制,否则将无法打开文件
        ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
    }

    /**
     * 获取 FTP 连接配置
     *
     * @return {@link FtpConfig}
     */
    public FtpConfig getConfig(){
        return config;
    }
}

创建操作连接FTP相关API接口

import org.apache.commons.net.ftp.FTPClient;
/**
 * @author wuzhenyong
 * ClassName:FtpPoolService.java
 * date:2024-07-13 18:14
 * Description:
 */
public interface FtpPoolService {
    /**
     * 获取ftpClient
     *
     * @return {@link FTPClient}
     */
    FTPClient borrowObject();

    /**
     * 归还ftpClient
     *
     * @param ftpClient ftp客户端
     */
    void returnObject(FTPClient ftpClient);

    /**
     * 获取 ftp 配置信息
     *
     * @return {@link FtpConfig}
     */
    FtpConfig getFtpPoolConfig();
}

创建操作连接FTP相关API接口实现类

import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author wuzhenyong
 * ClassName:FtpPoolServiceImpl.java
 * date:2024-07-13 18:19
 * Description:
 */
@Component
@Slf4j
public class FtpPoolServiceImpl implements FtpPoolService{
    /**
     * ftp 连接池生成
     */
    private GenericObjectPool<FTPClient> pool;

    /**
     * ftp 客户端配置文件
     */
    @Autowired
    private FtpConfig config;

    /**
     * ftp 客户端工厂
     */
    @Autowired
    private FTPClientFactory factory;

    /**
     * 初始化线程池
     */
    @PostConstruct
    private void initPool () {
        this.pool = new GenericObjectPool<FTPClient>(this.factory, this.config);
    }

    /**
     * 获取ftpClient
     *
     * @return {@link FTPClient}
     */
    @Override
    public FTPClient borrowObject() {
        if (this.pool != null) {
            try {
                return this.pool.borrowObject();
            } catch (Exception e) {
                log.error("获取 FTPClient 失败 ", e);
            }
        }
        return null;
    }

    /**
     * 归还ftpClient
     *
     * @param ftpClient ftp客户端
     */
    @Override
    public void returnObject(FTPClient ftpClient) {
        if (this.pool != null && ftpClient != null) {
            this.pool.returnObject(ftpClient);
        }
    }

    /**
     * 获取 ftp 配置信息
     *
     * @return {@link FtpConfig}
     */
    @Override
    public FtpConfig getFtpPoolConfig() {
        return config;
    }
}

使用方法

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;

/**
 * @author wuzhenyong
 * ClassName:FtpUtil.java
 * date:2024-07-13 18:24
 * Description:
 */
@Component
@Slf4j
public class FtpUtil {
    /**
     * ftp 连接池
     */
    @Autowired
    private FtpPoolService ftpPoolService;

    public static final String DIR_SPLIT = "/";


    /**
     * 上传文件
     *
     * @param sourcePath 上传目录
     * @throws Exception 异常
     */
    public void upload(String sourcePath) throws Exception {
        // 从连接池获取客户端
        FTPClient ftpClient = ftpPoolService.borrowObject();
        ftpClient.enterLocalPassiveMode();
        File file = new File(sourcePath);
        if (!file.exists() || !file.isFile()) {
            return;
        }

        // 中文目录处理存在问题, 转化为ftp能够识别中文的字符集
        String remotePath;
        try {
            remotePath = new String(file.getName().getBytes(StandardCharsets.UTF_8), FTP.DEFAULT_CONTROL_ENCODING);
        } catch (UnsupportedEncodingException e) {
            remotePath = file.getName();
        }

        try (
                InputStream inputStream = new FileInputStream(file);
                OutputStream outputStream = ftpClient.storeFileStream(remotePath);
        ) {
            byte[] buffer = new byte[2048];
            int length;
            while ((length = inputStream.read(buffer)) != -1) {
                outputStream.write(buffer, 0, length);
                outputStream.flush();
            }
            // 关闭流之后必须执行,否则下一个文件导致流为空
            boolean complete = ftpClient.completePendingCommand();
            if (complete) {
                log.info("文件{}上传完成", remotePath);
            } else {
                log.error("文件{}上传失败", remotePath);
            }
        } catch (Exception e) {
            log.error("文件上传异常", e);
        } finally {
            // 释放客户端资源
            ftpPoolService.returnObject(ftpClient);
        }
    }


    /**
     * 文件下载
     *
     * @param ftpClient ftp连接客户端
     * @param ftpPath   ftp文件路径
     */
    public  byte[] download(FTPClient ftpClient,
                                   String ftpPath,
                                   String ftpFileName
    ) throws Exception {
        if (ftpClient == null || ftpPath == null ) {
            return null;
        }
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

        boolean isChange = ftpClient.changeWorkingDirectory(ftpPath);
        log.info("文件地址{}监测:{}", ftpPath, isChange);
        log.info("文件名称{}", ftpFileName);
        //第三步:下载指定文件
        ftpClient.setFileType(FTPClient.BINARY_FILE_TYPE); //文件类型
        boolean isRetrieve = ftpClient.retrieveFile(new String(ftpFileName.getBytes(StandardCharsets.UTF_8), FTP.DEFAULT_CONTROL_ENCODING), outputStream);

        // 关闭流之后必须执行,否则下一个文件导致流为空
        // boolean complete = ftpClient.completePendingCommand();
        if (isRetrieve) {
            // log.info("文件{}下载完成", ftpPath);
        } else {
            throw new RuntimeException("下载失败"+ftpPath);
        };
        return outputStream.toByteArray();
    }

    /**
     * 下载文件到本地 
     *
     * @param ftpPath     FTP服务器文件目录 
     * @param ftpFileName 文件名称 
     * @param localPath   下载后的文件路径 
     * @return boolean
     */
    public boolean download(String ftpPath, String ftpFileName, String localPath) {
        FTPClient ftpClient = ftpPoolService.borrowObject();
        OutputStream outputStream = null;
        try {
            FTPFile[] ftpFiles = ftpClient.listFiles(ftpPath, file -> file.isFile() && file.getName().equals(ftpFileName));
            if (ftpFiles != null && ftpFiles.length > 0) {
                FTPFile ftpFile = ftpFiles[0];
                File localFile = new File(localPath + DIR_SPLIT + ftpFile.getName());
                // 判断本地路径目录是否存在,不存在则创建
                if (!localFile.getParentFile().exists()) {
                    localFile.getParentFile().mkdirs();
                }
                outputStream = Files.newOutputStream(localFile.toPath());
                ftpClient.retrieveFile(ftpFile.getName(), outputStream);

                log.info("fileName:{},size:{}", ftpFile.getName(), ftpFile.getSize());
                log.info("下载文件成功...");
                return true;
            } else {
                log.info("文件不存在,filePathname:{},", ftpPath + DIR_SPLIT + ftpFileName);
            }
        } catch (Exception e) {
            log.error("下载文件失败...",e);
        } finally {
            try {
                outputStream.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            ftpPoolService.returnObject(ftpClient);
        }
        return false;
    }
}
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐