Nebula的Python客户端封装

  • 分类:
  • 发表日期:2021-07-30 15:29:00
  • 最后修改:2021-07-30 15:29:00

基本模块

from nebula2.gclient.net import ConnectionPool
from nebula2.Config import Config
from collections import Iterable

 

参数类

class P:
    def __init__(self, attrs, vdict):
        for attr in attrs:
            value = None if attr not in vdict else vdict[attr]
            self.__setattr__(attr, value)

    def __getattr__(self, item):
        raise Exception(f'{item} is not exits')

 

格式化输出

class NebulaDataWrapper:
    def __init__(self, resp):
        # self.data = clean_data
        self.data = self.print(resp)

    def __repr__(self):
        return str(self.data)

    def print(self, resp):
        output_table = []
        for recode in resp:
            value_list = []
            for col in recode:
                if col.is_empty():
                    value_list.append(None)
                elif col.is_null():
                    value_list.append(None)
                elif col.is_bool():
                    value_list.append(col.as_bool())
                elif col.is_int():
                    value_list.append(col.as_int())
                elif col.is_double():
                    value_list.append(col.as_double())
                elif col.is_string():
                    value_list.append(col.as_string())
                elif col.is_time():
                    value_list.append(col.as_time())
                elif col.is_date():
                    value_list.append(col.as_date())
                elif col.is_datetime():
                    value_list.append(col.as_datetime())
                elif col.is_list():
                    value_list.append(col.as_list())
                elif col.is_set():
                    value_list.append(col.as_set())
                elif col.is_map():
                    value_list.append(col.as_map())
                elif col.is_vertex():
                    value_list.append(col.as_node())
                elif col.is_edge():
                    value_list.append(col.as_relationship())
                elif col.is_path():
                    value_list.append(col.as_path())
                else:
                    value_list.append('ERROR: Type unsupported')
                    return

            output_table.extend(value_list)

        return output_table

 

连接池

class NebulaPool:
    def __init__(self, hosts, pool_size=5):
        config = Config()
        config.max_connection_pool_size = pool_size
        self.connection_pool = ConnectionPool()
        assert self.connection_pool.init(hosts, config)
        # except Exception:
        #     import traceback
        #     print(traceback.format_exc())
        #     exit(1)

    def get_client(self):
        client = self.connection_pool.get_session('root', 'nebula')
        assert client is not None
        return NebulaHelper(client)

    def release_client(self, nebula_helper):
        assert nebula_helper.client is not None
        nebula_helper.client.release()

 

CRUD封装

class NebulaSearch:
    '''
    自行定制
    '''
    def __init__(self, conn):
        self.client = conn

    def subgraph(self, vertex, step):
        cmd = f'''GET SUBGRAPH {step} STEPS FROM "{vertex}"'''
        resp = self.client.execute(cmd)
        # data = []
        # if not resp.is_empty():
        #     for recode in resp:
        #         for col in recode:
        #             if col.is_list():
        #                 for col in col.as_list():
        #                     if col.is_vertex():
        #                         col = col.as_node()
        #                         data.append(str(col.get_id()))
        # print(data)
        return NebulaDataWrapper(resp).data

    def fetch_prop_tag(self, cmd, tag_props):
        resp = self.client.execute(cmd)
        data = NebulaDataWrapper(resp).data[1:]

        if any(data):
            return P(tag_props, dict(zip(tag_props, data)))
        else:
            return None

    def fetch_prop_edge(self): ...

    def go(self, cmd, tag_props=None):
        # print(cmd)
        resp = self.client.execute(cmd)
        if tag_props:
            tag_props = ['id'] + tag_props
            gn = len(tag_props)
            data = NebulaDataWrapper(resp).data
            if data:
                data = [P(tag_props, vdict) for vdict in [dict(zip(tag_props, data[gn*j:gn*(j+1)])) for j in range(len(data)//gn)]]
                return data
            else:
                return None
        else:
            return NebulaDataWrapper(resp).data


class NebulaHelper:
    def __init__(self, client):
        self.client = client
        self.bulk_size = 100

    def execute(self, cmd):
        try:
            resp = self.client.execute(cmd)
            assert resp.is_succeeded(), resp.error_msg()
        except AssertionError:
            # from traceback import format_exc
            # print(format_exc())
            raise Exception(f'此语句执行出错:{cmd}')
        else:
            return resp

    def use_space(self, obj):
        if isinstance(obj, str):
            try:
                self.client.execute(f'USE {obj};')
            except Exception as e:
                print(f'空间 {obj} 不存在')
                raise e
        else:
            assert 'name' in obj

            name = obj['name']
            default_params = {'replica_factor': 1, 'partition_num': 10}

            def get_other_params():
                dp = default_params.copy()
                for p, v in dp.items():
                    v = v if p not in obj else obj[p]
                    default_params.update({p:v})

            get_other_params()

            pn = default_params['partition_num']
            rf = default_params['replica_factor']

            self.client.execute(
                    # 创建数据库
                    f'CREATE SPACE IF NOT EXISTS {name}(partition_num={pn}, replica_factor={rf}, vid_type=FIXED_STRING(64));'
                    # 选择刚创建的数据库
                    f'USE {name};'
                )

    def create_tag(self, tags):
        cmds = []
        assert isinstance(tags, Iterable)
        for tag in tags:
            cmd = f'CREATE TAG IF NOT EXISTS {tag};'
            cmds.append(cmd)
        self.client.execute(''.join(cmds))

    def create_edge(self, edges):
        cmds = []
        assert isinstance(edges, Iterable)
        for edge in edges:
            cmd = f'CREATE EDGE IF NOT EXISTS {edge};'
            cmds.append(cmd)
        self.client.execute(''.join(cmds))

    def insert_vertex(self, vtex):
        name = vtex['name']
        data = vtex['data']

        cmd_head = f'INSERT VERTEX {name} VALUES '
        if isinstance(data, str):
            cmd_tail = data
        elif isinstance(data, Iterable):
            cmd_tail = ', '.join(set(data))
        else:
            raise Exception('类型错误')

        cmd = cmd_head + cmd_tail
        self.execute(cmd)

    def insert_edge(self, edge):
        name = edge['name']
        data = edge['data']

        cmd_head = f'INSERT EDGE {name} VALUES '
        if isinstance(data, str):
            cmd_tail = data
        elif isinstance(data, Iterable):
            cmd_tail = ', '.join(set(data))
        else:
            raise Exception('类型错误,')

        cmd = cmd_head + cmd_tail
        self.execute(cmd)

    @property
    def search(self):
        return NebulaSearch(self.client)

    def sleep(self, n):
        import time
        time.sleep(n)

 

使用事例

def fetch(nebula, *args, **kwargs):
    params = ['vid', 'tag', 'tag_props']
    p = P(params, kwargs)

    cmd_1 = f'FETCH PROP ON {p.tag} "{p.vid}" YIELD '
    cmd_2 = ', '.join([f'{p.tag}.{prop}' for prop in p.tag_props])
    cmd = cmd_1 + cmd_2

    return nebula.search.fetch_prop_tag(cmd, p.tag_props)

def go(nebula, *args, **kwargs):
    params = ['edge', 'vid', 'tag', 'tag_props', 'limit', 'reversely']
    p = P(params, kwargs)

    cmd_1 = f'GO FROM "{p.vid}" OVER {p.edge}'
    cmd_2 = f'YIELD {p.edge}._dst'
    for prop in p.tag_props:
        cmd_2 += f', $$.{p.tag}.{prop}'
    cmd_2 += f' | limit {p.limit}'
    cmd = f'{cmd_1} reversely {cmd_2}' if p.reversely else f'{cmd_1} {cmd_2}'

    return nebula.search.go(cmd, p.tag_props)

 

0 评论

大哥整点话呗~