atrooney-online-2/server/area_manager.py
Cerapter 0156849cc2 BEGINNINGS! of the multi-CM system.
- Allows people to become CMs of multiple areas.
- Allows people to appoint CMs besides themselves using `/cm id]`.
- CMs can now freely leave the areas they CM in without losing CM
status.
- CMs that own an area, but aren't there, still show up in them when
using `/getarea` with the `[RCM]` = Remote Case Manager tag.
- CMs get all IC and OOC messages from their areas.
- CMs can use `/s` to send a message to all the areas they own, or `/a
[area_id]` to send a message only to a specific area. This is usable
both IC and OOC.
2018-09-18 19:51:20 +02:00

413 lines
16 KiB
Python

# tsuserver3, an Attorney Online server
#
# Copyright (C) 2016 argoneus <argoneuscze@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import random
import time
import yaml
from server.exceptions import AreaError
from server.evidence import EvidenceList
from enum import Enum
class AreaManager:
class Area:
def __init__(self, area_id, server, name, background, bg_lock, evidence_mod = 'FFA', locking_allowed = False, iniswap_allowed = True, showname_changes_allowed = False, shouts_allowed = True, jukebox = False, abbreviation = '', non_int_pres_only = False):
self.iniswap_allowed = iniswap_allowed
self.clients = set()
self.invite_list = {}
self.id = area_id
self.name = name
self.background = background
self.bg_lock = bg_lock
self.server = server
self.music_looper = None
self.next_message_time = 0
self.hp_def = 10
self.hp_pro = 10
self.doc = 'No document.'
self.status = 'IDLE'
self.judgelog = []
self.current_music = ''
self.current_music_player = ''
self.current_music_player_ipid = -1
self.evi_list = EvidenceList()
self.is_recording = False
self.recorded_messages = []
self.evidence_mod = evidence_mod
self.locking_allowed = locking_allowed
self.showname_changes_allowed = showname_changes_allowed
self.shouts_allowed = shouts_allowed
self.abbreviation = abbreviation
self.cards = dict()
"""
#debug
self.evidence_list.append(Evidence("WOW", "desc", "1.png"))
self.evidence_list.append(Evidence("wewz", "desc2", "2.png"))
self.evidence_list.append(Evidence("weeeeeew", "desc3", "3.png"))
"""
self.is_locked = self.Locked.FREE
self.blankposting_allowed = True
self.non_int_pres_only = non_int_pres_only
self.jukebox = jukebox
self.jukebox_votes = []
self.jukebox_prev_char_id = -1
self.owners = []
class Locked(Enum):
FREE = 1,
SPECTATABLE = 2,
LOCKED = 3
def new_client(self, client):
self.clients.add(client)
self.server.area_manager.send_arup_players()
def remove_client(self, client):
self.clients.remove(client)
if len(self.clients) == 0:
self.change_status('IDLE')
def unlock(self):
self.is_locked = self.Locked.FREE
self.blankposting_allowed = True
self.invite_list = {}
self.server.area_manager.send_arup_lock()
self.send_host_message('This area is open now.')
def spectator(self):
self.is_locked = self.Locked.SPECTATABLE
for i in self.clients:
self.invite_list[i.id] = None
for i in self.owners:
self.invite_list[i.id] = None
self.server.area_manager.send_arup_lock()
self.send_host_message('This area is spectatable now.')
def lock(self):
self.is_locked = self.Locked.LOCKED
for i in self.clients:
self.invite_list[i.id] = None
for i in self.owners:
self.invite_list[i.id] = None
self.server.area_manager.send_arup_lock()
self.send_host_message('This area is locked now.')
def is_char_available(self, char_id):
return char_id not in [x.char_id for x in self.clients]
def get_rand_avail_char_id(self):
avail_set = set(range(len(self.server.char_list))) - set([x.char_id for x in self.clients])
if len(avail_set) == 0:
raise AreaError('No available characters.')
return random.choice(tuple(avail_set))
def send_command(self, cmd, *args):
for c in self.clients:
c.send_command(cmd, *args)
def send_owner_command(self, cmd, *args):
for c in self.owners:
if not c in self.clients:
c.send_command(cmd, *args)
def send_host_message(self, msg):
self.send_command('CT', self.server.config['hostname'], msg, '1')
self.send_owner_command('CT', '[' + self.abbreviation + ']' + self.server.config['hostname'], msg, '1')
def set_next_msg_delay(self, msg_length):
delay = min(3000, 100 + 60 * msg_length)
self.next_message_time = round(time.time() * 1000.0 + delay)
def is_iniswap(self, client, anim1, anim2, char):
if self.iniswap_allowed:
return False
if '..' in anim1 or '..' in anim2:
return True
for char_link in self.server.allowed_iniswaps:
if client.get_char_name() in char_link and char in char_link:
return False
return True
def add_jukebox_vote(self, client, music_name, length=-1, showname=''):
if not self.jukebox:
return
if length <= 0:
self.remove_jukebox_vote(client, False)
else:
self.remove_jukebox_vote(client, True)
self.jukebox_votes.append(self.JukeboxVote(client, music_name, length, showname))
client.send_host_message('Your song was added to the jukebox.')
if len(self.jukebox_votes) == 1:
self.start_jukebox()
def remove_jukebox_vote(self, client, silent):
if not self.jukebox:
return
for current_vote in self.jukebox_votes:
if current_vote.client.id == client.id:
self.jukebox_votes.remove(current_vote)
if not silent:
client.send_host_message('You removed your song from the jukebox.')
def get_jukebox_picked(self):
if not self.jukebox:
return
if len(self.jukebox_votes) == 0:
return None
elif len(self.jukebox_votes) == 1:
return self.jukebox_votes[0]
else:
weighted_votes = []
for current_vote in self.jukebox_votes:
i = 0
while i < current_vote.chance:
weighted_votes.append(current_vote)
i += 1
return random.choice(weighted_votes)
def start_jukebox(self):
# There is a probability that the jukebox feature has been turned off since then,
# we should check that.
# We also do a check if we were the last to play a song, just in case.
if not self.jukebox:
if self.current_music_player == 'The Jukebox' and self.current_music_player_ipid == 'has no IPID':
self.current_music = ''
return
vote_picked = self.get_jukebox_picked()
if vote_picked is None:
self.current_music = ''
return
if vote_picked.client.char_id != self.jukebox_prev_char_id or vote_picked.name != self.current_music or len(self.jukebox_votes) > 1:
self.jukebox_prev_char_id = vote_picked.client.char_id
if vote_picked.showname == '':
self.send_command('MC', vote_picked.name, vote_picked.client.char_id)
else:
self.send_command('MC', vote_picked.name, vote_picked.client.char_id, vote_picked.showname)
else:
self.send_command('MC', vote_picked.name, -1)
self.current_music_player = 'The Jukebox'
self.current_music_player_ipid = 'has no IPID'
self.current_music = vote_picked.name
for current_vote in self.jukebox_votes:
# Choosing the same song will get your votes down to 0, too.
# Don't want the same song twice in a row!
if current_vote.name == vote_picked.name:
current_vote.chance = 0
else:
current_vote.chance += 1
if self.music_looper:
self.music_looper.cancel()
self.music_looper = asyncio.get_event_loop().call_later(vote_picked.length, lambda: self.start_jukebox())
def play_music(self, name, cid, length=-1):
self.send_command('MC', name, cid)
if self.music_looper:
self.music_looper.cancel()
if length > 0:
self.music_looper = asyncio.get_event_loop().call_later(length,
lambda: self.play_music(name, -1, length))
def play_music_shownamed(self, name, cid, showname, length=-1):
self.send_command('MC', name, cid, showname)
if self.music_looper:
self.music_looper.cancel()
if length > 0:
self.music_looper = asyncio.get_event_loop().call_later(length,
lambda: self.play_music(name, -1, length))
def can_send_message(self, client):
if self.cannot_ic_interact(client):
client.send_host_message('This is a locked area - ask the CM to speak.')
return False
return (time.time() * 1000.0 - self.next_message_time) > 0
def cannot_ic_interact(self, client):
return self.is_locked != self.Locked.FREE and not client.is_mod and not client.id in self.invite_list
def change_hp(self, side, val):
if not 0 <= val <= 10:
raise AreaError('Invalid penalty value.')
if not 1 <= side <= 2:
raise AreaError('Invalid penalty side.')
if side == 1:
self.hp_def = val
elif side == 2:
self.hp_pro = val
self.send_command('HP', side, val)
def change_background(self, bg):
if bg.lower() not in (name.lower() for name in self.server.backgrounds):
raise AreaError('Invalid background name.')
self.background = bg
self.send_command('BN', self.background)
def change_status(self, value):
allowed_values = ('idle', 'rp', 'casing', 'looking-for-players', 'lfp', 'recess', 'gaming')
if value.lower() not in allowed_values:
raise AreaError('Invalid status. Possible values: {}'.format(', '.join(allowed_values)))
if value.lower() == 'lfp':
value = 'looking-for-players'
self.status = value.upper()
self.server.area_manager.send_arup_status()
def change_doc(self, doc='No document.'):
self.doc = doc
def add_to_judgelog(self, client, msg):
if len(self.judgelog) >= 10:
self.judgelog = self.judgelog[1:]
self.judgelog.append('{} ({}) {}.'.format(client.get_char_name(), client.get_ip(), msg))
def add_music_playing(self, client, name):
self.current_music_player = client.get_char_name()
self.current_music_player_ipid = client.ipid
self.current_music = name
def add_music_playing_shownamed(self, client, showname, name):
self.current_music_player = showname + " (" + client.get_char_name() + ")"
self.current_music_player_ipid = client.ipid
self.current_music = name
def get_evidence_list(self, client):
client.evi_list, evi_list = self.evi_list.create_evi_list(client)
return evi_list
def broadcast_evidence_list(self):
"""
LE#<name>&<desc>&<img>#<name>
"""
for client in self.clients:
client.send_command('LE', *self.get_evidence_list(client))
def get_cms(self):
msg = ''
for i in self.owners:
msg = msg + '[' + str(i.id) + '] ' + i.get_char_name() + ', '
if len(msg) > 2:
msg = msg[:-2]
return msg
class JukeboxVote:
def __init__(self, client, name, length, showname):
self.client = client
self.name = name
self.length = length
self.chance = 1
self.showname = showname
def __init__(self, server):
self.server = server
self.cur_id = 0
self.areas = []
self.load_areas()
def load_areas(self):
with open('config/areas.yaml', 'r') as chars:
areas = yaml.load(chars)
for item in areas:
if 'evidence_mod' not in item:
item['evidence_mod'] = 'FFA'
if 'locking_allowed' not in item:
item['locking_allowed'] = False
if 'iniswap_allowed' not in item:
item['iniswap_allowed'] = True
if 'showname_changes_allowed' not in item:
item['showname_changes_allowed'] = False
if 'shouts_allowed' not in item:
item['shouts_allowed'] = True
if 'jukebox' not in item:
item['jukebox'] = False
if 'noninterrupting_pres' not in item:
item['noninterrupting_pres'] = False
if 'abbreviation' not in item:
item['abbreviation'] = self.get_generated_abbreviation(item['area'])
self.areas.append(
self.Area(self.cur_id, self.server, item['area'], item['background'], item['bglock'], item['evidence_mod'], item['locking_allowed'], item['iniswap_allowed'], item['showname_changes_allowed'], item['shouts_allowed'], item['jukebox'], item['abbreviation'], item['noninterrupting_pres']))
self.cur_id += 1
def default_area(self):
return self.areas[0]
def get_area_by_name(self, name):
for area in self.areas:
if area.name == name:
return area
raise AreaError('Area not found.')
def get_area_by_id(self, num):
for area in self.areas:
if area.id == num:
return area
raise AreaError('Area not found.')
def get_generated_abbreviation(self, name):
if name.lower().startswith("courtroom"):
return "CR" + name.split()[-1]
elif name.lower().startswith("area"):
return "A" + name.split()[-1]
elif len(name.split()) > 1:
return "".join(item[0].upper() for item in name.split())
elif len(name) > 3:
return name[:3].upper()
else:
return name.upper()
def send_remote_command(self, area_ids, cmd, *args):
for a_id in area_ids:
self.get_area_by_id(a_id).send_command(cmd, *args)
self.get_area_by_id(a_id).send_owner_command(cmd, *args)
def send_arup_players(self):
players_list = [0]
for area in self.areas:
players_list.append(len(area.clients))
self.server.send_arup(players_list)
def send_arup_status(self):
status_list = [1]
for area in self.areas:
status_list.append(area.status)
self.server.send_arup(status_list)
def send_arup_cms(self):
cms_list = [2]
for area in self.areas:
cm = 'FREE'
if len(area.owners) > 0:
cm = area.get_cms()
cms_list.append(cm)
self.server.send_arup(cms_list)
def send_arup_lock(self):
lock_list = [3]
for area in self.areas:
lock_list.append(area.is_locked.name)
self.server.send_arup(lock_list)