diff --git a/.gitignore b/.gitignore index 11c7573..37756f6 100755 --- a/.gitignore +++ b/.gitignore @@ -74,3 +74,6 @@ local_settings.py source/data/ *.db celerybeat-schedule.db +.vimrc.local +test* +note* diff --git a/README.md b/README.md index aa50a75..a02fcf4 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,31 @@ # PyFlyDB - -PyFlyDB is a python native graph database that implements [cypher](http://www.opencypher.org/) query language. +## Имплементация на графова база данни (данните могат да бъдат обработване като граф) над езика __link__ Open Cypher. +### Базата притежава следните елементи +#### Данни: +##### Върхове - основна единица в базата. +###### Притежава: +* етикети (човек) +* свойства (име) +##### Ребра - насочена връзка между върхове. Притежава: +* етикет (познава) +* свойства (име) + +#### Функции върху данните: +##### Добавяне на върхове/ребра +##### Търсене на: +* на върхове по подадена част от качествата му (етикети, свойства) - `(a:Person {name: "Sam"})` +* на ребра - `(a)-[b:Loves]-(c)` +* подаване на подграф - +##### Търсене на връх +##### Търсене на подграф +##### Търсен + + +pattern filtering vs WHERE syntax + +допълнителни филтриране -- Return, Distinct, Order by, Where + +2 начина на съхранение + +оптимизации \ No newline at end of file diff --git a/pyfly_init.py b/pyfly_init.py new file mode 100644 index 0000000..16ebeb3 --- /dev/null +++ b/pyfly_init.py @@ -0,0 +1,33 @@ +import asyncio + +from src.communications_manager import SocketCommunicationsManager +from src.process_manager.process_manager import ProcessManager +from src.query_processor.query_processor import QueryProcessor + +loop = asyncio.get_event_loop() +loop.run_forever() + +# communication_manager = None +# query_engine = None +# plan_executor = None +# process_manager = None + +""" +Set up environment +""" + + +def init(): + # XXX + # storage_manager = StorageManager() + process_manager = ProcessManager() + query_processor = QueryProcessor(process_manager=process_manager, + storage_manager=None) #,storage_manager) + + communications_manager = SocketCommunicationsManager(query_processor) + # Initialize the main process + communications_manager.run() + + +if __name__ == '__main__': + init() diff --git a/pyfly_shell.py b/pyfly_shell.py new file mode 100644 index 0000000..6f9e45e --- /dev/null +++ b/pyfly_shell.py @@ -0,0 +1,55 @@ +# Echo client program +import socket + +# Cap response to 2048 bytes + +QUERY_END_SYMBOL = ';' + +PROMPT = '> ' + +HOST = 'localhost' # The remote host +PORT = 50003 # The same port as used by the server + + +def default_input_method(cur_query): + return input(PROMPT if not cur_query else '... ').strip() + + +class PyflyShell: + def __init__(self, host=HOST, port=PORT, input_method=default_input_method): + """""" + self.input_method = input_method + self.host = host + self.port = port + + def run(self): + print('Welcome to PyFlyDB shell. Make your queries.') + print('Use Ctrl+C to exit.') + print('Establishing connection') + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + try: + s.connect((self.host, self.port)) + except OSError as e: + s.close() + print(e, 'Exiting ...') + print('Try restarting. ') + exit() + print('Connected!') + print('Enter your commands') + + # Prompt loop + while True: + query = [] + line = '' + while not line or line[-1] != QUERY_END_SYMBOL: + line = self.input_method(query) + query.append(line) + # Send the final query + s.sendall(bytearray(' '.join(query), encoding='utf-8')) + print('Waiting response ...') + data = s.recv(2048) + print('Received', repr(data)) + + +if __name__ == '__main__': + PyflyShell().run() diff --git a/pyflyd.py b/pyflyd.py new file mode 100755 index 0000000..ded9f00 --- /dev/null +++ b/pyflyd.py @@ -0,0 +1,109 @@ + +import argparse +import grp +import logging +import logging.handlers +import signal +import sys + +import daemon +import lockfile + +from src.communications_manager import SocketCommunicationsManager + +import pyfly_init +import os + +# Deafults +DATA_DIR = os.path.dirname(os.path.realpath(__file__)) # Script dir '/var/lib/pyfly' +LOG_FILENAME = "/tmp/pyfly.log" +LOG_LEVEL = logging.INFO + +PY_FLY_GRP = 'pyfly' + +# Define and parse command line arguments +parser = argparse.ArgumentParser(description="Runs the PyFly Graph database.") +parser.add_argument("-l", "--log", + help="file to write log to (default '" + LOG_FILENAME + "')") +parser.add_argument("--data-dir", dest='data_dir', + help="data directory (default '" + DATA_DIR + "')") + +# If the log file is specified on the command line then override the default +args = parser.parse_args() +if args.log: + LOG_FILENAME = args.log +if args.data_dir: + DATA_DIR = args.data_dir + +####################################################################### +# LOGGING # +####################################################################### +# Configure logging to log to a file, making a new file at midnight +# and keeping the last 3 day's data +# Give the logger a unique name (good practice) +logger = logging.getLogger(__name__) +# Set the log level to LOG_LEVEL +logger.setLevel(LOG_LEVEL) +# Make a handler that writes to a file, making a new file at midnight and +# keeping 3 backups +handler = logging.handlers.TimedRotatingFileHandler(LOG_FILENAME, + when="midnight", + backupCount=3) +# Format each log message like this +formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s') +# Attach the formatter to the handler +handler.setFormatter(formatter) +# Attach the handler to the logger +logger.addHandler(handler) + + +# Make a class we can use to capture stdout and sterr in the log +class MyLogger: + def __init__(self, logger, level): + """Needs a logger and a logger level.""" + self.logger = logger + self.level = level + + def write(self, message): + # Only log if there is a message (not just a new line) + if message.rstrip() != "": + self.logger.log(self.level, message.rstrip()) + + def flush(self): + pass + + +# Replace stdout with logging to file at INFO level +sys.stdout = MyLogger(logger, logging.INFO) +# Replace stderr with logging to file at ERROR level +sys.stderr = MyLogger(logger, logging.ERROR) + +####################################################################### +# DAEMON # +####################################################################### +print('Starting ...') +context = daemon.DaemonContext( + working_directory=DATA_DIR, + umask=0o002, + pidfile=lockfile.FileLock('/var/run/pyfly.pid'), +) + +# TODO +context.signal_map = { + signal.SIGTERM: exit, #program_cleanup, + signal.SIGHUP: 'terminate', + # signal.SIGUSR1: reload_program_config, +} + +mail_gid = grp.getgrnam(PY_FLY_GRP).gr_gid +context.gid = mail_gid + +# context.files_preserve = [important_file, interesting_file] + +# initial_program_setup() + +# run the daemon +with context: + print('Running ...') + pyfly_init.init() + # SocketCommunicationsManager().run() diff --git a/requirements.txt b/requirements.txt index b4cd666..3e8837d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ +daemon SQLAlchemy==1.0.13 diff --git a/setup.py b/setup.py index 5fbf0f5..9f0f0d5 100644 --- a/setup.py +++ b/setup.py @@ -1,11 +1,11 @@ - -m setuptools import setup, find_packages +from setuptools import setup, find_packages setup( name="pyfly", version="0.1", packages=find_packages(), - install_requires=['SQLAlchemy>=1.0.13'], + install_requires=['SQLAlchemy>=1.0.13', + 'python-daemon>=2.1.0'], author="alexnad", author_email="alexandernadjarian@gmail.com", description="native python graph database", @@ -13,4 +13,3 @@ keywords="graph database NoSQL databases", url="https://github.com/alexnad/PyFlyDB" ) - diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/communications_manager.py b/src/communications_manager.py new file mode 100644 index 0000000..185dbf4 --- /dev/null +++ b/src/communications_manager.py @@ -0,0 +1,98 @@ +import logging +import socket +import sys +from threading import Thread + +# TODO proper SIGUP handling (close connections ?) + +### SUPPORTS ### +# Support only UTF-8 messages +# A message must end on ; in order to be processed +# Parallel request processing +### + +logging.basicConfig( + level=logging.NOTSET, + format='%(threadName)10s %(name)18s: %(message)s', + stream=sys.stderr +) + +Logger = logging.getLogger('Communications') + +HOST = '' # Symbolic name meaning all available interfaces (lo, eth0, ...) +PORT = 50003 # Arbitrary non-privileged port +MAX_CONNECTIONS = 2 +QUERY_END_SYMBOL = ';' +UTF8 = 'utf-8' + + +# XXX testing +class DummyRepeaterProcessor: + def process(self, query): + Logger.debug(query) + return query + + +def is_request_end(data): + return data.strip()[-1] == QUERY_END_SYMBOL + + +class SocketCommunicationsManager: + def __init__(self, query_processor=DummyRepeaterProcessor): + self.processor = query_processor + self.connection_trds = [] + + # TODO rename + def run(self): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind((HOST, PORT)) + Logger.info('Socket created: %s:%s', HOST or 'ALL', PORT) + s.listen(1) + while True: + Logger.info('Listening ...') + conn, addr = s.accept() + + t = Thread(target=self.connection_worker, args=(conn, addr)) + t.start() + self.connection_trds.append(t) + # TODO send signals to threads upon close + + # TODO is this needed ? + for t in self.connection_trds: + t.join() + + def connection_worker(self, conn, addr): + """The assigned worker for the established connection.""" + + def process_request(query): + Logger.debug('Processing request: %s', query) + result = self.processor.process(query[:-1]) + + print('Sending result ...') + conn.sendall(bytearray(result, encoding=UTF8)) + + Logger.info('Connections established: %s', conn) + with conn: + Logger.info('Connected by %s', addr) + query_builder = [] + while True: + data = str(conn.recv(1024), encoding=UTF8) + if not data: + # on connection close + break + query_builder.append(data) + Logger.debug('Data: %s', data) + # check and run + # executor + if is_request_end(data): + process_request(''.join(query_builder)) + + Logger.info('Connection closed %s', addr) + + # TODO what to do on connection close ? + def close_connection(self): + pass + + +if __name__ == '__main__': + SocketCommunicationsManager(DummyRepeaterProcessor()).run() diff --git a/src/lib/utils.py b/src/lib/utils.py new file mode 100644 index 0000000..0ca7bb1 --- /dev/null +++ b/src/lib/utils.py @@ -0,0 +1,40 @@ +import collections +from itertools import tee +from src.query_processor.query_ast import models + + +def ensure_tuple(value): + if isinstance(value, tuple): + return value + elif isinstance(value, collections.Iterable): + return tuple(value) + else: + return (value,) + + +def pairize(iterable): + """s -> (s0,s1), (s1,s2), (s2, s3), ...""" + pairs = [] + a = iter(iterable) + try: + while True: + pairs.append((next(a), next(a))) + except StopIteration: + return pairs + +def pairwise(iterable): + "s -> (s0,s1), (s1,s2), (s2, s3), ..." + a, b = tee(iterable) + next(b, None) + return zip(a, b) + + +def collect_identifiers(elems): + identifiers = set() + elems = ensure_tuple(elems) + for elem in elems: + if isinstance(elem, models.Identifier): + identifiers.add(elem) + elif hasattr(elem, 'get_identifiers'): + identifiers.update(elem.get_identifiers()) + return identifiers diff --git a/src/process_manager/process_manager.py b/src/process_manager/process_manager.py new file mode 100644 index 0000000..8a7a0aa --- /dev/null +++ b/src/process_manager/process_manager.py @@ -0,0 +1,57 @@ +import asyncio +import queue +from enum import Enum + +import concurrent.futures + +from src.query_processor.query_ast.plan import Operation + +# TODO set some state of operations +# TODO stop threads on exit ! + +DEFAULT_PROCESSES_LIMIT = 3 +QUEUE_MAX_SIZE = 30 +MAX_BLOCK_TIME = 60 # seconds + + +class TaskStatuses(Enum): + NEW = 'new' + WAITING = 'waiting' + RUNNING = 'running' + FINISHEd = 'finished' + + +class OperationTask: + def __init__(self, operation, future=None): + """ + An operation task. + Args: + operation: + future (Future): Whether a notification to be sent + """ + self.operation = operation + self.future = future + self.state = TaskStatuses.NEW + + +### MAIN ### + + +class ProcessManager(concurrent.futures.ThreadPoolExecutor): + def __init__(self, processes_limit=DEFAULT_PROCESSES_LIMIT): + super().__init__(processes_limit) + + def submit(self, operation): + """ + Schedules an operation for execution + Args: + operation (Operation): + + Returns: + Future: + """ + return super().submit(operation.method, *operation.args) + + def operation_worker(self): + # get task + pass diff --git a/src/query_processor/__init__.py b/src/query_processor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/query_processor/errors/syntax.py b/src/query_processor/errors/syntax.py new file mode 100644 index 0000000..7c9fe6c --- /dev/null +++ b/src/query_processor/errors/syntax.py @@ -0,0 +1,68 @@ +class InvalidSyntaxError(Exception): + def __init__(self, value): + """Base syntax error.""" + self.value = value + + +class UnsupportedClauseError(InvalidSyntaxError): + pass + + +class UnsupportedExpressionType(InvalidSyntaxError): + pass + + +class NumberOfOperandsError(InvalidSyntaxError): + pass + + +class InvalidOperationError(InvalidSyntaxError): + pass + + +class InvalidExpressionError(InvalidSyntaxError): + pass + + +class InvalidGraphExpressionError(InvalidExpressionError): + pass + + +class BadGraphExpressionElementError(InvalidGraphExpressionError): + pass + + +class InvalidNodeError(InvalidGraphExpressionError): + def __init__(self, value, msg): + InvalidGraphExpressionError.__init__(self, value) + self.msg = msg + + +class EmptyGraphPatternExpressionError(InvalidGraphExpressionError): + pass + + +class InvalidGraphExpressionPropertiesError(InvalidGraphExpressionError): + def __init__(self, value, msg=''): + InvalidGraphExpressionError.__init__(self, value) + self.msg = msg + + +class InvalidEdgeError(InvalidGraphExpressionError): + def __init__(self, value, msg=''): + InvalidGraphExpressionError.__init__(self, value) + self.msg = msg + + +class InvalidEdgeLabelError(InvalidEdgeError): + def __init__(self, value, msg): + InvalidEdgeError.__init__(self, value) + self.msg = msg + + +class InvalidLabelsCountError(InvalidGraphExpressionError): + pass + + +class InvalidOperatorExpression(InvalidExpressionError): + pass diff --git a/src/query_processor/plan_executor.py b/src/query_processor/plan_executor.py new file mode 100644 index 0000000..eecc060 --- /dev/null +++ b/src/query_processor/plan_executor.py @@ -0,0 +1,61 @@ + +class PlanExecutor: + + def __init__(self, storage_manager, execution_scheduler): + self.storage_manager = storage_manager + self.scheduler = execution_scheduler + + + @staticmethod + def _post_processors(): + '''Return, sort, ...''' + pass + + def execute(self, query_plan, *args): + # TODO + """ + Registers operation to the scheduler and waits for it's + result + Args: + query_plan (QueryPlan): + *args: + + Returns: + + """ + name_to_identifiers_map = query.identifiers + + def populate_post_queries(queries, population_data): + """ + Applies data from results of a query. (populates identifiers) + N.B. query param is altered + + TODO optimize -- collect needed fields for the result and search only + those in the db + Args: + queries (Query): + population_data (dict): + Returns: + None: It updates the input object + """ + def update_identifier_data(query, name, value): + if query.get_identifiers_map().get(key): + query.get_identifiers_map()[key] + + for key, value in population_data.iteritems(): + for query in queries: + update_identifier_data(query, key, value) + query.get_identifiers_map() + + # TODO execute atomically ?? + for operation in query_plan.operations: + future = self.scheduler.submit(operation) + + # on future ready: + # - populate results + # - execute next + # -- rework operation (as op may be split) + result = future.result() + populate_post_queries() + + diff --git a/src/query_processor/query_ast/__init__.py b/src/query_processor/query_ast/__init__.py new file mode 100644 index 0000000..dccc3ea --- /dev/null +++ b/src/query_processor/query_ast/__init__.py @@ -0,0 +1 @@ +__all__ = ["clauses", "expression", "operators", "models", "query", "plan"] diff --git a/src/query_processor/query_ast/clauses.py b/src/query_processor/query_ast/clauses.py new file mode 100644 index 0000000..56ea098 --- /dev/null +++ b/src/query_processor/query_ast/clauses.py @@ -0,0 +1,118 @@ +from src.query_processor.query_ast.expression import * +from src.query_processor.query_ast.models import * +from src.lib.printable import Printable + +""" +Either add some serious logic to items or generate them dynamicaly +(type) + +MAIN_CLAUSES = [ + 'MATCH', + 'MERGE', + # This matches or creates semantics by using + # indexes and locks. You can specify different + # operations in case of a MATCH (part of the + # pattern already existed) or on CREATE + # (pattern did not exist yet). + + 'CREATE UNIQUE', + 'CREATE', + 'SET', # This updates properties and labels on nodes + 'REMOVE', # +and/or relationships. + 'DELETE', # It deletes nodes and relationships + + 'PROFILE', +] + +SUB_CLAUSES = [ + 'RETURN', + 'WHERE', + 'WITH', + 'DISTINCT', + 'ORDER BY' +] +""" + +# TODO more heiracal Clauses +# - clause possible possitions (after, before, only in the end) +# ... + + +class Clause(Printable, IdentifierHolderMixin): + expression_type = None + + def __init__(self, expression): + """ + + Args: + expression (Expression): + """ + self.expression = expression + + def get_identifiers(self): + if isinstance(self.expression, IdentifierExpression): + return self.expression.get_identifiers() + else: + return () + + @classmethod + def get_expression_type(cls): + return cls.expression_type + + +class Match(Clause): + expression_type = GraphPatternExpression + + def __init__(self, expression): + super().__init__(expression) + + +class Create(Clause): + expression_type = GraphPatternExpression + + def __init__(self, expression): + super().__init__(expression) + + +class Where(Clause): + expression_type = OperatorExpression + + def __init__(self, expression): + super().__init__(expression) + + +class Return(Clause): + expression_type = GenericExpression + + def __init__(self, expression, props=()): + super().__init__(expression) + self.props = ensure_tuple(props) + + # raise InvalidArguments('Return needs at least one item') + + +MAIN_CLAUSES = [ + 'MATCH', + 'MERGE', + # This matches or creates semantics by using + # indexes and locks. You can specify different + # operations in case of a MATCH (part of the + # pattern already existed) or on CREATE + # (pattern did not exist yet). + + 'CREATE UNIQUE', + 'CREATE', + 'SET', # This updates properties and labels on nodes + 'REMOVE', # +and/or relationships. + 'DELETE', # It deletes nodes and relationships + + 'PROFILE', +] + +SUB_CLAUSES = [ + 'RETURN', + 'WHERE', + 'WITH', + 'DISTINCT', + 'ORDER BY' +] diff --git a/src/query_processor/query_ast/expression.py b/src/query_processor/query_ast/expression.py new file mode 100644 index 0000000..6f3a216 --- /dev/null +++ b/src/query_processor/query_ast/expression.py @@ -0,0 +1,65 @@ +from src.lib.utils import ensure_tuple +from src.lib.utils import collect_identifiers +from src.lib.printable import Printable +from src.query_processor.query_ast.models import * + + +class Expression(Printable): + """ + A where (and other elements?) expression handler + Contains a list of elements - variables, consts and operations + """ + + def __init__(self, elements): + self.elements = ensure_tuple(elements) + + def __repr__(self): + return '<' + type(self).__name__ + '>' + str(self.elements) + + def __eq__(self, other): + return self.elements == other.elements + + # TODO probably not ... + def validate_expression(self): + pass + + +class IdentifierExpression(Expression, IdentifierHolderMixin): + def __init__(self, elements, identifiers): + """""" + super().__init__(elements) + self.identifiers = identifiers + + def get_identifiers(self): + return self.identifiers + + +class SimpleGraphPatternExpression(IdentifierExpression): + def __init__(self, expr): + """ + + Args: + expr (List[Node|Edge]): + """ + # collect identifiers + super().__init__(expr, collect_identifiers(expr)) + + +class GraphPatternExpression(IdentifierExpression): + def __init__(self, simple_exprs): + """ + + Args: + simple_exprs (List[SimpleGraphPatternExpression]): + """ + super().__init__(simple_exprs, collect_identifiers(simple_exprs)) + + +class OperatorExpression(Expression): + def __init__(self, elements): + super().__init__(elements) + + +class GenericExpression(IdentifierExpression): + def __init__(self, elements): + super().__init__(elements, collect_identifiers(elements)) diff --git a/src/query_processor/query_ast/models.py b/src/query_processor/query_ast/models.py new file mode 100644 index 0000000..1bf1aef --- /dev/null +++ b/src/query_processor/query_ast/models.py @@ -0,0 +1,220 @@ +from src.lib.printable import Printable +from src.lib.utils import ensure_tuple + + +class PropertiesHolderMixin: + def properties_as_dict(self): + props = {} + for prop in self.properties: + props[prop.key] = prop.value + return props + + +class IdentifierHolderMixin: + def get_identifiers(self): + raise NotImplementedError() + + +class Literal: + """Defines a literal for an expression.""" + + def __init__(self, value): + """ + Args: + value: + """ + self._value = value + + def __repr__(self): + return self._value + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + + +class Property(Printable): + def __init__(self, key, value): + """ + Args: + key: + value (str|number|Identifier): + """ + self.key = key + self.value = value + + def __repr__(self): + return '<' + self.key + ': ' + str(self.value) + '>' + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + + +class Identifier: + """ + Defines an identifier for a query. It can be + """ + + def __init__(self, name=None, fields=(), value=None): + """ + Args: + letter: + value: + """ + self._name = name + self._value = value + self._fields = ensure_tuple(fields) + + @property + def value(self): + return self._value + + @property + def name(self): + return self._name + + def __str__(self): + return self._name + '.' + '.'.join(self._fields) + ' ' + \ + str(self._value) + + def __repr__(self): + return '<' + str(self._name) + ' ' + str(self._value) + '>' + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + + def __hash__(self): + # XXX it's a quick fix -- __hash__ must be compatible with __eq__ + return id(self) + + +class Variable: + """Keeps data about referenced identifiers.""" + + def __init__(self, identifier, fields): + """ + Args: + name (str): + fields (List[str]): represents the variable properties + sequence -> a.b.c -> [b, c] + """ + self.id = identifier + self.fields = ensure_tuple(fields) + + +class Label: + def __init__(self, name=None): + self.name = name + + def __repr__(self): + return self.name + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + + +class Edge(Printable, IdentifierHolderMixin, PropertiesHolderMixin): + """ + TODO: make it immutable + An edge: + - has label + - identifier - used to keep the name of the matched edge + - has [properties] + - direction - true/false + - nodeIn, nodeOut - in case a direction if given, the edge direction + is determined from the node sequence given + """ + + def __init__(self, node_in=None, node_out=None, label=None, + directed=False, identifier=None, properties=()): + """ + Args: + label: + node_in (Node): + node_out (Node): + directed (bool|str): left or right directed + variable (Identifier|Variable): + properties: + """ + self.__label = label + self.__properties = ensure_tuple(properties) + self.__node_in = node_in + self.__node_out = node_out + self.__directed = directed + self.identifier = identifier + + def __repr__(self): + return '[' + str(self.identifier) + ':' + \ + str(self.__label) + ' { ' + str(self.__properties) + ' } < ' + \ + str(self.__node_in) + ' > < ' + str(self.__node_out) + ' > - ' + \ + str(self.__directed) + ']' + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + + def is_directed(self): + return self.__directed + + def get_nodes(self): + """ + Get a directed node pair out - in (direction flag needed) + """ + return (self.__node_in, self.__node_out) + + def get_identifiers(self): + ids = [] + if self.identifier: + ids.append(self.identifier) + if self.__node_in and self.__node_in.get_identifiers(): + ids.append(*self.__node_in.get_identifiers()) + if self.__node_out and self.__node_out.get_identifiers(): + ids.append(*self.__node_out.get_identifiers()) + return ids + + @property + def label(self): + return self.__label + + @property + def properties(self): + return self.__properties + + +class ReturnEdge(Edge): + """ + A returned edge must have an identifier, if not + it shouldn't be returned. + """ + + def __init__(self, direction, label, nodeLeft, nodeRight, _id, variable, + properties): + Edge.__init__(self, label, properties) + self.__id = _id + + # TODO implement setters + + +class Node(Printable, IdentifierHolderMixin, PropertiesHolderMixin): + """ + TODO: make it immutable + A node: + - identifier -- used to define the result variable + - has label/s + - has [properties] + """ + + def __init__(self, labels=(), identifier=None, properties=()): + self.identifier = identifier + self.properties = ensure_tuple(properties) + self.labels = ensure_tuple(labels) + + def get_identifiers(self): + return [self.identifier] if self.identifier else [] + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + + +class ReturnNode(Node): + def __init__(self, variable, _id, properties, labels=[]): + Node.__init__(self, variable, properties, labels) + self.__id = _id diff --git a/src/query_processor/query_ast/operators.py b/src/query_processor/query_ast/operators.py new file mode 100644 index 0000000..fd11d49 --- /dev/null +++ b/src/query_processor/query_ast/operators.py @@ -0,0 +1,42 @@ + +OPERATORS_BY_PRIORITY = ['OR', 'XOR', 'AND', 'OR', '>', '<', '<=', '>=', 'IN'] + +# TODO Needs redesign +class Operator: + ''' + Base operator class. + ''' + + def __init__(self, op, processor, priority, operands=0): + self.operation = op + self.priority = priority + self.processor = processor + self.operands = operands + + def execute(self, *args): + ''' + Pass the required number of operands to the + operator + ''' + return self.processor(*args) + +class Equals(Operator): + + """Defines operator =""" + + def __init__(self, op, processor, operands): + """TODO: to be defined1. + + Args: + op (TODO): TODO + processor (TODO): TODO + operands (TODO): TODO + + + """ + Operator.__init__(self) + + self._op = op + self._processor = processor + self._operands = operands + diff --git a/src/query_processor/query_ast/plan.py b/src/query_processor/query_ast/plan.py new file mode 100644 index 0000000..f59ed8b --- /dev/null +++ b/src/query_processor/query_ast/plan.py @@ -0,0 +1,39 @@ +""" +MATCH (a:b {c: 'd'}) RETURN a.b -> [find(Node, Identifier('a'), props] +""" + + +class Runnable: + def run(self): + raise NotImplementedError + + +class Operation(Runnable): + def __init__(self, method, *args): + """A wrapper for executor""" + self.method = method + self.args = args + + def run(self): + self.method(*self.args) + + +class QueryPlan: + def __init__(self, operations): + self.operations = operations + + +class LogicalQueryPlan(QueryPlan): + def __init__(self, query): + """ + + Args: + query (Query): + """ + pass + + +class PhysicalQueryPlan(QueryPlan): + def __init__(self, logical_query): + """""" + pass diff --git a/src/query_processor/query_ast/query.py b/src/query_processor/query_ast/query.py new file mode 100644 index 0000000..64dcb29 --- /dev/null +++ b/src/query_processor/query_ast/query.py @@ -0,0 +1,79 @@ +from src.lib.utils import ensure_tuple +from src.lib.utils import collect_identifiers + + +class SubQuery: + """Docstring for QueryModel. """ + + def __init__(self, clauses): + """ + Args: + clauses (List[Cluase]): + identifiers (List[Identifiers]): Keeps sub query identifiers for + faster lookup + """ + self._clauses = ensure_tuple(clauses) + + def get_identifiers(self): + return collect_identifiers(self._clauses) + + @property + def clauses(self): + return self._clauses + + def __repr__(self): + return str(self._clauses) + + def __eq__(self, other): + return self._clauses == other.clauses + + +class Query: + """The root of all evil.""" + + def __init__(self, queries): + """ + Defines a whole query. + + Keeps identifiers list for faster lookup. + Args: + queries (List[SubQuery]): + """ + self._queries = ensure_tuple(queries) + self.identifiers_map = Query.get_identifiers_map(queries) + + @staticmethod + def get_identifiers_map(sub_queries): + """ + Collects Identifiers from the sub-queries and generates a name-dict for + objects (same name may be present in multiple identifiers) + Args: + sub_queries (List[SubQuery]): + Returns: + dict: + """ + name_to_identifiers = {} + # collect identifiers lists from the subqueries + identifiers = [sub_query.get_identifiers() for + sub_query in + sub_queries] + identifiers = set().union(*identifiers) + + # now populate the map + for identifier in identifiers: + name = identifier.name + if name_to_identifiers.get(name): + name_to_identifiers[name].add(identifier) + else: + name_to_identifiers[name] = {identifier} + return name_to_identifiers + + @property + def sub_queries(self): + return self._queries + + def __repr__(self): + return str(self._queries) + + def __eq__(self, other): + return self._queries == other._queries diff --git a/src/query_processor/query_ast/utils.py b/src/query_processor/query_ast/utils.py new file mode 100644 index 0000000..498c1fa --- /dev/null +++ b/src/query_processor/query_ast/utils.py @@ -0,0 +1,28 @@ +from src.query_processor.query_ast.expression import * +from src.query_processor.query_ast.clauses import * +from src.query_processor.errors.syntax import UnsupportedClauseError + +# TODO rename file ... + +STR_TO_CLAUSE = { + 'match': Match, + 'create': Create, + 'return': Return, + 'where': Where +} + + +def get_clause_type(clause_str): + """ + Args: + clause_str (str): + Returns: + Clause|None: + """ + clause = clause_str.lower() + clause = STR_TO_CLAUSE.get(clause) + if clause: + return clause + else: + raise UnsupportedClauseError(clause_str) + diff --git a/src/query_processor/query_optimizer.py b/src/query_processor/query_optimizer.py new file mode 100644 index 0000000..8faee1e --- /dev/null +++ b/src/query_processor/query_optimizer.py @@ -0,0 +1,26 @@ + +class QueryOptimizer: + + """Docstring for QueryOptimizer. """ + + def __init__(self, query): + """TODO: to be defined1. + + :query: TODO + + """ + self._query = query + + @staticmethod + def optimize(query): + """ + Optimizes a passed query + + :query: This is a object used for dark magic by magitions with + names that must not be spoken. + + :returns: TODO + + """ + pass + diff --git a/src/query_processor/query_parser.py b/src/query_processor/query_parser.py new file mode 100644 index 0000000..ee46be7 --- /dev/null +++ b/src/query_processor/query_parser.py @@ -0,0 +1,609 @@ +import re +from enum import Enum + +import logging as Logger + +from src.query_processor.errors.syntax import * + +from src.query_processor.query_ast.operators import * +from src.query_processor.query_ast.query import * +from src.query_processor.query_ast.models import * +from src.query_processor.query_ast.clauses import * +from src.query_processor.query_ast.expression import * +from src.query_processor.query_ast.utils import * +from src.lib import utils + +''' +Support Notes: +- Nodes properties may have space between value and key +- Properties value may be - string, number, identifier +-- any string +-- float or integer +-- letters_and_numbers. ... +- Nodes may have more than 1 space between labels and properties +- Support whitespaces in id/labels ??? +- Case-sensitive labels and ids ?? +- Edges with properties + +- Only Edge search -> ()-[]-() + +- Support split by ',' expressions + + +- split to clauses (Match) +- split to sub-query (Where) -> Clause, expr, Cluase, expr ... +- Parse expression by clause + + +TODO -- +- RETURN - only in the end +- Set x.y = 10 +- WITH c, SUM(..) AS x +- x = (:A)-[]-(:B) -- pattern variables http://neo4j.com/docs/developer-manual/current/#_pattern_variables + + +-- support for properties +Variable fields: +- in node -> (var) +- Return, With, ... +- properties + +GraphExpressions +OperatorExpressions +IdExpressions + +''' +MATCH_NODE = '\(.*?\)' +MATCH_EDGE = '?' +PARSE_GRAPH_EXPRESSION = re.compile( + '(?P{})|(?P{})'.format(MATCH_NODE, MATCH_EDGE)) + + +# TODO ONE BIG VALIDATION REGEX + +class EdgeDirections(Enum): + left = 1 + right = 2 + + +# TODO flags -> convert to number; find all matches +# TODO use \w ?? or \S +IDENTIFIER_REGEX = re.compile('^(\w+)(?:|\s)?') +VARIABLE_REGEX = re.compile('\w+(\.\w+)*(?:|\s)?') + +# Match labels, without matching properties +LABELS_REGEX = re.compile('[\w:]*?:([\w_]+)(?:|\s)?') + +# TODO allowed_val_chars = '\w|\'' +PROPERTIES_BODY_REGEX = re.compile('{(.*?)}') +KEY = '(?P\w+)' +VAR = '(?P\w+(\.\w+)*)' +NUM = '(?P[\d.]+)' +STR = '("(?P(.+?))")' +# Matches item sequentianally by their type +PROPERTY_REGEX = re.compile('{}:\s*({}|{}|{}),?\s*'.format(KEY, NUM, VAR, STR)) + +BODY_REGEX = '(.*?)' +EDGE_BODY_REGEX = re.compile('?'.format(BODY_REGEX)) +NODE_BODY_REGEX = re.compile('\({}\)'.format(BODY_REGEX)) + + +# TODO FIX +# TODO caseInsensitive +def split_list(unsplitted, sep_list): + """ + Splits a string by list of separators + """ + # TODO make it case insensitive + splitted = re.split('\s*({})\s+'.format('|'.join(sep_list)), + unsplitted) + + if len(splitted) > 0 and not splitted[0]: + splitted = splitted[1:] + return splitted + + +# -- EXPRESSIONS -- + +MATCH_SPLITTERS = ['-'] +MULTI_ELEMENTS_SPLITTER = [','] + +''' + ** PARSER ** +''' + + +def check_valid_edge(raw_node): + # XXX + # TODO FIX + # XXX + error = None + + if error: + raise InvalidEdgeError(raw_node, error) + + +def check_valid_node(raw_node): + # XXX + # TODO FIX + # XXX + error = None + + if error: + raise InvalidNodeError(raw_node, error) + + +def get_properties(raw_elem): + """ + Gets properties; Parses to Number if possible. + Args: + raw_node (str): + + Returns: + List[Property]: + + """ + + def to_number(s): + """Tries to parse element to number.""" + + def is_int(n): + try: + int(n) + except ValueError: + return False + return float(n) == int(n) + + if is_int(s): + return int(s) + else: + try: + return float(s) + except ValueError: + return s + + # MATCH should work (match the beginning ...) + # Else raise error + # TODO or use findAll + # TODO check for {} brackets + properties = [] + raw_props = PROPERTIES_BODY_REGEX.search(raw_elem) + # get matched group + if raw_props and raw_props.group(1): + raw_props = raw_props.group(1) + Logger.debug('Processing properties: ', raw_props) + + # Use Named Groups to match elements + match = PROPERTY_REGEX.match(raw_props) + while match: + key = match.group('key').strip() + if match.group('var'): + # it's an identifier + var = match.group('var').split('.') + value = Identifier(name=var[0], + fields=var[1:]) + elif match.group('num'): + # try to parse it to string + value = to_number(match.group('num').strip()) + elif match.group('val'): + # it's just a string + value = to_number(match.group('val').strip()) + else: + raise InvalidGraphExpressionPropertiesError(raw_props) + + properties.append(Property(key=key, value=value)) + + match = PROPERTY_REGEX.match(raw_props, match.end()) + + return tuple(properties) + + +def get_labels(raw_elem, multi=True): + """ + ;id {};id:... {} + Args: + raw_elem (str): + multi (bool): + Returns: + List[Label]: + """ + matches = [] + # NOTE Use match to match the begging. + match = LABELS_REGEX.match(raw_elem) + while match: + matches.append(match.group(1)) + match = LABELS_REGEX.match(raw_elem, match.end()) + + # edge labels + if matches and (len(matches) > 1 and not multi): + raise InvalidLabelsCountError() + # Make to system labels + matches = tuple(Label(raw_label.strip()) for raw_label in matches) + if not multi: + # Is edge label + return matches[0] if matches else None + else: + return matches + +# ## TODO ONLY ONE INSTANCE +# def get_identifier_by_name(name): +# """Keep only one identifier instance by name.""" +# identifier = identifiers.get(name) +# if not identifier: +# identifier = Identifier(name=name) +# # add to existing +# identifiers[name] = identifier +# return identifier +# +# def get_variable(raw_elem): +# """ +# ;id {};id:... {} +# Args: +# raw_node (str): +# Returns: +# Variable|None: +# """ +# match = VARIABLE_REGEX.match(raw_elem) +# if match: +# match = match.group(0).split('.') +# id = get_identifier_by_name(match[0]) +# fields = match[1:] +# match = Variable(identifier=id, fields=fields ) +# return match + +def get_variable(raw_elem): + """ + Args: + raw_elem (str): + Returns: + List[str] + """ + match = VARIABLE_REGEX.match(raw_elem) + if match: + # separate the properties + match = match.group(0).split('.') + match = Variable(name=match[0], fields=match[1:]) + return match + + +def get_identifier(raw_elem): + """ + ;id {};id:... {} + Args: + raw_node (str): + Returns: + Identifier|None: + """ + match = VARIABLE_REGEX.match(raw_elem) + if match: + match = match.group(0).split('.') + match = Identifier(name=match[0], fields=match[1:]) + return match + + +def parse_node(raw_node): + """ + Node must follow the pattern: ([identifier][:label:label...] [{properties}]) + Args: + raw_node (str): (data) + Returns: + Node: + Raises: + InvalidNodeError: + """ + check_valid_node(raw_node) + raw_node_body = NODE_BODY_REGEX.match(raw_node) + if raw_node_body: + raw_node_body = raw_node_body.group(1).strip() + else: + raise InvalidEdgeError(raw_node) + + identifier = get_identifier(raw_node_body) + labels = get_labels(raw_node_body) + properties = get_properties(raw_node_body) + + return Node(identifier=identifier, labels=labels, properties=properties) + + +def parse_edge(raw_edge, node_left, node_right): + """ + Args: + raw_edge (str): -[]-, <-[]-, ... + node_left (Node): + node_right (Node): + Returns: + Edge: + """ + + def get_edge_direction(raw_edge): + # TODO raise on bad direction ... + if raw_edge[0] == '<': + return EdgeDirections.left + elif raw_edge[len(raw_edge) - 1] == '>': + return EdgeDirections.right + return None + + check_valid_edge(raw_edge) + raw_edge_body = EDGE_BODY_REGEX.match(raw_edge) + if raw_edge_body: + raw_edge_body = raw_edge_body.group(1).strip() + else: + raise InvalidEdgeError(raw_edge) + + identifier = get_identifier(raw_edge_body) + label = get_labels(raw_edge_body, False) + properties = get_properties(raw_edge_body) + direction = get_edge_direction(raw_edge) + + if direction == EdgeDirections.left: + # Swap edges to keep the proper ordering + # simplifies the code + node_left, node_right = node_right, node_left + + return Edge(identifier=identifier, + label=label, + properties=properties, + directed=bool(direction), + node_in=node_left, + node_out=node_right) + + +def parse_simple_graph_expr(raw_simple_expr): + """ + Expression must follow the pattern: + Node[Edge && Node ...] + Args: + raw_simple_expr (str): + Returns: + SimpleGraphPatternExpression: + + Raises: + InvalidGraphExpressionError: + + """ + + # split to items + # parse Node, parse Edge + def collect_elements(raw_simple_expr): + raw_nodes = [] + raw_edges = [] + + match = PARSE_GRAPH_EXPRESSION.match(raw_simple_expr) + while match: + if match.group('node'): + raw_nodes.append(match.group('node')) + elif match.group('edge'): + raw_edges.append(match.group('edge')) + else: + raise BadGraphExpressionElementError(match.group()) + match = PARSE_GRAPH_EXPRESSION.match(raw_simple_expr, match.end()) + + return {'nodes': raw_nodes, 'edges': raw_edges} + + raw_elements = collect_elements(raw_simple_expr) + + # TODO check number of nodeshttps://gist.github.com/mkaz/141394d9ee97bed99121 + + # Parse nodes + nodes = [parse_node(raw_node) for raw_node in raw_elements['nodes']] + + edges = [] + + # Parse edges + for raw_edge, edge_nodes in zip(raw_elements['edges'], + utils.pairwise(nodes)): + edges.append(parse_edge(raw_edge, edge_nodes[0], edge_nodes[1])) + + res = None + if edges: + res = tuple(edges) + elif nodes: + if len(nodes) > 1: + raise InvalidGraphExpressionError + res = nodes[0] + + return SimpleGraphPatternExpression(res) + +def optimize_identifiers(expr): + """Reuse same identifier objects for different elements.""" + return expr + +####################################################################### +# Expressions parsing # +####################################################################### + +def parse_generic_expression(raw_subexprs): + """ + Args: + raw_subexprs (List[str]): + Returns: + GenericExpression: + """ + + def parse_generic_subexpression(subexpession): + # TODO more cases + return get_identifier(subexpession) + + generic_subexpressions = \ + tuple(parse_generic_subexpression(expr) for expr in raw_subexprs) + return GenericExpression(generic_subexpressions) + + +def parse_graph_expression(simple_graph_exprs): + """ + ()-[]-(); (); (), ()-[]-() + Args: + simple_graph_exprs (List[str]): + Returns: + GraphPatternExpression: + + """ + # Split by , + # split by - + # parse elements as follows -> node, edge, node edge ... + # TODO MOVE TO SPLITTER + simple_graph_exprs = tuple(parse_simple_graph_expr(simple_expr) + for simple_expr in simple_graph_exprs) + + return GraphPatternExpression(simple_graph_exprs) + + +def parse_operator_expression(simple_graph_exprs): + # TODO + pass + + +def parse_expression(expression, expression_type): + """ + + Args: + expression (str): + expression_type (Expression): The type of expression. Note it represents + a class, not an instance. + + Returns: + Expression: the generated expression + + Raises: + InvalidSyntaxError: + """ + expression = expression.split(',') + + if expression_type == GraphPatternExpression: + parser = parse_graph_expression + elif expression_type == OperatorExpression: + parser = parse_operator_expression + elif expression_type == GenericExpression: + parser = parse_generic_expression + else: + raise UnsupportedExpressionType(expression_type) + + expression = optimize_identifiers(expression) + + return parser(expression) + + +def parse_clause(raw_clause): + """ + + Args: + raw_clause (List[srt]): [clause, expr] + + Returns: + Clause: The generated clause + """ + clause_str, expr = raw_clause + + clause_type = get_clause_type(clause_str) + + expr = parse_expression(expr, + clause_type.get_expression_type()) + return clause_type(expr) + + +class QueryParser: + """ + Creates a Query object out of a query string. + """ + + @staticmethod + def parse_query(query): + """ + Parses an incoming query + CREATE ... + MATCH ... + as follows: + * get subqueries -- [CREATE ..., MATCH, ...] + * parse each sub query - + * define operation + * extract expression + * extract sub commands (WHERE, RETURN) + * TODO -- optimize query + * run sub query + * result in identifiers + * process next items with results from first + + Args: + query (str): + + Returns: + Query: generated query + + """ + + def parse_sub_query(raw_sub_query): + """ + Args: + raw_sub_query (List[str]): + A list containing the sub-query elements, + e.g. ['Match', '(you)'], + ['Match', '(you)', 'Return', 'you.a'] + + Returns: + SubQuery: generated SubQuery + """ + + # Break to smaller parts with sub clauses - RETURN, WHERE + # List of: Clause, expressions (, separated) + clause = raw_sub_query[0] + subclauses_split = split_list(raw_sub_query[1], SUB_CLAUSES) + + # process expressions (of MATCH, WHERE, ... + # TODO expression type is defined by the clause it refers to -- use that cluase + subclauses = (parse_clause(clause) + for clause in + utils.pairize((clause, *subclauses_split))) + + return SubQuery(subclauses) + + # Process query by parts. + # Sub queries are defined by specific Clauses + + # TODO trailing spaces + sub_queries_str = utils.pairize(split_list(query, MAIN_CLAUSES)) + + sub_queries = [parse_sub_query(sub_query) for sub_query in + sub_queries_str] + # TODO parse to Query object ? + + # TODO - go through the sub_queries and apply matching variables + ##apply_variables(sub_queries) + + return Query(sub_queries) # TODO variables ??? + + +""" +Spliting + By main operation keyword - MATCH, CREATE ... + Subspliting: + CREATE -- ',' (),(), ()-[]->() + MATCH -- ','; '-' + -- split by, and then by - ... it must be: (node),[edge],(node)<,[edge], ... + -- ()-->() + -- ()--() + -- (a)-->()<--(b) -- path + -- ({x:1}) + -- (a)-[{b:3}]->(b) + -- (a)-[:A]-() + + WHERE e.name IN ['a', 'b'] AND e.b > 5 OR ... + """ + +# query -> main_parts -> +""" +CREATE (m:Person {name:'b'}) +MATCH n WHERE n.name='B' RETURN n.name +==> Split by main clauses +'MATCH (n), (a)-->(b)<--c<--(d), WHERE n.name RETURN n.name' +==> Split by sub clauses ((a n1 n2 n3)) +[ +['MATCH', (n)--()], +['WHERE', 'n.name=\'B\''], +['RETURN', 'n.name'] +] +-> [commands] -- [M, W, R], +[expressions] -- list, tree ? + + +list -- a > 5 AND b < 3 --> [and, >, a, 5, <, b, 3] +""" diff --git a/src/query_processor/query_processor.py b/src/query_processor/query_processor.py new file mode 100644 index 0000000..00f63eb --- /dev/null +++ b/src/query_processor/query_processor.py @@ -0,0 +1,27 @@ +from query_processor.plan_executor import PlanExecutor +from query_processor.query_rewriter import QueryRewriter +from src.query_processor.query_parser import QueryParser + + +class QueryProcessor: + def __init__(self, process_manager, storage_manager): + self.query_rewriter = QueryRewriter()#storage_manager) + self.plan_executor = PlanExecutor(execution_scheduler=process_manager) + + async def process(self, raw_query): + """ + Executes a passed query. Follows the steps: + - Parse query -> QueryModel + - Query rewrite + - Optimize query + - execute the query using the storage_api + Args: + query (str): + """ + parsed_query = QueryParser.parse_query(raw_query) + query_plan = self.query_rewriter.rewrite(parsed_query) + + # TODO query = QueryOptimizer.optimize(query) + + # plan executor + self.plan_executor.execute(query_plan) diff --git a/src/query_processor/query_rewriter.py b/src/query_processor/query_rewriter.py new file mode 100644 index 0000000..4b35faf --- /dev/null +++ b/src/query_processor/query_rewriter.py @@ -0,0 +1,41 @@ +from query_processor.query_ast.plan import QueryPlan + +class DummyStorageManager: + def find_node(self, identifier=None, *properties): + pass + + def find_edge(self, identifier=None, *properties): + pass + +clause_to_method = { + +} + + + + +class QueryRewriter: + + def __init__(self, storage_manager=None): + self.storage_manager = storage_manager + + def rewrite(self, query): + """ + + Args: + query (Query): + + find -> populate -> find for each -> ... + + Returns: + QueryPlan: + """ + # TODO + # identifiers -> to single instance + + for sub_query in query.sub_queries(): + sub_query + + return query + + diff --git a/src/query_processor/syntax_checker.py b/src/query_processor/syntax_checker.py new file mode 100644 index 0000000..056836b --- /dev/null +++ b/src/query_processor/syntax_checker.py @@ -0,0 +1,15 @@ + +def _check_clauses(): + pass +def _check_operators(): + pass +def _check_operat + +def sytax_check(query): + ''' + TODO + Some big regex to check for proper syntax + ''' + pass + +class