# tsuserver3, an Attorney Online server # # Copyright (C) 2016 argoneus # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import asyncio import random import time import yaml from server.exceptions import AreaError from server.evidence import EvidenceList class AreaManager: class Area: def __init__(self, area_id, server, name, background, bg_lock, evidence_mod = 'FFA', locking_allowed = False, iniswap_allowed = True, showname_changes_allowed = False, shouts_allowed = True): self.iniswap_allowed = iniswap_allowed self.clients = set() self.invite_list = {} self.id = area_id self.name = name self.background = background self.bg_lock = bg_lock self.server = server self.music_looper = None self.next_message_time = 0 self.hp_def = 10 self.hp_pro = 10 self.doc = 'No document.' self.status = 'IDLE' self.judgelog = [] self.current_music = '' self.current_music_player = '' self.current_music_player_ipid = -1 self.evi_list = EvidenceList() self.is_recording = False self.recorded_messages = [] self.evidence_mod = evidence_mod self.locking_allowed = locking_allowed self.showname_changes_allowed = showname_changes_allowed self.shouts_allowed = shouts_allowed self.owned = False self.cards = dict() """ #debug self.evidence_list.append(Evidence("WOW", "desc", "1.png")) self.evidence_list.append(Evidence("wewz", "desc2", "2.png")) self.evidence_list.append(Evidence("weeeeeew", "desc3", "3.png")) """ self.is_locked = False def new_client(self, client): self.clients.add(client) def remove_client(self, client): self.clients.remove(client) if client.is_cm: client.is_cm = False self.owned = False if self.is_locked: self.unlock() def unlock(self): self.is_locked = False self.invite_list = {} self.send_host_message('This area is open now.') def is_char_available(self, char_id): return char_id not in [x.char_id for x in self.clients] def get_rand_avail_char_id(self): avail_set = set(range(len(self.server.char_list))) - set([x.char_id for x in self.clients]) if len(avail_set) == 0: raise AreaError('No available characters.') return random.choice(tuple(avail_set)) def send_command(self, cmd, *args): for c in self.clients: c.send_command(cmd, *args) def send_host_message(self, msg): self.send_command('CT', self.server.config['hostname'], msg) def set_next_msg_delay(self, msg_length): delay = min(3000, 100 + 60 * msg_length) self.next_message_time = round(time.time() * 1000.0 + delay) def is_iniswap(self, client, anim1, anim2, char): if self.iniswap_allowed: return False if '..' in anim1 or '..' in anim2: return True for char_link in self.server.allowed_iniswaps: if client.get_char_name() in char_link and char in char_link: return False return True def play_music(self, name, cid, length=-1): self.send_command('MC', name, cid) if self.music_looper: self.music_looper.cancel() if length > 0: self.music_looper = asyncio.get_event_loop().call_later(length, lambda: self.play_music(name, -1, length)) def play_music_shownamed(self, name, cid, showname, length=-1): self.send_command('MC', name, cid, showname) if self.music_looper: self.music_looper.cancel() if length > 0: self.music_looper = asyncio.get_event_loop().call_later(length, lambda: self.play_music(name, -1, length)) def can_send_message(self, client): if self.is_locked and not client.is_mod and not client.ipid in self.invite_list: client.send_host_message('This is a locked area - ask the CM to speak.') return False return (time.time() * 1000.0 - self.next_message_time) > 0 def change_hp(self, side, val): if not 0 <= val <= 10: raise AreaError('Invalid penalty value.') if not 1 <= side <= 2: raise AreaError('Invalid penalty side.') if side == 1: self.hp_def = val elif side == 2: self.hp_pro = val self.send_command('HP', side, val) def change_background(self, bg): if bg.lower() not in (name.lower() for name in self.server.backgrounds): raise AreaError('Invalid background name.') self.background = bg self.send_command('BN', self.background) def change_status(self, value): allowed_values = ('idle', 'rp', 'casing', 'looking-for-players', 'lfp', 'recess', 'gaming') if value.lower() not in allowed_values: raise AreaError('Invalid status. Possible values: {}'.format(', '.join(allowed_values))) if value.lower() == 'lfp': value = 'looking-for-players' self.status = value.upper() def change_doc(self, doc='No document.'): self.doc = doc def add_to_judgelog(self, client, msg): if len(self.judgelog) >= 10: self.judgelog = self.judgelog[1:] self.judgelog.append('{} ({}) {}.'.format(client.get_char_name(), client.get_ip(), msg)) def add_music_playing(self, client, name): self.current_music_player = client.get_char_name() self.current_music_player_ipid = client.ipid self.current_music = name def add_music_playing_shownamed(self, client, showname, name): self.current_music_player = showname + " (" + client.get_char_name() + ")" self.current_music_player_ipid = client.ipid self.current_music = name def get_evidence_list(self, client): client.evi_list, evi_list = self.evi_list.create_evi_list(client) return evi_list def broadcast_evidence_list(self): """ LE#&&# """ for client in self.clients: client.send_command('LE', *self.get_evidence_list(client)) def get_abbreviation(self): if self.name.lower().startswith("courtroom"): return "CR" + self.name.split()[-1] elif self.name.lower().startswith("area"): return "A" + self.name.split()[-1] elif len(self.name.split()) > 1: return "".join(item[0].upper() for item in self.name.split()) elif len(self.name) > 3: return self.name[:3].upper() else: return self.name.upper() def __init__(self, server): self.server = server self.cur_id = 0 self.areas = [] self.load_areas() def load_areas(self): with open('config/areas.yaml', 'r') as chars: areas = yaml.load(chars) for item in areas: if 'evidence_mod' not in item: item['evidence_mod'] = 'FFA' if 'locking_allowed' not in item: item['locking_allowed'] = False if 'iniswap_allowed' not in item: item['iniswap_allowed'] = True if 'showname_changes_allowed' not in item: item['showname_changes_allowed'] = False if 'shouts_allowed' not in item: item['shouts_allowed'] = True self.areas.append( self.Area(self.cur_id, self.server, item['area'], item['background'], item['bglock'], item['evidence_mod'], item['locking_allowed'], item['iniswap_allowed'], item['showname_changes_allowed'], item['shouts_allowed'])) self.cur_id += 1 def default_area(self): return self.areas[0] def get_area_by_name(self, name): for area in self.areas: if area.name == name: return area raise AreaError('Area not found.') def get_area_by_id(self, num): for area in self.areas: if area.id == num: return area raise AreaError('Area not found.')