solr入门之多线程操作solr中索引字段的解决
涉及的问题:建索引时有一个字段是该词语出现的次数,这个字段是放在solr里的 而我用的是多线程来进行全量导入的,这里就涉及到了多线程问题多个线程操作同一个变量时如何处理?我是这样子做的 :首先将变量本地话--分布式就放到大容器中,我这里仅仅使用了一个map来存词和次数的关系映射变量本地化后就是多线程的解决了--锁的设置-我仅仅是在操作时加了一个锁来解决这
·
涉及的问题:
建索引时有一个字段是该词语出现的次数,这个字段是放在solr里的
而我用的是多线程来进行全量导入的,这里就涉及到了多线程问题
多个线程操作同一个变量时如何处理?
我是这样子做的 :
首先将变量本地话--分布式就放到大容器中,我这里仅仅使用了一个map来存
词和次数的关系映射
变量本地化后就是多线程的解决了--锁的设置-我仅仅是在操作时加了一个锁来解决这个问题
这样做后总体上应该能解决变量的问题了
最后还有一个线程顺序问题要解决下
当 最后一个提交索引时 获取的索引不一定是正确的索引
当两个线程中都有此字段时 都获取到了正确的结果 但是 最后提交那个
不一定是正确的最大的那个 所以还是存在一点点的误差的
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import cn.com.mx.gome.search.core.util.PreIndexThreadPool;
import cn.com.mx.gome.search.core.util.prop.PropUtils;
import cn.com.mx.gome.suggest.cache.WordCountCache;
import cn.com.mx.gome.suggest.component.FullIndexProcessContainer;
import cn.com.mx.gome.suggest.component.FullIndexSuggestProcess;
import cn.com.mx.gome.suggest.constant.Const;
import cn.com.mx.gome.suggest.domian.BaseSuggestIndex;
import cn.com.mx.gome.suggest.service.FullIndexSuggestService;
import cn.com.mx.gome.suggest.solr.SolrServiceWrapper;
/**
* 搜索建议 索引库导入业务层
* @author songqinghu
*
*/
@Service("fullIndexSuggestServiceImpl")
public class FullIndexSuggestServiceImpl extends FullIndexServiceImpl implements FullIndexSuggestService {
private Logger logger = LoggerFactory.getLogger(FullIndexSuggestServiceImpl.class);
@Resource //容器类
private FullIndexProcessContainer fullIndexProcessContainer;
@Value("${maxImumPoolSize}")//最大线程数目---通用可以提取到父类中
private int maxImumPoolSize;
private int skip=0;//开始的角标
private int limit=1000;//默认的步距
private int rows=0;//建立索引的总数目
private long maxNum =0;
//可回收线程池
private ExecutorService threadPool = Executors.newCachedThreadPool();
//solr连接
@Resource
private SolrServiceWrapper solrServiceWrapperImpl;
private SolrClient suggestClient;
@PostConstruct
private void getClient(){
suggestClient = solrServiceWrapperImpl.getCollection(getCollectionName());;
}
@Override
public String getCollectionName() {
return "meixin_suggest";
}
/**
* 搜索建议全量索引建立 具体实现方法
* 这个方法我想这样设计:
* 能够动态的添加和删除导入的过程类进入 多线程导入
* 做一个 集合类容器来完成类的组装操作 方法通过接口的方式来进行统一的 处理过程控制关键点
*
* 1.先做一个圈子的索引建议词导入--只导入圈子的名称
* ---从数据源中获取到所有的圈子名称--对圈子名称进行处理拼音加入---组装成bean 提交到solr中--> ok 最粗糙版本 后续可以优化下
* 2.商品搜索推荐词 导入---分析讨论下 (暂时不在--无非三个来源 数据库 已有索引库 还有人工导入干预也是进数据库 这里要不要再设置一个插拔式的数据源接口呢?
* 先做个正规途径的数据获取流程
*/
@Override
public int index(SolrClient client ,boolean morethread) {
logger.warn("{} 开始全量导入",new Date());
try {
//每次进入全量导入方法 对计数器清零处理
skip = 0;
rows = 0;
limit = PropUtils.getInstance().getInt(Const.INDEX_FULL_LIMIT_GROUP, 1000);
logger.warn("时间{},初始化变量: skip:{},rows:{},limit{} ",new Date(),skip,rows,limit);
WordCountCache.clear();//清理本地词语次数缓存
//容器中获取种类
List<FullIndexSuggestProcess> processContainer = fullIndexProcessContainer.getProcessContainer();
for (FullIndexSuggestProcess fullprocess : processContainer) {
logger.warn("时间{},导入{}索引 ",new Date(),fullprocess.getClass().getName());
//多线程索引建立过程
//1.获取此时最大索引数目--通用方法
maxNum = fullprocess.getMaxNum();
//2.组建任务线程队列
ArrayList<Future<Integer>> futureTasks = new ArrayList<Future<Integer>>();
//3.向任务队列中添加任务到线程或者索引数终止
for (int i = 0; i < maxImumPoolSize; i++) {
try {
if(!fullprocess.isEnd(skip,limit,maxNum)){//考虑兼容各种数据库问题---mongo 当相等时 虽然开启了线程但是取不到数据--可以考虑容错
break;
}else{
futureTasks.add(getFuture(client,fullprocess));
}
} catch (Exception e) {
logger.error("FullIndexSuggestServiceImpl addfuture : ",e);
}
}
//4.循环任务队列 持续添加任务到索引导入结束
while(futureTasks.size()>0){
ArrayList<Future<Integer>> tmpFutureTasks = new ArrayList<Future<Integer>>();
for (Future<Integer> future : futureTasks) {
if(!future.isDone()){//该线程未执行结束 --添加到任务队列中继续执行
tmpFutureTasks.add(future);
}else{
//rows +=future.get();//统计索引数量--没必要统计 数目是错的 中间可能会覆盖
if(fullprocess.isEnd(skip,limit,maxNum)){
tmpFutureTasks.add(getFuture(client,fullprocess));
}
}
}
futureTasks = tmpFutureTasks;
Thread.sleep(500);
}
//一个全量导入结束
}
//全部的全量导入结束--这里开启异步线程池对索引词语出现的次数进行重写
threadPool.execute(new Runnable() {
@Override
public void run() {
List<List<SolrInputDocument>> list = WordCountCache.getDocs();
logger.warn("时间{},开始修改词频 ,循环次数{}",new Date(),list.size());
try {
for (List<SolrInputDocument> docs : list) {
suggestClient.add(docs);
}
} catch (SolrServerException |IOException e) {
logger.error(" index threadPool.execute : " + e);
}
}
});
} catch (Exception e) {
logger.error("FullIndexSuggestServiceImpl ERROR:", e);
}
return WordCountCache.getRows();
}
/**
*
* @描述:通用任务组装类
* @param client solr客户端
* @param fullprocess 数据获取
* @return
* @return Future<Integer>
* @exception
* @createTime:2016年3月24日
* @author: songqinghu
*/
private Future<Integer> getFuture(SolrClient client, FullIndexSuggestProcess fullprocess) {
//任务类
IndexTask indexTask = new IndexTask();
//设置参数
indexTask.setParameters(skip, limit, client, fullprocess);
//提交开启任务---线程池的书写
Future<Integer> future = PreIndexThreadPool.getPool().submit(indexTask);
//变更控制变量--这里也要做成通用型的--都设置到各子的方法中??
skip = skip + limit;
//返回任务封装
return future;
}
private static class IndexTask implements Callable<Integer>{
private int skip; //开始 坐标
private int limit; //步距
private SolrClient client; //客户端
private FullIndexSuggestProcess fullprocess; //未知数据源
/**
*
* @描述:设置类中用到的参数
* @return void
* @exception
* @createTime:2016年3月24日
* @author: songqinghu
*/
public void setParameters(int skip,int limit,SolrClient client,FullIndexSuggestProcess fullprocess){
this.skip = skip;
this.limit = limit;
this.client = client;
this.fullprocess = fullprocess;
}
@Override
public Integer call() throws Exception {
//1 获取原始资源数据 对资源数据进行处理----->拼音处理--工具类-->组装返回的beans类---->组装类
List<BaseSuggestIndex> beans = fullprocess.getBeans(skip, limit);
//2 判断返回的创建成功的索引数目
if(beans !=null && beans.size()>0){
//3 //提交
client.addBeans(beans);
return beans.size();
}else{
return 0;
}
}
}
}
索引导入类
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import javax.annotation.Resource;
import javax.management.RuntimeErrorException;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import cn.com.mx.gome.search.core.util.MD5Utils;
import cn.com.mx.gome.search.core.util.Pinyin4jUtil;
import cn.com.mx.gome.search.core.util.SetToStringArrUtils;
import cn.com.mx.gome.search.quote.digger.solrbean.ProductBean;
import cn.com.mx.gome.suggest.cache.WordCountCache;
import cn.com.mx.gome.suggest.domian.BaseSuggestIndex;
import cn.com.mx.gome.suggest.solr.SolrServiceWrapper;
/**
* 商品类 搜索建议推荐词导入
* @author songqinghu
*
*/
@Component("fullIndexItemSuggestProcess")
public class FullIndexItemSuggestProcess implements FullIndexSuggestProcess {
private Logger logger = LoggerFactory.getLogger(FullIndexItemSuggestProcess.class);
//这里注入一个工厂来生产指定的数据源获取类来获取不同的数据源类--先放着
//solr连接
@Resource
private SolrServiceWrapper solrServiceWrapperImpl;
@Value("${SORL_PRODUCT_NAME}") //从 properties 文件中注入 solr连接的名称
private String solrProductName;
@Value("${SORL_SUGGEST_NAME}")
private String solrSuggestName;
private SolrClient itemClient;
private SolrClient suggestClient;
@Override
public List<BaseSuggestIndex> getBeans(int skip, int limit) {//这里在传一个 参数用来确定数据导入方式
Map<String, Integer> words ;
if(false){
//words = getDataBySQL(skip, limit); //从原始数据库中获取
}else {
words= getDataBySolr(skip, limit);//从solr中获取
}
List<BaseSuggestIndex> assembleBeans = AssembleBeans(words);
return assembleBeans;
}
/**
*
* @描述:从solr索引库中获取数据
* @param skip 开始
* @param limit 步距
* @return
* @return List<Product>
* @exception
* @createTime:2016年3月25日
* @author: songqinghu
*/
private Map<String,Integer> getDataBySolr(int skip, int limit) {
//查询指定范围内的数据
SolrQuery query = new SolrQuery();
query.set("q", "*:*");
query.setStart(skip);
query.setRows(limit);
QueryResponse response;
Map<String,Integer> words = null;
try {
response = itemClient.query(query);
logger.info("skip="+skip+",limit="+limit+",获取到索引数 : "+response.getResults().getNumFound());
List<ProductBean> productBeans = response.getBeans(ProductBean.class);
//数据处理部分---我要将获取到的数据中的 需要的三个字段的汉字 然后进行记录出现次数和去重复
words = suggestCollect(productBeans);
}catch (SolrServerException | IOException e) {
logger.error("FullIndexItemSuggestProcess getDataBySolr :" + e);
}
return words;
}
//将获取到的数据中的 需要的三个字段的汉字 然后进行记录出现次数和去重复
private Map<String, Integer> suggestCollect(List<ProductBean> productBeans) {
Map<String, Integer> words = new HashMap<String,Integer>();
for (ProductBean productBean : productBeans) { //这里需要考虑词语切分问题---不太会处理
String name = productBean.getName(); //商品名称
mapOperation(words, name);
List<String> cateNames = productBean.getCateName();//类目名称
for (String cateName : cateNames) {
mapOperation(words, cateName);
}
String spuBrand = productBean.getSpuBrand();//品牌名称
mapOperation(words, spuBrand);
}
return words;
}
/**
* 对map中数据进行操作
*/
private void mapOperation(Map<String, Integer> words,String name){
if(words.containsKey(name)){ //存在就加一
Integer count = words.get(name);
words.put(name, count+1);
}else{//不存在就存入
words.put(name, 1);
}
}
//对原始查询后的集合数据进行拆分
public List<BaseSuggestIndex> AssembleBeans(Map<String,Integer> words){
ArrayList<BaseSuggestIndex> baseSuggestIndexs = new ArrayList<BaseSuggestIndex>();
Set<Entry<String, Integer>> entrySet = words.entrySet();
for (Entry<String, Integer> entry : entrySet) {
BaseSuggestIndex assembleBean = AssembleBean(entry);
baseSuggestIndexs.add(assembleBean);
}
return baseSuggestIndexs;
}
//组装单个 solr文档对象
public BaseSuggestIndex AssembleBean(Entry<String, Integer> entry){
BaseSuggestIndex baseIndex = new BaseSuggestIndex();
String word = entry.getKey();
Set<String> shortpy = Pinyin4jUtil.converterToFirstSpellToSet(word);
Set<String> allpy = Pinyin4jUtil.converterToSpellToSet(word);
baseIndex.setWord(word);
baseIndex.setShort_py(SetToStringArrUtils.convertToStringArr(shortpy));
baseIndex.setAll_py(SetToStringArrUtils.convertToStringArr(allpy));
//这里需要设置一下 使用md5加密算法 来保证 每个字符串对应的id唯一 涉及到分类问题 可能会出现重复添加分类
String id = getId(word);
baseIndex.setSuggestId(id);
baseIndex.setType("product");
baseIndex.setCreateTime(new Date().getTime());
//这里需要特别注意了--涉及到多线程问题了 ---当查询已经存在的词语的时候 查到次数加上当前的次数---
//存在问题--如何处理线程顺序保证多个线程是有序的操作呢?---存入映射关系 结束后再次进行次数更新?
//这里必须要处理下 和实际的数据差距太大了了!!!!!!!!!!!!!
logger.info("词名: "+word + " 本轮次数 :"+ entry.getValue());
baseIndex.setCount(getCount4Word(id, entry.getValue()));
return baseIndex;
}
/**
*
* @描述:获取该词语的数量加上目前的词语数量---首次从索引库中获取 后缓存到本地中 以后从本地中获取
* 这里还是不太准确 虽然保证了词语出现次数 通过加锁 但是线程提交时是无法进行控制的 还是存在误差的
* @param id
* @param count 本轮词语出现的次数
* @return
* @return Integer
* @exception
* @createTime:2016年3月25日
* @author: songqinghu
*/
private Integer getCount4Word(String id,Integer count ){
return WordCountCache.putCount(id, count);
}
/**
*
* @描述:通过数据库来获取数据源
* @param skip
* @param limit
* @return
* @return List<Product>
* @exception
* @createTime:2016年3月25日
* @author: songqinghu
*/
private Map<String, Integer> getDataBySQL(int skip, int limit) {
return null;
}
@Override
public long getMaxNum() {//这里也需要根据数据源来配置下
if(itemClient == null){
itemClient = solrServiceWrapperImpl.getCollection(solrProductName);
}
SolrQuery query = new SolrQuery();
query.set("q", "*:*");
long maxNum = 0;
try {
maxNum = itemClient.query(query).getResults().getNumFound();
} catch (SolrServerException | IOException e) {
logger.error("FullIndexItemSuggestProcess getMaxNum :" + e);
}
logger.info(new Date()+" 最大值: "+maxNum);
return maxNum;
}
@Override
public boolean isEnd(int skip, int limit, long maxNum) {
//开始坐标要是小于 最大数量就继续---要不要事实更新呢?时时更新吧
return skip < getMaxNum() ? true : false;
}
/**
* 获取词语对应的索引id
*/
@Override
public String getId(String word) {
if(word ==null){
throw new RuntimeException("id不能为空");
}
return "product_"+MD5Utils.MD5(word);
}
}
本地容器类
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
/**
* 容器类 --对solr中涉及的词语出现的词语进行本地缓存 和 同步处理
* 分布式时可以考虑 放入 共享容器中
* @author songqinghu
*
*/
import java.util.concurrent.ConcurrentHashMap;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.com.mx.gome.suggest.domian.BaseSuggestIndex;
public class WordCountCache {
private static Logger logger = LoggerFactory.getLogger(WordCountCache.class);
/**
* 容器 放入word的md5加密后的 和次数
*/
private static ConcurrentHashMap<String, Integer> wordCountMap = new ConcurrentHashMap<String,Integer>();
private static Object lock = new Object();
/**
*
* @描述:设置词语的次数 ---设置锁处理
* @param word 词语(加密后的)id
* @param count 次数
* @return void
* @exception
* @createTime:2016年3月28日
* @author: songqinghu
*/
public static Integer putCount(String id,Integer count){
logger.warn("WordCountCache 线程: "+ Thread.currentThread().getName() +" id: "+id+" count: "+count);
synchronized (lock) {
Integer result = wordCountMap.get(id);
if(result !=null){ //存在此key 加上已经缓存的---不存在 直接存这次的
count = result + count;
}
wordCountMap.put(id, count);
return wordCountMap.get(id);
}
}
/**
*
* @描述:是否缓存到了本地中
* @param id
* @return
* @return boolean
* @exception
* @createTime:2016年3月28日
* @author: songqinghu
*/
public static boolean containsKey(String id){
if(wordCountMap.containsKey(id)){
return true;
}
return false;
}
/**
*
* @描述:清空本地的 缓存---全量结束 和 清理全量索引的时候
* @return
* @return boolean
* @exception
* @createTime:2016年3月28日
* @author: songqinghu
*/
public static boolean clear(){
wordCountMap.clear();
return true;
}
/**
*
* @描述:当全量索引导入结束后 获取缓存的词频 进行词频重写--后清空
* 这里还要优化下 当索引量很多 不能一次全部都提交了--设置为添加到定量的list中
* @return
* @return List<SolrInputDocument>
* @exception
* @createTime:2016年3月28日
* @author: songqinghu
*/
public static List<List<SolrInputDocument>> getDocs(){
//文档容器
List<List<SolrInputDocument>> list = new ArrayList<List<SolrInputDocument>>();
ArrayList<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();//测试一下
Set<Entry<String, Integer>> entrySet = wordCountMap.entrySet();
int i =0;
for (Entry<String, Integer> entry : entrySet) {
String id = entry.getKey();
Integer count = entry.getValue();
SolrInputDocument doc = new SolrInputDocument();
doc.setField(BaseSuggestIndex.Fd.suggestId.name(),id);//这里的硬编码可以设置成类的形式
Map<String, Integer> counts = new HashMap<String, Integer>(1);
counts.put("set", count);
doc.setField(BaseSuggestIndex.Fd.count.name(), counts);
docs.add(doc);
i++;
if(i>=1000){//为1000 单次修改文档数目设置为1000--总量最后一次进不来 的处理
ArrayList<SolrInputDocument> tmp = new ArrayList<SolrInputDocument>();
tmp.addAll(docs);
list.add(tmp);
i=0;
docs.clear();
}
}
if(i!=0){//最后一次没有进入判断里
list.add(docs);
}
return list;
}
/**
*
* @描述:获取总数量--id就为总的索引数
* @return
* @return Integer
* @exception
* @createTime:2016年3月28日
* @author: songqinghu
*/
public static Integer getRows(){
return wordCountMap.size();
}
}
更多推荐
已为社区贡献3条内容
所有评论(0)