← 返回首页
HBase基础教程(十二)
发表时间:2023-04-23 02:32:29
HBase的协处理器

HBase的协处理器。

1.HBase的协处理器

HBase的协处理器 (HBase Coprocessor),HBase协处理器受BigTable协处理器的启发,为用户提供类库和运行时环境,使得代码能够在HBase RegionServer和Master上处理, 分为系统协处理器和表协处理器。

2.协处理器分类

3.协处理器插件

Observer 1. RegionObserver:提供客户端的数据操纵事件钩子:Get、Put、 Delete、Scan等。 2. MasterObserver:提供DDL类型的操作钩子。如创建、删除、修改 数据表等。 3. WALObserver:提供WAL相关操作钩子。

Observer应用场景

  1. 安全性:例如执行Get或Put操作前,通过preGet或prePut方法检查是否允许该操作。
  2. 引用完整性约束:HBase并不支持关系型数据库中的引用完整性约束概念,即通常所说的外键。我们可以使用协处理器增强这种约束。
  3. 二级索引:可以使用协处理器来维持一个二级索引。

  4. 终端(Endpoint):动态的终端有点类似关系数据库的存储过程。

Endpoint

  1. Endpoint是动态RPC插件的接口,它的实现代码被安装在服务器端,从而能够通过HBase RPC唤醒。
  2. 调用接口,它们的实现代码会被目标Regionserver远程执行。
  3. 典型的案例:一个大Table有几百个Region,需要计算某列的平均值或者总和。

4.协处理器实例

pom.xml添加相关依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <artifactId>hbase</artifactId>
        <groupId>com.simoniu</groupId>
        <version>0.0.1-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>
    <groupId>com.simoniu.hbase</groupId>
    <artifactId>hbase-coprocessor</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>hbase-coprocessor</name>
    <description>hbase-coprocessor</description>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.6.13</spring-boot.version>
        <hbase-common.version>1.2.4</hbase-common.version>
        <hbase-server.version>1.2.4</hbase-server.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase-common.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase-server.version}</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <!--指定打包后的文件名-->
        <finalName>myhbase-coprocessor</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot.version}</version>
                <configuration>
                    <mainClass>com.simoniu.hbase.hbasecoprocessor.HbaseCoprocessorApplication</mainClass>
                    <skip>true</skip>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

/*
alter 'CoprocessorTest' ,'coprocessor'=>'hdfs://localhost:9000/coprocessor/myhbase-coprocessor.jar|com.simoniu.hbase.hbasecoprocessor.observer.MyRegionObserver|1001'
* */

public class MyRegionObserver extends BaseRegionObserver {

    private byte[] columnFamily = Bytes.toBytes("cf");
    private byte[] countCol = Bytes.toBytes("countCol");
    private byte[] unDeleteCol = Bytes.toBytes("unDeleteCol");
    private RegionCoprocessorEnvironment environment;

    //regionserver 打开region前执行
    @Override
    public void start(CoprocessorEnvironment e) throws IOException {
        environment = (RegionCoprocessorEnvironment) e;
    }

    //RegionServer关闭region前调用
    @Override
    public void stop(CoprocessorEnvironment e) throws IOException {

    }

    /**
     * 1. cf:countCol 进行累加操作。 每次插入的时候都要与之前的值进行相加
     */

    @Override
    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
                       Durability durability) throws IOException {
        if (put.has(columnFamily, countCol)) {
            //获取old countcol value
            Result rs = e.getEnvironment().getRegion().get(new Get(put.getRow()));
            int oldNum = 0;
            for (Cell cell : rs.rawCells()) {
                if (CellUtil.matchingColumn(cell, columnFamily, countCol)) {
                    oldNum = Integer.valueOf(Bytes.toString(CellUtil.cloneValue(cell)));
                }
            }

            //获取new countcol value
            List<Cell> cells = put.get(columnFamily, countCol);
            int newNum = 0;
            for (Cell cell : cells) {
                if (CellUtil.matchingColumn(cell, columnFamily, countCol)) {
                    newNum = Integer.valueOf(Bytes.toString(CellUtil.cloneValue(cell)));
                }
            }

            //sum AND update Put实例
            put.addColumn(columnFamily, countCol, Bytes.toBytes(String.valueOf(oldNum + newNum)));
        }
    }

    /**
     * 2. 不能直接删除unDeleteCol    删除countCol的时候将unDeleteCol一同删除
     */
    @Override
    public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
                          WALEdit edit,
                          Durability durability) throws IOException {
        //判断是否操作cf列族
        List<Cell> cells = delete.getFamilyCellMap().get(columnFamily);
        if (cells == null || cells.size() == 0) {
            return;
        }

        boolean deleteFlag = false;
        for (Cell cell : cells) {
            byte[] qualifier = CellUtil.cloneQualifier(cell);
            if (Arrays.equals(qualifier, unDeleteCol)) {
                throw new IOException("can not delete unDel column");
            }
            if (Arrays.equals(qualifier, countCol)) {
                deleteFlag = true;
            }
        }

        if (deleteFlag) {
            delete.addColumn(columnFamily, unDeleteCol);
        }
    }
}

将打包生成的myhbase-coprocessor.jar,上传至linux服务器的 /root/projects/hbase/ 下。

使用hdfs 的copyFormLocal命令,把myhbase-coprocessor.jar上传至 /coprocessor/目录下。

#创建coprocessor文件夹
[root@hadoop-master bin]# ./hdfs dfs -mkdir -p /coprocessor

#使用hdfs 的copyFormLocal命令,把myhbase-coprocessor.jar上传至 /coprocessor/目录下。
[root@hadoop-master bin]# ./hdfs dfs -copyFromLocal /root/projects/hbase/myhbase-coprocessor.jar /coprocessor/

#查看是否上传成功
[root@hadoop-master bin]# ./hdfs dfs -ls /coprocessor
Found 1 items
-rw-r--r--   1 root supergroup      33531 2023-04-23 15:27 /coprocessor/myhbase-coprocessor.jar

使用hbase shell命令,创建CoprocessTest 表。

[root@hadoop-master bin]# hbase shell

hbase(main):002:0> create 'CoprocessorTest','cf'
0 row(s) in 2.4980 seconds

使用put命令,添加两条记录。

hbase(main):014:0> put 'CoprocessorTest','rowKey1','cf:countCol',10
0 row(s) in 5.0550 seconds

hbase(main):015:0> put 'CoprocessorTest','rowKey1','cf:unDeleteCol','true'
0 row(s) in 0.0370 seconds

hbase(main):014:0> put 'CoprocessorTest','rowKey2','cf:countCol',100
0 row(s) in 5.0550 seconds

hbase(main):015:0> put 'CoprocessorTest','rowKey2','cf:unDeleteCol','true'
0 row(s) in 0.0370 seconds

hbase(main):005:0> scan 'CoprocessorTest'
ROW                                          COLUMN+CELL                                                                                                                     
 rowKey1                                     column=cf:countCol, timestamp=1682238634717, value=10                                                                           
 rowKey1                                     column=cf:unDeleteCol, timestamp=1682238646247, value=true                                                                      
 rowKey2                                     column=cf:countCol, timestamp=1682237414998, value=100                                                                          
 rowKey2                                     column=cf:unDeleteCol, timestamp=1682237430291, value=true                                                                      
2 row(s) in 0.0270 seconds

给CoprocessTest表添加Observer协处理器。


#首先禁用表
disable 'CoprocessorTest'

#添加Observer协处理器
alter 'CoprocessorTest' ,'coprocessor'=>'hdfs://localhost:9000/coprocessor/myhbase-coprocessor.jar|com.simoniu.hbase.hbasecoprocessor.observer.MyRegionObserver|1001'

#启用表
enable 'CoprocessorTest'

#观察CoprocessorTest是否加载了协处理器
desc 'CoprocessorTest'

观察countCol是否实现累加。

hbase(main):007:0> put 'CoprocessorTest','rowKey1','cf:countCol',10
0 row(s) in 0.0190 seconds

hbase(main):008:0> put 'CoprocessorTest','rowKey1','cf:countCol',10
0 row(s) in 0.0160 seconds

hbase(main):009:0> put 'CoprocessorTest','rowKey1','cf:countCol',10
0 row(s) in 0.0100 seconds

hbase(main):010:0> scan 'CoprocessorTest'
ROW                                          COLUMN+CELL                                                                                                                     
 rowKey1                                     column=cf:countCol, timestamp=1682238898441, value=40                                                                           
 rowKey1                                     column=cf:unDeleteCol, timestamp=1682238646247, value=true                                                                      
 rowKey2                                     column=cf:countCol, timestamp=1682237414998, value=100                                                                          
 rowKey2                                     column=cf:unDeleteCol, timestamp=1682237430291, value=true                                                                      
2 row(s) in 0.0230 seconds

测试能否直接删除unDeleteCol字段。

hbase(main):012:0> delete 'CoprocessorTest', 'rowKey1','cf:unDeleteCol'

ERROR: java.io.IOException: can not delete unDel column
...

#测试删除countCol字段会级联删除unDeleteCol字段。
hbase(main):013:0> delete 'CoprocessorTest','rowKey1','cf:countCol'
0 row(s) in 0.0380 seconds

hbase(main):014:0> scan 'CoprocessorTest'
ROW                                          COLUMN+CELL                                                                                                                     
 rowKey2                                     column=cf:countCol, timestamp=1682237414998, value=100                                                                          
 rowKey2                                     column=cf:unDeleteCol, timestamp=1682237430291, value=true                                                                      
1 row(s) in 0.0200 seconds