开发更新Solr索引的工具
Solr搜索服务器直接部署到Web容器之中,如果想要在服务器外部管理索引(创建、更新、删除),需要向Solr服务器进程发送待处理的请求数据或者命令,实际索引变更是在Solr服务器程序中完成,自然而然底层是调用Lucene的API来实现这一操作的。为了便于平时在开发中,随时向Solr服务器发送索引更新请求,来观察实际执行状况,或验证Solr的最新功能,我们基于Solr自带的SimplePostToo
Solr搜索服务器直接部署到Web容器之中,如果想要在服务器外部管理索引(创建、更新、删除),需要向Solr服务器进程发送待处理的请求数据或者命令,实际索引变更是在Solr服务器程序中完成,自然而然底层是调用Lucene的API来实现这一操作的。为了便于平时在开发中,随时向Solr服务器发送索引更新请求,来观察实际执行状况,或验证Solr的最新功能,我们基于Solr自带的SimplePostTool,增加了聚合数据源的接口,实现了一个简易地小工具。
工具类图
首先,我们先看一下类图及其类之间的关系,如图所示:
编码与实现
AbstractPostServer抽象类:该类表示一个要与Solr搜索服务器通信的实体,其中实现了一些与服务器基于HTTP协议进行通信的逻辑,在子类实现时,可以直接调用这些相关方法,而具体通过什么方式、以什么样的数据格式则留给子类实现。抽象类代码如下所示:
package org.shirdrn.solr.tools;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashSet;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Abstract post tool, generic operations.
*
* @author shirdrn
* @date 2011-12-07
*/
public abstract class AbstractPostServer {
protected static final Logger LOG = LoggerFactory.getLogger(AbstractPostServer.class);
protected URL solrUrl;
protected PostConfig postConfig;
protected HttpURLConnection httpConn;
protected int responseCode = HttpURLConnection.HTTP_UNAVAILABLE;
protected DataLoader dataLoader;
public AbstractPostServer(PostConfig postConfig, DataLoader dataLoader) {
super();
this.postConfig = postConfig;
this.dataLoader = dataLoader;
try {
this.solrUrl = new URL(postConfig.postUrl);
} catch (MalformedURLException e) {
e.printStackTrace();
}
}
public abstract String getResponseMessage();
public int getServerResponseCode() {
return responseCode;
}
protected void post(InputStream data, Integer length, OutputStream output) throws IOException {
httpConn = (HttpURLConnection) solrUrl.openConnection();
httpConn.setRequestMethod(postConfig.postMethod);
httpConn.setDoOutput(true);
httpConn.setDoInput(true);
httpConn.setUseCaches(false);
httpConn.setAllowUserInteraction(false);
httpConn.setRequestProperty("Content-type", postConfig.contentType);
if (null != length) {
httpConn.setFixedLengthStreamingMode(length);
}
OutputStream out = httpConn.getOutputStream();
pipe(data, out);
if(out!=null) {
out.close();
}
InputStream in = null;
responseCode = httpConn.getResponseCode();
if (HttpURLConnection.HTTP_OK != responseCode) {
LOG.error("Solr server error: " + httpConn.getResponseCode() + " " + httpConn.getResponseMessage());
}
in = httpConn.getInputStream();
pipe(in, output);
if(httpConn!=null) {
httpConn.disconnect();
}
in.close();
}
private void pipe(InputStream dataIn, OutputStream dataOut) throws IOException {
byte[] buf = new byte[1024];
int read = 0;
while ((read = dataIn.read(buf)) >= 0) {
if (null != dataOut) {
dataOut.write(buf, 0, read);
}
}
if (null != dataOut) {
dataOut.flush();
}
}
protected InputStream stringToStream(String s) {
InputStream is = null;
try {
is = new ByteArrayInputStream(s.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return is;
}
public void setDataLoader(DataLoader dataLoader) {
this.dataLoader = dataLoader;
}
/**
* Solr post configuration, convenient usage.
*
* @author shirdrn
* @date 2011-12-07
*/
public static final class PostConfig implements Serializable {
private static final long serialVersionUID = 6389419734694067683L;
private String postUrl = "http://localhost:8080/solr/test/update";
private String postMethod = "POST";
private String contentType = "application/xml";
private int maxCommitCount = 100;
private String uniqueKey;
private Set<String> indexFieldSet = new HashSet<String>();
private Set<String> finalFieldSet = new HashSet<String>();
public PostConfig(String postUrl, String postMethod, String contentType, String uniqueKey, String[] indexFields, String[] finalFields, int maxCommitCount) {
super();
this.postUrl = (postUrl==null ? this.postUrl : postUrl);
this.postMethod = (postMethod==null ? this.postMethod : postMethod);
this.contentType = (contentType==null ? this.contentType : contentType);
this.uniqueKey = uniqueKey;
setIndexFieldSet(indexFields);
setFinalFieldSet(finalFields);
this.maxCommitCount = maxCommitCount;
}
public int getMaxCommitCount() {
return maxCommitCount;
}
public String getUniqueKey() {
return uniqueKey;
}
public Set<String> getIndexFieldSet() {
return indexFieldSet;
}
private void setIndexFieldSet(String[] indexFields) {
setFieldSet(indexFields, indexFieldSet);
}
private void setFinalFieldSet(String[] finalFields) {
setFieldSet(finalFields, finalFieldSet);
}
public Set<String> getFinalFieldSet() {
return finalFieldSet;
}
private void setFieldSet(String[] finalFields, Set<String> fieldSet) {
if(finalFields!=null) {
for(String field : finalFields) {
if(!field.isEmpty()) {
fieldSet.add(field.trim());
}
}
}
}
@Override
public boolean equals(Object obj) {
PostConfig other = (PostConfig)obj;
boolean isEquals =
postMethod.toLowerCase().equals(other.postMethod.toLowerCase())
&& contentType.toLowerCase().equals(other.contentType.toLowerCase())
&& postUrl.toLowerCase().equals(other.postUrl.toLowerCase())
&& maxCommitCount == other.maxCommitCount
&& indexFieldSet.equals(other.indexFieldSet)
&& finalFieldSet.equals(other.finalFieldSet);
return isEquals;
}
@Override
public String toString() {
StringBuffer config = new StringBuffer();
config.append("[postUrl=" + postUrl)
.append(", postMethod=" + postMethod)
.append(", contentType=" + contentType)
.append(", maxCommitCount=" + maxCommitCount)
.append(", indexFieldSet=" + indexFieldSet)
.append(", finalFieldSet=" + finalFieldSet)
.append("]");
return config.toString();
}
}
}
AbstractPostServer类中使用了接口DataLoader,在子类中只需要注入DataLoader接口的实现类即可以完成加载数据的功能。DataLoader接口是对不同数据源的抽象,定义如下所示:
package org.shirdrn.solr.tools;
import java.util.Map;
public interface DataLoader {
public Map<Object, Object> fetchOne();
public int getRecordCount();
}
fetchOne方法是供AbstractPostServer类的子类去调用的,每次调用获取数据源中一条记录,我们知道,在索引的时候逻辑上通常对应着一个文档(Document),如一篇文章、一段文字等等。这也就暗示了,实现DataLoader接口,必须提供一个基于数据源的迭代器,每次在fetchOne方法中调用一次迭代器(如next方法),获取到一条记录,然后在AbstractPostServer类的子类中转换成Lucene的Document(实际间接地通过Solr来转换的,而我们只是规划好每个字段及其对应的内容)。
这里,我给出了一个DataLoader接口的实现,数据存储在MongoDB中,具体实现对应于MongoDataLoader类,代码如下所示:
package org.shirdrn.solr.tools;
import java.io.Serializable;
import java.util.Map;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.MongoException;
/**
* Load data being indexed from Mongo DB.
*
* @author shirdrn
* @date 2011-12-07
*/
public class MongoDataLoader implements DataLoader {
private MongoConfig mongoConfig;
private DBCollection collection;
private Map<Object, Object> conditions;
private DBCursor cursor;
private int recordCount;
public MongoDataLoader(MongoConfig mongoConfig, Map<Object, Object> conditions) {
super();
this.mongoConfig = mongoConfig;
this.conditions = conditions;
initialize();
}
private void initialize() {
DBObject q = new BasicDBObject();
if(conditions!=null) {
q.putAll(conditions);
}
try {
if(collection==null) {
collection = MongoHelper.newHelper(mongoConfig).getCollection(mongoConfig.collectionName);
}
cursor = collection.find(q);
recordCount = cursor.size();
} catch (MongoException e) {
e.printStackTrace();
}
}
@SuppressWarnings("unchecked")
@Override
public Map<Object, Object> fetchOne() {
Map<Object, Object> m = null;
try {
if(cursor.hasNext()) {
m = cursor.next().toMap();
m.put("id", m.get("_id").toString());
} else {
cursor.close();
}
} catch (Exception e) {
e.printStackTrace();
}
return m;
}
@Override
public int getRecordCount() {
return recordCount;
}
public static class MongoConfig implements Serializable {
private static final long serialVersionUID = -3028092758346115702L;
private String host;
private int port;
private String dbname;
private String collectionName;
public MongoConfig(String host, int port, String dbname, String collectionName) {
super();
this.host = host;
this.port = port;
this.dbname = dbname;
this.collectionName = collectionName;
}
@Override
public boolean equals(Object obj) {
MongoConfig other = (MongoConfig) obj;
return host.equals(other.host) && port==other.port
&& dbname.equals(other.dbname) && collectionName.equals(other.collectionName);
}
}
static class MongoHelper {
private static Mongo mongo;
private static MongoHelper helper;
private MongoConfig mongoConfig;
private MongoHelper(MongoConfig mongoConfig) {
super();
this.mongoConfig = mongoConfig;
}
public synchronized static MongoHelper newHelper(MongoConfig mongoConfig) {
try {
if(helper==null) {
helper = new MongoHelper(mongoConfig);
mongo = new Mongo(mongoConfig.host, mongoConfig.port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
if(mongo!=null) {
mongo.close();
}
}
});
}
} catch (Exception e) {
e.printStackTrace();
}
return helper;
}
public DBCollection getCollection(String collectionName) {
DBCollection c = null;
try {
c = mongo.getDB(mongoConfig.dbname).getCollection(collectionName);
} catch (Exception e) {
e.printStackTrace();
}
return c;
}
}
}
上面为了方便,定义了MongoConfig类封装了与MongoDB相关的一些数据,而MongoHelper类实现了与MongoDB数据库进行交互的一些操作。实际上,在打开一个DBCollection实例以后,便可以根据查询条件获取到查询结果集的一个游标(DBCursor),通过游标就可以迭代整个结果集的记录,亦即需要进行索引的数据。
在我们要实现一个真正提供与Solr搜索服务器进行交互服务的AbstractPostServer之前,我们先看一下,都需要提供哪些基本服务,这是在PostService接口中定义的。PostService接口中定义了管理索引最基本的功能,代码如下所示:
package org.shirdrn.solr.tools;
import java.io.IOException;
import java.io.OutputStream;
public interface PostService {
public void commit(OutputStream output) throws IOException;
public void optimize(OutputStream output) throws IOException;
public void postUpdate(boolean autoOptimize) throws Exception;
public void postDelete(boolean autoOptimize) throws Exception;
public int getPostCount();
}
实现一个基于JSON数据格式的AbstractPostServer,需要继承自AbstractPostServer抽象类,同时实现接口PostService,对应的JsonPostServer实现类的代码如下所示:
package org.shirdrn.solr.tools;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import net.sf.json.JSONObject;
/**
* Json style post server. Actually it represents
* a proxy server of Solr search server, and is responsible for
* managing indices, such as indexing, deleting some document, etc.
*
* @author shirdrn
* @date 2011-12-07
*/
public class JsonPostServer extends AbstractPostServer implements PostService {
private int postCount;
private String responseMessage = "OK";
public JsonPostServer(PostConfig postConfig) {
super(postConfig, null);
}
public JsonPostServer(PostConfig postConfig, DataLoader dataLoader) {
super(postConfig, dataLoader);
}
@Override
public void postUpdate(boolean autoOptimize) throws Exception {
try {
StringBuffer data = new StringBuffer("{");
for(int i=0; i<dataLoader.getRecordCount(); i++) {
Map<Object, Object> record = dataLoader.fetchOne();
JSONObject op = new JSONObject();
JSONObject doc = new JSONObject();
if(record!=null) {
Iterator<Entry<Object, Object>> iter = record.entrySet().iterator();
JSONObject jo = new JSONObject();
while(iter.hasNext()) {
Entry<Object, Object> entry = iter.next();
if(postConfig.getIndexFieldSet().contains(entry.getKey())) {
if(postConfig.getFinalFieldSet().contains(entry.getKey())) {
jo.put(entry.getKey(), entry.getValue().toString());
} else {
jo.put(entry.getKey(), purgeJsonSpecifiedCharacters(entry.getValue().toString()));
}
}
}
doc.put("doc", jo);
op.put("add", doc);
data.append(op.toString().substring(1, op.toString().length()-1));
increment(i+1, dataLoader.getRecordCount(), data);
}
}
if(autoOptimize) {
optimize(System.out);
}
} catch (IOException e) {
responseMessage = e.getMessage();
}
}
private void increment(int i, int recordCount, StringBuffer data) throws IOException {
++postCount;
if(i%postConfig.getMaxCommitCount()==0 || i==recordCount) {
data.append("}");
post(stringToStream(data.toString()), null, System.out);
commit(System.out);
if(i!=recordCount) {
data.delete(0, data.length());
data.append("{");
}
} else {
data.append(",");
}
}
private String purgeJsonSpecifiedCharacters(String data) {
StringBuffer buffer = new StringBuffer();
for (int i=0; i<data.length(); i++){
switch (data.charAt(i)){
case '\"':
case '\\':
case '/':
case '\b':
case '\f':
case '\n':
case '\r':
case '\t':
buffer.append(" ");
break;
default:
buffer.append(data.charAt(i));
}
}
return buffer.toString().trim();
}
@Override
public void postDelete(boolean autoOptimize) throws Exception {
try {
StringBuffer data = new StringBuffer("{");
for(int i=0; i<dataLoader.getRecordCount(); i++) {
Map<Object, Object> record = dataLoader.fetchOne();
JSONObject jo = new JSONObject();
JSONObject op = new JSONObject();
Iterator<Entry<Object, Object>> iter = record.entrySet().iterator();
Entry<Object, Object> entry = iter.next();
if(postConfig.getUniqueKey().equals(entry.getKey()) || entry.getKey().equals("query")) {
jo.put(entry.getKey(), entry.getValue());
op.put("delete", jo);
data.append(op.toString().substring(1, op.toString().length()-1));
increment(i+1, dataLoader.getRecordCount(), data);
}
}
if(autoOptimize) {
optimize(System.out);
}
} catch (IOException e) {
responseMessage = e.getMessage();
}
}
@Override
public void commit(OutputStream output) throws IOException {
JSONObject commit = new JSONObject();
commit.put("commit", new JSONObject());
post(stringToStream(commit.toString()), null, output);
LOG.debug("Commit done: " + commit.toString());
}
@Override
public void optimize(OutputStream output) throws IOException {
JSONObject optimizer = new JSONObject();
JSONObject jo = new JSONObject();
jo.put("waitFlush", false);
jo.put("waitSearcher", false);
optimizer.put("optimize", jo);
post(stringToStream(optimizer.toString()), null, output);
LOG.debug("Optimize done: " + optimizer.toString());
}
@Override
public String getResponseMessage() {
return responseMessage;
}
@Override
public int getPostCount() {
return postCount;
}
}
测试用例
测试用例,我们主要测试postUpdate和postDelete,代码如下所示:
package org.shirdrn.solr.tools;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import junit.framework.TestCase;
import org.shirdrn.solr.tools.AbstractPostServer.PostConfig;
import org.shirdrn.solr.tools.MongoDataLoader.MongoConfig;
/**
* Test post server based on JSON style data.
*
* @author shirdrn
* @date 2011-12-08
*/
public class TestJsonPostServer extends TestCase {
private PostService server;
@Override
protected void setUp() throws Exception {
PostConfig postConfig = new PostConfig(
"http://192.168.0.195:8080/solr35/core0/update",
"POST", "application/json",
"id",
new String[]{"id", "title", "content", "pubdate", "url"},
new String[] {"url"},
2);
server = new JsonPostServer(postConfig);
}
public void testPostUpdate() {
MongoConfig mongoConfig = new MongoConfig("192.168.0.195", 27017, "pagedb", "page");
DataLoader dataLoader = new MongoDataLoader(mongoConfig, null);
((AbstractPostServer)server).setDataLoader(dataLoader);
try {
server.postUpdate(true);
} catch (Exception e) {
e.printStackTrace();
}
assertEquals(5, server.getPostCount());
assertEquals("OK", ((AbstractPostServer)server).getResponseMessage());
assertEquals(200, ((AbstractPostServer)server).getServerResponseCode());
}
public void testPostDelete() {
final Map<Object, Object> conditions = new HashMap<Object, Object>();
conditions.put("id", "4eded53abf3bfa0014000002");
conditions.put("query", "title:孟加拉国");
final Iterator<Entry<Object, Object>> iter = conditions.entrySet().iterator();
((AbstractPostServer)server).setDataLoader(new DataLoader() {
@Override
public Map<Object, Object> fetchOne() {
Map<Object, Object> m = new HashMap<Object, Object>();
if(iter.hasNext()) {
Entry<Object, Object> entry = iter.next();
m.put(entry.getKey(), entry.getValue());
}
return m;
}
@Override
public int getRecordCount() {
return conditions.size();
}
});
try {
server.postDelete(false);
} catch (Exception e) {
e.printStackTrace();
}
assertEquals(2, server.getPostCount());
assertEquals("OK", ((AbstractPostServer)server).getResponseMessage());
assertEquals(200, ((AbstractPostServer)server).getServerResponseCode());
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
}
}
附录说明
索引字段
Solr搜索服务器的schema.xml定义的索引字段如下所示:<?xml version="1.0" ?>
<schema name="example core zero" version="1.1">
<types>
<fieldtype name="string" class="solr.StrField" sortMissingLast="true" omitNorms="true" />
<fieldType name="text" class="solr.TextField" positionIncrementGap="100">
<analyzer type="index">
<tokenizer class="solr.SmartChineseSentenceTokenizerFactory" />
<filter class="solr.SmartChineseWordTokenFilterFactory" />
<filter class="solr.PositionFilterFactory" />
<filter class="solr.StandardFilterFactory" />
<filter class="solr.StopFilterFactory" ignoreCase="true" words="stopwords.txt" enablePositionIncrements="true" />
</analyzer>
<analyzer type="query">
<tokenizer class="solr.SmartChineseSentenceTokenizerFactory" />
<filter class="solr.SmartChineseWordTokenFilterFactory" />
<filter class="solr.PositionFilterFactory" />
<filter class="solr.StandardFilterFactory" />
<filter class="solr.SynonymFilterFactory" synonyms="./synonyms.txt" ignoreCase="false" expand="true" />
</analyzer>
</fieldType>
</types>
<fields>
<field name="id" type="string" indexed="true" stored="true" multiValued="false" required="true" />
<field name="content" type="text" indexed="true" stored="true" multiValued="true" />
<field name="pubdate" type="string" indexed="true" stored="true" multiValued="false" />
<field name="title" type="text" indexed="true" stored="true" multiValued="true" />
<field name="url" type="string" indexed="true" stored="true" multiValued="false" />
</fields>
<uniqueKey>id</uniqueKey>
<defaultSearchField>title</defaultSearchField>
<solrQueryParser defaultOperator="OR" />
</schema>
我把整个测试用schema.xml文件的内容都附在上面了。
依赖jar库
commons-beanutils-1.7.0.jar
commons-collections-3.2.1.jar
commons-lang-2.4.jar
commons-logging-1.1.1.jar
ezmorph-1.0.6.jar
json-lib-2.4-jdk15.jar
mongo-2.5.3.jar
slf4j-api-1.5.5.jar
slf4j-jdk14-1.5.5.jar
更多推荐
所有评论(0)