基本模块
from nebula2.gclient.net import ConnectionPool
from nebula2.Config import Config
from collections import Iterable
参数类
class P:
def __init__(self, attrs, vdict):
for attr in attrs:
value = None if attr not in vdict else vdict[attr]
self.__setattr__(attr, value)
def __getattr__(self, item):
raise Exception(f'{item} is not exits')
格式化输出
class NebulaDataWrapper:
def __init__(self, resp):
# self.data = clean_data
self.data = self.print(resp)
def __repr__(self):
return str(self.data)
def print(self, resp):
output_table = []
for recode in resp:
value_list = []
for col in recode:
if col.is_empty():
value_list.append(None)
elif col.is_null():
value_list.append(None)
elif col.is_bool():
value_list.append(col.as_bool())
elif col.is_int():
value_list.append(col.as_int())
elif col.is_double():
value_list.append(col.as_double())
elif col.is_string():
value_list.append(col.as_string())
elif col.is_time():
value_list.append(col.as_time())
elif col.is_date():
value_list.append(col.as_date())
elif col.is_datetime():
value_list.append(col.as_datetime())
elif col.is_list():
value_list.append(col.as_list())
elif col.is_set():
value_list.append(col.as_set())
elif col.is_map():
value_list.append(col.as_map())
elif col.is_vertex():
value_list.append(col.as_node())
elif col.is_edge():
value_list.append(col.as_relationship())
elif col.is_path():
value_list.append(col.as_path())
else:
value_list.append('ERROR: Type unsupported')
return
output_table.extend(value_list)
return output_table
连接池
class NebulaPool:
def __init__(self, hosts, pool_size=5):
config = Config()
config.max_connection_pool_size = pool_size
self.connection_pool = ConnectionPool()
assert self.connection_pool.init(hosts, config)
# except Exception:
# import traceback
# print(traceback.format_exc())
# exit(1)
def get_client(self):
client = self.connection_pool.get_session('root', 'nebula')
assert client is not None
return NebulaHelper(client)
def release_client(self, nebula_helper):
assert nebula_helper.client is not None
nebula_helper.client.release()
CRUD封装
class NebulaSearch:
'''
自行定制
'''
def __init__(self, conn):
self.client = conn
def subgraph(self, vertex, step):
cmd = f'''GET SUBGRAPH {step} STEPS FROM "{vertex}"'''
resp = self.client.execute(cmd)
# data = []
# if not resp.is_empty():
# for recode in resp:
# for col in recode:
# if col.is_list():
# for col in col.as_list():
# if col.is_vertex():
# col = col.as_node()
# data.append(str(col.get_id()))
# print(data)
return NebulaDataWrapper(resp).data
def fetch_prop_tag(self, cmd, tag_props):
resp = self.client.execute(cmd)
data = NebulaDataWrapper(resp).data[1:]
if any(data):
return P(tag_props, dict(zip(tag_props, data)))
else:
return None
def fetch_prop_edge(self): ...
def go(self, cmd, tag_props=None):
# print(cmd)
resp = self.client.execute(cmd)
if tag_props:
tag_props = ['id'] + tag_props
gn = len(tag_props)
data = NebulaDataWrapper(resp).data
if data:
data = [P(tag_props, vdict) for vdict in [dict(zip(tag_props, data[gn*j:gn*(j+1)])) for j in range(len(data)//gn)]]
return data
else:
return None
else:
return NebulaDataWrapper(resp).data
class NebulaHelper:
def __init__(self, client):
self.client = client
self.bulk_size = 100
def execute(self, cmd):
try:
resp = self.client.execute(cmd)
assert resp.is_succeeded(), resp.error_msg()
except AssertionError:
# from traceback import format_exc
# print(format_exc())
raise Exception(f'此语句执行出错:{cmd}')
else:
return resp
def use_space(self, obj):
if isinstance(obj, str):
try:
self.client.execute(f'USE {obj};')
except Exception as e:
print(f'空间 {obj} 不存在')
raise e
else:
assert 'name' in obj
name = obj['name']
default_params = {'replica_factor': 1, 'partition_num': 10}
def get_other_params():
dp = default_params.copy()
for p, v in dp.items():
v = v if p not in obj else obj[p]
default_params.update({p:v})
get_other_params()
pn = default_params['partition_num']
rf = default_params['replica_factor']
self.client.execute(
# 创建数据库
f'CREATE SPACE IF NOT EXISTS {name}(partition_num={pn}, replica_factor={rf}, vid_type=FIXED_STRING(64));'
# 选择刚创建的数据库
f'USE {name};'
)
def create_tag(self, tags):
cmds = []
assert isinstance(tags, Iterable)
for tag in tags:
cmd = f'CREATE TAG IF NOT EXISTS {tag};'
cmds.append(cmd)
self.client.execute(''.join(cmds))
def create_edge(self, edges):
cmds = []
assert isinstance(edges, Iterable)
for edge in edges:
cmd = f'CREATE EDGE IF NOT EXISTS {edge};'
cmds.append(cmd)
self.client.execute(''.join(cmds))
def insert_vertex(self, vtex):
name = vtex['name']
data = vtex['data']
cmd_head = f'INSERT VERTEX {name} VALUES '
if isinstance(data, str):
cmd_tail = data
elif isinstance(data, Iterable):
cmd_tail = ', '.join(set(data))
else:
raise Exception('类型错误')
cmd = cmd_head + cmd_tail
self.execute(cmd)
def insert_edge(self, edge):
name = edge['name']
data = edge['data']
cmd_head = f'INSERT EDGE {name} VALUES '
if isinstance(data, str):
cmd_tail = data
elif isinstance(data, Iterable):
cmd_tail = ', '.join(set(data))
else:
raise Exception('类型错误,')
cmd = cmd_head + cmd_tail
self.execute(cmd)
@property
def search(self):
return NebulaSearch(self.client)
def sleep(self, n):
import time
time.sleep(n)
使用事例
def fetch(nebula, *args, **kwargs):
params = ['vid', 'tag', 'tag_props']
p = P(params, kwargs)
cmd_1 = f'FETCH PROP ON {p.tag} "{p.vid}" YIELD '
cmd_2 = ', '.join([f'{p.tag}.{prop}' for prop in p.tag_props])
cmd = cmd_1 + cmd_2
return nebula.search.fetch_prop_tag(cmd, p.tag_props)
def go(nebula, *args, **kwargs):
params = ['edge', 'vid', 'tag', 'tag_props', 'limit', 'reversely']
p = P(params, kwargs)
cmd_1 = f'GO FROM "{p.vid}" OVER {p.edge}'
cmd_2 = f'YIELD {p.edge}._dst'
for prop in p.tag_props:
cmd_2 += f', $$.{p.tag}.{prop}'
cmd_2 += f' | limit {p.limit}'
cmd = f'{cmd_1} reversely {cmd_2}' if p.reversely else f'{cmd_1} {cmd_2}'
return nebula.search.go(cmd, p.tag_props)
0 评论
大哥整点话呗~