HBase的协处理器。
1.HBase的协处理器
HBase的协处理器 (HBase Coprocessor),HBase协处理器受BigTable协处理器的启发,为用户提供类库和运行时环境,使得代码能够在HBase RegionServer和Master上处理, 分为系统协处理器和表协处理器。
2.协处理器分类
3.协处理器插件
Observer 1. RegionObserver:提供客户端的数据操纵事件钩子:Get、Put、 Delete、Scan等。 2. MasterObserver:提供DDL类型的操作钩子。如创建、删除、修改 数据表等。 3. WALObserver:提供WAL相关操作钩子。
Observer应用场景
二级索引:可以使用协处理器来维持一个二级索引。
终端(Endpoint):动态的终端有点类似关系数据库的存储过程。
Endpoint
4.协处理器实例
pom.xml添加相关依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>hbase</artifactId>
<groupId>com.simoniu</groupId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>com.simoniu.hbase</groupId>
<artifactId>hbase-coprocessor</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>hbase-coprocessor</name>
<description>hbase-coprocessor</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.6.13</spring-boot.version>
<hbase-common.version>1.2.4</hbase-common.version>
<hbase-server.version>1.2.4</hbase-server.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase-common.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase-server.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<!--指定打包后的文件名-->
<finalName>myhbase-coprocessor</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<configuration>
<mainClass>com.simoniu.hbase.hbasecoprocessor.HbaseCoprocessorApplication</mainClass>
<skip>true</skip>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
/*
alter 'CoprocessorTest' ,'coprocessor'=>'hdfs://localhost:9000/coprocessor/myhbase-coprocessor.jar|com.simoniu.hbase.hbasecoprocessor.observer.MyRegionObserver|1001'
* */
public class MyRegionObserver extends BaseRegionObserver {
private byte[] columnFamily = Bytes.toBytes("cf");
private byte[] countCol = Bytes.toBytes("countCol");
private byte[] unDeleteCol = Bytes.toBytes("unDeleteCol");
private RegionCoprocessorEnvironment environment;
//regionserver 打开region前执行
@Override
public void start(CoprocessorEnvironment e) throws IOException {
environment = (RegionCoprocessorEnvironment) e;
}
//RegionServer关闭region前调用
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
}
/**
* 1. cf:countCol 进行累加操作。 每次插入的时候都要与之前的值进行相加
*/
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
Durability durability) throws IOException {
if (put.has(columnFamily, countCol)) {
//获取old countcol value
Result rs = e.getEnvironment().getRegion().get(new Get(put.getRow()));
int oldNum = 0;
for (Cell cell : rs.rawCells()) {
if (CellUtil.matchingColumn(cell, columnFamily, countCol)) {
oldNum = Integer.valueOf(Bytes.toString(CellUtil.cloneValue(cell)));
}
}
//获取new countcol value
List<Cell> cells = put.get(columnFamily, countCol);
int newNum = 0;
for (Cell cell : cells) {
if (CellUtil.matchingColumn(cell, columnFamily, countCol)) {
newNum = Integer.valueOf(Bytes.toString(CellUtil.cloneValue(cell)));
}
}
//sum AND update Put实例
put.addColumn(columnFamily, countCol, Bytes.toBytes(String.valueOf(oldNum + newNum)));
}
}
/**
* 2. 不能直接删除unDeleteCol 删除countCol的时候将unDeleteCol一同删除
*/
@Override
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
WALEdit edit,
Durability durability) throws IOException {
//判断是否操作cf列族
List<Cell> cells = delete.getFamilyCellMap().get(columnFamily);
if (cells == null || cells.size() == 0) {
return;
}
boolean deleteFlag = false;
for (Cell cell : cells) {
byte[] qualifier = CellUtil.cloneQualifier(cell);
if (Arrays.equals(qualifier, unDeleteCol)) {
throw new IOException("can not delete unDel column");
}
if (Arrays.equals(qualifier, countCol)) {
deleteFlag = true;
}
}
if (deleteFlag) {
delete.addColumn(columnFamily, unDeleteCol);
}
}
}
将打包生成的myhbase-coprocessor.jar,上传至linux服务器的 /root/projects/hbase/ 下。
使用hdfs 的copyFormLocal命令,把myhbase-coprocessor.jar上传至 /coprocessor/目录下。
#创建coprocessor文件夹
[root@hadoop-master bin]# ./hdfs dfs -mkdir -p /coprocessor
#使用hdfs 的copyFormLocal命令,把myhbase-coprocessor.jar上传至 /coprocessor/目录下。
[root@hadoop-master bin]# ./hdfs dfs -copyFromLocal /root/projects/hbase/myhbase-coprocessor.jar /coprocessor/
#查看是否上传成功
[root@hadoop-master bin]# ./hdfs dfs -ls /coprocessor
Found 1 items
-rw-r--r-- 1 root supergroup 33531 2023-04-23 15:27 /coprocessor/myhbase-coprocessor.jar
使用hbase shell命令,创建CoprocessTest 表。
[root@hadoop-master bin]# hbase shell
hbase(main):002:0> create 'CoprocessorTest','cf'
0 row(s) in 2.4980 seconds
使用put命令,添加两条记录。
hbase(main):014:0> put 'CoprocessorTest','rowKey1','cf:countCol',10
0 row(s) in 5.0550 seconds
hbase(main):015:0> put 'CoprocessorTest','rowKey1','cf:unDeleteCol','true'
0 row(s) in 0.0370 seconds
hbase(main):014:0> put 'CoprocessorTest','rowKey2','cf:countCol',100
0 row(s) in 5.0550 seconds
hbase(main):015:0> put 'CoprocessorTest','rowKey2','cf:unDeleteCol','true'
0 row(s) in 0.0370 seconds
hbase(main):005:0> scan 'CoprocessorTest'
ROW COLUMN+CELL
rowKey1 column=cf:countCol, timestamp=1682238634717, value=10
rowKey1 column=cf:unDeleteCol, timestamp=1682238646247, value=true
rowKey2 column=cf:countCol, timestamp=1682237414998, value=100
rowKey2 column=cf:unDeleteCol, timestamp=1682237430291, value=true
2 row(s) in 0.0270 seconds
给CoprocessTest表添加Observer协处理器。
#首先禁用表
disable 'CoprocessorTest'
#添加Observer协处理器
alter 'CoprocessorTest' ,'coprocessor'=>'hdfs://localhost:9000/coprocessor/myhbase-coprocessor.jar|com.simoniu.hbase.hbasecoprocessor.observer.MyRegionObserver|1001'
#启用表
enable 'CoprocessorTest'
#观察CoprocessorTest是否加载了协处理器
desc 'CoprocessorTest'
观察countCol是否实现累加。
hbase(main):007:0> put 'CoprocessorTest','rowKey1','cf:countCol',10
0 row(s) in 0.0190 seconds
hbase(main):008:0> put 'CoprocessorTest','rowKey1','cf:countCol',10
0 row(s) in 0.0160 seconds
hbase(main):009:0> put 'CoprocessorTest','rowKey1','cf:countCol',10
0 row(s) in 0.0100 seconds
hbase(main):010:0> scan 'CoprocessorTest'
ROW COLUMN+CELL
rowKey1 column=cf:countCol, timestamp=1682238898441, value=40
rowKey1 column=cf:unDeleteCol, timestamp=1682238646247, value=true
rowKey2 column=cf:countCol, timestamp=1682237414998, value=100
rowKey2 column=cf:unDeleteCol, timestamp=1682237430291, value=true
2 row(s) in 0.0230 seconds
测试能否直接删除unDeleteCol字段。
hbase(main):012:0> delete 'CoprocessorTest', 'rowKey1','cf:unDeleteCol'
ERROR: java.io.IOException: can not delete unDel column
...
#测试删除countCol字段会级联删除unDeleteCol字段。
hbase(main):013:0> delete 'CoprocessorTest','rowKey1','cf:countCol'
0 row(s) in 0.0380 seconds
hbase(main):014:0> scan 'CoprocessorTest'
ROW COLUMN+CELL
rowKey2 column=cf:countCol, timestamp=1682237414998, value=100
rowKey2 column=cf:unDeleteCol, timestamp=1682237430291, value=true
1 row(s) in 0.0200 seconds