迁移环境

1、一台linux FTP服务器大约有2000万个小文件,根目录大约有160万个文件夹。需要把FTP文件迁移到HCP对象存储服务器上。总大小大约5T左右。

此方案最终效果。

1、记录每个文件迁移情况。迁移是否成功是否失败。
2、迁移速度每分钟4-5G左右,也就是大约一天左右可以迁移完成。
3、没有文件数量越大处理越慢的问题,2亿个文件处理仍然会是这个速度

迁移解决方案

  1. 遍历根目录所有的文件夹,存到数据库,并对每个文件编一个主键ID;
  2. 写程序多线程迁移;

迁移遇到的问题以及解决方案:

1、根目录有160万个文件夹,java 程序 遍历会出现内存堆栈溢出的情况,所以直接用以下命令,当然列出目录的命令有好几种,这一种是最好后续处理的。

  ls -F | grep '/$'  >/mnt/dir.tx
  ls -F | grep '/$' >/mnt/file.txt

得到的格式如下

    bank/
  Desktop/
  develop/
 Documents/

用notepad++ 批量处理一下就好了。然后倒入数据库。
另外win下获取目录或者文件的方式分别如下

dir /B /a:d >directory.txt
dir /B /a:a >file.txt

此方案的JVAVA代码:

1、采用的多线程的方式
首先是调用多线程的main 函数,这地方根据需要自动定义线程个数

public static void main(String[] agrs) throws Exception {
		System.out.print("-------------------");
		Task1[] task = new Task1[2];
		for (int i = 0; i < 1; i++) {
			task[i] = new Task1(i);
			task[i].start();
		}
	}

2、线程函数

package test;

import java.io.IOException;
import java.io.InputStream;

import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;

import com.hitachivantara.hcp.common.ex.HCPException;
import com.hitachivantara.hcp.standard.body.HCPStandardClient;

import ctais.business.yxcx.htcobject.HCPClients;
import ctais.business.yxcx.htcobject.HcpObjectUtil;
import ctais.services.data.DataWindow;
import ctais.util.StringEx;

public class MultipleMigrationToHCP extends Thread {

	// 线程个数
	private int number;
	// HCP链接信息,敏感信息,我就用* 代替,
	String namespace = "*******";
	String endpoint = "********";
	String accessKey = "Z*****";
	String secretKey = "7b76b3245********c5e8c9eab";
	HCPStandardClient hcpClient = null;
	FTPClient FTP = null;// 全局FTP

    //这里是初始化HCP对象的,需要另外的类,这里就不再上传,相信做HCP对象存存储的都有
	public MultipleMigrationToHCP(int number) throws HCPException {
		this.number = number;
		hcpClient = HCPClients.getInstance().getHCPClient(endpoint, namespace, accessKey, secretKey);
	}
	
	//多线程
	public void run() {
		try {
			// 多线程迁移跟文件数据
			// genwjdatamigration(this.number);

			// 多线程迁移正常数据
			// datamigration5(this.number);

			// 迁移目录
			zmluqy(this.number);

			// 多系统核查
			// hcpCheck(this.number);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	/**
	 * 导文件 功能描述: <br>
	 * 〈功能详细描述〉
	 * 
	 * @throws Exception
	 * @Author: zhangzdc
	 * @Date: 2018-11-26 下午4:29:42
	 * @see [相关类/方法](可选)
	 * @since [产品/模块版本](可选)
	 */
	public void genwjdatamigration(int thread) throws Exception {
	    	String sql = "select wjmc,id from zhouqwa_YX_QYwj t where t.qyzt is null";
			DataWindow dw = DataWindow.dynamicCreate(sql, true);
			dw.retrieve();
		long totalRowCount = dw.getRowCount();// 调用dw公共方法,取得总记录数

		// FTP 形式
			FTPClient ftp = new FTPClient();
		ftp.setDefaultTimeout(30000); // 设置默认超时时间
		ftp.setDataTimeout(30000); // 设置数据超时时间
			ftp.connect("0000.0000.20.95");
		// 如果采用默认端口,可以使用ftp.connect(host)的方式直接连接FTP服务器
		ftp.login("tax-arch", "TpArc=953");// 登录
			ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
			int reply = ftp.getReplyCode();
			if (!FTPReply.isPositiveCompletion(reply)) {
				ftp.disconnect();
			}

			InputStream inputstream = null;
			StringBuffer filepath = new StringBuffer();
			StringBuffer key = new StringBuffer();
			for (int i = 0; i < totalRowCount; i++) {
				try {
					String mlmc = StringEx.sNull(dw.getItemAny(i, "wjmc"));
					String id = StringEx.sNull(dw.getItemAny(i, "ID"));
				// 赋值
					filepath.delete(0, filepath.length());
					filepath.append("/" + mlmc);
					key.delete(0, key.length());
				key.append("/dshistory/" + mlmc.replace("\\", "/"));

				// 判断是否重复
					if (hcpClient.doesObjectExist(key.toString())) {
					updategenwjdatamigration(id, thread, 1);
						continue;
					}

					inputstream = ftp.retrieveFileStream(filepath.toString());
					hcpClient.putObject(key.toString(), inputstream);
					inputstream.close();
					ftp.completePendingCommand();
					updategenwjdatamigration(id, thread, 1);
				} catch (Exception e) {
					System.out.println(e.getMessage());
				System.out.println("错误");
					continue;
				} finally {
					if (inputstream != null) {
						inputstream.close();
					}
				}
			}
	}

	public static void updategenwjdatamigration(String id, int thread, int rowss) throws Exception {
		System.out.println("第" + thread + "线程+++++++++++++++++====" + id);
		String sql = "update zhouqwa_YX_QYwj t set t.qyzt = 'Y',t.qyrq=sysdate where  t.id = '" + id + "'";
		DataWindow dw = DataWindow.dynamicCreate(sql);
		dw.update(false);
	}


	public void datamigration5(int thread) throws Exception {

		for (int aa = 0; aa < 1; aa++) {
			// System.out.println("此线程" + thread + "第" + aa + "次第Id范围" + (aa *
			// 10000 + thread * 80000) + "到" + ((aa + 1) * 10000 + thread *
			// 80000));
			// String sql =
			// "select mlmc,id,sfqx from zhouqwa_yx_qyml t where sfqx is null and sfmu is null and "
			// + (aa * 10000 + thread * 80000)
			// + "<t.id and  t.id<=" + ((aa + 1) * 10000 + thread * 80000) + "";

			System.out.println("此线程" + thread + "第" + aa + "次第Id范围" + (thread * 3000 + 1600000) + "到" + ((thread + 1) * 3000 + 1600000));
			String sql = "select mlmc,id,sfqx from zhouqwa_yx_qyml t where sfqx is null and sfmu is null ";
			// continue;
			DataWindow dw = DataWindow.dynamicCreate(sql, true);
			dw.retrieve();
			long totalRowCount = dw.getRowCount();// 调用dw公共方法,取得总记录数

			// FTP 形式
			FTPClient ftp = new FTPClient();
			ftp.setDefaultTimeout(30000); // 设置默认超时时间
			ftp.setDataTimeout(30000); // 设置数据超时时间
			ftp.connect("140.24.20.95");
			// 如果采用默认端口,可以使用ftp.connect(host)的方式直接连接FTP服务器
			ftp.login("tax-arch", "TpArc=953");// 登录
			ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
			int reply = ftp.getReplyCode();
			if (!FTPReply.isPositiveCompletion(reply)) {
				ftp.disconnect();
			}

			InputStream inputstream = null;
			StringBuffer filepath = new StringBuffer();
			StringBuffer key = new StringBuffer();
			for (int i = 0; i < totalRowCount; i++) {
				try {
					String mlmc = StringEx.sNull(dw.getItemAny(i, "MLMC"));
					String id = StringEx.sNull(dw.getItemAny(i, "ID"));

					FTPFile[] remoteFiles = ftp.listFiles("/" + mlmc);
					String[] files = new String[remoteFiles.length];
					if (remoteFiles != null) {
						for (int k = 0; k < remoteFiles.length; k++) {
							if (remoteFiles[k] == null)
								continue;
							// 判断目录是不是子目录,如果是子目录,如果是目录则暂时跳过
							if (remoteFiles[k].isDirectory()) {
								// 更新目录文件
								gxzmuuxx(id, mlmc);
								continue;
							}
							files[k] = remoteFiles[k].getName();
						}
					}

					for (int j = 0; j < files.length; j++) {

						if (files[j] == null || files[j] == "") {
							continue;
						}

						// 赋值
						filepath.delete(0, filepath.length());
						filepath.append("/" + mlmc + files[j]);
						key.delete(0, key.length());
						key.append("/dshistory/" + mlmc + files[j].replace("\\", "/"));

						// 判断是否重复
						if (hcpClient.doesObjectExist(key.toString())) {
							System.out.println("cunzai" + j);
							continue;
						}
						inputstream = ftp.retrieveFileStream(filepath.toString());
						hcpClient.putObject(key.toString(), inputstream);
						inputstream.close();
						ftp.completePendingCommand();
					}
					updateendmuxx(id, thread, aa);
				} catch (Exception e) {
					System.out.println(e.getMessage());
					System.out.println("错误");
					continue;
				} finally {
					if (inputstream != null) {
						inputstream.close();
					}
				}
			}

		}
	}
	

	/**
	 * 更新完成目录信息 功能描述: <br>
	 * 〈功能详细描述〉
	 * 
	 * @param id
	 * @throws Exception
	 *             Author: zhouqwa Date: 2018年11月18日 下午5:02:24 History:
	 */
	public static void updateendmuxx(String id, int thread, int i) throws Exception {
		System.out.println("线程ID" + thread + "ID 为" + id);
		String sql = "update zhouqwa_yx_qyml t set t.sfqx = 'Y',t.qyrq=sysdate where  t.id = '" + id + "'";
		DataWindow dw = DataWindow.dynamicCreate(sql);
		dw.update(false);
	}
	
	public static void gxzmuuxx(String id, String mlmc) throws Exception {
		System.out.println("" + id + ":" + mlmc + "有子目录+++++++++++++++");
		String sql = "update zhouqwa_yx_qyml t set t.sfmu='Y' where  t.id = '" + id + "'";
		DataWindow dw = DataWindow.dynamicCreate(sql);
		dw.update(false);
	}


	/**
	 * 更新目录文件 1、更新目录信息 2、更新完成西信息
	 */
	public static void updatemuxx(String id) throws Exception {
		String sql = "update yx_qyml t set t.sfmu= 'Y' where  t.id = '" + id + "'";
		DataWindow dw = DataWindow.dynamicCreate(sql);
		dw.update(false);
	}

	/**
	 * 数据迁移验证 功能描述: <br>
	 * 〈功能详细描述〉 Author: zhouqwa Date: 2018年11月15日 上午9:59:14 History:
	 * 
	 * @throws HCPException
	 */
	public void hcpCheck(int thread) throws HCPException {

		// System.out.println("此线程" + thread + "--Id范围" + (10000000 + thread *
		// 1000000) + "到---"
		// + ((99 + 1) * 11000 + 10000000 + thread * 1000000));
		for (int aa = 0; aa < 1; aa++) {
			// System.out.println("此线程" + thread + "--Id范围" + (aa * 11000 +
			// 10000000 + thread * 1000000) + "到"
			// + ((aa + 1) * 11000 + 10000000 + thread * 1000000));

			try {
				String namespace = "dzdanamespace";
				String endpoint = "dzdatenant.osd.qdsw.tax";
				String accessKey = "******";
				String secretKey = "7b76b3245906ee27c5e8c9eab";
				HCPStandardClient hcpClient = HCPClients.getInstance().getHCPClient(endpoint, namespace, accessKey, secretKey);
//				String sql = "select t.id,t.filepath,t.sfhd,t.sfhd from ko_tmp_yx_syxx_ftppath_ds t where sfhd is null and hdcg is null and "
//						+ (aa * 11000 + 10000000 + thread * 1000000) + "<t.id and  t.id<=" + ((aa + 1) * 11000 + 10000000 + thread * 1000000) + "";

				String sql = "select t.id,t.filepath,t.sfhd,t.sfhd from ko_tmp_yx_syxx_ftppath_ds t where sfhd is null and hdcg is null ";
				DataWindow dw = DataWindow.dynamicCreate(sql, true);
				dw.retrieve();
				long totalRowCount = dw.getRowCount();// 调用dw公共方法,取得总记录数

				// 判断当前页和总页数的比例
				for (int i = 0; i < totalRowCount; i++) {
					// 取出数据进行HCP上传,并更新
					String id = StringEx.sNull(dw.getItemAny(i, "ID"));
					String filepath = StringEx.sNull(dw.getItemAny(i, "FILEPATH"));
					System.out.print("----" + filepath);
					// 根据文件路径,定义上传HCP文件
					if (hcpClient.doesObjectExist("/dshistory" + filepath)) {
						// 更新完后更新数据库
						updatehdwjxx(id, i, true);
					} else {
						updatehdwjxx(id, i, false);
					}
				}
			} catch (Exception e) {
				e.printStackTrace();
				continue;
			}
		}
	}

	public static void updatehdwjxx(String id, int i, boolean result) throws Exception {
		// 判断是否更新成功
		if (i % 10 == 0) {
			System.out.println("处理++++++++++++++++++++++" + id + "状态:" + result);
		}
		String sql = "";
		if (result) {
			sql = "update ko_tmp_yx_syxx_ftppath_ds t set t.hdcg = 'Y' ,t.sfhd = 'Y' where t.id = '" + id + "'";
		} else {
			sql = "update ko_tmp_yx_syxx_ftppath_ds t set t.sfhd= 'Y' where t.id = '" + id + "'";
		}
		DataWindow dw = DataWindow.dynamicCreate(sql);
		dw.update(false);
	}

	/**
	 * 上传到HCP中 1、测试上传 2、只测试验证
	 */
	public static String hcpResult(String filepath, String exec) {
		String s1 = filepath.substring(filepath.lastIndexOf("\\") + 1);
		boolean result = false;
		String rtn = "1:";
		try {
			// 测试上传方法
			if ("1".equals(exec)) {
				String key = "test/" + s1 + "";
				result = HcpObjectUtil.putObjectToHCP(filepath, key);
			} else {
				result = HcpObjectUtil.hcpCheck(filepath);
			}
		} catch (Exception e) {
			rtn = "2:" + e.getMessage();
			e.printStackTrace();
		}
		return rtn;
	}

	/**
	 * 更新文件信息数据库
	 * 
	 * @throws Exception
	 */
	public static void updateYzWjxx(String syzj, String cjlsh, boolean result) throws Exception {
		// 判断是否更新成功
		String sql = "";
		if (result) {
			sql = "update YX_WJXXTEST t set t.YZYXBZ = 'Y' where t.sy_zj='" + syzj + "' and t.YX_CJLSH = '" + cjlsh + "'";
		} else {
			sql = "update YX_WJXXTEST t set t.YZYXBZ= 'N' where t.sy_zj='" + syzj + "' and t.YX_CJLSH = '" + cjlsh + "'";
		}
		DataWindow dw = DataWindow.dynamicCreate(sql);
		dw.update(false);
	}
	
	/**
	 * 子目录迁移 功能描述: <br>
	 * 〈功能详细描述〉
	 * 
	 * @throws Exception
	 * @Author: zhangzdc
	 * @Date: 2018-11-28 下午2:58:09
	 * @see [相关类/方法](可选)
	 * @since [产品/模块版本](可选)
	 */
	public void zmluqy(int rows) {
		System.out.println("线程++++++++++++" + rows + "从" + rows * 1 + "到" + (rows + 1) * 1);
		// String sql = " select * from ZHOUQWA_YX_zml t where t.id <" + ((rows
		// + 1) * 1000) + " and t.id>" + rows * 1000
		// + " and t.sfcg is null and t.zmlcs is null";
		String sql = " select * from (select t.mlmc,id,rownum id2 from zhouqwa_yx_zml t where t.sfcg is null and t.zmlcs is null)aa where aa.id2>="
				+ rows * 1 + "and aa.id2<" + (rows + 1) * 1;
		DataWindow dw;
		boolean flag = false;
		try {
			dw = DataWindow.dynamicCreate(sql, true);
			dw.retrieve();
			long totalRowCount = dw.getRowCount();// 调用dw公共方法,取得总记录数
			for (int i = 0; i < totalRowCount; i++) {
				FTPClient ftp = new FTPClient();
				ftp.setDefaultTimeout(30000); // 设置默认超时时间
				ftp.setDataTimeout(30000); // 设置数据超时时间
				ftp.connect("140.24.20.95");
				// 如果采用默认端口,可以使用ftp.connect(host)的方式直接连接FTP服务器
				ftp.login("tax-arch", "TpArc=953");// 登录
				ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
				int reply = ftp.getReplyCode();
				if (!FTPReply.isPositiveCompletion(reply)) {
					ftp.disconnect();
				}
				FTP = ftp;

				String mlmc = StringEx.sNull(dw.getItemAny(i, "MLMC"));
				String id = StringEx.sNull(dw.getItemAny(i, "ID"));
				flag = mlqx("/errorfiles/" + mlmc + "/", id, 0, rows);
				updatexx(id, flag,rows);
				ftp.disconnect();
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	private void updatexx(String id, boolean flag, int rows) throws Exception {
		// 判断是否更新成功
		String sql = "";
		if (flag) {
			sql = "update ZHOUQWA_YX_zml t set t.sfcg = 'Y' where t.id='" + id + "'";
			DataWindow dw = DataWindow.dynamicCreate(sql);
			dw.update(false);
		}
	}
	
	private void updateten(String id) throws Exception {
		String sql = "update ZHOUQWA_YX_zml t set t.zmlcs = '大于10' where t.id='" + id + "'";
		DataWindow dw = DataWindow.dynamicCreate(sql);
		dw.update(false);
	}

	private boolean mlqx(String mlmc, String id, int level, int thread) {
		// 判断是不是目录,如果是目录直接迁移
		if (level++ > 10) {
			System.out.println("超过实10层++++++");
			try {
				updateten(id);
			} catch (Exception e) {
				e.printStackTrace();
				return false;
			}
			return false;
		}

		// 递归
		try {
			FTPFile[] remoteFiles = this.FTP.listFiles(mlmc);
			if (remoteFiles != null) {
				boolean[] flag = new boolean[remoteFiles.length];
				for (int k = 0; k < remoteFiles.length; k++) {
					if (remoteFiles[k] == null)
						continue;
					// 判断目录是不是子目录,如果是子目录,如果是目录则暂时跳过
					if (remoteFiles[k].isDirectory()) {
						flag[k] = mlqx(mlmc + remoteFiles[k].getName() + "/", id, level, thread);
					} else {
						// 上传文件
						flag[k] = uploadFromFTPTOHCP(mlmc + remoteFiles[k].getName(), thread);
					}
				}

				// 便利这个文件是否成功
				for (boolean a : flag) {
					if (a == false) {
						return false;
					}
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
			return false;
		}
		return true;
	}


	/**
	 * 上传文件 功能描述: <br>
	 * 〈功能详细描述〉
	 * 
	 * @return
	 * @Author: zhangzdc
	 * @Date: 2018-12-3 上午10:29:42
	 * @see [相关类/方法](可选)
	 * @since [产品/模块版本](可选)
	 */
	private boolean uploadFromFTPTOHCP(String filepath, int thread) {
		// 路径截取
		StringBuffer key = new StringBuffer();
		key.append("/dshistory/" + filepath.substring(12));
		InputStream inputstream = null;
		boolean flag;
		
		try{
			if (hcpClient.doesObjectExist(key.toString())) {
				System.out.println("线程" + thread + "已经存在" + filepath.substring(12));
				return true;
			}

			inputstream = FTP.retrieveFileStream(filepath.toString());
			if (inputstream == null) {
				flag = false;
			} else {
				hcpClient.putObject(key.toString(), inputstream);
				// System.out.println("-----------------------一张");
				flag = true;
			}
			inputstream.close();
			this.FTP.completePendingCommand();
		} catch (Exception e) {
			System.out.println("++++++++++++++++++++++又错了");
			System.out.println(e.getMessage());
			flag = false;
		} finally {
			if (inputstream != null) {
				try {
					inputstream.close();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
		}
		}
		
		return flag;
	}

}

此方案好处

1、不怕断电,每当处理完一个文件夹,我就在数据库里记录下。下次在处理可以排除已经处理成功的文件。不会从头开始处理
2、不递归,处理速度快,不怕卡死等问题,不论多少文件,处理速度几乎恒定不变,不存在文件数量越大,处理越慢的问题。

最后说一点

不要迷信工具,我们用的日立的HCP云存储,他给的官方工具500万个几乎没大小的文件迁移了一天也没迁移完。500万个文件愣是给我搜索出了1000多万。最后几乎卡的就不动了

Logo

更多推荐