Python Pandas导出Hbase数据到dataframe

Python导出Hbase数据的思路:

  1. 使用happybase连接Hbase
  2. 使用table.scan()扫数据,将得到的数据整理为dataframe格式
  3. 将从Hbase中得到的byte类型的数据转为str类型的数据

示例代码

import happybase
import numpy as np
import pandas as pd


def create_table(table_name):
    """创建表"""
    connection = happybase.Connection()
    if table_name in connection.tables():  # 在所有的表中
        connection.delete_table(table_name, disable=True)  # 删除表
    connection.create_table(
        table_name,  # 表名
        {
          
   
            "col_1": dict(),  # 定义列族
            "col_2": dict(),  # 定义列族
            "col_3": dict(),
        }
    )
    connection.close()


def generate_data(table_name):
    """添加数据,添加20000行数据"""
    connection = happybase.Connection()
    table = connection.table(table_name)
    with table.batch(batch_size=10) as batch_table:
        for i in range(20000):
            random_col = np.random.randint(0, 10)
            batch_table.put(row{}.format(i), {
          
   
                col_1:c{}.format(random_col): "{}".format(random_col),
                col_2:c{}.format(random_col): "{}".format(random_col),
                col_3:c{}.format(random_col): "{}".format(random_col),
            })


def convert_string(value):
    """将byte类型的数据转为str"""
    if pd.isna(value):
        return value
    else:
        return value.decode("utf8")


def change_data_to_dataframe(table_name, limit=2000):
    """将数据转为dataframe"""
    connection = happybase.Connection()
    table = connection.table(table_name)
    table_index = []
    table_values = []
    for key, value in table.scan(limit=limit):  # 选择前1000行
        table_index.append(key)
        table_values.append(value)
    table_index = [i.decode("utf8") for i in table_index]
    table_df = pd.DataFrame(table_values, index=table_index)
    table_df = table_df.applymap(convert_string)  # 将bytes解码为utf-8
    table_df.columns = [convert_string(i) for i in table_df.columns]
    return table_df


def main():
    table_name = "generate_table"
    create_table(table_name)  # 创建数据table
    generate_data(table_name)  # 生成数据table
    table_df = change_data_to_dataframe(table_name)
    print(table_df.head())


if __name__ == __main__:
    main()
经验分享 程序员 微信小程序 职场和发展