FTP服务器海量小文件迁移备份可行解决方法
迁移环境1、一台linux FTP服务器大约有2000万个小文件,根目录大约有160万个文件夹。需要把FTP文件迁移到HCP对象存储服务器上。总大小大约5T左右。此方案最终效果。1、记录每个文件迁移情况。迁移是否成功是否失败。2、迁移速度每分钟4-5G左右,也就是大约一天左右可以迁移完成。迁移解决方案遍历根目录所有的文件夹,存到数据库,并对每个文件编一个主键ID;写程序多线程迁移;...
迁移环境
1、一台linux FTP服务器大约有2000万个小文件,根目录大约有160万个文件夹。需要把FTP文件迁移到HCP对象存储服务器上。总大小大约5T左右。
此方案最终效果。
1、记录每个文件迁移情况。迁移是否成功是否失败。
2、迁移速度每分钟4-5G左右,也就是大约一天左右可以迁移完成。
3、没有文件数量越大处理越慢的问题,2亿个文件处理仍然会是这个速度
迁移解决方案
- 遍历根目录所有的文件夹,存到数据库,并对每个文件编一个主键ID;
- 写程序多线程迁移;
迁移遇到的问题以及解决方案:
1、根目录有160万个文件夹,java 程序 遍历会出现内存堆栈溢出的情况,所以直接用以下命令,当然列出目录的命令有好几种,这一种是最好后续处理的。
ls -F | grep '/$' >/mnt/dir.tx
ls -F | grep '/$' >/mnt/file.txt
得到的格式如下
bank/
Desktop/
develop/
Documents/
用notepad++ 批量处理一下就好了。然后倒入数据库。
另外win下获取目录或者文件的方式分别如下
dir /B /a:d >directory.txt
dir /B /a:a >file.txt
此方案的JVAVA代码:
1、采用的多线程的方式
首先是调用多线程的main 函数,这地方根据需要自动定义线程个数
public static void main(String[] agrs) throws Exception {
System.out.print("-------------------");
Task1[] task = new Task1[2];
for (int i = 0; i < 1; i++) {
task[i] = new Task1(i);
task[i].start();
}
}
2、线程函数
package test;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import com.hitachivantara.hcp.common.ex.HCPException;
import com.hitachivantara.hcp.standard.body.HCPStandardClient;
import ctais.business.yxcx.htcobject.HCPClients;
import ctais.business.yxcx.htcobject.HcpObjectUtil;
import ctais.services.data.DataWindow;
import ctais.util.StringEx;
public class MultipleMigrationToHCP extends Thread {
// 线程个数
private int number;
// HCP链接信息,敏感信息,我就用* 代替,
String namespace = "*******";
String endpoint = "********";
String accessKey = "Z*****";
String secretKey = "7b76b3245********c5e8c9eab";
HCPStandardClient hcpClient = null;
FTPClient FTP = null;// 全局FTP
//这里是初始化HCP对象的,需要另外的类,这里就不再上传,相信做HCP对象存存储的都有
public MultipleMigrationToHCP(int number) throws HCPException {
this.number = number;
hcpClient = HCPClients.getInstance().getHCPClient(endpoint, namespace, accessKey, secretKey);
}
//多线程
public void run() {
try {
// 多线程迁移跟文件数据
// genwjdatamigration(this.number);
// 多线程迁移正常数据
// datamigration5(this.number);
// 迁移目录
zmluqy(this.number);
// 多系统核查
// hcpCheck(this.number);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 导文件 功能描述: <br>
* 〈功能详细描述〉
*
* @throws Exception
* @Author: zhangzdc
* @Date: 2018-11-26 下午4:29:42
* @see [相关类/方法](可选)
* @since [产品/模块版本](可选)
*/
public void genwjdatamigration(int thread) throws Exception {
String sql = "select wjmc,id from zhouqwa_YX_QYwj t where t.qyzt is null";
DataWindow dw = DataWindow.dynamicCreate(sql, true);
dw.retrieve();
long totalRowCount = dw.getRowCount();// 调用dw公共方法,取得总记录数
// FTP 形式
FTPClient ftp = new FTPClient();
ftp.setDefaultTimeout(30000); // 设置默认超时时间
ftp.setDataTimeout(30000); // 设置数据超时时间
ftp.connect("0000.0000.20.95");
// 如果采用默认端口,可以使用ftp.connect(host)的方式直接连接FTP服务器
ftp.login("tax-arch", "TpArc=953");// 登录
ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
int reply = ftp.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
ftp.disconnect();
}
InputStream inputstream = null;
StringBuffer filepath = new StringBuffer();
StringBuffer key = new StringBuffer();
for (int i = 0; i < totalRowCount; i++) {
try {
String mlmc = StringEx.sNull(dw.getItemAny(i, "wjmc"));
String id = StringEx.sNull(dw.getItemAny(i, "ID"));
// 赋值
filepath.delete(0, filepath.length());
filepath.append("/" + mlmc);
key.delete(0, key.length());
key.append("/dshistory/" + mlmc.replace("\\", "/"));
// 判断是否重复
if (hcpClient.doesObjectExist(key.toString())) {
updategenwjdatamigration(id, thread, 1);
continue;
}
inputstream = ftp.retrieveFileStream(filepath.toString());
hcpClient.putObject(key.toString(), inputstream);
inputstream.close();
ftp.completePendingCommand();
updategenwjdatamigration(id, thread, 1);
} catch (Exception e) {
System.out.println(e.getMessage());
System.out.println("错误");
continue;
} finally {
if (inputstream != null) {
inputstream.close();
}
}
}
}
public static void updategenwjdatamigration(String id, int thread, int rowss) throws Exception {
System.out.println("第" + thread + "线程+++++++++++++++++====" + id);
String sql = "update zhouqwa_YX_QYwj t set t.qyzt = 'Y',t.qyrq=sysdate where t.id = '" + id + "'";
DataWindow dw = DataWindow.dynamicCreate(sql);
dw.update(false);
}
public void datamigration5(int thread) throws Exception {
for (int aa = 0; aa < 1; aa++) {
// System.out.println("此线程" + thread + "第" + aa + "次第Id范围" + (aa *
// 10000 + thread * 80000) + "到" + ((aa + 1) * 10000 + thread *
// 80000));
// String sql =
// "select mlmc,id,sfqx from zhouqwa_yx_qyml t where sfqx is null and sfmu is null and "
// + (aa * 10000 + thread * 80000)
// + "<t.id and t.id<=" + ((aa + 1) * 10000 + thread * 80000) + "";
System.out.println("此线程" + thread + "第" + aa + "次第Id范围" + (thread * 3000 + 1600000) + "到" + ((thread + 1) * 3000 + 1600000));
String sql = "select mlmc,id,sfqx from zhouqwa_yx_qyml t where sfqx is null and sfmu is null ";
// continue;
DataWindow dw = DataWindow.dynamicCreate(sql, true);
dw.retrieve();
long totalRowCount = dw.getRowCount();// 调用dw公共方法,取得总记录数
// FTP 形式
FTPClient ftp = new FTPClient();
ftp.setDefaultTimeout(30000); // 设置默认超时时间
ftp.setDataTimeout(30000); // 设置数据超时时间
ftp.connect("140.24.20.95");
// 如果采用默认端口,可以使用ftp.connect(host)的方式直接连接FTP服务器
ftp.login("tax-arch", "TpArc=953");// 登录
ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
int reply = ftp.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
ftp.disconnect();
}
InputStream inputstream = null;
StringBuffer filepath = new StringBuffer();
StringBuffer key = new StringBuffer();
for (int i = 0; i < totalRowCount; i++) {
try {
String mlmc = StringEx.sNull(dw.getItemAny(i, "MLMC"));
String id = StringEx.sNull(dw.getItemAny(i, "ID"));
FTPFile[] remoteFiles = ftp.listFiles("/" + mlmc);
String[] files = new String[remoteFiles.length];
if (remoteFiles != null) {
for (int k = 0; k < remoteFiles.length; k++) {
if (remoteFiles[k] == null)
continue;
// 判断目录是不是子目录,如果是子目录,如果是目录则暂时跳过
if (remoteFiles[k].isDirectory()) {
// 更新目录文件
gxzmuuxx(id, mlmc);
continue;
}
files[k] = remoteFiles[k].getName();
}
}
for (int j = 0; j < files.length; j++) {
if (files[j] == null || files[j] == "") {
continue;
}
// 赋值
filepath.delete(0, filepath.length());
filepath.append("/" + mlmc + files[j]);
key.delete(0, key.length());
key.append("/dshistory/" + mlmc + files[j].replace("\\", "/"));
// 判断是否重复
if (hcpClient.doesObjectExist(key.toString())) {
System.out.println("cunzai" + j);
continue;
}
inputstream = ftp.retrieveFileStream(filepath.toString());
hcpClient.putObject(key.toString(), inputstream);
inputstream.close();
ftp.completePendingCommand();
}
updateendmuxx(id, thread, aa);
} catch (Exception e) {
System.out.println(e.getMessage());
System.out.println("错误");
continue;
} finally {
if (inputstream != null) {
inputstream.close();
}
}
}
}
}
/**
* 更新完成目录信息 功能描述: <br>
* 〈功能详细描述〉
*
* @param id
* @throws Exception
* Author: zhouqwa Date: 2018年11月18日 下午5:02:24 History:
*/
public static void updateendmuxx(String id, int thread, int i) throws Exception {
System.out.println("线程ID" + thread + "ID 为" + id);
String sql = "update zhouqwa_yx_qyml t set t.sfqx = 'Y',t.qyrq=sysdate where t.id = '" + id + "'";
DataWindow dw = DataWindow.dynamicCreate(sql);
dw.update(false);
}
public static void gxzmuuxx(String id, String mlmc) throws Exception {
System.out.println("" + id + ":" + mlmc + "有子目录+++++++++++++++");
String sql = "update zhouqwa_yx_qyml t set t.sfmu='Y' where t.id = '" + id + "'";
DataWindow dw = DataWindow.dynamicCreate(sql);
dw.update(false);
}
/**
* 更新目录文件 1、更新目录信息 2、更新完成西信息
*/
public static void updatemuxx(String id) throws Exception {
String sql = "update yx_qyml t set t.sfmu= 'Y' where t.id = '" + id + "'";
DataWindow dw = DataWindow.dynamicCreate(sql);
dw.update(false);
}
/**
* 数据迁移验证 功能描述: <br>
* 〈功能详细描述〉 Author: zhouqwa Date: 2018年11月15日 上午9:59:14 History:
*
* @throws HCPException
*/
public void hcpCheck(int thread) throws HCPException {
// System.out.println("此线程" + thread + "--Id范围" + (10000000 + thread *
// 1000000) + "到---"
// + ((99 + 1) * 11000 + 10000000 + thread * 1000000));
for (int aa = 0; aa < 1; aa++) {
// System.out.println("此线程" + thread + "--Id范围" + (aa * 11000 +
// 10000000 + thread * 1000000) + "到"
// + ((aa + 1) * 11000 + 10000000 + thread * 1000000));
try {
String namespace = "dzdanamespace";
String endpoint = "dzdatenant.osd.qdsw.tax";
String accessKey = "******";
String secretKey = "7b76b3245906ee27c5e8c9eab";
HCPStandardClient hcpClient = HCPClients.getInstance().getHCPClient(endpoint, namespace, accessKey, secretKey);
// String sql = "select t.id,t.filepath,t.sfhd,t.sfhd from ko_tmp_yx_syxx_ftppath_ds t where sfhd is null and hdcg is null and "
// + (aa * 11000 + 10000000 + thread * 1000000) + "<t.id and t.id<=" + ((aa + 1) * 11000 + 10000000 + thread * 1000000) + "";
String sql = "select t.id,t.filepath,t.sfhd,t.sfhd from ko_tmp_yx_syxx_ftppath_ds t where sfhd is null and hdcg is null ";
DataWindow dw = DataWindow.dynamicCreate(sql, true);
dw.retrieve();
long totalRowCount = dw.getRowCount();// 调用dw公共方法,取得总记录数
// 判断当前页和总页数的比例
for (int i = 0; i < totalRowCount; i++) {
// 取出数据进行HCP上传,并更新
String id = StringEx.sNull(dw.getItemAny(i, "ID"));
String filepath = StringEx.sNull(dw.getItemAny(i, "FILEPATH"));
System.out.print("----" + filepath);
// 根据文件路径,定义上传HCP文件
if (hcpClient.doesObjectExist("/dshistory" + filepath)) {
// 更新完后更新数据库
updatehdwjxx(id, i, true);
} else {
updatehdwjxx(id, i, false);
}
}
} catch (Exception e) {
e.printStackTrace();
continue;
}
}
}
public static void updatehdwjxx(String id, int i, boolean result) throws Exception {
// 判断是否更新成功
if (i % 10 == 0) {
System.out.println("处理++++++++++++++++++++++" + id + "状态:" + result);
}
String sql = "";
if (result) {
sql = "update ko_tmp_yx_syxx_ftppath_ds t set t.hdcg = 'Y' ,t.sfhd = 'Y' where t.id = '" + id + "'";
} else {
sql = "update ko_tmp_yx_syxx_ftppath_ds t set t.sfhd= 'Y' where t.id = '" + id + "'";
}
DataWindow dw = DataWindow.dynamicCreate(sql);
dw.update(false);
}
/**
* 上传到HCP中 1、测试上传 2、只测试验证
*/
public static String hcpResult(String filepath, String exec) {
String s1 = filepath.substring(filepath.lastIndexOf("\\") + 1);
boolean result = false;
String rtn = "1:";
try {
// 测试上传方法
if ("1".equals(exec)) {
String key = "test/" + s1 + "";
result = HcpObjectUtil.putObjectToHCP(filepath, key);
} else {
result = HcpObjectUtil.hcpCheck(filepath);
}
} catch (Exception e) {
rtn = "2:" + e.getMessage();
e.printStackTrace();
}
return rtn;
}
/**
* 更新文件信息数据库
*
* @throws Exception
*/
public static void updateYzWjxx(String syzj, String cjlsh, boolean result) throws Exception {
// 判断是否更新成功
String sql = "";
if (result) {
sql = "update YX_WJXXTEST t set t.YZYXBZ = 'Y' where t.sy_zj='" + syzj + "' and t.YX_CJLSH = '" + cjlsh + "'";
} else {
sql = "update YX_WJXXTEST t set t.YZYXBZ= 'N' where t.sy_zj='" + syzj + "' and t.YX_CJLSH = '" + cjlsh + "'";
}
DataWindow dw = DataWindow.dynamicCreate(sql);
dw.update(false);
}
/**
* 子目录迁移 功能描述: <br>
* 〈功能详细描述〉
*
* @throws Exception
* @Author: zhangzdc
* @Date: 2018-11-28 下午2:58:09
* @see [相关类/方法](可选)
* @since [产品/模块版本](可选)
*/
public void zmluqy(int rows) {
System.out.println("线程++++++++++++" + rows + "从" + rows * 1 + "到" + (rows + 1) * 1);
// String sql = " select * from ZHOUQWA_YX_zml t where t.id <" + ((rows
// + 1) * 1000) + " and t.id>" + rows * 1000
// + " and t.sfcg is null and t.zmlcs is null";
String sql = " select * from (select t.mlmc,id,rownum id2 from zhouqwa_yx_zml t where t.sfcg is null and t.zmlcs is null)aa where aa.id2>="
+ rows * 1 + "and aa.id2<" + (rows + 1) * 1;
DataWindow dw;
boolean flag = false;
try {
dw = DataWindow.dynamicCreate(sql, true);
dw.retrieve();
long totalRowCount = dw.getRowCount();// 调用dw公共方法,取得总记录数
for (int i = 0; i < totalRowCount; i++) {
FTPClient ftp = new FTPClient();
ftp.setDefaultTimeout(30000); // 设置默认超时时间
ftp.setDataTimeout(30000); // 设置数据超时时间
ftp.connect("140.24.20.95");
// 如果采用默认端口,可以使用ftp.connect(host)的方式直接连接FTP服务器
ftp.login("tax-arch", "TpArc=953");// 登录
ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
int reply = ftp.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
ftp.disconnect();
}
FTP = ftp;
String mlmc = StringEx.sNull(dw.getItemAny(i, "MLMC"));
String id = StringEx.sNull(dw.getItemAny(i, "ID"));
flag = mlqx("/errorfiles/" + mlmc + "/", id, 0, rows);
updatexx(id, flag,rows);
ftp.disconnect();
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void updatexx(String id, boolean flag, int rows) throws Exception {
// 判断是否更新成功
String sql = "";
if (flag) {
sql = "update ZHOUQWA_YX_zml t set t.sfcg = 'Y' where t.id='" + id + "'";
DataWindow dw = DataWindow.dynamicCreate(sql);
dw.update(false);
}
}
private void updateten(String id) throws Exception {
String sql = "update ZHOUQWA_YX_zml t set t.zmlcs = '大于10' where t.id='" + id + "'";
DataWindow dw = DataWindow.dynamicCreate(sql);
dw.update(false);
}
private boolean mlqx(String mlmc, String id, int level, int thread) {
// 判断是不是目录,如果是目录直接迁移
if (level++ > 10) {
System.out.println("超过实10层++++++");
try {
updateten(id);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return false;
}
// 递归
try {
FTPFile[] remoteFiles = this.FTP.listFiles(mlmc);
if (remoteFiles != null) {
boolean[] flag = new boolean[remoteFiles.length];
for (int k = 0; k < remoteFiles.length; k++) {
if (remoteFiles[k] == null)
continue;
// 判断目录是不是子目录,如果是子目录,如果是目录则暂时跳过
if (remoteFiles[k].isDirectory()) {
flag[k] = mlqx(mlmc + remoteFiles[k].getName() + "/", id, level, thread);
} else {
// 上传文件
flag[k] = uploadFromFTPTOHCP(mlmc + remoteFiles[k].getName(), thread);
}
}
// 便利这个文件是否成功
for (boolean a : flag) {
if (a == false) {
return false;
}
}
}
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* 上传文件 功能描述: <br>
* 〈功能详细描述〉
*
* @return
* @Author: zhangzdc
* @Date: 2018-12-3 上午10:29:42
* @see [相关类/方法](可选)
* @since [产品/模块版本](可选)
*/
private boolean uploadFromFTPTOHCP(String filepath, int thread) {
// 路径截取
StringBuffer key = new StringBuffer();
key.append("/dshistory/" + filepath.substring(12));
InputStream inputstream = null;
boolean flag;
try{
if (hcpClient.doesObjectExist(key.toString())) {
System.out.println("线程" + thread + "已经存在" + filepath.substring(12));
return true;
}
inputstream = FTP.retrieveFileStream(filepath.toString());
if (inputstream == null) {
flag = false;
} else {
hcpClient.putObject(key.toString(), inputstream);
// System.out.println("-----------------------一张");
flag = true;
}
inputstream.close();
this.FTP.completePendingCommand();
} catch (Exception e) {
System.out.println("++++++++++++++++++++++又错了");
System.out.println(e.getMessage());
flag = false;
} finally {
if (inputstream != null) {
try {
inputstream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
return flag;
}
}
此方案好处
1、不怕断电,每当处理完一个文件夹,我就在数据库里记录下。下次在处理可以排除已经处理成功的文件。不会从头开始处理
2、不递归,处理速度快,不怕卡死等问题,不论多少文件,处理速度几乎恒定不变,不存在文件数量越大,处理越慢的问题。
最后说一点
不要迷信工具,我们用的日立的HCP云存储,他给的官方工具500万个几乎没大小的文件迁移了一天也没迁移完。500万个文件愣是给我搜索出了1000多万。最后几乎卡的就不动了
更多推荐
所有评论(0)