`

hbase代码

 
阅读更多

package com.sxt.hbase;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/*找到源码包的方式:
 * 方式1.我们可以从本地磁盘中加入源码包,这样做的坏处就是如果我们的项目拷贝到其他电脑了,这些jar包或者源码包就都不存在了。
 * 方式2.那我们就先把jar包或者源码包先拷贝到项目下,例如hbase-0.98.23-src是hbase的源码包,hadoop源码包hadoop-2.6.5-src。此方式的坏处就是项目变的比较大了
 * 方式3.jar包最好也这样,例如起个名字叫lib,然后在添加构建路径的时候选择lib下,而不是选择本地磁盘
 *
 * */


/*在插入的时候都是先转化成字节数组,是因为插入操作是在生成Hfile小文件。
在读取的时候也都是在读取字节数组,是因为我们要读这些小文件。读完之后我们自己在转化成字符串*/
public class HBaseDemo {//hbase开发
 
 Configuration conf;//hadoop中的配置文件
 HBaseAdmin admin;//hbase客户端,相当于数据库连接
 String TN="phone";//表名字符串
 HTable htable;
 byte[] family = "cf".getBytes();
 
 
 @Before
 public void bgein() throws Exception{
   conf=new Configuration();//hadoop的配置文件。(应该就是hdfs的客户端)
   //客户端的请求是通过zookeep的,所以只需要知道zookeep就行了。hbase.zookeeper.quorum是zookeep列表的意思
  conf.set("hbase.zookeeper.quorum", "node001");//这里写的是zookeep在哪台虚拟机上。因为客户端请求的时候只找zookeep,不管具体hdfs等。
  //conf.set("hbase.zookeeper.quorum", "note002,note003,note004");//hadoop的配置文件,加载不了hbase。因此需要我们自己设置。设置内容其实也是来自配置文件
   admin=new HBaseAdmin(conf);//相当于数据库连接
   htable=new HTable(conf,TN);//用类创建具体表的实例,具体表名为phone
  
  
 }
 
 @After
 public void end() throws Exception{
  if(admin!=null){
   admin.close();
  }
  
  if(htable!=null){
   htable.close();
  }
 }
 
 @Test
 public void createTb1() throws IOException {//创建表,创建表的时候要现有列族。
  if(admin.tableExists(TN)){//表是否存在
   admin.disableTable(TN);//先禁用,在删除
   admin.deleteTable(TN);
  }
  HTableDescriptor desc=new HTableDescriptor(TableName.valueOf(TN));//desc是表的描述,TableName类的valueof方法输入的是一个字符串,返回的是一个对象
  HColumnDescriptor cf=new HColumnDescriptor("cf");//创建列族
  cf.setInMemory(true);//设置列族信息:缓存数据是否存放到内存
  cf.setMaxVersions(1);//设置列族信息:最大版本数。默认就是1
  desc.addFamily(cf);//创建表的时候至少有一个列族
  admin.createTable(desc);//有了列族可以创建表了。在数据库中一个具体的表就相当于一个对象。该对象对应的类是HTable
  
  
 }
 
 
 @Test
 public void insertDB1() throws Exception {//put新增数据
  String rowkey="12300";
  Put put =new Put(rowkey.getBytes());//put对象相当于是一行数据。put进行新增数据。新增数据的时候先要指定rowkey。底层Put类中,没有无参的构造方法,几个构造方法中参数都包括rowkey
  
  
  /*
   * 根据add方法源码可以看出来:底层是先用列族建立一个list集合,集合中元素的类型是Cell。然后每一组列名和列值组成一个单元格cell对象。最后这些对象都加入到list中。其实就是把每列的值都加入到列族
   * 由于底层方法的参数都是字节数组,因此我们传参的时候也需要传递的都是数组public Put add(byte [] family, byte [] qualifier, long ts, byte [] value) 。其中ts是时间戳的意思,做版本用的,
   * 此方法的返回值是put类型的,也就是返回一条完整的数据。
   * HBase是Key Value的存储方式:底层有个KeyValue()方法.key就是列名,value就是列值
         */  
  put.add("cf".getBytes(), "name".getBytes(), "xiaoming".getBytes());//添加一列。需要列族、列名、列值
  put.add("cf".getBytes(), "sex".getBytes(), "man".getBytes());
  //上面两行就是配置一行数据中的每一列。当此行配置完整了之后就可以把这一行添加到表中了。
  
  htable.put(put);//将添加到缓冲区。如果缓冲区已经太大,发送缓冲区的集群(由于master提供了负载均衡,所以可以找到其他节点也会存着这个表的数据,但是由于此put都放在这个节点上了,因此它可以通过master传到其他节点)。此过程自动Flush(隐式刷写)

  
 }
 
 @Test
 public void getDB1() throws Exception{//查看一条数据
  String rowkey="123";
  Get get=new Get(rowkey.getBytes());//创建Get对象,根据rowkey查询。返回的是用此rowkey查询出来的一整行数据,相当于oracle中的select *from table where id=123
  //get.addColumn("cf".getBytes(), "sex".getBytes());//如果这里写sex,就是把get对象中只存sex属性,这样就会导致下面取name的时候空指针异常。这样add的目的是在查询的时候不要查询全部,而是查询指定的列
  Result rs = htable.get(get);
  Cell cell = rs.getColumnLatestCell("cf".getBytes(), "name".getBytes());//得到最后的版本
  System.out.println(new String(CellUtil.cloneValue(cell)));//CellUtil.cloneValue(cell))是单元格的值 
 }
 
 
 
 
 
 
 
 
 
 
 
 /**
  * hbase的核心是通过rowkey进行查询的,添加的时候也是先设置rowkey。
  * 通话详细单:(自己)手机号、对方手机号、日期、通话时长、主叫被叫类型
  * 1.查询某个月份的通话详单,时间降序
  * 2.查询某个手机号主叫类型的通话记录。
  *
  * 分析:
  * 1.rowkey的设计:需要日期时间戳和手机号。为了能方便找到具体人的通话记录,应该把手机号放在前面。
  * 因此:rowkey=手机号_时间戳
  * 2.数据在表里存的时候是按照字典序排序的,都是从小到大的升序排序的。为了达到降序排序,我们就要在存的时候
  * 就让他降序保存。
  * 因此rowkey=手机号_(Long.Max-时间戳)
  *
  */
 
 /**
  *
  * 生成测试数据:十个用户生成一百条数据
  * 生成测试数据:3个用户生成5数据
  */
 
 
 HTools t=new HTools();
 
 Random r=new Random();
 SimpleDateFormat sdf=new SimpleDateFormat("yyyyMMddHHmmss");
 
 @Test
 public void  insertDB2() throws Exception{//新增数据
  
  List<Put> puts=new ArrayList<Put>();
   for (int i = 0; i <3; i++) {
    String pnum=t.getPhoneNum("186");//自己手机号
   for(int j= 0; i <5; j++ ){
    String dnum=t.getPhoneNum("177");//对方手机号
    String datestr=t.getDate("2018");//通话(起始)时间
    String  length=r.nextInt(99)+"";   //通话时长
    String type=r.nextInt(2)+"";//主叫被叫类型。0和1来区分
    
    String rowkey=pnum+"-"+(Long.MAX_VALUE-sdf.parse(datestr).getTime());
    Put put=new Put(rowkey.getBytes());
    put.add(family, "dnum".getBytes(), dnum.getBytes());
    put.add(family, "date".getBytes(), datestr.getBytes());
    put.add(family, "length".getBytes(), length.getBytes());
    put.add(family, "type".getBytes(), type.getBytes());
    
    
    puts.add(put);
    
   }
  }
  
   htable.put(puts);
  
 }
 
 
 
 
 
 
 
 /**
  * 查询某个手机号  某个月份所有的通话记录
  * 范围
  * @throws Exception
  */
 @Test
 public void scanDB1() throws Exception {//全表扫描:范围查找(比过滤器更好)
  Scan scan = new Scan();
  
  String pnum = "18692739289_";

  String startRowkey = pnum + (Long.MAX_VALUE-sdf.parse("20181001000000").getTime());//起始位置和结束位置,即9月份数据
  String stopRowkey = pnum + (Long.MAX_VALUE-sdf.parse("20180901000000").getTime());
  
  scan.setStartRow(startRowkey.getBytes());
  scan.setStopRow(stopRowkey.getBytes());
  
  ResultScanner rss = htable.getScanner(scan);
  for (Result rs : rss) {
   System.out.print(new String(CellUtil.cloneValue(rs.getColumnLatestCell(family, "dnum".getBytes()))));
   System.out.print(" - " + new String(CellUtil.cloneValue(rs.getColumnLatestCell(family, "date".getBytes()))));
   System.out.print(" - " + new String(CellUtil.cloneValue(rs.getColumnLatestCell(family, "type".getBytes()))));
   System.out.println(" - " + new String(CellUtil.cloneValue(rs.getColumnLatestCell(family, "length".getBytes()))));
  }
 }
 
 
 
 
 
 /**
  * 查询某个手机号  所有的主叫type=1
  * 过滤器
  * @throws Exception
  */
 @Test
 public void scanDB2() throws Exception {//全表扫描:过滤查找
           Scan scan = new Scan();
  //FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ONE);//过滤器集合中最少要满足一个

  FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL);//过滤器集合中所有过滤器都要满足
  
  PrefixFilter filter1 = new PrefixFilter("18692739289".getBytes());//前缀过滤器
  list.addFilter(filter1);
  
  SingleColumnValueFilter filter2 = new SingleColumnValueFilter(family,
    "type".getBytes(), CompareOp.EQUAL, "1".getBytes());//键值过滤器,参数含义分别为列族名称、列名、比较规则、列值
  
  list.addFilter(filter2);
  
  scan.setFilter(list);
  
  ResultScanner rss = htable.getScanner(scan);
  for (Result rs : rss) {
   System.out.print(new String(CellUtil.cloneValue(rs.getColumnLatestCell(family, "dnum".getBytes()))));
   System.out.print(" - " + new String(CellUtil.cloneValue(rs.getColumnLatestCell(family, "date".getBytes()))));
   System.out.print(" - " + new String(CellUtil.cloneValue(rs.getColumnLatestCell(family, "type".getBytes()))));
   System.out.println(" - " + new String(CellUtil.cloneValue(rs.getColumnLatestCell(family, "length".getBytes()))));
  }
  
 }
 
 
 

 //////////////////////////////////////////////////////下面是使用protobuf形式
 
/* 
 * protobuf设计如下:Call.proto文件
 * package com.sxt.hbase;
 message callDetail
 {
    required string dnum = 1;
    required string date = 2;
    required string length = 3;
    required string type = 4;
 }
 message dayCallDetail
 {
    repeated callDetail callDetails = 1;
 }*/

 
 
 /**
  * 生成测试数据
  *
  * 十个用户   每天产生一百条通话记录
  */
 @Test
 public void insertDB3() throws Exception {
  
  List<Put> puts = new ArrayList<Put>();
  
  for (int i = 0; i < 10; i++) {
   String pnum = t.getPhoneNum("186");
   
   String day = "20180115";
   
   Call.dayCallDetail.Builder dayCall = Call.dayCallDetail.newBuilder();
   
   // 每个用户  这一天 产生的一百条通话记录
   for (int j = 0; j < 100; j++) {
    
    String dnum = t.getPhoneNum("177");//属性值
    String datestr = t.getDate2(day);//属性值
    String length = r.nextInt(99) + "";//属性值
    String type = r.nextInt(2) + "";//属性值
    
    Call.callDetail.Builder callDetail = Call.callDetail.newBuilder();
    callDetail.setDnum(dnum);//set方法设置值
    callDetail.setDate(datestr);//set方法设置值
    callDetail.setLength(length);//set方法设置值
    callDetail.setType(type);//set方法设置值
    
    dayCall.addCallDetails(callDetail);//往集合中添加元素,for循环到100之后才产生一个单元格
   }
   
   String rowkey = pnum + "_" + (Long.MAX_VALUE-sdf.parse("20180115000000").getTime());
   
   Put put = new Put(rowkey.getBytes());
   put.add(family, "call".getBytes(), dayCall.build().toByteArray());//dayCall.build().toByteArray()得到字节数组
   
   puts.add(put);//添加一条数据。
  }
  
  htable.put(puts);
 }
 
 
 /**
  * 查询某个手机号 一天的所有通话记录
  * 18697862438_9223370520909175807
  */
 @Test
 public void getDB2() throws Exception {
  String rowkey = "18697862438_9223370520909175807";
  Get get = new Get(rowkey.getBytes());
  get.addColumn("cf".getBytes(), "call".getBytes());
  
  Result rs = htable.get(get);
  Cell cell = rs.getColumnLatestCell("cf".getBytes(), "call".getBytes());
  
  Call.dayCallDetail dayCall = Call.dayCallDetail.parseFrom(CellUtil.cloneValue(cell));//parseFrom解析字节数据,进行反序列化。

  for(Call.callDetail call : dayCall.getCallDetailsList()) {
   System.out.println(call.getDate() + " - " + call.getDnum() + " - " + call.getType() + " - " + call.getLength());
  }
 }
 
}

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics