#!/usr/bin/env python3 """ Prawicowy Dashboard — Flask backend Port 1234 | Static files + RSS proxy + Admin panel (/admin/) + Auth (/login) """ import os, json, time, threading, socket, subprocess import xml.etree.ElementTree as ET from datetime import datetime from flask import (Flask, request, jsonify, redirect, url_for, render_template_string, send_from_directory, abort, Response, flash) from flask_sqlalchemy import SQLAlchemy from flask_login import (LoginManager, UserMixin, login_user, logout_user, login_required, current_user) from flask_admin import Admin, AdminIndexView, expose from flask_admin.contrib.sqla import ModelView from werkzeug.security import generate_password_hash, check_password_hash ROOT = os.path.dirname(os.path.abspath(__file__)) CACHE: dict = {} CACHE_TTL = 600 FETCH_TIMEOUT = 12 app = Flask(__name__, template_folder=os.path.join(ROOT, 'templates')) app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'pd-secret-2024-change-me') app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(ROOT, 'dashboard.db') app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False db = SQLAlchemy(app) login_manager = LoginManager(app) login_manager.login_view = 'auth_login' login_manager.login_message = 'Wymagane logowanie administratora.' # ══════════════════════════════════════════════════════════ # MODELS # ══════════════════════════════════════════════════════════ class User(db.Model, UserMixin): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) email = db.Column(db.String(200)) password_hash = db.Column(db.String(256), nullable=False, default='') is_admin = db.Column(db.Boolean, default=True) created_at = db.Column(db.DateTime, default=datetime.utcnow) last_login = db.Column(db.DateTime) def set_password(self, pw): self.password_hash = generate_password_hash(pw) def check_password(self, pw): return check_password_hash(self.password_hash, pw) def __str__(self): return self.username class RssSource(db.Model): __tablename__ = 'rss_sources' id = db.Column(db.Integer, primary_key=True) group_id = db.Column(db.String(50), nullable=False, index=True) name = db.Column(db.String(100), nullable=False) url = db.Column(db.String(1000), nullable=False) icon = db.Column(db.String(20), default='📰') website = db.Column(db.String(500)) css_class = db.Column(db.String(50), default='src-republika') dot_id = db.Column(db.String(50)) category = db.Column(db.String(20), default='main') # main | opposition active = db.Column(db.Boolean, default=True) order = db.Column(db.Integer, default=0) def __str__(self): return f'{self.name} ({self.group_id})' class Politician(db.Model): __tablename__ = 'politicians' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(100), nullable=False) role = db.Column(db.String(200)) img_path = db.Column(db.String(500)) quote = db.Column(db.Text) description = db.Column(db.Text) party = db.Column(db.String(50)) ticker_visible = db.Column(db.Boolean, default=True) sidebar_visible = db.Column(db.Boolean, default=False) popup_key = db.Column(db.String(50)) order = db.Column(db.Integer, default=0) active = db.Column(db.Boolean, default=True) def __str__(self): return self.name class PartyMember(db.Model): __tablename__ = 'party_members' id = db.Column(db.Integer, primary_key=True) party_slug = db.Column(db.String(50), nullable=False, index=True) section = db.Column(db.String(200), default='') name = db.Column(db.String(100), nullable=False) role = db.Column(db.String(200)) img_url = db.Column(db.String(1000)) initials = db.Column(db.String(10)) bg_color = db.Column(db.String(20)) description = db.Column(db.Text) desc_type = db.Column(db.String(10), default='merit') # merit | fail married = db.Column(db.Boolean, default=False) children = db.Column(db.Integer, default=0) religious = db.Column(db.Boolean, default=False) is_leader = db.Column(db.Boolean, default=False) order = db.Column(db.Integer, default=0) active = db.Column(db.Boolean, default=True) def __str__(self): return f'{self.name} ({self.party_slug})' class SiteSetting(db.Model): __tablename__ = 'site_settings' id = db.Column(db.Integer, primary_key=True) key = db.Column(db.String(100), unique=True, nullable=False) value = db.Column(db.Text, default='') description = db.Column(db.String(500)) def __str__(self): return self.key @login_manager.user_loader def load_user(uid): return db.session.get(User, int(uid)) # ══════════════════════════════════════════════════════════ # ADMIN VIEWS # ══════════════════════════════════════════════════════════ class _Auth: def is_accessible(self): return current_user.is_authenticated and current_user.is_admin def inaccessible_callback(self, name, **kwargs): return redirect(url_for('auth_login', next=request.url)) class UserAdmin(_Auth, ModelView): column_labels = dict(username='Użytkownik', email='Email', is_admin='Admin', created_at='Utworzony', last_login='Ostatnie logowanie') column_list = ['username', 'email', 'is_admin', 'created_at', 'last_login'] column_searchable_list = ['username', 'email'] column_filters = ['is_admin'] form_excluded_columns = ['password_hash', 'created_at', 'last_login'] form_widget_args = {'email': {'style': 'width:100%'}} def on_model_change(self, form, model, is_created): pw = (form.new_password.data or '').strip() if hasattr(form, 'new_password') else '' if pw: model.set_password(pw) elif is_created and not model.password_hash: model.set_password('admin123') def scaffold_form(self): from wtforms import PasswordField form_class = super().scaffold_form() form_class.new_password = PasswordField('Nowe hasło (puste = bez zmiany)') return form_class class RssSourceAdmin(_Auth, ModelView): column_labels = dict(group_id='Grupa', name='Nazwa', url='URL feed', icon='Ikona', website='Strona www', css_class='Klasa CSS', dot_id='ID wskaźnika', category='Kategoria', active='Aktywne', order='Kolejność') column_list = ['name', 'group_id', 'url', 'icon', 'website', 'category', 'active', 'order'] column_searchable_list = ['name', 'url', 'group_id'] column_filters = ['group_id', 'active', 'category'] column_editable_list = ['active', 'order', 'name'] form_widget_args = {'url': {'style': 'width:100%'}, 'website': {'style': 'width:100%'}} class PoliticianAdmin(_Auth, ModelView): column_labels = dict(name='Imię i nazwisko', role='Stanowisko', img_path='Zdjęcie', quote='Cytat (ticker)', description='Opis', party='Partia', ticker_visible='Pasek górny', sidebar_visible='Sidebar', popup_key='Klucz popup', order='Kolejność', active='Aktywny') column_list = ['name', 'role', 'party', 'ticker_visible', 'sidebar_visible', 'order', 'active'] column_searchable_list = ['name', 'role', 'party'] column_filters = ['party', 'ticker_visible', 'sidebar_visible', 'active'] column_editable_list = ['ticker_visible', 'sidebar_visible', 'order', 'active'] form_widget_args = {'quote': {'rows': 3}, 'description': {'rows': 5}, 'img_path': {'style': 'width:100%'}} class PartyMemberAdmin(_Auth, ModelView): column_labels = dict(party_slug='Partia', section='Sekcja', name='Imię i nazwisko', role='Funkcja', img_url='URL zdjęcia', description='Opis', desc_type='Typ (merit/fail)', is_leader='Lider', order='Kolejność', active='Aktywny') column_list = ['name', 'party_slug', 'section', 'role', 'desc_type', 'is_leader', 'order', 'active'] column_searchable_list = ['name', 'role', 'party_slug', 'section'] column_filters = ['party_slug', 'desc_type', 'is_leader', 'active'] column_editable_list = ['order', 'active', 'is_leader'] form_widget_args = {'description': {'rows': 6}, 'img_url': {'style': 'width:100%'}} class SettingAdmin(_Auth, ModelView): column_labels = dict(key='Klucz', value='Wartość', description='Opis') column_list = ['key', 'value', 'description'] column_searchable_list = ['key', 'description'] column_editable_list = ['value'] form_widget_args = {'value': {'rows': 3, 'style': 'width:100%'}, 'description': {'style': 'width:100%'}} class HomeAdmin(_Auth, AdminIndexView): @expose('/') def index(self): if not current_user.is_authenticated: return redirect(url_for('auth_login')) stats = dict( users = User.query.count(), sources = RssSource.query.filter_by(active=True).count(), politicians = Politician.query.filter_by(active=True).count(), party_members = PartyMember.query.filter_by(active=True).count(), ) return self.render('admin/index.html', stats=stats) admin_panel = Admin( app, name='🦅 Prawicowy Dashboard — Admin', index_view=HomeAdmin(), ) admin_panel.add_view(RssSourceAdmin( RssSource, db, name='📡 Źródła RSS', category='📰 Treści', endpoint='rss_source')) admin_panel.add_view(PoliticianAdmin(Politician, db, name='🏛 Politycy (ticker)', category='📰 Treści', endpoint='politician')) admin_panel.add_view(PartyMemberAdmin(PartyMember, db, name='⚡ Posłowie partii', category='📰 Treści', endpoint='party_member')) admin_panel.add_view(SettingAdmin( SiteSetting, db, name='⚙️ Ustawienia', category='⚙️ System', endpoint='site_setting')) admin_panel.add_view(UserAdmin( User, db, name='👤 Użytkownicy', category='⚙️ System', endpoint='user')) # ══════════════════════════════════════════════════════════ # AUTH # ══════════════════════════════════════════════════════════ LOGIN_HTML = ''' Logowanie — Prawicowy Dashboard Admin
🦅

Panel Administracyjny

Prawicowy Dashboard
{% if error %}
{{ error }}
{% endif %}
← Wróć do Dashboardu
''' @app.route('/login', methods=['GET', 'POST']) def auth_login(): if current_user.is_authenticated: return redirect(url_for('admin.index')) error = None if request.method == 'POST': u = User.query.filter_by(username=request.form.get('username', '').strip()).first() if u and u.check_password(request.form.get('password', '')): u.last_login = datetime.utcnow() db.session.commit() login_user(u) return redirect(request.args.get('next') or url_for('admin.index')) error = 'Nieprawidłowy użytkownik lub hasło.' return render_template_string(LOGIN_HTML, error=error) @app.route('/logout') @login_required def auth_logout(): logout_user() return redirect(url_for('auth_login')) # ══════════════════════════════════════════════════════════ # RSS FETCH / PARSE (ported from server.py) # ══════════════════════════════════════════════════════════ def fetch_url(url: str) -> bytes: now = time.time() if url in CACHE and now - CACHE[url][0] < CACHE_TTL: return CACHE[url][1] result = subprocess.run( ['curl', '-sL', '--max-time', str(FETCH_TIMEOUT), '-A', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', '-H', 'Accept: application/rss+xml,application/xml,text/xml,*/*', '-H', 'Accept-Language: pl-PL,pl;q=0.9', '--compressed', url], capture_output=True, timeout=FETCH_TIMEOUT + 3 ) if result.returncode != 0: raise RuntimeError(f'curl {result.returncode}: {result.stderr.decode()[:80]}') data = result.stdout if not data: raise RuntimeError('empty response') CACHE[url] = (now, data) return data def _t(el, tag): f = el.find(tag) return (f.text or '').strip() if f is not None and f.text else '' def strip_tags(html): if not html: return '' import re return re.sub(r'<[^>]+>', '', html).replace('&', '&').replace('<', '<') \ .replace('>', '>').replace(' ', ' ').replace(''', "'").strip() def extract_img(html): if not html: return '' import re m = re.search(r']+src=["\']([^"\']+)["\']', html, re.I) return m.group(1) if m else '' def parse_rss(xml_bytes: bytes) -> list: try: root = ET.fromstring(xml_bytes) except ET.ParseError: text = xml_bytes.decode('utf-8', errors='replace').lstrip('') root = ET.fromstring(text) items = [] tag = root.tag.lower() if 'rss' in tag or root.find('channel') is not None: for item in root.iter('item'): desc = _t(item, 'description') or _t(item, '{http://purl.org/rss/1.0/modules/content/}encoded') thumb = '' mt = item.find('{http://search.yahoo.com/mrss/}thumbnail') if mt is not None: thumb = mt.get('url', '') enc = item.find('enclosure') if not thumb and enc is not None and 'image' in (enc.get('type') or ''): thumb = enc.get('url', '') if not thumb: thumb = extract_img(desc) items.append({'title': strip_tags(_t(item, 'title')), 'link': _t(item, 'link') or _t(item, '{http://www.w3.org/2005/Atom}link'), 'summary': strip_tags(desc)[:300], 'date': _t(item, 'pubDate') or _t(item, '{http://purl.org/dc/elements/1.1/}date'), 'thumb': thumb}) elif 'feed' in tag: for entry in root.iter('{http://www.w3.org/2005/Atom}entry'): lel = entry.find('{http://www.w3.org/2005/Atom}link[@rel="alternate"]') \ or entry.find('{http://www.w3.org/2005/Atom}link') desc = _t(entry, '{http://www.w3.org/2005/Atom}summary') \ or _t(entry, '{http://www.w3.org/2005/Atom}content') items.append({'title': strip_tags(_t(entry, '{http://www.w3.org/2005/Atom}title')), 'link': lel.get('href', '') if lel is not None else '', 'summary': strip_tags(desc)[:300], 'date': _t(entry, '{http://www.w3.org/2005/Atom}updated') or _t(entry, '{http://www.w3.org/2005/Atom}published'), 'thumb': extract_img(desc)}) return items[:30] def fetch_group_urls(group_id: str, urls: list) -> dict: for url in urls: try: data = fetch_url(url) items = parse_rss(data) if items: return {'status': 'ok', 'source': group_id, 'items': items} except Exception as e: print(f'[{group_id}] {url}: {e}') return {'status': 'error', 'source': group_id, 'items': []} # ══════════════════════════════════════════════════════════ # API ROUTES # ══════════════════════════════════════════════════════════ @app.route('/api/feeds') def api_feeds(): qs_groups = request.args.get('groups', '') req_ids = [g.strip() for g in qs_groups.split(',') if g.strip()] q = RssSource.query.filter_by(active=True) if req_ids: q = q.filter(RssSource.group_id.in_(req_ids)) rows = q.order_by(RssSource.order).all() # Build group_id → [urls] map feeds: dict = {} for r in rows: feeds.setdefault(r.group_id, []).append(r.url) results: dict = {} lock = threading.Lock() def worker(gid, urls): res = fetch_group_urls(gid, urls) with lock: results[gid] = res threads = [threading.Thread(target=worker, args=(g, u)) for g, u in feeds.items()] for t in threads: t.start() for t in threads: t.join(timeout=15) return jsonify(results) @app.route('/api/feed') def api_feed(): url = request.args.get('url', '') if not url: return jsonify({'status': 'error', 'message': 'Missing url'}), 400 try: data = fetch_url(url) items = parse_rss(data) return jsonify({'status': 'ok', 'items': items}) except Exception as e: return jsonify({'status': 'error', 'message': str(e)}) @app.route('/api/sources') def api_sources(): rows = RssSource.query.filter_by(active=True).order_by(RssSource.order).all() seen, result = set(), [] for r in rows: if r.group_id not in seen: seen.add(r.group_id) result.append({'id': r.group_id, 'group_id': r.group_id, 'name': r.name, 'icon': r.icon, 'website': r.website or '', 'css_class': r.css_class or 'src-republika', 'dot_id': r.dot_id or f'dot-{r.group_id}', 'category': r.category or 'main'}) return jsonify(result) @app.route('/api/politicians') def api_politicians(): q = Politician.query.filter_by(active=True) if request.args.get('ticker', '1') == '1': q = q.filter_by(ticker_visible=True) if request.args.get('sidebar', '0') == '1': q = q.filter_by(sidebar_visible=True) pols = q.order_by(Politician.order).all() return jsonify([{'id': p.id, 'name': p.name, 'role': p.role or '', 'img_path': p.img_path or '', 'quote': p.quote or '', 'description': p.description or '', 'party': p.party or ''} for p in pols]) @app.route('/api/party/') def api_party(slug): members = (PartyMember.query .filter_by(party_slug=slug, active=True) .order_by(PartyMember.order).all()) if not members: return jsonify({'error': 'not found'}), 404 leader = next((m for m in members if m.is_leader), None) sections: dict = {} for m in members: if m.is_leader: continue sec = m.section or 'Inne' sections.setdefault(sec, []).append({ 'name': m.name, 'role': m.role or '', 'img': m.img_url or '', 'initials': m.initials or '', 'bg': m.bg_color or '', 'married': m.married, 'children': m.children, 'religious': m.religious, 'description': m.description or '', 'desc_type': m.desc_type or 'merit', }) return jsonify({ 'slug': slug, 'desc_type': leader.desc_type if leader else 'merit', 'leader': { 'name': leader.name, 'role': leader.role or '', 'img': leader.img_url or '', 'married': leader.married, 'children': leader.children, 'religious': leader.religious, 'description': leader.description or '', 'desc_type': leader.desc_type or 'merit', } if leader else None, 'sections': [{'label': lbl, 'members': mems} for lbl, mems in sections.items()], }) @app.route('/api/settings') def api_settings(): return jsonify({s.key: s.value for s in SiteSetting.query.all()}) @app.route('/api/radio/maryja') def api_radio_maryja(): RHOST, RPORT = '51.68.135.155', 80 def stream(): try: sock = socket.create_connection((RHOST, RPORT), timeout=10) sock.sendall(('GET /stream HTTP/1.0\r\nHost: 51.68.135.155\r\n' 'User-Agent: Mozilla/5.0\r\nIcy-MetaData: 0\r\n\r\n').encode()) buf = b'' while b'\r\n\r\n' not in buf: chunk = sock.recv(1024) if not chunk: return buf += chunk left = buf.split(b'\r\n\r\n', 1) if len(left) > 1 and left[1]: yield left[1] while True: chunk = sock.recv(4096) if not chunk: break yield chunk sock.close() except Exception as e: print(f'[radio] {e}') return Response(stream(), mimetype='audio/aac', headers={'Cache-Control': 'no-cache', 'Access-Control-Allow-Origin': '*'}) # ══════════════════════════════════════════════════════════ # STATIC FILES # ══════════════════════════════════════════════════════════ @app.route('/') def index(): return send_from_directory(ROOT, 'agregator.html') @app.route('/') def static_files(path): try: return send_from_directory(ROOT, path) except Exception: abort(404) # ══════════════════════════════════════════════════════════ # MAIN # ══════════════════════════════════════════════════════════ if __name__ == '__main__': os.chdir(ROOT) with app.app_context(): db.create_all() # Create default admin if DB is empty if not User.query.first(): u = User(username='admin', email='admin@dashboard.local', is_admin=True) u.set_password('admin123') db.session.add(u) db.session.commit() print('Utworzono domyślnego admina: admin / admin123') print('Serwer uruchomiony na http://0.0.0.0:1234') print('Panel admina: http://localhost:1234/admin/') print('Logowanie: http://localhost:1234/login') app.run(host='0.0.0.0', port=1234, debug=False, threaded=True)