一:介绍
把一部分计算也移动到数据的存放端;允许用户执行region级的操作;可以动态加载。
二:使用场景:
1、使用钩子来关联行修改操作来维护辅助索引,或维护一些数据间的引用完整性。
2.权限控制
三:coprocessor两大类:observer和endpoint介绍
3.1Observer
与触发器类似;
regionobserver处理数据修改事件,表region联系紧密;
MasterObserver集群级事件操作,管理或DDL类型操作;
WALObserver控制WAL。
3.2Endpoint
用户自定义操作添加到服务端添加一些远程过程调用动态拓展RPC协议;与RDBMS存储类似;
3.3Coprocessor类
所有的协处理器都必须实现这个接口,定义了协处理器的基本约定。
两个被应用于框架的枚举类:priority与state。
3.3.1Coprocessor.priority
定义的优先级
SYSTEM 最先被执行的协处理器;
USER 其他协处理器,按顺序执行;
3.3.2Coprocessor类接口提供的方法:
void start(CoprocessorEnviroment env)throws IOException;
void stop(CoprocessorEnviroment env)throws IOException;
3.3.3CoprocessorEnviroment
(协处理器实例一直保存在提供的环境中)
3.3.4Coprocessor.state
定义的状态;
uninstalled:协处理器最初的状态
installed:实例装载了他的环境参数
starting:开始工作
active:start方法调用,处于active状态
stopping:stop方法被调用之前stopped:
stop方法将控制权交给框架
3.3.5CoprocessorHost类:
维护协处理实例和他们专用环境
Coprocessor,CoprocessorEnviroment,CoprocessorHost形成协处理器类基础。
四:协处理器的加载:
1.从配置文件中加载(hbase-site.xml)
2.表描述符中加载HTableDescriptor.setValue()
五:RegionObserver类
region级别操作发生,会触发钩子函数
分为两类操作:region生命周期变化;客户端api调用
5.1RegionCoprocessorEnviroment
RegionObserver类的协处理器环境的实例;实现了CoprocessorEnviroment接口;
5.2ObserverContext类
特殊的上下文:提供访问当前系统环境入口;提供关键功能通知协处理器在毁掉函数完成时需要做什么;
5.3BaseRegionObserver类
所有用户实现监听类型协处理器基类,可以重载自己感兴趣的方法实现自己功能
六:MasterObserver类
处理master服务器的所有回调函数
6.1MasterCoprocessorEnviroment类
封装了一个masterobserver实例;实现了CoprocessorEnviroment接口;
6.2BaseMasterObserver类
扩展此类,实现自己功能;选择对应的pre.post方法;
七:endpoint:
hbase0.98之前:实现一个endpoint两个步骤:
1.拓展CoprocessorProtocol接口
给客户端提供自定义的rpc协议;
定义了客户端服务端的通信协议
2.拓展BaseEndpointCoprocessor类
hbase 0.98以上版本对endpoint的版本作了修改,修改后的使用:
(1)创建通信协议:
一个proto文件,使用protoc工具来生成协议类文件。这个文件需要在服务端及客户端存在。
proto文件:
option java_package = "org.myname.hbase.coprocessor.autogenerated";
option java_outer_classname = "Sum";
message SumRequest {
required string family = 1;
}
message SumResponse {
required int64 sum = 1 [default = 0];
}
service SumService {
rpc getSum(SumRequest)
returns (SumResponse);
}
生成协议类文件:
$ protoc --java_out=src ./sum.proto
(2)创建一个Service类,实现具体的业务逻辑
例如官网例子:
public class SumEndPoint extends Sum.SumService implements Coprocessor, CoprocessorService {
@Override
public Service getService() {
return this;
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
.................
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// do nothing
}
@Override
public void getSum(RpcController controller, Sum.SumRequest request, RpcCallback done) {
Scan scan = new Scan();
.............
try {
............
}
(3)创建表时指定使用这个EndPoint,或者是全局配置。
(4)创建一个Client类,调用这个RPC方法。
例如:
try{
Map results = table.coprocessorService(
Sum.SumService.class,
null, /* start key */
null,/* end key */
newBatch.Call() {
@Override
publicLongcall(Sum.SumService aggregate)throwsIOException{
BlockingRpcCallback rpcCallback =newBlockingRpcCallback<>();
aggregate.getSum(null, request, rpcCallback);
Sum.SumResponse response = rpcCallback.get();
returnresponse.hasSum() ? response.getSum() :0L;
}
} );