java实现简单web容器(线程池)
线程池ThreadPool.javapackage webserver;import java.util.Collection;import java.util.Iterator;import java.util.Vector;/*** 线程池* @author ShaoJiang**/public class ThreadPool {
·
线程池ThreadPool.java
package webserver;
import java.util.Collection;
import java.util.Iterator;
import java.util.Vector;
/**
* 线程池
* @author ShaoJiang
*
*/
public class ThreadPool {
protected int maxPoolSize;//最大线程数
protected int initPoolSize;//初始化线程数
protected Vector<UserThread> threads = new Vector<UserThread>();//存放线程的向量
protected boolean initialized = false;//线程池初始化状态
protected boolean hasIdleThread = false;
/**
* 初始化线程池
* @param maxPoolSize --最大线程数
* @param initPoolSize --初始化线程数
*/
public ThreadPool(int maxPoolSize, int initPoolSize) {
this.maxPoolSize = maxPoolSize;
this.initPoolSize = initPoolSize;
System.out.println("创建线程池...\r\n最大线程数为:"+this.maxPoolSize+"\r\n初始化线程数为:"+this.initPoolSize);
}
/**
* 线程池的初始化,创建活动线程添加到线程向量
*/
public void init() {
initialized = true;//设置线程池初始化状态为true
for (int i = 0; i < initPoolSize; i++) {
UserThread thread = new UserThread(this);//初始化用户线程
thread.start();//启动用户线程,此时线程处于等待状态
threads.add(thread);//将活动线程添加到线程池
}
System.out.println("初始化线程池...\r\n是否有闲置线程:"+hasIdleThread);
}
/**
* 设置最大连接数
* @param maxPoolSize
*/
public void setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
if (maxPoolSize < getPoolSize())
setPoolSize(maxPoolSize);
}
/**
* 重设当前线程数 若需杀掉某线程,线程不会立刻杀掉,而会等到线程中的事务处理完成 但此方法会立刻从线程池中移除该线程,不会等待事务处理结束
*
* @param size
*/
public void setPoolSize(int size) {
if (!initialized) {
initPoolSize = size;
return;
} else if (size > getPoolSize()) {
for (int i = getPoolSize(); i < size && i < maxPoolSize; i++) {
UserThread thread = new UserThread(this);
thread.start();
threads.add(thread);
}
} else if (size < getPoolSize()) {
while (getPoolSize() > size) {
UserThread th = (UserThread) threads.remove(0);
th.kill();
}
}
}
/**
* 获得线程池大小
* @return
*/
public int getPoolSize() {
return threads.size();
}
/**
* 提示有闲置线程
*/
protected void notifyForIdleThread() {
hasIdleThread = true;
}
protected boolean waitForIdleThread() {
hasIdleThread = false;
while (!hasIdleThread && getPoolSize() >= maxPoolSize) {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
return false;
}
}
return true;
}
/**
* 获取闲置线程
* @return
*/
public synchronized UserThread getIdleThread() {
while (true) {
for (Iterator<UserThread> itr = threads.iterator(); itr.hasNext();) {
UserThread th = (UserThread) itr.next();
if (!th.isRunning())
return th;
}
if (getPoolSize() < maxPoolSize) {
UserThread thread = new UserThread(this);
thread.start();
threads.add(thread);
return thread;
}
if (waitForIdleThread() == false)
return null;
}
}
/**
* 获取一个闲置线程,执行任务
* @param task
*/
public void processTask(ThreadTask task) {
UserThread th = getIdleThread();
System.out.println("执行任务线程id:"+th.getId());
if (th != null) {
th.putTask(task);//添加任务
th.startTasks();//结束等待执行任务
}
System.out.println("任务执行完毕...\r\n线程池当前线程数:"+getPoolSize());
}
/**
* 获取一个闲置线程,执行一组任务
* @param tasks
*/
public void processTasksInSingleThread(ThreadTask[] tasks) {
UserThread th = getIdleThread();
if (th != null) {
th.putTasks(tasks);
th.startTasks();
}
}
/**
* 获取一个闲置线程,执行一组任务
* @param tasks
*/
public void processTasksInSingleThread(Collection<ThreadTask> tasks) {
UserThread th = getIdleThread();
if (th != null) {
th.putTasks(tasks);
th.startTasks();
}
}
}
用户线程UserThread.java
package webserver;
import java.util.Collection;
import java.util.Vector;
/**
* 用户线程
* @author ShaoJiang
*
*/
public class UserThread extends Thread {
protected Vector<ThreadTask> tasks = new Vector<ThreadTask>();//任务队列
protected boolean running = false;//控制线程运行状态
protected boolean stopped = false;//任务停止状态
protected boolean paused = false;//任务暂停状态
protected boolean killed = false;//当前线程是否被杀死
private ThreadPool pool;//线程池
public UserThread(ThreadPool pool) {
this.pool = pool;
}
/**
* 添加任务
* @param task
*/
public void putTask(ThreadTask task) {
tasks.add(task);
}
public void putTasks(ThreadTask[] tasks) {
for (int i = 0; i < tasks.length; i++)
this.tasks.add(tasks[i]);
}
public void putTasks(Collection<ThreadTask> tasks) {
this.tasks.addAll(tasks);
}
/**
* 移除队头任务
* @return
*/
protected ThreadTask popTask() {
if (tasks.size() > 0)
return (ThreadTask) tasks.remove(0);
else
return null;
}
/**
* 返回任务运行状态
* @return
*/
public boolean isRunning() {
return running;
}
/**
* 任务停止状态
*/
public void stopTasks() {
stopped = true;
}
public void stopTasksSync() {
stopTasks();
while (isRunning()) {
try {
sleep(5);
} catch (InterruptedException e) {
}
}
}
/**
* 暂停任务
*/
public void pauseTasks() {
paused = true;
}
public void pauseTasksSync() {
pauseTasks();
while (isRunning()) {
try {
sleep(5);
} catch (InterruptedException e) {
}
}
}
/**
* 杀死当前线程
*/
public void kill() {
if (!running)
interrupt();
else
killed = true;
}
public void killSync() {
kill();
while (isAlive()) {
try {
sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 启动任务,唤醒线程
*/
public synchronized void startTasks() {
running = true;
this.notify();
}
/**
* 运行线程执行任务
*/
public synchronized void run() {
try {
while (true) {
if (!running || tasks.size() == 0) {
pool.notifyForIdleThread();
this.wait();
} else {
ThreadTask task;
while ((task = popTask()) != null) {
task.run();
if (stopped) {
stopped = false;
if (tasks.size() > 0) {
tasks.clear();
System.out.println(Thread.currentThread()
.getId() + ": Tasks are stopped");
break;
}
}
if (paused) {
paused = false;
if (tasks.size() > 0) {
System.out.println(Thread.currentThread()
.getId() + ": Tasks are paused");
break;
}
}
}
running = false;
}
if (killed) {
killed = false;
break;
}
}
} catch (InterruptedException e) {
return;
}
}
}
任务接口ThreadTask.java
package webserver;
/**
* 线程任务接口
* @author ShaoJiang
*
*/
public interface ThreadTask {
public void run();
}
连接任务ConnectionTask .java
package webserver;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.Socket;
/**
* 连接任务
* @author ShaoJiang
*
*/
class ConnectionTask implements ThreadTask {
Socket client; // 连接Web浏览器的socket字
public ConnectionTask(Socket cl) {
client = cl;
}
@SuppressWarnings("deprecation")
public void run() {
try {
PrintStream outstream = new PrintStream(client.getOutputStream());
DataInputStream instream = new DataInputStream(client.getInputStream());
String inline = instream.readLine(); // 读取Web浏览器提交的请求信息,主要是这里,获取协议
System.out.println("Received:" + inline);
if (getrequest(inline)) { // 如果是GET请求
String filename = getFilename(inline);
File file = new File(filename);
System.out.println("File Path:"+file.getAbsolutePath());
if (file.exists()) {
// 若文件存在,则将文件送给Web浏览器
outstream.println("HTTP/1.0 200 OK");
outstream.println("MIME_version:1.0");
outstream.println("Content_Type:text/html");
int len = (int) file.length();
outstream.println("Content_Length:" + len);
outstream.println("");
sendFile(outstream, file); // 发送文件
outstream.flush();
} else {
// 文件不存在时
String notfound = "<html><head><title>Not Found</title></head><body><h1>Error 404-file not found</h1></body></html>";
outstream.println("HTTP/1.0 404 no found");
outstream.println("Content_Type:text/html");
outstream.println("Content_Length:" + notfound.length() + 2);
outstream.println("");
outstream.println(notfound);
outstream.flush();
}
}
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
//获取请求类型是否为“GET”
boolean getrequest(String s) {
if (s.length() > 0) {
if (s.substring(0, 3).equalsIgnoreCase("GET")) {
return true;
}
}
return false;
}
//获取要访问的文件名
String getFilename(String s) {
String f = s.substring(s.indexOf(' ') + 1);
f = f.substring(0, f.indexOf(' '));
try {
if (f.charAt(0) == '/') {
f = f.substring(1);
}
} catch (StringIndexOutOfBoundsException e) {
System.out.println("Exception:" + e);
}
if (f.equals("")) {
f = "index.html";//设置默认首页
}
return f;
}
//把指定文件发送给Web浏览器
void sendFile(PrintStream outs, File file) {
try {
DataInputStream in = new DataInputStream(new FileInputStream(file));
int len = (int) file.length();
byte buf[] = new byte[len];
in.readFully(buf);
outs.write(buf, 0, len);
outs.flush();
in.close();
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}
测试程序:
package webserver;
import java.net.ServerSocket;
import java.net.Socket;
public class WebServer {
public static void main(String args[]) {
int PORT = 8070;
ServerSocket server = null;
Socket client = null;
ThreadPool pool = new ThreadPool(3, 2);
pool.init();
try {
server = new ServerSocket(PORT);
System.out.println("服务器正在监听端口:"+ server.getLocalPort());
while(true) {
client = server.accept(); // 接受客户机的连接请求
ThreadTask task = new ConnectionTask(client);
pool.processTask(task);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)