HOME/Articles/

Java多线程爬虫及ElasticSearch的初步使用

Article Outline

本文主要记录了一个使用多线程爬取数据之后使用ES进行分析的一个练手项目的开发体验

<!--more-->

项目开发的一般原则

  • 使用GitHub + 主干/分支模型进行开发
  • 禁止直接push master分支
  • 所有的改动通过开分支,然后pull request进行合并(这样可以执行自动化代码检查和测试)
  • 提交内容不多余,且尽量做到没有本地依赖,使得其他使用者clone下来之后可以无障碍运行

初始化新项目的常用方法

  • mvn archetype:generate 可以使用maven提供的项目骨架去新搭建一个项目
  • 也可以通过IDEA的new project去创建

其实更多情况下是通过cp已有项目来新建项目的

pom文件配置

值得注意的是,pom.xml下一般要进行阿里云maven镜像的配置:

// pom.xml
    <!--阿里云镜像-->
    <repositories>
        <repository>
            <id>alimaven</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
    </repositories>

完整的pom.xml这里可以看到

初步开发项目的源码及测试代码

本次爬取的是新浪的新闻网站https://sina.cnhttps://www.sina.com.cn,我们使用apache.httpcomponents进行爬取

项目代码目录为:src/main/java/com/github/DeeJay0921

测试代码的目录为:src/test/java/com/github/DeeJay0921

先初步的测试一下是否可以爬取到数据:

package com.github.DeeJay0921;

import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.DefaultProxyRoutePlanner;
import org.apache.http.util.EntityUtils;

import java.io.IOException;

public class Main {
    public static void main(String[] args) throws IOException {
        HttpHost proxy = new HttpHost("10.30.6.49", 9090);
        DefaultProxyRoutePlanner routePlanner = new DefaultProxyRoutePlanner(proxy);
        CloseableHttpClient httpclient = HttpClients.custom()
                .setRoutePlanner(routePlanner)
                .build();
        // 上述方式创建的 httpclient 是因为需要通过内网代理才能访问外网 仅供笔者自身使用
        // CloseableHttpClient httpclient = HttpClients.createDefault(); 如无需要 直接使用本句创建的 httpclient 即可
        HttpGet httpGet = new HttpGet("http://sina.cn");
        try (CloseableHttpResponse response1 = httpclient.execute(httpGet)) {
            System.out.println(response1.getStatusLine());
            HttpEntity entity1 = response1.getEntity();
//            EntityUtils.consume(entity1);
            System.out.println("EntityUtils.toString(entity1) = " + EntityUtils.toString(entity1));
        }
    }
}

我们编写一个SmokeTest来进行测试刚才编写的代码是否可以通过冒烟测试:

package com.github.DeeJay0921;

import org.junit.jupiter.api.Test;

public class SmokeTest {
    @Test
    public void test() {
        System.out.println("This is smoke Test");
    }
}

<details> <summary> 解决使用公司内网,通过Java程序访问外网被forbidden的问题: </summary> 因为笔者所在公司为内网环境,只能依靠配置代理访问外部网络,但是在使用httpClient进行访问https://sina.cn时,被拦截了

首先我通过httpClient的文档,搜索到了代理配置

HttpHost proxy = new HttpHost("someproxy", 8080);
DefaultProxyRoutePlanner routePlanner = new DefaultProxyRoutePlanner(proxy);
CloseableHttpClient httpclient = HttpClients.custom()
        .setRoutePlanner(routePlanner)
        .build();

而不是通过传统的CloseableHttpClient httpclient = HttpClients.createDefault();去创建httpclient实例

这样访问之后,开始报一个unable to find valid certification path to requested target的错误,一看就是证书出现的问题

先通过chrome访问https://sina.cn然后点击证书,复制到文件,导出.cer文件到本地

再打开本地jdk的文件夹,比如C:\Program Files\Java\jdk-10.0.1\lib\security下,通过终端运行:

keytool -import -file ./sinaCer.cer -keystore cacerts -alias server

其中./sinaCer.cer就是刚才导出的证书,默认密码为:changeit,输入密码后输入y进行确定即可。

至此,使用Java程序也能通过代理从内网访问外网进行爬取数据了。

</details>

配置circleci配置

circleCI文档

在项目的根目录创建一个名为.circleci的文件夹,并新建config.yml文件, 在CI的控制台去Add Projects,选择好项目和OS以及语言之后, 就可以直接Start Building去执行第一次构建了。

关于config.yml

CI的配置由三部分组成

  • version
  • jobs 要执行的job清单,集合中的键为job的名称,值是具体执行job的内容,如果你使用工作流(workflows),则job的名称在配置文件中必须唯一,如果你不使用 工作流(workflows),则必须包含名称为buildjob来作为用户提交代码时的默认job
  • workflow

比如说:

version: 2
jobs:
  test:
    docker:
      - image: circleci/openjdk:8u212-jdk-stretch
    steps:
      - checkout
      - restore_cache:
          key: DeeJay0921-{{ checksum "pom.xml" }}
      - run:
          name: Run Maven tests
          command: mvn clean test
      - save_cache: # saves the project dependencies
          paths:
            - ~/.m2
          key: DeeJay0921-{{ checksum "pom.xml" }}
workflows:
  version: 2
  default:
    jobs:
      - test

上述配置中jobs下只有一个名为testjob,这个名为testjob下有一些常用属性:

  • docker: 指定CI当前job 使用docker, 其值image是指 docker 所使用的镜像,比如说本例我们制定了JDK8, 必要时你可以同时指定多个镜像,比如你的项目需要依赖mysql 或者 redis。 第一个列出的容器为主容器,steps `都会在主容器中进行。
  • steps: 当前job要运行的 命令 (command) 列表

进行开发

到这里就要开始真正的开发了,记得先创建好自己的分支,不能在master下直接开发

确定算法

我们要做到从一个节点出发,遍历所有的节点

深度优先是优先访问层次更深的节点,而广度优先是优先访问完同一层次的所有节点,再去访问下一层次的节点

这里采用广度优先的思想

大概思路是维护一个链接池,每次从这个连接池里面去拿一个链接开始处理

首先做判断是否已经处理过该链接,如果没处理过那么访问链接得到页面,如果是非新闻页,那么直接跳过

如果是新闻页,将得到的信息存储在数据库,将新的页面得到的链接加入链接池,如此循环

开发出一个初步的算法

maven 生命周期 与 插件配置

详细的周期为:maven lifecycle

一般我们使用的就是Default Lifecycle,从validtatedeploy

当我们运行mvn xxx时,就会从这个生命周期开始,一直依次往下执行直到我们输入的指令,比如mvn test会一直从validate执行到test

并且,在默认情况下,这些生命周期未经指定的话,是什么逻辑都不执行的,所以需要绑定逻辑到生命周期上,这个就需要maven的插件

maven内置了一些插件,比如maven-comoile-plugin,将逻辑绑定到了compile周期上,这些插件也不会显示的声明在pom.xml里面,而这个绑定的逻辑,被称为一个goal

在test周期也有内置插件为maven-surefire-plugin,可以在pom里面显式的指定想要用那个版本

关于非内置插件,举个checkStyle的例子:

<plugin>
    <artifactId>maven-checkstyle-plugin</artifactId>
    <version>3.0.0</version>
    <configuration>
        <!--checkstyle文件配置路径-->
        <configLocation>${basedir}/.circleci/checkstyle.xml</configLocation>
        <includeTestSourceDirectory>true</includeTestSourceDirectory>
        <enableRulesSummary>false</enableRulesSummary>
    </configuration>
    <executions>
        <execution>
            <id>compile</id>
            <phase>compile</phase>
            <goals>
                <goal>check</goal>
            </goals>
        </execution>
    </executions>
    <dependencies>
        <dependency>
            <groupId>com.puppycrawl.tools</groupId>
            <artifactId>checkstyle</artifactId>
            <version>8.22</version>
        </dependency>
    </dependencies>
</plugin>

这是一个插件的配置,其中executions指定了一个名为goalgoal,且将其绑定到了compile周期。 如果一个maven生命周期绑定了多个goal的时候,谁先声明的就先执行谁

spotBugs插件引入

官方文档

使用maven将其引入,我们额外还得加个配置,使其在verify周期再执行:

<!--spotbugs-->
<plugin>
    <groupId>com.github.spotbugs</groupId>
    <artifactId>spotbugs-maven-plugin</artifactId>
    <version>3.1.12.2</version>
    <executions>
        <execution>
            <id>spotbugs</id>
            <phase>verify</phase>
            <goals>
                <goal>check</goal>
            </goals>
        </execution>
    </executions>
    <dependencies>
        <!-- overwrite dependency on spotbugs if you want to specify the version of spotbugs -->
        <dependency>
            <groupId>com.github.spotbugs</groupId>
            <artifactId>spotbugs</artifactId>
            <version>4.0.0-beta4</version>
        </dependency>
    </dependencies>
</plugin>

写一段会出bug的代码测试一下:

Integer i = null;
if (i == 1) {
    System.out.println("Test");
}

执行mvn verify, 会看到spotbugs会有提示Null pointer dereference of i in com.github.DeeJay0921.Main.main(String[])

引入H2数据库实现数据存储和断点续传

要在项目里引入数据库,将未处理以及已处理的连接池转为数据库存储,同时将爬取到的新闻信息也存储到数据库里。

如上分析,我们需要新建3张表:

  1. LINKS_TO_BE_PROCESSED
  2. LINKS_ALREADY_PROCESSED
  3. NEWS

1和2的表结构都很简单,直接存链接即可,针对存储新闻的表NEWS,暂定字段为id, Title, Content, URL, created_at, updated_at。

引入数据库之后,将之前的操作都改为存储到数据库中,代码如下

注意到这里我们仍然使用了一个内存中的List去缓存数据库中的连接池,可以进一步优化掉

使用flyway自动化管理数据库

随着版本迭代,数据库的结构也在不断的进行变更,比如字段的增删等,如果需要将这些变更维护起来,就需要flyway

我们可以使用flyway去使得我们的数据库的新建和初始化完全自动化

按照官方文档的约定,我们新建一个/main/resources/db/migration路径,

在下面按照其约定的命名规则,创建2个sql,一个名为V1__Create_tables.sql,作为新建表的sql,一个名为V2__Init_data.sql作为初始化数据的sql。代码在这里

然后运行mvn flyway:migrate即可自动化创建及初始化数据库的数据

抽离数据库操作方式为DAO

ORM(Object Relation Mapping)对象关系映射

因为每次都要手写JDBC真的很烦,我们可以通过ORM,将每张表映射到一个对象上

在引入MyBatis之前,先将目前的代码重构一遍,将数据库操作剥离出来,方便后面改写,这是重构之后的代码

重构之后的好处是,爬虫的逻辑和数据库的逻辑完全剥离开,可以写一个接口将数据库的操作多态化

之后如果访问数据库的方式要发生改变(比如说切换数据库,使用MyBatis等),可以不涉及修改爬虫的逻辑,新的访问逻辑只需要实现数据库对应的接口即可。

通过抽取公共逻辑为一个接口,这是重构的代码

改进数据库操作方式,抛弃JDBC,使用 MyBatis

官方文档

导入maven依赖后,我们首先还是在/src/main/resources下创建一个mybatis-config.xml,具体的路径为/src/main/resources/db/mybatis/mybatis-config.xml

<environments default="development">
    <environment id="development">
        <transactionManager type="JDBC"/>
        <dataSource type="POOLED">
            <property name="driver" value="org.h2.Driver"/>
            <property name="url" value="jdbc:h2:file:./news"/>
            <!--<property name="username" value="${username}"/>-->
            <!--<property name="password" value="${password}"/>-->
        </dataSource>
    </environment>
</environments>
<mappers>
    <!--映射关系文件-->
    <mapper resource="db/mybatis/myMapper.xml"/>
</mappers>

指定了映射文件的路径之后,就可以在Mapper.xml里面写SQL了,在外部直接使用session调用即可。

注意,在SqlSession session = sqlSessionFactory.openSession(true);的操作中,所有对数据库造成更新的操作,都应该将autoCommit这个参数置为true

这里是引入Mybatis之后的代码

将爬虫改为多线程的

经过上面的开发,爬虫基本可用,但是由于网络IO太慢了,需要将其改成多线程让CPU得到更多的利用。

我们直接将Crawler改为Thread的子类,通过外部调度即可。

这里要分析一下之前的代码:

    @Override
    public String getNextLinkThenDelete() throws SQLException {
        String link;
        // 这里的openSession 的参数autoCommit一定要为true,否则每次的删除就没有被提交到数据库
        try (SqlSession session = sqlSessionFactory.openSession(true)) {
            link = session.selectOne(
                    "com.github.DeeJay0921.mybatis.selectNextLink"); // 这边输入Mapper.xml里面的命名空间加Select语句的id
            if (link != null) {
                session.delete("com.github.DeeJay0921.mybatis.deleteLink", link);
            }
        }
        return link;
    }

在上述方法中,取一个link出来再删掉的操作很明显不是一个原子操作,在多线程情况下,会出现多次重复删除等操作,所以需要设置锁,简单点设为synchronized方法即可。

PS: 数据库操作天生就是线程安全的

改为多线程的代码

当数据规模大了之后

当数据库存储数据达到一定规模之后,会出现一些性能问题

先来写一些代码用来模拟生成一些百万级别的数据:

将代码修改为这样

在上例中,我们在news类里新增了createdAtupdatedAt, 在mybatis中,默认是不会将其转为snake_case形式的,所以需要配置mapUnderscoreToCamelCase这个属性为true 另外,在mybatis的配置中,settings要放到最前面,否则会报错,完整的配置列表在这

索引优化

create index

现在我们拥有了很大的数据库,要执行一些查询的时候,如果使用的是主键查询,即id等的时候,查询还是很快的,例如:

select * from NEWS where id=123

查询仍然很快

但是我们做一点小改动,我们将NEWS表的created_atupdated_at列后面的时分秒去掉, 即将2020-01-10 16:18:41.162189这种数据改为2020-01-10

update NEWS set created_at = date(created_at), updated_at = date(updated_at)

如果当前数据库不支持date()方法的话,可以执行:

update NEWS
set CREATED_AT = TO_CHAR(CREATED_AT, 'yyyy-MM-dd'),
    UPDATED_AT = TO_CHAR(UPDATED_AT, 'yyyy-MM-dd')

date()是sql内置函数, 可以将timestamp转为date, 也可以使用to_char

然后通过created_at作为索引执行查询操作,例如:

select * from NEWS where created_at = '2019-08-29'

这时候查询的动作就很慢了,

那么对于这种非主键的查询,我们可以给目标列建立一个索引,查看官方文档,在本例中,我们可以执行:

CREATE INDEX CREATED_AT_INDEX
ON NEWS (CREATED_AT)

其他参数都可以选择默认,然后执行一次,给表中的CREATED_AT都加上索引,等待都加好之后,可以执行:

show index from NEWS

可以看到建立的所有索引

再次执行:

select * from NEWS where created_at = '2019-08-29'

就可以看到查询速度显著提升了。

explain语句分析sql

另外可以使用explain来解释当前语句将会以怎样的方式被执行,比如当我们运行:

explain select *
        from NEWS
        WHERE CREATED_AT = '1970-01-01'

就可以分析该sql,找到可以优化的点

关于explain可以看这篇

联合索引

还可以为多个列增加联合索引,语法还是一样的:

CREATE INDEX CREATED_AT_AND_UPDATED_AT
ON NEWS (CREATED_AT, UPDATED_AT)
-- 一般的原则是,尽量修改原有的索引,然后再考虑新加索引

上述语句为CREATED_ATUPDATED_AT添加了联合索引CREATED_AT + UPDATED_AT,我们这次执行查询:

SELECT * FROM NEWS WHERE CREATED_AT = '1970-01-06' AND UPDATED_AT > '1970-01-04'

sql查询遵循最左匹配原则,上述语句中CREATED_AT = '1970-01-06' AND UPDATED_AT > '1970-01-04'

左边的CREATED_AT = '1970-01-06这个相等语句,可以匹配到联合索引CREATED_AT + UPDATED_AT的左边的CREATED_AT,所以查询会比较快,执行explain上述语句时,发现该语句typeref

但是当我们将sql写为:

SELECT * FROM NEWS WHERE CREATED_AT > '1970-01-05' AND UPDATED_AT = '1970-01-05'

时,左边语句没有匹配到索引,速度会明显降低。当我们执行explain上述语句时,也可以发现该语句typeALL

所以我们在实际开发中,要根据具体的业务需要去建立索引,且索引也不是越多越好

Elatsicsearch原理及初步使用

为什么需要Elatsicsearch

上面的例子都是针对索引优化的,但是如果想对于新闻的一些文本内容进行检索的话,sql是没有很好的支持的。

如果使用sql,我们只能写出像下面一样的sql:

SELECT * FROM NEWS WHERE CONTENT LIKE '%关键字%'

其检索速度非常的慢,因为数据库的长处在于对于非文本的一些数据索引检索。

而对于一些搜索引擎来讲,经常面临千亿万亿级别的文本搜索,这时候就需要Elasticseach

Elatsicsearch 的原理

Elatsicsearch采用了倒排索引,想象一个场景,给你一个'月'字,让你想出在你脑海中记忆着的所有包含'月'字的古诗词。

传统的数据库检索方式只能将脑海中的所有诗词都遍历一遍,然后判断是否contains('月'), 但是倒排索引采用了一种方法

拿《静夜思》来举例子,倒排索引指的是,将"床前明月光"这里的每个字都建立一个索引,指向《静夜思》这个标题,即 :

'床' --> 《静夜思》,
'前' --> 《静夜思》,
'明' --> 《静夜思》,
'月' --> 《静夜思》,
'光' --> 《静夜思》
// ...

这样建立了索引之后,如果有新的诗词比如《山居秋暝》,诗中有一句"明月松间照", 那么索引可以继续增加,

// ...
'月' --> [《静夜思》,《山居秋暝》]
// ...

那么这样一来,我们要根据'月'检索出所有脑海中的古诗词,就很快了,这就是Elasticsearch的原理倒排索引

使用Elasticsearch

关于Elasticsearch的一些基本使用,可以参见Elasticsearch: 权威指南

安装还是推荐docker安装,启动之后可以访问http://localhost:9200/?pretty即可看到ES的返回值了

传统的关系型数据库的结构: Databases --> Tables --> Rows --> Columns ES的结构:ES Cluster --> Indices --> Types(将被废弃) --> Documents --> Fields

首先还是需要向Elasticsearch中灌入数据,在文档中搜索Java Client来看相关操作,发现有Java High Level REST Client可以使用

采用的版本为7.3.1,文档在这里

最终找到了迁移指南

[初步MockData的代码(]https://github.com/DeeJay0921/multithread-crawler-demo/commit/817e20cb892b083c4ee89a39c51ccae5bae5a68c)

对于这种IO密集型的插入操作,可以考虑多开几个线程插入数据,效率会快一点,且同时可以使用ES的Bulk批量请求操作

插入好数据之后,可以按照基本的文档进行操作,Elasticsearch Getting Started

比方说直接访问http://localhost:9200/news/_search?pretty,发现就算数据量很庞大,ES也能很快的返回

可以按照关键字来进行搜索,例如:http://localhost:9200/news/_search?q=title:%E4%B9%A0%E8%BF%91%E5%B9%B3

还有一些条件搜索等,具体可以查看文档。

然后我们可以借助Elasticsearch实现一个简易的搜素引擎来搜索我们爬取到的数据, 搜索API可以见这里

简易的搜索引擎实现