Webmagic是Java中的一个爬虫开源框架,主要有四大核心组件,分别是:Downloader、PageProcessor、Scheduler、Pipeline,并有Spider进行管理。这四个组件分别对应了爬虫生命周期中的下载、处理、管理、持久化。同时还支持XPath、Jsoup、CSS选择器,方便我们对抓取的页面进行解析。

    Webmagic的源码可以从github上pull下来:https://github.com/code4craft/webmagic

    Webmagic的入门文档可以查看:http://webmagic.io/docs/zh/

    以下是一张从文档中截下来的Webmagic架构图:

     

    接下来将会以Webmagic中的一个例子,来跟踪一下webmagic运行流程的源码。这个例子位于webmagic-core这个核心模块中的us.codecraft.webmagic.processor.example包下,类名为GithubRepoPageProcessor,这是一个关于Github爬虫的代码。

    首先从main函数入手,它创建了一个Spider对象,GithubRepoPageProcessor对象是对抓去结果进行解析的类。addUrl() 函数可以添加我们需要爬去的连接,这个函数的参数是可变的,可以传入多个URL并使用逗号隔开。thread() 函数用于设置Spider线程的数量,表明抓取时启动的线程数,支持多线程并发抓取。run方法用于启动Spider,因为Spider类实现了Runnable接口。这里也可以调用Spider的start()方法用于启动Spider。

    public static void main(String[] args) {
        Spider.create(new GithubRepoPageProcessor()).addUrl("https://github.com/code4craft").thread(5).run();
    }
    thread()函数中首先会调用checkIfRunning()函数来检查Spider的运行状态,如果状态为已运行,那么将会抛出异常。在Spider类中定义了三个常量来表示爬虫的运行状态,分别是初始化、运行、停止。第三行中将参数复制给类变量,用于保存爬虫的线程数。

    public Spider thread(int threadNum) {
        checkIfRunning();
        this.threadNum = threadNum;
        if (threadNum <= 0) {
            throw new IllegalArgumentException("threadNum should be more than one!");
        }
        return this;
    }
    protected void checkIfRunning() {
        if (stat.get() == STAT_RUNNING) {
            throw new IllegalStateException("Spider is already running!");
        }
    }

    由上面checkIfRunning函数中可以看待,stat用来保存爬虫的状态,从它的定义中,我发现它的类型是AtomicInteger,这是JDK1.5之后,在java.util.concurrent.atomic包下新增的原子处理类,主要用于在多线程环境下保证数据操作的准确性,能保证并发访问下的线程安全。

    protected AtomicInteger stat = new AtomicInteger(STAT_INIT);

    protected final static int STAT_INIT = 0;

    protected final static int STAT_RUNNING = 1;

    protected final static int STAT_STOPPED = 2;

    接下来我们来看看run方法的运行流程,当线程启动时,就会调用run方法。第3行调用函数用于检测Spider的状态,第4行的函数用于初始化爬虫的一些组件,这些组件我们可以在创建Spider的时候进行设置,如果没有设置,那么该函数会使用默认的组件进行初始化,其中也包含了对线程池的初始化。

    第6行中用while循环,判断如果当前线程不中断,且Spider的状态为运行状态,也就是在checkRunningStat()方法中成功设置成了运行状态,那么就开始执行爬虫。第7行先从请求队列中拿出一个请求,Scheduler是用于管理爬虫请求的类,如果没有指定,Spider默认使用的是QueueScheduler,即基于内存的队列,其内部的数据结构是使用一个LinkedBlockingQueue来存放我们要爬取的请求。

    当从队列中取出的请求为null,则判断如果线程池的活跃线程数为0,且exitWhenComplete设置为了true,那么就退出while循环。exitWhenComplete是一个boolean类型的值,表示当没有新的爬虫请求时,是否退出。否则的话,将执行waitNewUrl()等待新的请求被加入队列,再继续执行。

    当Request不为null,则执行到第16行,创建一个匿名内部类,实现Runnable()接口,并添加到线程池中去执行。第20行,调用processRequest()函数,将会创建HttpClient去执行请求,并抓取界面进行解析等一系列操作。如果这个过程中没有发生异常,那么一次爬虫的生命周期就成功了,此时会执行第21行onSuccess()。这里涉及到了为Spider添加一个爬虫的监听器,当爬虫执行成功时,在onSuccess()中就会调用这个监听器中的onSuccess()方法,我们可以把对于爬虫执行成功做的操作,写在onSuccess()方法内。而当发生异常时,会执行onError()方法,实际上也是执行监听器中的onError()方法,我们可以对爬虫失败做一些处理。

    当跳出while循环时,执行第33行,将Spider的状态设置为停止,并进行资源的释放。

    @Override
    public void run() {
        checkRunningStat();
        initComponent();
        logger.info("Spider " + getUUID() + " started!");
        while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
            Request request = scheduler.poll(this);
            if (request == null) {
                if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
                    break;
                }
                // wait until new url added
                waitNewUrl();
            } else {
                final Request requestFinal = request;
                threadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            processRequest(requestFinal);
                            onSuccess(requestFinal);
                        } catch (Exception e) {
                            onError(requestFinal);
                            logger.error("process request " + requestFinal + " error", e);
                        } finally {
                            pageCount.incrementAndGet();
                            signalNewUrl();
                        }
                    }
                });
            }
        }
        stat.set(STAT_STOPPED);
        // release some resources
        if (destroyWhenExit) {
            close();
        }
    }

    在checkRunningStat()函数中,是一个while的死循环。第3行会得到当前Spider的状态,当状态为运行态时,会抛出异常,否则就设置当前状态为运行态,并退出循环。

    private void checkRunningStat() {
        while (true) {
            int statNow = stat.get();
            if (statNow == STAT_RUNNING) {
                throw new IllegalStateException("Spider is already running!");
            }
            if (stat.compareAndSet(statNow, STAT_RUNNING)) {
                break;
            }
        }
    }

    在initComponent()函数中,当你没有指定Downloader时,默认使用HttpClientDownloader,当你没有指定Pipeline时,默认使用ConsolePipeline。注意,一个Spider只对应有一个Downloader,一个Scheduler,一个PageProcessor,但可以有多个Pipeline,在Spider中使用List集合来保存多个的Pipeline。第9-15行是对线程池的初始化,在创建Spider的时候,可以使用setExecutorService(ExecutorService executorService)来设置我们自己的线程池,如果没有设置,那么Spider会给我们创建一个默认的线程池,即Executors.newFixedThreadPool(threadNum)。

    第16-21行,遍历初始设置的请求,添加到Scheduler请求队列当中,并清空初始请求。startRequest是一个Request列表,可以在创建Spider的时候,通过startRequest(List<Request> startRequests)函数进行设置。

    protected void initComponent() {
        if (downloader == null) {
            this.downloader = new HttpClientDownloader();
        }
        if (pipelines.isEmpty()) {
            pipelines.add(new ConsolePipeline());
        }
        downloader.setThread(threadNum);
        if (threadPool == null || threadPool.isShutdown()) {
            if (executorService != null && !executorService.isShutdown()) {
                threadPool = new CountableThreadPool(threadNum, executorService);
            } else {
                threadPool = new CountableThreadPool(threadNum);
            }
        }
        if (startRequests != null) {
            for (Request request : startRequests) {
                scheduler.push(request, this);
            }
            startRequests.clear();
        }
        startTime = new Date();
    }
    processRequest()是执行爬虫生命周期的函数,其参数为Request对象,封装了请求的相关信息,如URL,请求方式等。第二行调用download()方法去下载我们要抓去的界面,其内部实现是基于HttpClinet实现的,抓取返回一个Page对象,封装了抓取到的界面的信息。第12行调用了PageProcessor中的process()方法,此方法用于解析抓取的界面,并添加新的抓取请求,通常我们需要自己实现PageProcessor,在process()方法中写我们自己的解析代码。第14-18行,遍历所有的Pipeline,并执行process()方法,来对解析出的数据信息处理,可以打印也可以保存到数据库中。这部分的代码执行与否可以通过ResultItems类中的skip来控制,如果skip设置为true,那么则会执行Pipeline,如果skip设置为false,则不执行Pipeline。
    protected void processRequest(Request request) {
        Page page = downloader.download(request, this);
        if (page == null) {
            throw new RuntimeException("unaccpetable response status");
        }
        // for cycle retry
        if (page.isNeedCycleRetry()) {
            extractAndAddRequests(page, true);
            sleep(site.getRetrySleepTime());
            return;
        }
        pageProcessor.process(page);
        extractAndAddRequests(page, spawnUrl);
        if (!page.getResultItems().isSkip()) {
            for (Pipeline pipeline : pipelines) {
                pipeline.process(page.getResultItems(), this);
            }
        }
        //for proxy status management
        request.putExtra(Request.STATUS_CODE, page.getStatusCode());
        sleep(site.getSleepTime());
    }
    上图中第13行所调用的函数实现如下,我们在PageProcessor的process函数中对界面进行解析,可以将解析得到的新的URL添加到Page的targetRequests列表中。而下面代码的3-5行,则是遍历targetRequests列表,将所有的请求加入到Scheduler的请求队列当中,以此来让Spider继续进行爬取。

    protected void extractAndAddRequests(Page page, boolean spawnUrl) {
        if (spawnUrl && CollectionUtils.isNotEmpty(page.getTargetRequests())) {
            for (Request request : page.getTargetRequests()) {
                addRequest(request);
            }
        }
    }

    private void addRequest(Request request) {
        if (site.getDomain() == null && request != null && request.getUrl() != null) {
            site.setDomain(UrlUtils.getDomain(request.getUrl()));
        }
        scheduler.push(request, this);
    }

    Site对象保存的是爬虫的一些配置信息,如请求头、Cookie、代理信息、字符编码、可以接收的服务器状态吗等。第3-6行,判断task如果不为空,那么就得到site。Task是一个接口,里面有两个方法:一个是getUUID(),另一个是getSite(),而我们的Spider就是实现了Task接口的类。第11-13行,当site不为null时,分别获得site中设置的信息,有可接收的状态码、字符编码、请求头,这些信息是我们在创建Site对象时,可以自己设置进去的。第14行,如果site为null,那么就创建一个包含状态码200的Set集合,表示默认只接收服务器返回的带有200状态码的响应。

    第23-28行,与获取代理有关。如果Site中有设置代理IP池,并且启用,那么则从代理IP池中获取一个代理Proxy对象,如果没有设置代理IP池,但Site中设置了HttpHost对象,则直接从Site中获取。第30行,调用getHttpUriRequest()来的一个Http请求对象,这个函数的具体实现我们之后会详细说明。第31行,通过getHttpClient()方法得到用于执行HTTP请求的HttpClient对象,并调用execute()执行请求,得到Response对象。第32-41行,获取服务器响应的状态码,判断这个状态码是否是当前Spider所支持的状态码,如果支持,则使用handleResponse()方法处理响应的界面,得到了封装响应界面的Page对象,如果不支持,则会打出警告日志。在finally中,会将代理IP归还给IP池,并关闭响应流,即EntityUtils.consume()。

    @Override
    public Page download(Request request, Task task) {
        Site site = null;
        if (task != null) {
            site = task.getSite();
        }
        Set<Integer> acceptStatCode;
        String charset = null;
        Map<String, String> headers = null;
        if (site != null) {
            acceptStatCode = site.getAcceptStatCode();
            charset = site.getCharset();
            headers = site.getHeaders();
        } else {
            acceptStatCode = Sets.newHashSet(200);
        }
        logger.info("downloading page {}", request.getUrl());
        CloseableHttpResponse httpResponse = null;
        int statusCode=0;
        try {
            HttpHost proxyHost = null;
            Proxy proxy = null; //TODO
            if (site.getHttpProxyPool() != null && site.getHttpProxyPool().isEnable()) {
                proxy = site.getHttpProxyFromPool();
                proxyHost = proxy.getHttpHost();
            } else if(site.getHttpProxy()!= null){
                proxyHost = site.getHttpProxy();
            }
            
            HttpUriRequest httpUriRequest = getHttpUriRequest(request, site, headers, proxyHost);
            httpResponse = getHttpClient(site, proxy).execute(httpUriRequest);
            statusCode = httpResponse.getStatusLine().getStatusCode();
            request.putExtra(Request.STATUS_CODE, statusCode);
            if (statusAccept(acceptStatCode, statusCode)) {
                Page page = handleResponse(request, charset, httpResponse, task);
                onSuccess(request);
                return page;
            } else {
                logger.warn("code error " + statusCode + "\t" + request.getUrl());
                return null;
            }
        } catch (IOException e) {
            logger.warn("download page " + request.getUrl() + " error", e);
            if (site.getCycleRetryTimes() > 0) {
                return addToCycleRetry(request, site);
            }
            onError(request);
            return null;
        } finally {
        	request.putExtra(Request.STATUS_CODE, statusCode);
            if (site.getHttpProxyPool()!=null && site.getHttpProxyPool().isEnable()) {
                site.returnHttpProxyToPool((HttpHost) request.getExtra(Request.PROXY), (Integer) request
                        .getExtra(Request.STATUS_CODE));
            }
            try {
                if (httpResponse != null) {
                    //ensure the connection is released back to pool
                    EntityUtils.consume(httpResponse.getEntity());
                }
            } catch (IOException e) {
                logger.warn("close response fail", e);
            }
        }
    }

    我们来看看是如何获取到HttpUriRequest对象的。getHttpUriRequest()函数有四个参数,第一个是Request对象,封装了请求的信息;第二个是Site对象,封装了Spider的配置信息;第三个是Map集合,保存了请求头;第四个是HttpHost对象,保存了代理的信息。第1行,调用selectRequestMethod() 来进行请求方式的选择,返回一个RequestBuilder对象,该对象是用于产生HttpUriRequest的。第22行,从request中获得请求的方式,然后进行判断,根据不同的请求方式构建出不同的RequestBuilder对象。这里要说的一个是,如果请求为POST,则需要获取表单参数,POST请求的表单参数是以key-value的形式封装在NameValuePair数组中的。需要特别注意的是,当我们使用Request对象的post请求时,如果要传递表单参数,则要将封装了表单参数NameValuePair数组,以key为nameValuePair的形式,加入到Request对象里的Map集合中。

    第3-7行,当请求头不为null时,则为RequestBuilder添加请求头。第8-12行,创建配置请求信息的对象,设置了连接超时时间、socket超时时间等信息。第13-16行,对代理信息进行设置。

    protected HttpUriRequest getHttpUriRequest(Request request, Site site, Map<String, String> headers,HttpHost proxy) {
        RequestBuilder requestBuilder = selectRequestMethod(request).setUri(request.getUrl());
        if (headers != null) {
            for (Map.Entry<String, String> headerEntry : headers.entrySet()) {
                requestBuilder.addHeader(headerEntry.getKey(), headerEntry.getValue());
            }
        }
        RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
                .setConnectionRequestTimeout(site.getTimeOut())
                .setSocketTimeout(site.getTimeOut())
                .setConnectTimeout(site.getTimeOut())
                .setCookieSpec(CookieSpecs.BEST_MATCH);
        if (proxy !=null) {
		requestConfigBuilder.setProxy(proxy);
		request.putExtra(Request.PROXY, proxy);
	}
        requestBuilder.setConfig(requestConfigBuilder.build());
        return requestBuilder.build();
    }

    protected RequestBuilder selectRequestMethod(Request request) {
        String method = request.getMethod();
        if (method == null || method.equalsIgnoreCase(HttpConstant.Method.GET)) {
            //default get
            return RequestBuilder.get();
        } else if (method.equalsIgnoreCase(HttpConstant.Method.POST)) {
            RequestBuilder requestBuilder = RequestBuilder.post();
            NameValuePair[] nameValuePair = (NameValuePair[]) request.getExtra("nameValuePair");
            if (nameValuePair != null && nameValuePair.length > 0) {
                requestBuilder.addParameters(nameValuePair);
            }
            return requestBuilder;
        } else if (method.equalsIgnoreCase(HttpConstant.Method.HEAD)) {
            return RequestBuilder.head();
        } else if (method.equalsIgnoreCase(HttpConstant.Method.PUT)) {
            return RequestBuilder.put();
        } else if (method.equalsIgnoreCase(HttpConstant.Method.DELETE)) {
            return RequestBuilder.delete();
        } else if (method.equalsIgnoreCase(HttpConstant.Method.TRACE)) {
            return RequestBuilder.trace();
        }
        throw new IllegalArgumentException("Illegal HTTP Method " + method);
    }

    下面我们来看看如何产生HttpClient对象的,如第11行,是通过HttpClientGenerator对象中的getClient()方法来获取HttpClient对象。其内部实现就是使用HttpClientBuilder来生产HttpClient,需要设置代理IP的信息,以及用户名和密码。

    private CloseableHttpClient getHttpClient(Site site, Proxy proxy) {
        if (site == null) {
            return httpClientGenerator.getClient(null, proxy);
        }
        String domain = site.getDomain();
        CloseableHttpClient httpClient = httpClients.get(domain);
        if (httpClient == null) {
            synchronized (this) {
                httpClient = httpClients.get(domain);
                if (httpClient == null) {
                    httpClient = httpClientGenerator.getClient(site, proxy);
                    httpClients.put(domain, httpClient);
                }
            }
        }
        return httpClient;
    }

    得到了服务器的响应,就需要解析这些响应,handleResponse() 方法就实现了这个getContent()方法,用于获得服务器响应的HTML代码。这是设计到一个字符编码的问题,如果有在site中设置了charset,就使用这个charset,将服务器响应的byte字节数组转成String类型返回。否则,则需要调用getHtmlCharset() 根据响应内容获取字符编码,有一下几种方式:第一,解析响应头中Content-Type属性,Content-Type中设置的编码;第二,解析HTML中meta标签中的字符编码,例如

 <meta http-equiv="Content-Type" content="text ml; charset=UTF-8" /> ,该函数返回的是解析出的字符编码。
    第3行,会创建Page对象,将界面的内容封装到Page对象中,并返回。

    protected Page handleResponse(Request request, String charset, HttpResponse httpResponse, Task task) throws IOException {
        String content = getContent(charset, httpResponse);
        Page page = new Page();
        page.setRawText(content);
        page.setUrl(new PlainText(request.getUrl()));
        page.setRequest(request);
        page.setStatusCode(httpResponse.getStatusLine().getStatusCode());
        return page;
    }

    protected String getContent(String charset, HttpResponse httpResponse) throws IOException {
        if (charset == null) {
            byte[] contentBytes = IOUtils.toByteArray(httpResponse.getEntity().getContent());
            String htmlCharset = getHtmlCharset(httpResponse, contentBytes);
            if (htmlCharset != null) {
                return new String(contentBytes, htmlCharset);
            } else {
                logger.warn("Charset autodetect failed, use {} as charset. Please specify charset in Site.setCharset()", Charset.defaultCharset());
                return new String(contentBytes);
            }
        } else {
            return IOUtils.toString(httpResponse.getEntity().getContent(), charset);
        }
    }
    自此,一个简单example的爬虫运行流程就走完了,有些细节的地方,还需要自己慢慢去体会。

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐