CrazyAirhead

疯狂的傻瓜,傻瓜也疯狂——傻方能执著,疯狂才专注!

0%

If you want to test the HBase Indexer with a higher throughput than the manual puts explained in the Tutorial, but don’t want to bother setting up your own schema & application, you will find the demo ingester tools useful.

如果你想测试HBase Indexer在高吞吐面而不是像教程所描述的那样手工输入,也不想设置一个自己的脚本和应用,你可以用demo提取工具。

For more information on them, please see demo/README.md in the binary distribution.

更为详细的信息,可以查看发布包中的demo/README.md

The most basic indexer configuration only requires a table name and a single field. However, there are many configuration settings that can be used in an indexer configuration file to customize behavior.

最基础索引器配置只需要一个表名和一个字段名就可以。但是,有很多配置可以用于配置可以自定义解析器的行为。

1
2
3
<indexer table="mytable">
<field name="fieldname" value="columnfamily:qualifier" type="string"/>
</indexer>

全局索引器属性(Global indexer attributes)

The following is a list of attributes that can be set on the top-level element in an indexer configuration.

以下列出的属性可以在顶层<indexer>节点中配置。

table

The table attribute specifies the name of the HBase table to be indexed by the indexer. It is the only mandatory attribute in the indexer element.

table属性用于指定HBase需要索引器索引的表名。这个indexer节点中强制要求的属性。

mapping-type

The mapping-type attribute has two possible values: row, or column. This attribute specifies whether row-based or column-based indexing is to be performed.

mapping-type属性有两种值:row和column。这个属性用于指定是基于行还是基于列的索引方式。

Row-based indexing treats all data within a single HBase row as input for a single document in Solr. This is the kind of indexing that would be used for an HBase table that contains a separate entity in each row, e.g. a table containing users.

基于行索引方式把Hbase一行当中的所有数据当成Solr的一个文档输入。这种方式用于索引一行就表示一个独立的实体的Hbase表,比如包含用户的表。

Column-based indexing treats each HBase cell as input for a single document in Solr. This approach could be used for example in a messaging platform where a single user’s messages are all stored in a single row, with each message being stored in a separate cell.

基于列索引方式把Hbase的一个列当作Solr的一个文档输入。这种处理可以用于消息平台,一个用户的所有消息存在同一行中,而每条消息存在一个单独的列上。

The default mapping-type value is row.

默认的mapping-type是row。

read-row

The read-row attribute has two possible values: dynamic, or never.

read-row属性有两种值:dyamic和never。

This attribute is only important when using row-based indexing. It specifies whether or not the indexer should re-read data from HBase in order to perform indexing.

这个属性中对基于行索引的方式才是重要的。它指定索引器为了进行索引是否需要从Hbase重新读取数据。

When set to “dynamic”, the indexer will read the necessary data from a row if a partial update to the row is performed in HBase. In dynamic mode, the row will not be re-read if all data needed to perform indexing is included in the row update.

当被设置为“dynamic”时,如果HBase是一行数据的部分数据更新时,索引器会读取这一行中需要的数据。dynamic模式下,如果是一行的全部数据的更新,索引器不会重新读取Hbase的数据。

If this attribute is set to never, a row will never be re-read by the indexer.

如果这个属性被设置为never,索引器不会重读行数据。

The default setting is “dynamic”.

默认的设置是“dynamic”。

mapper

The mapper attribute allows the user to specify a custom mapper class that will create a Solr document from a HBase Result object. The mapper class must implement the com.ngdata.hbaseindexer.parse.ResultToSolrMapper interface.

mapper属性允许用户指定一个自定义的类通过Hbase结果对象来创建Solr文档。但这个mapper类必需实现com.ngdata.hbaseindexer.parse.ResultToSolrMapper接口。

By default, the built-in com.ngdata.hbaseindexer.parse.DefaultResultToSolrMapper is used.

默认情况,使用的是内署的com.ngdata.hbaseindexer.parse.DefaultResultToSolrMapper

unique-key-formatter

The unique-key-formatter attribute specifies the name of the class used to format HBase row keys (as well as column families and column qualifiers) as text. A textual representation of these pieces of information is needed for indexing in Solr, as all data in Solr is textual, but row keys, column families, and column qualifiers are byte arrays.

unique-key-formatter指定了用于格式化Hbase rowkeys(列簇和列限定符也一样)为文本的类名。在Solr中索引需要这些信息的文本表示,因为Solr中的所有数据都是文本数据,但是行键、列族和列限定符是字节数组。

A unique-key-formatter class must implement the com.ngdata.hbaseindexer.uniquekey.UniqueKeyFormatter interface.

一个unique-key-formatter类必须实现com.ngdata.hbaseindexer.uniquekey.UniqueKeyFormatter接口。

The default value of this attribute is com.ngdata.hbaseindexer.uniquekey.StringUniqueKeyFormatter. The StringUniqueKey formatter simply treats row keys and other byte arrays as strings.

这个属性的默认值是com.ngdata.hbaseindexer.uniquekey.StringUniqueKeyFormatter。StringUniqueKeyFormatter只是简单的把rowkey或者其他字节数据当成字符串。

If your row keys, column families, or qualifiers can’t simply be used as strings, consider using the com.ngdata.hbaseindexer.uniquekey.HexUniqueKeyFormatter.

如果你的rowkey,列簇,或者限定符不能简单的用字符串表示,就你需要考虑使用com.ngdata.hbaseindexer.uniquekey.HexUniqueKeyFormatter

unique-key-field

This attribute specifies the name of the document identifier field used in Solr.

这个属性指定Solr中文档的标识字段。

The default value for this field is “id”.

默认值是“id”。

row-field

The row-field attribute specifies the name of the Solr field to be used for storing an HBase row key.

row-field属性指定了Solr中用于存储Hbase rowkey的安段名。

This field is only important when doing column-based indexing. In order for the indexer to be able to delete all documents for a single row from the index, it needs to be able to find all documents for the row in Solr. When this attribute is populated in the indexer definition, it’s value is used as the name of a field in Solr to store the encoded row key.

这个字段对于基于列索引方式才很重要。索引器为了能够从索引中删除一行的所有文档,它一个字段能在Solr中找到该行的所有文档。在索引器定义中设置这个属性时,它的值将用作Solr中字段的名称,以存储编码的行键。

By default, this attribute is empty, meaning that the row key is not stored in Solr. The consequence of this is that deleting a complete row or complete column family in HBase will not delete the indexed documents in Solr.

默认情况,这个属性为空,这意味着rowkey不会被存储在Solr里面。这样的结果就是在Hbase中删除一个完整的行或者列并不删除Solr中的已经索引的文档。

column-family-field

The column-family-field specifies the name of the Solr field to be used for storing the HBase column family name.

column-family-field指定Solr中用于存储HBase列簇名的字段名。

See the description of the row-field attribute for more information.

查看row-field属性查看更多的信息。

By default, this attribute is empty, so the column-family name is not saved in Solr.

默认情况,这个属性为空,也就是说,列簇不会被Solr存储。

table-name-field

The table-name-field specifies the name of the Solr field to be used for storing the name of the HBase table where a record is stored.

table-name-field指定Solr中用于存储Hbase表名的字段名。

By default, this attribute is empty, so the name of the HBase table is not stored unless this setting is explicitly set in the indexer config.

默认情况,这个属性是空,也就是说Hbase的表名不会被存储除非在indexer节点中明确指定。

indxer内定义的节点(Elements within the indexer definition)

There are three types of elements that can be used within an indexer configuration: , , and .

在indexer节点有三种类型的节点:,

The field element defines a single field to be indexed in Solr, as well as where its contents are to be taken from and interpreted from HBase. There are typically one or more fields listed in an indexer configuration – one for each Solr field to be stored.

field节点定义了要在Solr中索引的单个字段,以及要从Hbase获取和解析的内容。索引器配置中通常列出一个或多个字段——每个Solr字段都要存储一个字段。

The field attribute has four attributes, listed below.

field有如下的四个属性。

name

The name attribute specifies the name of a Solr field in which to store data. A field with a matching name should be defined in the Solr schema.

name属性指定了Solr的中存储数据的字段名。这个字段需要在Solr的Schema中有匹配的字段名。

The name attribute is mandatory.

name属性是必须的。

value

The value attribute specifies the data to be used from HBase for populating the field in Solr. It takes the form of a column family name and qualifier, separated by a colon.

value属性指定了Hase用于填充Solr字段的数据。它采用列族名和限定符的形式,用冒号分隔。

The qualifier portion can end in an asterisk, which is interpreted as a wildcard. In this case, all matching column-family and qualifier expressions will be used.

限定符部分可以星号结尾,该星号被解释为通配符。在这种情况下,将使用所有匹配的列族表达式和限定符表达式。

The following are examples of valid value attributes:

以下是有效值属性的示例:

  • mycolumnfamily:myqualifier
  • mycolumnfamily:my*
  • mycolumnfamily:*

source

The source attribute determines what portion of an HBase KeyValue will be used as indexing content.

Source属性确定HBASE KeyValue的哪个部分将用作索引内容。

It has two possible values: value and qualifier.、

它有两个可能的值:value和qualifier。

When value is specified (which is the case by default), then the cell value is used as input for indexing.

当指定值时(默认情况下是这种情况),则使用单元格值作为索引的输入。

When qualifier is specified, then the column qualifier is used as input for indexing.

当指定限定符时,列限定符将用作索引的输入。

type

The type attribute defines the datatype of the content in HBase.

type属性定义HBASE中内容的数据类型。

Because all data is stored in HBase as byte arrays, but all content in Solr is indexed as text, a method for converting from byte arrays to the actual datatype is needed.

因为所有数据都以字节数组的形式存储在HBASE中,但是Solr中的所有内容都被索引为文本,所以需要一个方法将字节数组转换为实际的数据类型。

The value of this field can be one of any of the datatypes supported by the HBase Bytes class: int, long, string, boolean, float, double, short, or bigdecimal.

该字段的值可以是HBASE Bytes类支持的任何数据类型之一:int、long、string、boole、Float、Double、Short或Big十进制。

If the Bytes-based representation has not been used for storing data in HBase, the name of a custom class can be specified for this attribute. The custom class must implement the com.ngdata.hbaseindexer.parse.ByteArrayValueMapper interface.

如果没有使用基于Bytes的表示形式在HBASE中存储数据,则可以为该属性指定自定义类的名称。自定义类必须实现com.ngdata.hbaseindexer.parse.ByteArrayValueMapper接口。

The element defines a key-value pair that will be supplied to custom classes that implement the com.ngdata.hbaseindexer.Configurable interface.

节点定义了一个键值对,它将提供给实现com.ngdata.hbase indexer.Configable接口的自定义类。 elements can also be nested in a element. 元素也可以嵌套在元素中。

The element has two attributes: name and value. Both are mandatory.

访节点有两个属性:name和value。两者都是必须的。

配置样例(Example configuration)

The example configuration below demonstrates all elements and attributes that can be used to configure an indexer.

下面的示例配置演示了可用于配置索引器的所有元素和属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<!--
Do row-based indexing on table "table1", never re-reading updated content.
Store the unique document id in Solr field called "custom-id".
Additionally store the row key in a Solr field called "custom-row", and store the
column family in a Solr field called "custom-family".

Perform conversion of byte array keys using the class "com.mycompany.MyKeyFormatter".
-->
<indexer
table="table1"
mapping-type="row"
read-row="never"
unique-key-field="custom-id"
row-field="custom-row"
column-family-field="custom-family"
table-name-field="custom-table"
unique-key-formatter="com.mycompany.MyKeyFormatter"
>

<!-- A float-based field taken from any qualifier in the column family "colfam" -->
<field name="field1" value="colfam:*" source="qualifier" type="float"/>

<param name="globalKeyA" value="globalValueA"/>
<param name="globalKeyB" value="globalValueB"/>

</indexer>

参考

morphlines
morphlines-architecture

This page explains how to start doing basic indexing in HBase. Before following this tutorial, make sure that the HBase Indexer and other required software is installed and running as explained in the installation instructions.

这篇解释如何在Hbase中开始基础索引。在开始教程之前,请确保Hbase Indexer和其他需要的软件已经安装和运行。

At this point, HBase and Solr (in cloud mode) should be running, and the HBase Indexer should be unpacked in a directory (which we’ll call $INDEXER_HOME). For this tutorial, it is assumed that the default example index schema is being used by Solr (as explained on the installation page).

此时,Hbasa和Solr(云模式)已经行行,并且Hbase Indexer已经解压到一个目录中(这是我们称为$INDEXER_HOME)。在这个教程中,我们假设用的是Solr的默认example索引(就像安装中说的那样)。

启动Hbase Indexer(Start the HBase Indexer daemon)

In a terminal, execute the following (assuming $INDEXER_HOME points to the directory where the hbase-indexer tar.gz distribution was unpacked).

在终端中,运行如下命令(假定$INDEXER_HOME指向hbase-indexer tar.gz解压的目录)。

1
2
cd $INDEXER_HOME
./bin/hbase-indexer server

创建需要索引的表(Create a table to be indexed in HBase)

In the HBase shell, create a table. For this tutorial, we’ll create a table named “indexdemo-user”, with a single column family named “info”. Note that the REPLICATION_SCOPE of the column family of the table must be set to 1.

在Hbase shell,创建一个表。在这里,我们创建一个“indexdemo-user”的表,有一个“info”的列簇。注意列簇的REPLICATION_SCOPE必须设置成1。

1
2
$ hbase shell
hbase> create 'indexdemo-user', { NAME => 'info', REPLICATION_SCOPE => '1' }

添加索引器(Add an indexer)

Now we’ll create an indexer that will index the the indexdemo-user table as its contents are updated.

现在我们创建一个索引器,在indexdemo-user表的内容被更新是他会进行索引。

In your favourite text editor, create a new xml file called indexdemo-indexer.xml, with the following content:

用你喜欢的编辑器,创建一个新的xml文件,命名为indexdemo-indexer.xml,添加如下内容:

1
2
3
4
5
6
<?xml version="1.0"?>
<indexer table="indexdemo-user">
<field name="firstname_s" value="info:firstname"/>
<field name="lastname_s" value="info:lastname"/>
<field name="age_i" value="info:age" type="int"/>
</indexer>

The above file defines three pieces of information that will be used for indexing, how to interpret them, and how they will be stored in Solr.

上面的文件定义了三个索引字段的信息,如何解析,如何在Solr中存储。

Next, create an indexer based on the created indexer xml file.

接下来,基于xml文件创建一个索引器。

1
2
./bin/hbase-indexer add-indexer -n myindexer -c indexdemo-indexer.xml \
-cp solr.zk=localhost:2181/solr -cp solr.collection=collection1

Note that the above command assumes that ZooKeeper is running on localhost on port 2181, and that there is a Solr Core called “collection1” configured. If you are doing this tutorial on an existing HBase/Solr environment, you may need to use different settings.

注意上面的命令假设,ZooKeeper运行要本机的2181端口上,并且Solr Core已经创建了一个名为“collection1”的索引。如果你在一个现有的Hbase/Solr环境运行本教程,你需要修改配置。

更新表内容(Update the table content)

In the HBase shell, try adding some data to the indexdemo-user table
在Hbase shell中,尝试给indexdemo-user添加一些数据。

1
2
hbase> put 'indexdemo-user', 'row1', 'info:firstname', 'John'
hbase> put 'indexdemo-user', 'row1', 'info:lastname', 'Smith'

After adding this data, take a look in Solr (i.e. http://localhost:8983/solr/#/collection1/query). You should see a single document in Solr that has the firstname_s field set to “John”, and the lastname_s field set to “Smith”.

添加数据之后,来看一下Solr(i.e. http://localhost:8983/solr/#/collection1/quer),你应该能看到一个文档,这个文档的firstname_s字段值为“John”,同时lastname_s字段值为“Smith”。

Note If you don’t have autoCommit enabled in Solr, you won’t be able to see the updated contents immediately in Solr. The demo environment has autoCommit enabled for a commit every second.

注意如果没有开启autoCommit,你不会马上在Solr看到更新的内容。这例子的环境autoCommit已经开启并设置为每秒自动提交1次。

Now try updating the data you’ve just added

现在尝试更新你刚才添加的数据

1
hbase> put 'indexdemo-user', 'row1', 'info:firstname', 'Jim'

And now check the content in Solr. The document’s firstname_s field now contains the string “Jim”.

然后在Solr有检查内容。看下文档的firstname_s是不是变成了“Jim”

Finally, delete the row from HBase.

最后,删除Hbase的这条记录。

1
hbase> deleteall 'indexdemo-user', 'row1'

You can now verify that the data has been deleted from Solr.

你可以验证下Solr中的数据是不是已经删除了。

This page explains how to do a basic installation of the HBase Indexer on a single machine.

这篇会讲解如何在单机中进行Hbase Indexer的基础安装。

Before you start, make sure that you have the required software installed (they can all be running on single machine).

在开始之前,请确认你已经安装了所有需要的软件(他们可以都可以在单机运行)。

获取Hbase Indexer(Get the HBase Indexer)

Check out the code and build the tar.gz distribution.

签出代码并生成tar.gz的发布包。

1
2
git clone git://github.com/NGDATA/hbase-indexer.git
mvn clean package -Pdist -DskipTests

Next, unpackage the tar.gz distribution (in the example below it is unpacked under your $HOME directory).

接下来,解压tar.gz发布包(这个例子中,他被解压到$HOME目录)。

1
2
tar zxvf hbase-indexer-dist/target/hbase-indexer-1.0-SNAPSHOT.tar.gz -C ~
cd ~/hbase-indexer-1.0-SNAPSHOT

配置Hbase Indexer(Configure HBase Indexer)

In the hbase-indexer directory, edit the file conf/hbase-indexer-site.xml and configure the ZooKeeper connection string (twice, once for hbase-indexer, and once for hbase, alternatively you can copy your hbase-site.xmlto the conf directory).

hbase-indexer目录,编辑conf/hbase-indexer-site.xml 文件并配置ZooKeeper的连接串(两次,一次用于hbase-indexer,一次用于Hbase,或者你可以拷贝你的hbase-site.xml到你的conf目录)。

1
2
3
4
5
6
7
8
<property>
<name>hbaseindexer.zookeeper.connectstring</name>
<value>zookeeperhost</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>zookeeperhost</value>
</property>

If you have not defined JAVA_HOME globally, and the bin/hbase-indexer script would complain it doesn’t find you Java, you can set the JAVA_HOME in the script conf/hbase-indexer-env.sh.

如果你没有定义全局的JAVA_HOMEbin/hbase-indexer脚本会报没有找到Java,你可以通过运行conf/hbase-indexer-env.sh来设置JAVA_HOME

配置Hbase(Configure HBase)

In order to use the HBase Indexer, replication must be enabled in HBase. There are also a number of other HBase settings that can be set to optimize the working of the HBase indexer.

为了运行Hbase Indexer,复制集必须开启。同时还有一些HBase的设置需要配置以便优代Hbase Indexer的运行。

Add the settings below to your hbase-site.xml configuration on all HBase region servers, and restart HBase.
所有的分区服务上的hbase-site.xml添加如下配置,然后重启Hbase。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<configuration>
<!-- SEP is basically replication, so enable it -->
<property>
<name>hbase.replication</name>
<value>true</value>
</property>
<!-- Source ratio of 100% makes sure that each SEP consumer is actually
used (otherwise, some can sit idle, especially with small clusters) -->
<property>
<name>replication.source.ratio</name>
<value>1.0</value>
</property>
<!-- Maximum number of hlog entries to replicate in one go. If this is
large, and a consumer takes a while to process the events, the
HBase rpc call will time out. -->
<property>
<name>replication.source.nb.capacity</name>
<value>1000</value>
</property>
<!-- A custom replication source that fixes a few things and adds
some functionality (doesn't interfere with normal replication
usage). -->
<property>
<name>replication.replicationsource.implementation</name>
<value>com.ngdata.sep.impl.SepReplicationSource</value>
</property>
</configuration>

添加Indexer jar到HBase(Add indexer jars to HBase)

The HBase Indexer includes two jar files that need to be in the classpath of HBase. Copy these from the lib directory of the unpacked hbase-indexer installation into the lib directory of HBase for each region server.

Hbase Indexer包含两个jar文件需要在Hbase的类目录下。将解压目录hbase-indexer下的lib中的这些文件拷贝到所有Hbase分区服务器的lib目录。

1
cp lib/hbase-sep-* $HBASE_HOME/lib

启动Solr(Start Solr)

Ensure that Solr is running. In general, it’s easiest to have Solr use the same ZooKeeper instance as HBase.

确保Solr已经运行。通常,很容易让Hbase和Solr共用相同的ZooKeeper。

Assuming that you’ve downloaded Solr 4.2.0 and you’re running ZooKeeper on the current machine, you can start up the base Solr in cloud mode using the example schema as follows:

假设你已经下载了Solr 4.2.0并且ZooKeeper运行在当前机器,你可能通过下列脚本在云模式运行Solr。

1
2
cd $SOLR_HOME/example
java -Dbootstrap_confdir=./solr/collection1/conf -Dcollection.configName=myconf -DzkHost=localhost:2181/solr -jar start.jar

构建(Building the binary distribution)

Use the following command to build the binary distribution (.tar.gz & rpm):

使用如下命令构建生成二进制发布包(.tar.gz & rpm):

1
mvn -DskipTests -Pdist install

测试(Testing)

These steps assume a running HBase 0.94.x installation (preferably CDH 4.2), as well as a running Solr installation. For this example, the configured Solr schema will need to have a multi-valued field called “data”, as well as a unique key field called “id”.

以下这此步骤假定已经安装了Hbase 0.94.x(最好是CDH 4.2)和Solr。在这个例子中,Solr的映射,需要配置多值字段“data”和唯一键字段“id”。

  1. Enable replication and other settings that are outlined in the hbase-sep demo instructions

  2. hbase-sep demo instructions的配置复制集和其他设置。

  3. Unzip the binary distribution (instructions for creating the binary distribution are listed above).

  4. 解压发布包(生成发布包的方式已经在上文列出)。

    1
    $ tar zxvf hbase_indexer.tar.gz
  5. Copy the hbase-sep jar files from the lib directory of the binary distribution into the lib directory of HBase.

  6. 将发布包下的lib目录的hbase-sep jar文件拷贝到Hbase的lib目录。

    1
    2
    $ cd hbase_indexer
    $ sudo cp lib/hbase-sep-* /usr/lib/hbase/lib
  7. Create a table in HBase that has replication enabled. For this example, we’ll create a table called “record” with a single column family called ‘data’.

  8. 创建一个开启复制集功能的表。这个例子中,我们会创建“record”表,有一个列簇“data”。

    1
    hbase> create 'record', {NAME => 'data', REPLICATION_SCOPE => 1}
  9. Start the hbase-indexer server

  10. 启动hbase-indexer服务

    1
    $ ./bin/hbase-indexer server
  11. Create an indexer definition. For this example, we’ll just index anything in the data column family into the “data” field in Solr. Save the below contents in a file called ‘sample.xml’.

  12. 创建索引器定义。在这个例子中,我们只是把data列簇中所有内容都放到Solr的“data”字段中进行索引。将以下内容保存为”sample.xml”。

    1
    2
    3
    4
    <?xml version="1.0"?>
    <indexer table="record">
    <field name="data" value="data:*" type="string"/>
    </indexer>
  13. Add the indexer definition to the indexer server. The following command assumes that the Solr ZooKeeper is running on the current host, and the name of the collection to be used for indexing is “core0”.

  14. 把索引定义添加到inderxer服务中。下面的命令假设Solr ZooKeeper运行在当前的主机中,并且用于索引的集合名为“core0”。

    1
    $ ./bin/hbase-indexer add-indexer  -n sampleindex -c sample.xml --cp solr.collection=core0 
  15. Add some data to the record table in HBase. The data added to the data column family in the record table should show up in the Solr index.

  16. 向Hbase的recode表添加一些数据。添加到record表中的数据应该会出现在Solr的索引中。

    1
    hbase> put 'record', 'row1', 'data:value', 'Test of HBase Indexer'

The following software is required for running the HBase Indexer:

运行Hbase Indexer需要下面的这些软件:

  • HBase 0.94.x
  • Solr 4.x in cloud mode
  • ZooKeeper 3.x (required by the two above packages)

All components can be run on a single machine, or they can be run on multiple machines on a cluster.

所有的组件可以运行在单机,也可运行在多台机器的集群上。

详情(Details)

HBase

HBase 0.94.x is the supported version of HBase for the HBase Indexer. It is recommended to use the version of HBase 0.94.2 that is bundled with Cloudera CDH 4.2. However, other versions of HBase 0.94.x may also work. CDH 4.2 is currently used for testing.

Hbase 0.94.x是Hbase Indexer支持的版本。推荐使用Cloudera CDH 4.2附带的HBase 0.94.2版本。尽管,其他的0.94.x版本也能正常工作,但CDH 4.2被用于当前的测试。

HBase should be configured to use HDFS as its filesystem – HBase Indexer is not fully functional if the local filesystem implementation is used instead of HDFS.

Hbase要配置HDFS做为它的文件系统,如果使用本地文件系统而不是HDFS,HBase Indexer就不行完全运行了。

Solr

Solr 4.x is required for the HBase Indexer, and it must be configured to run in cloud mode. Solr 4.2.0 is currently used for development testing.

Hbase Indexer 需要Solr 4.x版本,并且要求配置在云模式下运行。Solr 4.2.0是当并开发测试的版本。

ZooKeeper

It is recommended that the version of ZooKeeper that is bundled in CDH 4.2 is used.

推荐使用和CDH4.2附带的ZooKeeper版本。

简介(Introduction)

The HBase Indexer project provides indexing (via Solr) for content stored in HBase. It provides a flexible and extensible way of defining indexing rules, and is designed to scale.

Hbase Indexer项目可以为存储在HBase中的内容提供Solr索引。它提供灵活的可扩展的方式来定义索引规则,且为大规模而生。

Indexing is performed asynchronously, so it does not impact write throughput on HBase. SolrCloud is used for storing the actual index in order to ensure scalability of the indexing.

索引是异步的,所以不会影响Hbase的写入吞吐量。SolrCloud用于存储实际的索引,以确保索引的可伸缩性。

开始使用Hbase Indexer(Getting started with the HBase Indexer)

  1. Make sure you’ve got the required software installed, as detailed on the Requirements page.

  2. Follow the Tutorial to get a feel for how to use the HBase Indexer.

  3. Customize your indexing setup as needed using the other reference documentation provided here.

  4. 确认所有要求的软件已经安装,具体内容在[要求]页。

  5. 按照教程来体验下如何使用Hbase Indexer。

  6. 通过提供的文档来调整索引的配置。

工作原理(How it works)

The HBase Indexer works by acting as an HBase replication sink. As updates are written to HBase region servers, they are “replicated” asynchronously to the HBase Indexer processes.

Hbase Indxer扮演了Hbase复制集的角色。当数据写入Hbase的分区时,数据被异步的“复制”给Hbase Indexer处理器。

The indexer analyzes incoming HBase mutation events, and where applicable it creates Solr documents and pushes them to SolrCloud servers.

Hbase Indexer分析从Hbase发送过来的变化事件,当合适的时候创建Solr文档并发送给SolrCloud服务器。

The indexed documents in Solr contain enough information to uniquely identify the HBase row that they are based on, allowing you to use Solr to search for content that is stored in HBase.

Solr文档维护了足够多的信息用于唯一标识一条Hbase记录,这样允许你可能通过Solr来检索Hbase内容。

HBase replication is based on reading the HBase log files, which are the precise source of truth of the what is stored in HBase: there are no missing or no extra events. In various cases, the log also contains all the information needed to index, so that no expensive random-read on HBase is necessary (see the read-rowattribute in the Indexer Configuration).

HBase复制集是通过读取Hbase的日志文件,这些日志文件是HBASE中存储内容的精确来源:没有遗漏也没有额外的事件。在大多数情况下,日志含了索引需要的全部信息,因此不需要对Hbase进行昂贵的随机读取。(在[Indexer配置]中可查看read-row属性)。

HBase replication delivers (small) batches of events. HBase-indexer exploits this by avoiding double-indexing of the same row if it would have been updated twice in a short time frame, and as well will batch/buffer the updates towards Solr, which gives important performance gains. The updates are applied to Solr before confirming the processing of the events to HBase, so that no event loss is possible.

Hbase复制集(小规模)批量分发事件。Hbase indexer利用这一特性用于避免在一个很小的时间窗口期一条记录被更新两次而引起重复索引,也通过批量/缓存的方式来更新Solr,这样可以获得更好的性能。所有的更新会在Hbase处理确认之前更新到Solr中,因此不会出现丢失数据的情况。

横向扩展(Horizontal scalability)

All information about indexers is stored in ZooKeeper. New indexer hosts can always be added to a cluster, in the same way that HBase regionservers can be added to to an HBase cluster.

所有的索引信息都是保存在ZooKeeper中的。新的Indexer主机可以被添加为一个集群中,就像Hbase的分区服务器被添加到Hbase集群一样。

All indexing work for a single configured indexer is shared over all machines in the cluster. In this way, adding additional indexer nodes allows horizontal scaling.

同一索引配置的所有索引工作被分配给集群中的所有机器。这样说来,添加额外的索引节点就可以横向扩展。

自动的错误处理(Automatic failure handling)

The HBase replication system upon which the HBase Indexer is based is designed to handle hardware failures. Because the HBase Indexer is based on this system, it also benefits from the same ability to handle failures.

Hbase复制集系统为硬件错误处理做了设计。Hbase Indexer其于此系统,因此也从中获得了错误处理的能力。

In general, indexing nodes going down or Solr nodes going down will not result in any lost data in the HBase Indexer.

通常情况,索引节点或者Solr宕机不会导致Hbase Indexer中的数据丢失。

背景

公司使用Hbase indexer做二级索引,最近在做数据统计时发现,数据存在有缺失的情况。在网上查找时发现可能是Hbase Indexer的一个BUG,详情看这里。大意是说修改read-row=”never”或者修改源码。我们使用的组件不是完全开源的Hbase Indexer,有被提供商做了部分调整,为了保险起见,自己还是做了一次测试。同时也是进一步了解Hbase和Habse Indexer的机会。

验证方案

  1. 把ES的qc索引做为数据来源,数据量比较大
  2. 写测试程序从ES拉数据,100万条。
  3. 调整不同的read-row方式
  4. 修改程序验证部分更新。

    验证准备

    创建Hbase表

    1
    create 'qc',{NAME =>'d', REPLICATION_SCOPE =>1}
    此处需要,注意设置REPLICATION_SCOPE为1,第一次验证时未开启。如果未开启的情况,可以进行如下操作:
    1
    2
    3
    disable 'qc'
    alter 'qc',{NAME =>'d', REPLICATION_SCOPE =>1}
    enable 'qc'

    配置Hbase Indexer

    之前已经有配置,拷贝一份及可
    1
    2
    3
    cd  /opt/morphline_config
    cp -a xyz.xml qc.xml
    cp -a xyz.conf qc.conf

修改qc.xml

1
2
3
4
5
<indexer table="qc" unique-key-field="rowkey"
unique-key-formatter="com.ngdata.hbaseindexer.uniquekey.StringUniqueKeyFormatter"
mapper="com.ngdata.hbaseindexer.morphline.MorphlineResultToSolrMapper" read-row="never">
<param name="morphlineFile" value="/opt/morphline_config/qc.conf" />
</indexer>

修改qc.conf

可以把文件下载回来修改

1
sz qc.conf

通过’rz’可以上传修改后的配置文件。

调整后的qc.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
morphlines : [
{
id : morphline1
importCommands : ["org.kitesdk.morphline.**", "com.ngdata.**"]

commands : [
{
extractHBaseCells {
mappings : [
{
inputColumn : "d:_id"
outputField : "_id"
type : string
isAllowEmpty : true
source : value
},
{
inputColumn : "d:code"
outputField : "code"
type : string
isAllowEmpty : true
source : value
},
{
inputColumn : "d:errorCode"
outputField : "errorCode"
type : string
isAllowEmpty : true
source : value
},
{
inputColumn : "d:errorMsg"
outputField : "errorMsg"
type : string
isAllowEmpty : true
source : value
},
{
inputColumn : "d:eventNo"
outputField : "eventNo"
type : string
isAllowEmpty : true
source : value
},
{
inputColumn : "d:eventTime"
outputField : "eventTime"
type : string
isAllowEmpty : true
source : value
},
{
inputColumn : "d:orgCode"
outputField : "orgCode"
type : string
isAllowEmpty : true
source : value
},
{
inputColumn : "d:patientId"
outputField : "patientId"
type : string
isAllowEmpty : true
source : value
},
{
inputColumn : "d:receiveTime"
outputField : "receiveTime"
type : string
isAllowEmpty : true
source : value
},
{
inputColumn : "d:rowKey"
outputField : "rowKey"
type : string
isAllowEmpty : true
source : value
},
{
inputColumn : "d:table"
outputField : "table"
type : string
isAllowEmpty : true
source : value
},
{
inputColumn : "d:value"
outputField : "value"
type : string
isAllowEmpty : true
source : value
},
{
inputColumn : "d:version"
outputField : "version"
type : string
isAllowEmpty : true
source : value
}
]
}
}

{ logTrace { format : "output record: {}", args : ["@{}"] } }
]
}
]

添加映射

1
hbase-indexer add-indexer -c qc.xml -n qc -z node1,node3,node2  -cp solr.zk=node1:2181,node3:2181,node2:2181/solr -cp solr.collection=qc

检查配置是否生效

1
hbase-indexer list-indexers -dump

删除映射

如果配置没有生效的情况,最好先删掉映射后重新添加。

1
hbase-indexer delete-indexer --name 'qc'

重新拉取Hbase数据

1
nohup hadoop jar /opt/hbase-indexer/latest/tools/hbase-indexer-mr-1.6-ngdata-job.jar  --conf /etc/hbase/conf/hbase-site.xml -D 'mapred.child.java.opts=-Xmx500m' --hbase-indexer-file /opt/morphline_config/qc.xml --zk-host node1/solr --collection qc  --reduce 0 &

配置Solr

修改配置

之前有配置的,可直接拷贝一份开始配置

1
2
cd /root
cp -a xyz qc

修改后的scheme.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<?xmlversion="1.0"encoding="UTF-8"standalone="no"?>
<schemaname="qc"version="1.5">
<field indexed="true" name="_version_" stored="true" type="long"/>
<field indexed="true"name="_root_"stored="false" type="string"/>
<field indexed="true"multiValued="false"name="_id"stored="true" type="string"/>
<field indexed="true" multiValued="false" name="rowkey" stored="true" type="string"/>
<field indexed="true" name="code" stored="false" type="string"/>
<field indexed="true" name="errorCode" stored="true" type="string"/>
<field indexed="true" name="errorMsg" stored="true" type="string"/>
<field indexed="true" name="eventNo" stored="true" type="string"/>
<field indexed="true" name="eventTime" stored="true" type="string"/>
<field indexed="true" name="orgCode" stored="true" type="string"/>
<field indexed="true" name="patientId" stored="true" type="string"/>
<field indexed="true" name="receiveTime" stored="false" type="string"/>
<field indexed="true" name="rowKey" stored="true" type="string"/>
<field indexed="true" name="table" stored="true" type="string"/>
<field indexed="true" name="value" stored="true" type="string"/>
<field indexed="true" name="version" stored="true" type="string"/>
<uniqueKey>rowkey</uniqueKey>
</schema>

此处注意配置rowkey字段,之前一就因为rowkey没有导致创建索引失败

上传配置

1
/opt/solr/latest/server/scripts/cloud-scripts/zkcli.sh -zkhost node1:2181/solr -cmd upconfig --confdir /root/qc/conf/ --confname qc

创建索引

1
/opt/solr/latest/bin/solr create_collection -c qc -d /root/qc/conf/ -n qc

修改ES配置

拉取测试数据时,提示只有10000的窗口数据,需要设计max_result_window,修改方法如下:

1
2
3
4
curl -XPOST 'http://xx:9200/qc/_close'
curl -XPUT 'http://xx:9200/qc/_settings?preserve_existing=true' -d '{"max_result_window" : "1000000"}'
curl -XGET 'http://xx:9200/qc/_settings?preserve_existing=true'
curl -XPOST 'http://xx:9200/qc/_open'

测试

read-row为never

  1. 测试数据为100万条,全量数据更新。
    数据测试,导入数据时出现服务连接问题,中间出现Hbase Indexer异常停止,重启后,数据能对上。
  2. 测试数据为100万条,部分数据更新。
    数据字段出现丢失情况,
  3. 验证配置字段顺序问题。

    清理Hbase数据

    1
    truncate 'qc'

    清理Solr数据

    1
    2
    hdfs dfs -rm -r /solr/qc
    hdfs dfs -ls /solr
    验证的情况,与配置文件的顺序无关

验证重跑MapRedurce

删除solr中qc的记录

1
2
<delete><query>*:*</query></delete>
<commit/>

重跑

1
nohup hadoop jar /opt/hbase-indexer/latest/tools/hbase-indexer-mr-1.6-ngdata-job.jar  --conf /etc/hbase/conf/hbase-site.xml -D 'mapred.child.java.opts=-Xmx500m' --hbase-indexer-file /opt/morphline_config/qc.xml --zk-host node1/solr --collection qc  --reduce 0 &

测试时发现可以正常拉取数据,正式线发现不能拉取到之前遗漏的数据。

解决方案

  1. 修改read-row为never
  2. 重新检查写入Hbase相关代码,确保数据是整条记录更新(即需要合并旧数据的方式进行更新)

注意事项,采用read-row为never时,只会从WAL中获取数据去更新Solr,也就是说如果,数据只更新部分,Solr也只会有最后更新的那部分数据。
3. 通过写程序将缺失数据提取出来重新更新Hbase,该工作已让郑维协助处理,保持跟进。

参考链接

Lily HBase Indexer同步HBase二级索引到Solr丢失数据的问题分析

使用Notepad++有挺长一段时间了。因为打开速度快,经常会用它来编绎有一些小文本,或者文件的格式化。在一次升级后发现Json Viwer插件不见了。
当时也没太在意,换了一台电脑用了。最近因为查问题需要格式化的JSON也多了起来,就想查查到底是怎么回事,顺便记录下处理方式。

网上一搜还有不少碰到这个问题的人notepad++64位添加Plugin Manager。大意是64位已经不提供plugin manager了。可以通过https://github.com/bruderstein/nppPluginManager/releases下载Plugin Manager。下载之后,用覆盖的方式粘贴plugins和updater文件夹,重新启动就可以。

启动之后提示,32位Notepad++不能运行64位插件。索性重新下载了个新版本的Notepad++64位,plugin manager已经改名为Plugin Admin了。下载我自己需要的插件,一切正常。

第六章 - 并发(Chapter 6 - Concurrency)

Go is often described as a concurrent-friendly language. The reason for this is that it provides a simple syntax over two powerful mechanisms: goroutines and channels.

Go常被描述为是一种适用于并发的语言。是因为它在两个强大的机制提供了简法的语法支持:go协程通道

Go协程(Goroutines)

A goroutine is similar to a thread, but it is scheduled by Go, not the OS. Code that runs in a goroutine can run concurrently with other code. Let’s look at an example:

一个Go协程和一个线程类似,只不这它是由Go,而不是系统来调度的。在协程中的代码可以和其他代码并发执行。让我们看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main

import (
"fmt"
"time"
)

func main() {
fmt.Println("start")
go process()
time.Sleep(time.Millisecond * 10) // this is bad, don't do this! 这样不好,不能这么做!
fmt.Println("done")
}

func process() {
fmt.Println("processing")
}

There are a few interesting things going on here, but the most important is how we start a goroutine. We simply use the go keyword followed by the function we want to execute. If we just want to run a bit of code, such as the above, we can use an anonymous function. Do note that anonymous functions aren’t only used with goroutines, however.

这里有几个有趣的地方,但最重要的是我们如何开启一个Go协程。我们只是简单的使用了go关键字后紧跟我们需的执行的函数。如果我们只是要运行一小段代码,比如上面的例子,我们可以使用匿名函数。但是记住,匿名函数不只适用于Go协程。

1
2
3
go func() {
fmt.Println("processing")
}()

Goroutines are easy to create and have little overhead. Multiple goroutines will end up running on the same underlying OS thread. This is often called an M:N threading model because we have M application threads (goroutines) running on N OS threads. The result is that a goroutine has a fraction of overhead (a few KB) than OS threads. On modern hardware, it’s possible to have millions of goroutines.

Go协程创建简单和开销小。多个Go协程最终会运行在一个系统线程中。这通常称为M:N线程模型,因为我们有M个应用线程(Go协程)运行在N个系统线程上。结果就是,一个Go协程的开销比系统线程小(一般都是几KB)。在现代的硬件上,有可能创建成千上万个Go协程。

Furthermore, the complexity of mapping and scheduling is hidden. We just say this code should run concurrently and let Go worry about making it happen.

此外,因为隐藏了映射和调度的复杂性。我们只需要说这段代码需要并发执行,然后让Go自己来运行它。

If we go back to our example, you’ll notice that we had to Sleep for a few milliseconds. That’s because the main process exits before the goroutine gets a chance to execute (the process doesn’t wait until all goroutines are finished before exiting). To solve this, we need to coordinate our code.

回到我们的例子中,你将会注意到我们使用了Sleep让程序等待了几毫秒。这是让主进程在退出前有机会去执行协程(主进程退出时不会等待所有协程都执行结束)。为了解决这个问题,我们必须让代码协同。

同步(Synchronization)

Creating goroutines is trivial, and they are so cheap that we can start many; however, concurrent code needs to be coordinated. To help with this problem, Go provides channels. Before we look at channels, I think it’s important to understand a little bit about the basics of concurrent programming.

创建Go协程是容易的,而且他们的开销很小,所以我们可以开启很多Go协程;但是并发代码需要协同。为了帮助我们解决这个问题,Go提供了通道。在我们继续通道之前,我觉得有必要先了解一些并发编程的基础知识。

Writing concurrent code requires that you pay specific attention to where and how you read and write values. In some ways, it’s like programming without a garbage collector – it requires that you think about your data from a new angle, always watchful for possible danger. Consider:

在编写并发执行的代码时,你需要特别的注意在哪里和如何读写一个值。出于某些原因,例如没有垃圾回收的语言,需要你从一个新的角度去考虑你的数据,总是警惕着可能存在的危险。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
"fmt"
"time"
)

var counter = 0

func main() {
for i := 0; i < 2; i++ {
go incr()
}
time.Sleep(time.Millisecond * 10)
}

func incr() {
counter++
fmt.Println(counter)
}

What do you think the output will be?

你觉得输出的会是什么呢?

If you think the output is 1, 2 you’re both right and wrong. It’s true that if you run the above code, you’ll very likely get that output. However, the reality is that the behavior is undefined. Why? Because we potentially have multiple (two in this case) goroutines writing to the same variable, counter, at the same time. Or, just as bad, one goroutine would be reading counter while another writes to it.

如果你觉得输出是12,不能说你对或者错。如果你运行上面的代码,你很有可能得到那样的输出。但是,实际上这个输出是不确定的。为什么?因为我们可能有多个(这里是2个)Go协程同时写同一个变量counter。或者更糟的情况是一个协程正在读counter,而另一个协程正在写counter

Is that really a danger? Yes, absolutely. counter++ might seem like a simple line of code, but it actually gets broken down into multiple assembly statements – the exact nature is dependent on the platform that you’re running. It’s true that, in this example, the most likely case is things will run just fine. However, another possible outcome would be that they both see counter when its equal to 0 and you get an output of 1, 1. There are worse possibilities, such as system crashes or accessing an arbitrary piece of data and incrementing it!

这很危险吗?是的,绝对的。counter++似乎看起来只是一行简单的代码,但是实际上它被拆分为很多汇编指令————具体依赖于你运行的软件和硬件平台。是的,在上面的例子中,确实在大多数情况下运行良好。但是,其他一些平台可能的输出结果是1, 1,因为两个协程看到的counter都是0。还有更糟的情况是,比如系统崩溃或者访问到一个随机值并递增它。

The only concurrent thing you can safely do to a variable is to read from it. You can have as many readers as you want, but writes need to be synchronized. There are various ways to do this, including using some truly atomic operations that rely on special CPU instructions. However, the most common approach is to use a mutex:

在并发编程中维一安全的事情就是读一个变量。无论你想读多少次都可以,但是写变量时必须是同步的。有几种方式来实现,包括一些在特定CPU架构上真正的原子操作。但是,最常见的方式就是用互斥锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"time"
"sync"
)

var (
counter = 0
lock sync.Mutex
)

func main() {
for i := 0; i < 2; i++ {
go incr()
}
time.Sleep(time.Millisecond * 10)
}

func incr() {
lock.Lock()
defer lock.Unlock()
counter++
fmt.Println(counter)
}

A mutex serializes access to the code under lock. The reason we simply define our lock as lock sync.Mutex is because the default value of a sync.Mutex is unlocked.

互斥锁会顺序化有锁的代码的访问。因为sync.Mutex默认值是未锁状态,所以我们简单的定义了一个锁lock sync.Mutex

Seems simple enough? The example above is deceptive. There’s a whole class of serious bugs that can arise when doing concurrent programming. First of all, it isn’t always so obvious what code needs to be protected. While it might be tempting to use coarse locks (locks that cover a large amount of code), that undermines the very reason we’re doing concurrent programming in the first place. We generally want fine locks; else, we end up with a ten-lane highway that suddenly turns into a one-lane road.

看起来足够简单?上面的例子有欺骗性。在并发编程时,会碰到一系列很严重的bug。首先,那些需要被保护代码通常都不是这么明显。虽然它可能是想使用一个粗锁(涵盖了大量代码的锁),但这破坏了并发编程首要原则。我们需要适度的锁,或者说,我们最终由一个10快车道的突然转变成一个单车道。

The other problem has to do with deadlocks. With a single lock, this isn’t a problem, but if you’re using two or more locks around the same code, it’s dangerously easy to have situations where goroutineA holds lockA but needs access to lockB, while goroutineB holds lockB but needs access to lockA.

另一个问题是如何处理死锁。只有一个锁的时候,这不是问题,但是如果你在相同的代码中使用2个或者更多的锁,就很容易出现一种危险的情况,即协程A拥有锁lockA,想去访问锁lockB,同时协程B拥有lockB并需要访问锁lockA

It actually is possible to deadlock with a single lock, if we forget to release it. This isn’t as dangerous as a multi-lock deadlock (because those are really tough to spot), but just so you can see what happens, try running:

实际上使用一个锁也有可能发生死锁,如果我们忘记释放它时。但是这和多个锁引起的死锁为比起来,危害性不大(因为这真的很少出现),但只是想让你看会发生什么,试试下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main

import (
"time"
"sync"
)

var (
lock sync.Mutex
)

func main() {
go func() { lock.Lock() }()
time.Sleep(time.Millisecond * 10)
lock.Lock()
}

There’s more to concurrent programming than what we’ve seen so far. For one thing, there’s another common mutex called a read-write mutex. This exposes two locking functions: one to lock for reading and one to lock for writing. This distinction allows multiple simultaneous readers while ensuring that writing is exclusive. In Go, sync.RWMutex is such a lock. In addition to the Lock and Unlock methods of a sync.Mutex, it also exposes RLock and RUnlock methods; where R stands for Read. While read-write mutexes are commonly used, they place an additional burden on developers: we must now pay attention to not only when we’re accessing data, but also how.

接下来我们会介绍更多的并发编程。一方面,另一个常见的互斥锁叫读写互斥锁。它主要提供2中锁功能:一个读锁定和一个写锁定。在Go中,sync.RWMutex就是这种锁。另外sync.Mutex结构不但提供了LockUnlock方法,也提供了RLockRLock方法,这里的R代表。虽然读写锁很常用,但是他们也给开发者带来一些额外的负担:我们不但要关注我们何时访问数据,而且也要关注如何访问。

Furthermore, part of concurrent programming isn’t so much about serializing access across the narrowest possible piece of code; it’s also about coordinating multiple goroutines. For example, sleeping for 10 milliseconds isn’t a particularly elegant solution. What if a goroutine takes more than 10 milliseconds? What if it takes less and we’re just wasting cycles? Also, what if instead of just waiting for goroutines to finish, we want to tell one hey, I have new data for you to process!?

此外,部分并发编程不只是通过为数不多代码按顺序的访问变量,也需要协调多个go协程。例如,休眠10毫秒不是一种优雅的方法。如果一个Go协程运行的时间超过10毫秒呢?如果Go协程运行时间少于10毫秒,我们只是浪费了cpu?又或者可以等待Go协程运行完毕,我们告诉另外一个Go协程嗨,我有一些新数据给你处理?

These are all things that are doable without channels. Certainly for simpler cases, I believe you should use primitives such as sync.Mutex and sync.RWMutex, but as we’ll see in the next section, channels aim at making concurrent programming cleaner and less error-prone.

所有的这些事在不使用通道的情况下也都是可以实现的。当然,对于更简单的例子,我认为你应该使用基本的功能例如sync.Mutexsync.RWMutex,但是在下一节我们将看到,通道的目的是为了使并发编程更清晰和不易出错。

通道(Channels)

The challenge with concurrent programming stems from sharing data. If your goroutines share no data, you needn’t worry about synchronizing them. That isn’t an option for all systems, however. In fact, many systems are built with the exact opposite goal in mind: to share data across multiple requests. An in-memory cache or a database, are good examples of this. This is becoming an increasingly common reality.

并发编程的最在挑战来自共享数据。如果你的Go协程没有共享数据,你不需要担心他们之间的同步。但是这不是所有系统的选择。事实上,许多系统的构建就是为了:在多个请求中共享数据。内存缓存或者数据库,都是很好的例子。这也成为越来越普遍的事实。

Channels help make concurrent programming saner by taking shared data out of the picture. A channel is a communication pipe between goroutines which is used to pass data. In other words, a goroutine that has data can pass it to another goroutine via a channel. The result is that, at any point in time, only one goroutine has access to the data.

通过共享数据规划,通道使并发编程更清晰。一个通道是一个通信管道用于Go协程之间的数据传递。换一句话来说。一个Go协程可以通过通道来把数据传递给另一个Go协程。这样做的结果就是,无论什么时间节点,都只有一个Go协程可以访问共享数据。

A channel, like everything else, has a type. This is the type of data that we’ll be passing through our channel. For example, to create a channel which can be used to pass an integer around, we’d do:

通道和其他类型一样有类型。这个类型就是我们将在通道中传递的数据类型。例如,创建一个用来传递整数的通道,我们这样做:

1
c := make(chan int)

The type of this channel is chan int. Therefore, to pass this channel to a function, our signature looks like:

这个通道的类型是chan int。因此,将这个通道传递给一个函数是,可以这样声明:

1
func worker(c chan int) { ... }

Channels support two operations: receiving and sending. We send to a channel by doing:

通道支持2种操作:接收和发送。我们可以使用下面方式往通道发送数据:

1
CHANNEL <- DATA

and receive from one by doing

然后可以使用下面方式从通道接收数据:

1
VAR := <-CHANNEL

The arrow points in the direction that data flows. When sending, the data flows into the channel. When receiving, the data flows out of the channel.

箭头的方向就是数据的流动方向。当发送数据时,数据流入通道。当发送数据时,数据是流出通道。

The final thing to know before we look at our first example is that receiving and sending to and from a channel is blocking. That is, when we receive from a channel, execution of the goroutine won’t continue until data is available. Similarly, when we send to a channel, execution won’t continue until the data is received.

最后,在看我们的第一个例子之前,从一个通道接收或者发送数据时会阻塞。也就是说,当我们从一个通道接收数据时,直到数据可用Go协程才会继续执行。类似的,往一个通道发送数据时,在数据被接收之前Go协程也不会继续执行。

Consider a system with incoming data that we want to handle in separate goroutines. This is a common requirement. If we did our data-intensive processing on the goroutine which accepts the incoming data, we’d risk timing out clients. First, we’ll write our worker. This could be a simple function, but I’ll make it part of a structure since we haven’t seen goroutines used like this before:

假设这样的一个系统,我们想通过不同的协程来处理输入数据。这是一个常见的需求。如果通过Go协程接收输入的数据并进行数据密集型处理,那么在客户端会有超时风险。首先,我们将写出我们的处理器。这是一个简单的函数,但是我会让它变成一个结构体的部分,因为我们之前从来没有这样使用过Go协程:

1
2
3
4
5
6
7
8
9
10
type Worker struct {
id int
}

func (w Worker) process(c chan int) {
for {
data := <-c
fmt.Printf("worker %d got %d\n", w.id, data)
}
}

Our worker is simple. It waits until data is available then “processes” it. Dutifully, it does this in a loop, forever waiting for more data to process.

我们的处理器很简单。它会一直等待直到数据可用并“处理”它。它通过一个循环来实现,永久等待更多的数据来处理。

To use this, the first thing we’d do is start some workers:

为了使用上面的代码,我们首先要做的是启动一些处理器:

1
2
3
4
5
c := make(chan int)
for i := 0; i < 4; i++ {
worker := Worker{id: i}
go worker.process(c)
}

And then we can give them some work:

然后我们可以给他们一些工作:

1
2
3
4
for {
c <- rand.Int()
time.Sleep(time.Millisecond * 50)
}

Here’s the complete code to make it run:

下面是完整的可执行代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"time"
"math/rand"
)

func main() {
c := make(chan int)
for i := 0; i < 5; i++ {
worker := &Worker{id: i}
go worker.process(c)
}

for {
c <- rand.Int()
time.Sleep(time.Millisecond * 50)
}
}

type Worker struct {
id int
}

func (w *Worker) process(c chan int) {
for {
data := <-c
fmt.Printf("worker %d got %d\n", w.id, data)
}
}

We don’t know which worker is going to get what data. What we do know, what Go guarantees, is that the data we send to a channel will only be received by a single receiver.

我们不知道哪个处理器将获得数据。我们所知道的是,Go确保了往一个通道发送数据时,仅有一个单独的接收器可以接受。

Notice that the only shared state is the channel, which we can safely receive from and send to concurrently. Channels provide all of the synchronization code we need and also ensure that, at any given time, only one goroutine has access to a specific piece of data.

需要指出的是通道是唯一的共享方式,通过通道我们可以并发安全的发送和接收数据。通道提供了我们需要的所有同步代码,并且也确保在任意的特定时刻只有一个Go协程可以访问一个特定的数据。

带缓存的通道(Buffered Channels)

Given the above code, what happens if we have more data coming in than we can handle? You can simulate this by changing the worker to sleep after it has received data:

在上面的代码中,如果输入的数据超过我们可以处理的数据会发生什么?你可以模拟这种场景,在处理器收到数据后执行time.Sleep

1
2
3
4
5
for {
data := <-c
fmt.Printf("worker %d got %d\n", w.id, data)
time.Sleep(time.Millisecond * 500)
}

What’s happening is that our main code, the one that accepts the user’s incoming data (which we just simulated with a random number generator) is blocking as it sends to the channel because no receiver is available.

main函数中会发什么呢?接收用户的输入数据(这里通过一个随机的数字生成器模拟)会被阻塞,因为往通道发送数据时没有可用的接收者。

In cases where you need high guarantees that the data is being processed, you probably will want to start blocking the client. In other cases, you might be willing to loosen those guarantees. There are a few popular strategies to do this. The first is to buffer the data. If no worker is available, we want to temporarily store the data in some sort of queue. Channels have this buffering capability built-in. When we created our channel with make, we can give our channel a length:

在这种情况下,你需要确保数据被处理,你可能想要让客户端阻塞。在其他情况下,你可能愿意不确保数据被处理。这里有一些流行的策略能完成此事。首先是将数据缓存起来。如果没有处理器可用,我们想将数据暂时存放在一个有序的队列中。通道内置缓存能力。当我们使用make创建一个通道时,我们可以指定通道的长度:

1
c := make(chan int, 100)

You can make this change, but you’ll notice that the processing is still choppy. Buffered channels don’t add more capacity; they merely provide a queue for pending work and a good way to deal with a sudden spike. In our example, we’re continuously pushing more data than our workers can handle.

你可以做这样的修改,但是你将注意到处理过程仍然震荡。缓冲通道没有增加处理能力,他们只是为挂起的工作提供了一个队列和应对突发尖峰的好方法。在我们示例中,我们持续不断的发送超出我们处理器可以处理的数据。

Nevertheless, we can get a sense that the buffered channel is, in fact, buffering by looking at the channel’s len:

尽管如此,事实上,我们可以查看通道的len,来了解到带缓存的通道的缓冲情况:

1
2
3
4
5
for {
c <- rand.Int()
fmt.Println(len(c))
time.Sleep(time.Millisecond * 50)
}

You can see that it grows and grows until it fills up, at which point sending to our channel start to block again.

你可以看到它的长度在不断增大,直到装满为止,此时,往通道发送的数据又开始被阻塞。

选择(Select)

Even with buffering, there comes a point where we need to start dropping messages. We can’t use up an infinite amount of memory hoping a worker frees up. For this, we use Go’s select.

即使借助缓存,有一点需要指出的是,我们需要开始丢弃一些消息,我们不能使用一个无限大的内存,并指望人工的释放它。所以我们使用Go的select

Syntactically, select looks a bit like a switch. With it, we can provide code for when the channel isn’t available to send to. First, let’s remove our channel’s buffering so that we can clearly see how select works:

在语法结构上,select看起来有点类似switch。通过select,我们能写出一些针对通道不可写情况下的代码。首先,让我们去掉我们通道的缓存,这样可以更清晰的看到select是如何工作的。

1
c := make(chan int)

Next, we change our for loop:

接下来,我们修改for循环:

1
2
3
4
5
6
7
8
9
10
for {
select {
case c <- rand.Int():
//optional code here
default:
//this can be left empty to silently drop the data
fmt.Println("dropped")
}
time.Sleep(time.Millisecond * 50)
}

We’re pushing out 20 messages per second, but our workers can only handle 10 per second; thus, half the messages get dropped.

我们每秒往通道中发送20个信息,但是我们的处理器每秒只能处理10个信息;因此,有一半的信息被丢弃。

This is only the start of what we can accomplish with select. A main purpose of select is to manage multiple channels. Given multiple channels, select will block until the first one becomes available. If no channel is available, default is executed if one is provided. A channel is randomly picked when multiple are available.

这仅仅只是我们使用select完成一些事的开始。使用select的最主要目的是通过它管理多个通道。给定多个通道,select将阻塞直到有一个通道可用。如果没有可用的通道,当提供了default语句时,执行该分支。当多个通道都可用时,选择其中的一个通道是随机的。

It’s hard to come up with a simple example that demonstrates this behavior as it’s a fairly advanced feature. The next section might help illustrate this though.

很难想出一个简单的例子来证明这种行为,因为这是一种高级特性。在下一小节可能有助于说明这个问题。

超时(Timeout)

We’ve looked at buffering messages as well as simply dropping them. Another popular option is to timeout. We’re willing to block for some time, but not forever. This is also something easy to achieve in Go. Admittedly, the syntax might be hard to follow but it’s such a neat and useful feature that I couldn’t leave it out.

我们已经学习了缓存消息和简单丢弃消息。另外一种比较流行的做法是使用超时。我们将阻塞一段时间,但不是一直阻塞。在Go中这很容易实现。老实说,这个语法有点难于接受,但是它是比较灵活和有用的特性,我基本不能没有它。

To block for a maximum amount of time, we can use the time.After function. Let’s look at it then try to peek beyond the magic. To use this, our sender becomes:

为了达到阻塞的最大时限,我们可以使用time.After函数。让我们看看它,并试着看出其中的魔法。为了使用这种方式,我们的发送器需要修改为:

1
2
3
4
5
6
7
8
for {
select {
case c <- rand.Int():
case <-time.After(time.Millisecond * 100):
fmt.Println("timed out")
}
time.Sleep(time.Millisecond * 50)
}

time.After returns a channel, so we can select from it. The channel is written to after the specified time expires. That’s it. There’s nothing more magical than that. If you’re curious, here’s what an implementation of after could look like:

time.After返回一个通道,所以我们可以对它使用select语法。当指定的时间到期时这个通道被写入。就是如此。没有其他更多的魔法了。如果你还是好奇,这里有一个after的实现:

1
2
3
4
5
6
7
8
func after(d time.Duration) chan bool {
c := make(chan bool)
go func() {
time.Sleep(d)
c <- true
}()
return c
}

Back to our select, there are a couple of things to play with. First, what happens if you add the default case back? Can you guess? Try it. If you aren’t sure what’s going on, remember that default fires immediately if no channel is available.

回到我们的select中来,还一些内容可以研究。首先,如果添加了default条件会发生会什么呢?你可以猜猜?试试。如果你不确定会发生什么,记住如果没有可用的通道default会立即被触发。

Also, time.After is a channel of type chan time.Time. In the above example, we simply discard the value that was sent to the channel. If you want though, you can receive it:

同时,time.After的通道类型是chan time.Time。上面的例子中,我们简单的丢弃了发送给通道的值。如果你相要,你可以这样接收它:

1
2
case t := <-time.After(time.Millisecond * 100):
fmt.Println("timed out at", t)

Pay close attention to our select. Notice that we’re sending to c but receiving from time.After. select works the same regardless of whether we’re receiving from, sending to, or any combination of channels:

更近一步的看我们的select。注意我们向c发送数据,但是从time.After接收数据。select对无论是接收数据,发送数据,还是其他通道的组合,都是一样对待的:

  • The first available channel is chosen.

  • If multiple channels are available, one is randomly picked.

  • If no channel is available, the default case is executed.

  • If there’s no default, select blocks.

  • 第一个可用的通道被选择。

  • 如果有多个通道可用,随机选择一个。

  • 如果没有通道可用,默认条件被执行。

  • 如果没有默认条件,选择阻塞。

Finally, it’s common to see a select inside a for. Consider:

最后,select通常在for循环中使用。例如:

1
2
3
4
5
6
7
8
9
for {
select {
case data := <-c:
fmt.Printf("worker %d got %d\n", w.id, data)
case <-time.After(time.Millisecond * 10):
fmt.Println("Break time")
time.Sleep(time.Second)
}
}

继续之前(Before You Continue)

If you’re new to the world of concurrent programming, it might all seem rather overwhelming. It categorically demands considerably more attention and care. Go aims to make it easier.

如果你是并发编程的新手,它可能显得相当庞大。它绝对是需要相当多的重视和关注。 Go的目标就是使其更容易。

Goroutines effectively abstract what’s needed to run concurrent code. Channels help eliminate some serious bugs that can happen when data is shared by eliminating the sharing of data. This doesn’t just eliminate bugs, but it changes how one approaches concurrent programming. You start to think about concurrency with respect to message passing, rather than dangerous areas of code.

Go协程有效的抽象了需要并发执行的代码。通道协助消除了可能在数据共享时的严重Bug。这不只是消除了Bug,更是改变了并发编程的开发方式。你开始使用消息传递的方式来考虑并发,而不是危险的共享代码。

Having said that, I still make extensive use of the various synchronization primitives found in the sync and sync/atomic packages. I think it’s important to be comfortable with both. I encourage you to first focus on channels, but when you see a simple example that needs a short-lived lock, consider using a mutex or read-write mutex.

虽然这么说,我仍然广泛使用的各种同步原语中发现的syncsync/atomic包。我觉得这两种情况都要适应是很重要的。我鼓励你先聚焦在通道上,但是如果你碰到只是需要短暂的多锁,建议你使用互斥锁或者读写互斥锁。

结论(Conclusion)

I recently heard Go described as a boring language. Boring because it’s easy to learn, easy to write and, most importantly, easy to read. Perhaps, I did this reality a disservice. We did spend three chapters talking about types and how to declare variables after all.

我最近听说Go被描述为一门单调的语言。单调是因为它很容易学习,很容易编写,最为重要的是,很容易读。也许,我这是在帮倒忙,我确实花了三个章节来介绍类型和如何申请变量。

If you have a background in a statically typed language, much of what we saw was probably, at best, a refresher. That Go makes pointers visible and that slices are thin wrappers around arrays probably isn’t overwhelming to seasoned Java or C# developers.

如果你在静态类型语言的背景,大多数我们看到的,充其量只是复习。同时Go的指针可见性和切片的轻量封装对经验丰富的Java的C#开发人员来说不算什么。

If you’ve mostly been making use of dynamic languages, you might feel a little different. It is a fair bit to learn. Not least of which is the various syntax around declaration and initialization. Despite being a fan of Go, I find that for all the progress towards simplicity, there’s something less than simple about it. Still, it comes down to some basic rules (like you can only declare variable once and := does declare the variable) and fundamental understanding (like new(X) or &X{} only allocate memory, but slices, maps and channels require more initialization and thus, make).

如果你更多的是使用动态语言,你可能会觉得有点不同。这一点公平的学习。不过其中最重要的是各种声明和初始化的语法。虽然是一个Go粉,我发现所有的努力都是为了简单,还有一些致简的东西。仍然,它也有一些基本的规则(比如变量申明一次和:=已经申明了变量)和基本的了解(比如new(X)&X{}只是分配了内存,但切片,字典和通道就需要使用make来分配内存和初始化)。

Beyond this, Go gives us a simple but effective way to organize our code. Interfaces, return-based error handling, defer for resource management and a simple way to achieve composition.

除此之外,Go提供了一个简洁但又高效的方式来组织我们的代码。接口,基于返回值的错误处理,用于资源管理的defer和简单的实现组合。

Last but not least is the built-in support for concurrency. There’s little to say about goroutines other than they’re effective and simple (simple to use anyway). It’s a good abstraction. Channels are more complicated. I always think it’s important to understand basics before using high-level wrappers. I do think learning about concurrent programming without channels is useful. Still, channels are implemented in a way that, to me, doesn’t feel quite like a simple abstraction. They are almost their own fundamental building block. I say this because they change how you write and think about concurrent programming. Given how hard concurrent programming can be, that is definitely a good thing.

最后但是最重要的是它内置了对并发的支持。还有一点关于Go协程的要说就是它们高效和简单(反正使用简单)。这是很好的抽象。通道会更复杂一点。我一直认为在学习高级封装之前要掌握好基础。我确认认为不使用通道来进行并发编程是有益的。但是,通道的实现方式,对我来说,不太像是一个简单的抽象。它们有自己的基础构建。我这么说是因为它们改变了你对并发编程的思考和书写方式。鉴于并发编程的难度,这绝对是一个好事。