Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,6 @@ local_settings.py
source/data/
*.db
celerybeat-schedule.db
.vimrc.local
test*
note*
31 changes: 29 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,31 @@
# PyFlyDB

PyFlyDB is a python native graph database that implements [cypher](http://www.opencypher.org/) query language.
## Имплементация на графова база данни (данните могат да бъдат обработване като граф) над езика __link__ Open Cypher.

### Базата притежава следните елементи
#### Данни:
##### Върхове - основна единица в базата.
###### Притежава:
* етикети (човек)
* свойства (име)
##### Ребра - насочена връзка между върхове. Притежава:
* етикет (познава)
* свойства (име)

#### Функции върху данните:
##### Добавяне на върхове/ребра
##### Търсене на:
* на върхове по подадена част от качествата му (етикети, свойства) - `(a:Person {name: "Sam"})`
* на ребра - `(a)-[b:Loves]-(c)`
* подаване на подграф -
##### Търсене на връх
##### Търсене на подграф
##### Търсен


pattern filtering vs WHERE syntax

допълнителни филтриране -- Return, Distinct, Order by, Where

2 начина на съхранение

оптимизации
33 changes: 33 additions & 0 deletions pyfly_init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import asyncio

from src.communications_manager import SocketCommunicationsManager
from src.process_manager.process_manager import ProcessManager
from src.query_processor.query_processor import QueryProcessor

loop = asyncio.get_event_loop()
loop.run_forever()

# communication_manager = None
# query_engine = None
# plan_executor = None
# process_manager = None

"""
Set up environment
"""


def init():
# XXX
# storage_manager = StorageManager()
process_manager = ProcessManager()
query_processor = QueryProcessor(process_manager=process_manager,
storage_manager=None) #,storage_manager)

communications_manager = SocketCommunicationsManager(query_processor)
# Initialize the main process
communications_manager.run()


if __name__ == '__main__':
init()
55 changes: 55 additions & 0 deletions pyfly_shell.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Echo client program
import socket

# Cap response to 2048 bytes

QUERY_END_SYMBOL = ';'

PROMPT = '> '

HOST = 'localhost' # The remote host
PORT = 50003 # The same port as used by the server


def default_input_method(cur_query):
return input(PROMPT if not cur_query else '... ').strip()


class PyflyShell:
def __init__(self, host=HOST, port=PORT, input_method=default_input_method):
""""""
self.input_method = input_method
self.host = host
self.port = port

def run(self):
print('Welcome to PyFlyDB shell. Make your queries.')
print('Use Ctrl+C to exit.')
print('Establishing connection')
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.connect((self.host, self.port))
except OSError as e:
s.close()
print(e, 'Exiting ...')
print('Try restarting. ')
exit()
print('Connected!')
print('Enter your commands')

# Prompt loop
while True:
query = []
line = ''
while not line or line[-1] != QUERY_END_SYMBOL:
line = self.input_method(query)
query.append(line)
# Send the final query
s.sendall(bytearray(' '.join(query), encoding='utf-8'))
print('Waiting response ...')
data = s.recv(2048)
print('Received', repr(data))


if __name__ == '__main__':
PyflyShell().run()
109 changes: 109 additions & 0 deletions pyflyd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@

import argparse
import grp
import logging
import logging.handlers
import signal
import sys

import daemon
import lockfile

from src.communications_manager import SocketCommunicationsManager

import pyfly_init
import os

# Deafults
DATA_DIR = os.path.dirname(os.path.realpath(__file__)) # Script dir '/var/lib/pyfly'
LOG_FILENAME = "/tmp/pyfly.log"
LOG_LEVEL = logging.INFO

PY_FLY_GRP = 'pyfly'

# Define and parse command line arguments
parser = argparse.ArgumentParser(description="Runs the PyFly Graph database.")
parser.add_argument("-l", "--log",
help="file to write log to (default '" + LOG_FILENAME + "')")
parser.add_argument("--data-dir", dest='data_dir',
help="data directory (default '" + DATA_DIR + "')")

# If the log file is specified on the command line then override the default
args = parser.parse_args()
if args.log:
LOG_FILENAME = args.log
if args.data_dir:
DATA_DIR = args.data_dir

#######################################################################
# LOGGING #
#######################################################################
# Configure logging to log to a file, making a new file at midnight
# and keeping the last 3 day's data
# Give the logger a unique name (good practice)
logger = logging.getLogger(__name__)
# Set the log level to LOG_LEVEL
logger.setLevel(LOG_LEVEL)
# Make a handler that writes to a file, making a new file at midnight and
# keeping 3 backups
handler = logging.handlers.TimedRotatingFileHandler(LOG_FILENAME,
when="midnight",
backupCount=3)
# Format each log message like this
formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s')
# Attach the formatter to the handler
handler.setFormatter(formatter)
# Attach the handler to the logger
logger.addHandler(handler)


# Make a class we can use to capture stdout and sterr in the log
class MyLogger:
def __init__(self, logger, level):
"""Needs a logger and a logger level."""
self.logger = logger
self.level = level

def write(self, message):
# Only log if there is a message (not just a new line)
if message.rstrip() != "":
self.logger.log(self.level, message.rstrip())

def flush(self):
pass


# Replace stdout with logging to file at INFO level
sys.stdout = MyLogger(logger, logging.INFO)
# Replace stderr with logging to file at ERROR level
sys.stderr = MyLogger(logger, logging.ERROR)

#######################################################################
# DAEMON #
#######################################################################
print('Starting ...')
context = daemon.DaemonContext(
working_directory=DATA_DIR,
umask=0o002,
pidfile=lockfile.FileLock('/var/run/pyfly.pid'),
)

# TODO
context.signal_map = {
signal.SIGTERM: exit, #program_cleanup,
signal.SIGHUP: 'terminate',
# signal.SIGUSR1: reload_program_config,
}

mail_gid = grp.getgrnam(PY_FLY_GRP).gr_gid
context.gid = mail_gid

# context.files_preserve = [important_file, interesting_file]

# initial_program_setup()

# run the daemon
with context:
print('Running ...')
pyfly_init.init()
# SocketCommunicationsManager().run()
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
daemon
SQLAlchemy==1.0.13
7 changes: 3 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@

m setuptools import setup, find_packages
from setuptools import setup, find_packages

setup(
name="pyfly",
version="0.1",
packages=find_packages(),
install_requires=['SQLAlchemy>=1.0.13'],
install_requires=['SQLAlchemy>=1.0.13',
'python-daemon>=2.1.0'],
author="alexnad",
author_email="alexandernadjarian@gmail.com",
description="native python graph database",
license="GNU",
keywords="graph database NoSQL databases",
url="https://github.com/alexnad/PyFlyDB"
)

Empty file added src/__init__.py
Empty file.
98 changes: 98 additions & 0 deletions src/communications_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import logging
import socket
import sys
from threading import Thread

# TODO proper SIGUP handling (close connections ?)

### SUPPORTS ###
# Support only UTF-8 messages
# A message must end on ; in order to be processed
# Parallel request processing
###

logging.basicConfig(
level=logging.NOTSET,
format='%(threadName)10s %(name)18s: %(message)s',
stream=sys.stderr
)

Logger = logging.getLogger('Communications')

HOST = '' # Symbolic name meaning all available interfaces (lo, eth0, ...)
PORT = 50003 # Arbitrary non-privileged port
MAX_CONNECTIONS = 2
QUERY_END_SYMBOL = ';'
UTF8 = 'utf-8'


# XXX testing
class DummyRepeaterProcessor:
def process(self, query):
Logger.debug(query)
return query


def is_request_end(data):
return data.strip()[-1] == QUERY_END_SYMBOL


class SocketCommunicationsManager:
def __init__(self, query_processor=DummyRepeaterProcessor):
self.processor = query_processor
self.connection_trds = []

# TODO rename
def run(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
Logger.info('Socket created: %s:%s', HOST or 'ALL', PORT)
s.listen(1)
while True:
Logger.info('Listening ...')
conn, addr = s.accept()

t = Thread(target=self.connection_worker, args=(conn, addr))
t.start()
self.connection_trds.append(t)
# TODO send signals to threads upon close

# TODO is this needed ?
for t in self.connection_trds:
t.join()

def connection_worker(self, conn, addr):
"""The assigned worker for the established connection."""

def process_request(query):
Logger.debug('Processing request: %s', query)
result = self.processor.process(query[:-1])

print('Sending result ...')
conn.sendall(bytearray(result, encoding=UTF8))

Logger.info('Connections established: %s', conn)
with conn:
Logger.info('Connected by %s', addr)
query_builder = []
while True:
data = str(conn.recv(1024), encoding=UTF8)
if not data:
# on connection close
break
query_builder.append(data)
Logger.debug('Data: %s', data)
# check and run
# executor
if is_request_end(data):
process_request(''.join(query_builder))

Logger.info('Connection closed %s', addr)

# TODO what to do on connection close ?
def close_connection(self):
pass


if __name__ == '__main__':
SocketCommunicationsManager(DummyRepeaterProcessor()).run()
Loading