Files
2026-04-29 19:04:25 +02:00

562 lines
25 KiB
Python

#!/usr/bin/env python3
"""
Prawicowy Dashboard — Flask backend
Port 1234 | Static files + RSS proxy + Admin panel (/admin/) + Auth (/login)
"""
import os, json, time, threading, socket, subprocess
import xml.etree.ElementTree as ET
from datetime import datetime
from flask import (Flask, request, jsonify, redirect, url_for,
render_template_string, send_from_directory, abort, Response, flash)
from flask_sqlalchemy import SQLAlchemy
from flask_login import (LoginManager, UserMixin, login_user, logout_user,
login_required, current_user)
from flask_admin import Admin, AdminIndexView, expose
from flask_admin.contrib.sqla import ModelView
from werkzeug.security import generate_password_hash, check_password_hash
ROOT = os.path.dirname(os.path.abspath(__file__))
CACHE: dict = {}
CACHE_TTL = 600
FETCH_TIMEOUT = 12
app = Flask(__name__, template_folder=os.path.join(ROOT, 'templates'))
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'pd-secret-2024-change-me')
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(ROOT, 'dashboard.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
login_manager = LoginManager(app)
login_manager.login_view = 'auth_login'
login_manager.login_message = 'Wymagane logowanie administratora.'
# ══════════════════════════════════════════════════════════
# MODELS
# ══════════════════════════════════════════════════════════
class User(db.Model, UserMixin):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(200))
password_hash = db.Column(db.String(256), nullable=False, default='')
is_admin = db.Column(db.Boolean, default=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
last_login = db.Column(db.DateTime)
def set_password(self, pw): self.password_hash = generate_password_hash(pw)
def check_password(self, pw): return check_password_hash(self.password_hash, pw)
def __str__(self): return self.username
class RssSource(db.Model):
__tablename__ = 'rss_sources'
id = db.Column(db.Integer, primary_key=True)
group_id = db.Column(db.String(50), nullable=False, index=True)
name = db.Column(db.String(100), nullable=False)
url = db.Column(db.String(1000), nullable=False)
icon = db.Column(db.String(20), default='📰')
website = db.Column(db.String(500))
css_class = db.Column(db.String(50), default='src-republika')
dot_id = db.Column(db.String(50))
category = db.Column(db.String(20), default='main') # main | opposition
active = db.Column(db.Boolean, default=True)
order = db.Column(db.Integer, default=0)
def __str__(self): return f'{self.name} ({self.group_id})'
class Politician(db.Model):
__tablename__ = 'politicians'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
role = db.Column(db.String(200))
img_path = db.Column(db.String(500))
quote = db.Column(db.Text)
description = db.Column(db.Text)
party = db.Column(db.String(50))
ticker_visible = db.Column(db.Boolean, default=True)
sidebar_visible = db.Column(db.Boolean, default=False)
popup_key = db.Column(db.String(50))
order = db.Column(db.Integer, default=0)
active = db.Column(db.Boolean, default=True)
def __str__(self): return self.name
class PartyMember(db.Model):
__tablename__ = 'party_members'
id = db.Column(db.Integer, primary_key=True)
party_slug = db.Column(db.String(50), nullable=False, index=True)
section = db.Column(db.String(200), default='')
name = db.Column(db.String(100), nullable=False)
role = db.Column(db.String(200))
img_url = db.Column(db.String(1000))
initials = db.Column(db.String(10))
bg_color = db.Column(db.String(20))
description = db.Column(db.Text)
desc_type = db.Column(db.String(10), default='merit') # merit | fail
married = db.Column(db.Boolean, default=False)
children = db.Column(db.Integer, default=0)
religious = db.Column(db.Boolean, default=False)
is_leader = db.Column(db.Boolean, default=False)
order = db.Column(db.Integer, default=0)
active = db.Column(db.Boolean, default=True)
def __str__(self): return f'{self.name} ({self.party_slug})'
class SiteSetting(db.Model):
__tablename__ = 'site_settings'
id = db.Column(db.Integer, primary_key=True)
key = db.Column(db.String(100), unique=True, nullable=False)
value = db.Column(db.Text, default='')
description = db.Column(db.String(500))
def __str__(self): return self.key
@login_manager.user_loader
def load_user(uid): return db.session.get(User, int(uid))
# ══════════════════════════════════════════════════════════
# ADMIN VIEWS
# ══════════════════════════════════════════════════════════
class _Auth:
def is_accessible(self):
return current_user.is_authenticated and current_user.is_admin
def inaccessible_callback(self, name, **kwargs):
return redirect(url_for('auth_login', next=request.url))
class UserAdmin(_Auth, ModelView):
column_labels = dict(username='Użytkownik', email='Email', is_admin='Admin',
created_at='Utworzony', last_login='Ostatnie logowanie')
column_list = ['username', 'email', 'is_admin', 'created_at', 'last_login']
column_searchable_list = ['username', 'email']
column_filters = ['is_admin']
form_excluded_columns = ['password_hash', 'created_at', 'last_login']
form_widget_args = {'email': {'style': 'width:100%'}}
def on_model_change(self, form, model, is_created):
pw = (form.new_password.data or '').strip() if hasattr(form, 'new_password') else ''
if pw:
model.set_password(pw)
elif is_created and not model.password_hash:
model.set_password('admin123')
def scaffold_form(self):
from wtforms import PasswordField
form_class = super().scaffold_form()
form_class.new_password = PasswordField('Nowe hasło (puste = bez zmiany)')
return form_class
class RssSourceAdmin(_Auth, ModelView):
column_labels = dict(group_id='Grupa', name='Nazwa', url='URL feed', icon='Ikona',
website='Strona www', css_class='Klasa CSS', dot_id='ID wskaźnika',
category='Kategoria', active='Aktywne', order='Kolejność')
column_list = ['name', 'group_id', 'url', 'icon', 'website', 'category', 'active', 'order']
column_searchable_list = ['name', 'url', 'group_id']
column_filters = ['group_id', 'active', 'category']
column_editable_list = ['active', 'order', 'name']
form_widget_args = {'url': {'style': 'width:100%'}, 'website': {'style': 'width:100%'}}
class PoliticianAdmin(_Auth, ModelView):
column_labels = dict(name='Imię i nazwisko', role='Stanowisko', img_path='Zdjęcie',
quote='Cytat (ticker)', description='Opis', party='Partia',
ticker_visible='Pasek górny', sidebar_visible='Sidebar',
popup_key='Klucz popup', order='Kolejność', active='Aktywny')
column_list = ['name', 'role', 'party', 'ticker_visible', 'sidebar_visible', 'order', 'active']
column_searchable_list = ['name', 'role', 'party']
column_filters = ['party', 'ticker_visible', 'sidebar_visible', 'active']
column_editable_list = ['ticker_visible', 'sidebar_visible', 'order', 'active']
form_widget_args = {'quote': {'rows': 3}, 'description': {'rows': 5},
'img_path': {'style': 'width:100%'}}
class PartyMemberAdmin(_Auth, ModelView):
column_labels = dict(party_slug='Partia', section='Sekcja', name='Imię i nazwisko',
role='Funkcja', img_url='URL zdjęcia', description='Opis',
desc_type='Typ (merit/fail)', is_leader='Lider',
order='Kolejność', active='Aktywny')
column_list = ['name', 'party_slug', 'section', 'role', 'desc_type', 'is_leader', 'order', 'active']
column_searchable_list = ['name', 'role', 'party_slug', 'section']
column_filters = ['party_slug', 'desc_type', 'is_leader', 'active']
column_editable_list = ['order', 'active', 'is_leader']
form_widget_args = {'description': {'rows': 6}, 'img_url': {'style': 'width:100%'}}
class SettingAdmin(_Auth, ModelView):
column_labels = dict(key='Klucz', value='Wartość', description='Opis')
column_list = ['key', 'value', 'description']
column_searchable_list = ['key', 'description']
column_editable_list = ['value']
form_widget_args = {'value': {'rows': 3, 'style': 'width:100%'},
'description': {'style': 'width:100%'}}
class HomeAdmin(_Auth, AdminIndexView):
@expose('/')
def index(self):
if not current_user.is_authenticated:
return redirect(url_for('auth_login'))
stats = dict(
users = User.query.count(),
sources = RssSource.query.filter_by(active=True).count(),
politicians = Politician.query.filter_by(active=True).count(),
party_members = PartyMember.query.filter_by(active=True).count(),
)
return self.render('admin/index.html', stats=stats)
admin_panel = Admin(
app,
name='🦅 Prawicowy Dashboard — Admin',
index_view=HomeAdmin(),
)
admin_panel.add_view(RssSourceAdmin( RssSource, db, name='📡 Źródła RSS', category='📰 Treści', endpoint='rss_source'))
admin_panel.add_view(PoliticianAdmin(Politician, db, name='🏛 Politycy (ticker)', category='📰 Treści', endpoint='politician'))
admin_panel.add_view(PartyMemberAdmin(PartyMember, db, name='⚡ Posłowie partii', category='📰 Treści', endpoint='party_member'))
admin_panel.add_view(SettingAdmin( SiteSetting, db, name='⚙️ Ustawienia', category='⚙️ System', endpoint='site_setting'))
admin_panel.add_view(UserAdmin( User, db, name='👤 Użytkownicy', category='⚙️ System', endpoint='user'))
# ══════════════════════════════════════════════════════════
# AUTH
# ══════════════════════════════════════════════════════════
LOGIN_HTML = '''<!DOCTYPE html>
<html lang="pl"><head>
<meta charset="UTF-8">
<title>Logowanie — Prawicowy Dashboard Admin</title>
<meta name="viewport" content="width=device-width,initial-scale=1">
<style>
*{box-sizing:border-box;margin:0;padding:0}
body{background:#0f1117;display:flex;align-items:center;justify-content:center;min-height:100vh;font-family:system-ui,sans-serif}
.card{background:#1a1d27;border:1px solid #2d3147;border-radius:16px;padding:40px 36px;width:100%;max-width:420px;box-shadow:0 20px 60px rgba(0,0,0,.6)}
.eagle{font-size:3rem;text-align:center;display:block;margin-bottom:14px}
h1{color:#fff;font-size:1.4rem;font-weight:900;text-align:center;margin-bottom:4px}
.sub{color:#64748b;font-size:.78rem;text-align:center;margin-bottom:28px}
label{display:block;color:#94a3b8;font-size:.78rem;font-weight:600;margin-bottom:5px;margin-top:16px}
input{width:100%;background:#0f1117;border:1px solid #2d3147;border-radius:8px;color:#fff;padding:10px 14px;font-size:.9rem;outline:none;transition:border-color .15s}
input:focus{border-color:#ef4444}
.btn{display:block;width:100%;margin-top:26px;padding:11px;background:#dc2626;color:#fff;border:none;border-radius:8px;font-size:.95rem;font-weight:700;cursor:pointer;transition:background .15s}
.btn:hover{background:#b91c1c}
.err{background:rgba(220,38,38,.15);border:1px solid rgba(220,38,38,.4);color:#fca5a5;border-radius:8px;padding:10px 14px;font-size:.82rem;margin-bottom:10px}
.back{display:block;text-align:center;margin-top:18px;color:#475569;font-size:.75rem;text-decoration:none}
.back:hover{color:#94a3b8}
</style></head>
<body><div class="card">
<span class="eagle">🦅</span>
<h1>Panel Administracyjny</h1>
<div class="sub">Prawicowy Dashboard</div>
{% if error %}<div class="err">{{ error }}</div>{% endif %}
<form method="POST">
<label>Użytkownik</label>
<input type="text" name="username" autofocus required>
<label>Hasło</label>
<input type="password" name="password" required>
<button class="btn" type="submit">Zaloguj się</button>
</form>
<a class="back" href="/">← Wróć do Dashboardu</a>
</div></body></html>'''
@app.route('/login', methods=['GET', 'POST'])
def auth_login():
if current_user.is_authenticated:
return redirect(url_for('admin.index'))
error = None
if request.method == 'POST':
u = User.query.filter_by(username=request.form.get('username', '').strip()).first()
if u and u.check_password(request.form.get('password', '')):
u.last_login = datetime.utcnow()
db.session.commit()
login_user(u)
return redirect(request.args.get('next') or url_for('admin.index'))
error = 'Nieprawidłowy użytkownik lub hasło.'
return render_template_string(LOGIN_HTML, error=error)
@app.route('/logout')
@login_required
def auth_logout():
logout_user()
return redirect(url_for('auth_login'))
# ══════════════════════════════════════════════════════════
# RSS FETCH / PARSE (ported from server.py)
# ══════════════════════════════════════════════════════════
def fetch_url(url: str) -> bytes:
now = time.time()
if url in CACHE and now - CACHE[url][0] < CACHE_TTL:
return CACHE[url][1]
result = subprocess.run(
['curl', '-sL', '--max-time', str(FETCH_TIMEOUT),
'-A', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'-H', 'Accept: application/rss+xml,application/xml,text/xml,*/*',
'-H', 'Accept-Language: pl-PL,pl;q=0.9',
'--compressed', url],
capture_output=True, timeout=FETCH_TIMEOUT + 3
)
if result.returncode != 0:
raise RuntimeError(f'curl {result.returncode}: {result.stderr.decode()[:80]}')
data = result.stdout
if not data:
raise RuntimeError('empty response')
CACHE[url] = (now, data)
return data
def _t(el, tag):
f = el.find(tag)
return (f.text or '').strip() if f is not None and f.text else ''
def strip_tags(html):
if not html: return ''
import re
return re.sub(r'<[^>]+>', '', html).replace('&amp;', '&').replace('&lt;', '<') \
.replace('&gt;', '>').replace('&nbsp;', ' ').replace('&#39;', "'").strip()
def extract_img(html):
if not html: return ''
import re
m = re.search(r'<img[^>]+src=["\']([^"\']+)["\']', html, re.I)
return m.group(1) if m else ''
def parse_rss(xml_bytes: bytes) -> list:
try:
root = ET.fromstring(xml_bytes)
except ET.ParseError:
text = xml_bytes.decode('utf-8', errors='replace').lstrip('')
root = ET.fromstring(text)
items = []
tag = root.tag.lower()
if 'rss' in tag or root.find('channel') is not None:
for item in root.iter('item'):
desc = _t(item, 'description') or _t(item, '{http://purl.org/rss/1.0/modules/content/}encoded')
thumb = ''
mt = item.find('{http://search.yahoo.com/mrss/}thumbnail')
if mt is not None: thumb = mt.get('url', '')
enc = item.find('enclosure')
if not thumb and enc is not None and 'image' in (enc.get('type') or ''):
thumb = enc.get('url', '')
if not thumb: thumb = extract_img(desc)
items.append({'title': strip_tags(_t(item, 'title')),
'link': _t(item, 'link') or _t(item, '{http://www.w3.org/2005/Atom}link'),
'summary': strip_tags(desc)[:300],
'date': _t(item, 'pubDate') or _t(item, '{http://purl.org/dc/elements/1.1/}date'),
'thumb': thumb})
elif 'feed' in tag:
for entry in root.iter('{http://www.w3.org/2005/Atom}entry'):
lel = entry.find('{http://www.w3.org/2005/Atom}link[@rel="alternate"]') \
or entry.find('{http://www.w3.org/2005/Atom}link')
desc = _t(entry, '{http://www.w3.org/2005/Atom}summary') \
or _t(entry, '{http://www.w3.org/2005/Atom}content')
items.append({'title': strip_tags(_t(entry, '{http://www.w3.org/2005/Atom}title')),
'link': lel.get('href', '') if lel is not None else '',
'summary': strip_tags(desc)[:300],
'date': _t(entry, '{http://www.w3.org/2005/Atom}updated') or
_t(entry, '{http://www.w3.org/2005/Atom}published'),
'thumb': extract_img(desc)})
return items[:30]
def fetch_group_urls(group_id: str, urls: list) -> dict:
for url in urls:
try:
data = fetch_url(url)
items = parse_rss(data)
if items:
return {'status': 'ok', 'source': group_id, 'items': items}
except Exception as e:
print(f'[{group_id}] {url}: {e}')
return {'status': 'error', 'source': group_id, 'items': []}
# ══════════════════════════════════════════════════════════
# API ROUTES
# ══════════════════════════════════════════════════════════
@app.route('/api/feeds')
def api_feeds():
qs_groups = request.args.get('groups', '')
req_ids = [g.strip() for g in qs_groups.split(',') if g.strip()]
q = RssSource.query.filter_by(active=True)
if req_ids:
q = q.filter(RssSource.group_id.in_(req_ids))
rows = q.order_by(RssSource.order).all()
# Build group_id → [urls] map
feeds: dict = {}
for r in rows:
feeds.setdefault(r.group_id, []).append(r.url)
results: dict = {}
lock = threading.Lock()
def worker(gid, urls):
res = fetch_group_urls(gid, urls)
with lock:
results[gid] = res
threads = [threading.Thread(target=worker, args=(g, u)) for g, u in feeds.items()]
for t in threads: t.start()
for t in threads: t.join(timeout=15)
return jsonify(results)
@app.route('/api/feed')
def api_feed():
url = request.args.get('url', '')
if not url:
return jsonify({'status': 'error', 'message': 'Missing url'}), 400
try:
data = fetch_url(url)
items = parse_rss(data)
return jsonify({'status': 'ok', 'items': items})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/api/sources')
def api_sources():
rows = RssSource.query.filter_by(active=True).order_by(RssSource.order).all()
seen, result = set(), []
for r in rows:
if r.group_id not in seen:
seen.add(r.group_id)
result.append({'id': r.group_id, 'group_id': r.group_id, 'name': r.name,
'icon': r.icon, 'website': r.website or '',
'css_class': r.css_class or 'src-republika',
'dot_id': r.dot_id or f'dot-{r.group_id}',
'category': r.category or 'main'})
return jsonify(result)
@app.route('/api/politicians')
def api_politicians():
q = Politician.query.filter_by(active=True)
if request.args.get('ticker', '1') == '1':
q = q.filter_by(ticker_visible=True)
if request.args.get('sidebar', '0') == '1':
q = q.filter_by(sidebar_visible=True)
pols = q.order_by(Politician.order).all()
return jsonify([{'id': p.id, 'name': p.name, 'role': p.role or '',
'img_path': p.img_path or '', 'quote': p.quote or '',
'description': p.description or '', 'party': p.party or ''}
for p in pols])
@app.route('/api/party/<slug>')
def api_party(slug):
members = (PartyMember.query
.filter_by(party_slug=slug, active=True)
.order_by(PartyMember.order).all())
if not members:
return jsonify({'error': 'not found'}), 404
leader = next((m for m in members if m.is_leader), None)
sections: dict = {}
for m in members:
if m.is_leader:
continue
sec = m.section or 'Inne'
sections.setdefault(sec, []).append({
'name': m.name, 'role': m.role or '', 'img': m.img_url or '',
'initials': m.initials or '', 'bg': m.bg_color or '',
'married': m.married, 'children': m.children, 'religious': m.religious,
'description': m.description or '', 'desc_type': m.desc_type or 'merit',
})
return jsonify({
'slug': slug,
'desc_type': leader.desc_type if leader else 'merit',
'leader': {
'name': leader.name, 'role': leader.role or '', 'img': leader.img_url or '',
'married': leader.married, 'children': leader.children, 'religious': leader.religious,
'description': leader.description or '', 'desc_type': leader.desc_type or 'merit',
} if leader else None,
'sections': [{'label': lbl, 'members': mems} for lbl, mems in sections.items()],
})
@app.route('/api/settings')
def api_settings():
return jsonify({s.key: s.value for s in SiteSetting.query.all()})
@app.route('/api/radio/maryja')
def api_radio_maryja():
RHOST, RPORT = '51.68.135.155', 80
def stream():
try:
sock = socket.create_connection((RHOST, RPORT), timeout=10)
sock.sendall(('GET /stream HTTP/1.0\r\nHost: 51.68.135.155\r\n'
'User-Agent: Mozilla/5.0\r\nIcy-MetaData: 0\r\n\r\n').encode())
buf = b''
while b'\r\n\r\n' not in buf:
chunk = sock.recv(1024)
if not chunk: return
buf += chunk
left = buf.split(b'\r\n\r\n', 1)
if len(left) > 1 and left[1]: yield left[1]
while True:
chunk = sock.recv(4096)
if not chunk: break
yield chunk
sock.close()
except Exception as e:
print(f'[radio] {e}')
return Response(stream(), mimetype='audio/aac',
headers={'Cache-Control': 'no-cache', 'Access-Control-Allow-Origin': '*'})
# ══════════════════════════════════════════════════════════
# STATIC FILES
# ══════════════════════════════════════════════════════════
@app.route('/')
def index():
return send_from_directory(ROOT, 'agregator.html')
@app.route('/<path:path>')
def static_files(path):
try:
return send_from_directory(ROOT, path)
except Exception:
abort(404)
# ══════════════════════════════════════════════════════════
# MAIN
# ══════════════════════════════════════════════════════════
if __name__ == '__main__':
os.chdir(ROOT)
with app.app_context():
db.create_all()
# Create default admin if DB is empty
if not User.query.first():
u = User(username='admin', email='admin@dashboard.local', is_admin=True)
u.set_password('admin123')
db.session.add(u)
db.session.commit()
print('Utworzono domyślnego admina: admin / admin123')
print('Serwer uruchomiony na http://0.0.0.0:1234')
print('Panel admina: http://localhost:1234/admin/')
print('Logowanie: http://localhost:1234/login')
app.run(host='0.0.0.0', port=1234, debug=False, threaded=True)