-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
62 lines (50 loc) · 2.54 KB
/
server.py
File metadata and controls
62 lines (50 loc) · 2.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from flask import Flask, Response, request, stream_with_context
from flask_cors import CORS
import asyncio
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from economic_indicators.NLPAnalysis import EI_NLPAnalysis
from earnings.NLPAnalysis import NLP as ER_NLPAnalysis
from social_media.NLPAnalysis import social_media_sentiment_analysis as SM_NLPAnalysis
from news.sentiment_analysis import sentiment_analysis_on_ticker as NA_NLPAnalysis
app = Flask(__name__)
CORS(app)
@app.route('/analyze/<ticker>')
def analyze(ticker):
print(f"Received ticker: {ticker}")
@stream_with_context
def workflow():
try:
yield "data: Starting analysis...\n\n"
# Social Media Analysis
yield "data: Analyzing social media...\n\n"
social_media = asyncio.run(SM_NLPAnalysis(ticker))
if isinstance(social_media, list):
social_media = ' '.join(map(str, social_media))
yield f"data: socialMedia: {social_media.replace('\n', ' ')}\n\n"
# Earnings Analysis
yield "data: Analyzing earnings reports...\n\n"
earnings = asyncio.run(ER_NLPAnalysis(ticker))
if isinstance(earnings, (list, tuple)):
earnings = ' '.join(map(str, earnings))
yield f"data: earnings: {earnings.replace('\n', ' ')}\n\n"
# News Analysis
yield "data: Analyzing news articles...\n\n"
news_summaries, news_sentiments = asyncio.run(NA_NLPAnalysis(ticker))
news_summaries_str = ' '.join(f"{key}: {value}" for key, value in news_summaries.items())
yield f"data: newsSummaries: {news_summaries_str.replace('\n', ' ')}\n\n"
news_sentiments_str = ' '.join(map(str, news_sentiments))
yield f"data: newsSentiments: {news_sentiments_str.replace('\n', ' ')}\n\n"
# Economic Indicators Analysis
yield "data: Analyzing economic indicators...\n\n"
economic_indicators = asyncio.run(EI_NLPAnalysis(ticker))
if isinstance(economic_indicators, (list, tuple)):
economic_indicators = ' '.join(map(str, economic_indicators))
yield f"data: economicIndicators: {economic_indicators.replace('\n', ' ')}\n\n"
yield "data: Analysis complete.\n\n"
except Exception as e:
yield f"data: error: {str(e).replace('\n', ' ')}\n\n"
return Response(workflow(), content_type='text/event-stream')
if __name__ == '__main__':
app.run(debug=True)