You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SteamModManager/toolkit_steammodmanager.py

228 lines
7.2 KiB
Python

2 years ago
# -*- coding: iso-8859-1 -*-
"""This application requests mods from the Steam workshop
and posts them to Discord webhooks.
Abgekupfert von: https://github.com/robojumper/steam_workshop_discord"""
import datetime
import hashlib
import json
import os
import re
import sys
import requests
import toml
from steam import webapi
import logging
from appdirs import AppDirs
from time import time
# Set to true to not post. Useful for migrating from an earlier version of the
# script without posting
# WEBapi Key = E1DB8AC0D6D2BA5461EEEF229FEB8A21
# Beispiel GameId (Dont Starve Together) = 322330
# Vorlage f<>r ben<65>tigte API Calls:
## Subscribe
#steamctl webapi call --method=POST IPublishedFileService.Subscribe list_type=1 publishedfileid=<workshop id>
#webapi call --method=POST IPublishedFileService.Subscribe list_type=1 publishedfileid=2798599672
## Unsubscribe
#steamctl webapi call --method=POST IPublishedFileService.Unsubscribe list_type=1 publishedfileid=<workshop id>
## Favorite
#steamctl webapi call --method=POST IPublishedFileService.Subscribe list_type=2 publishedfileid=<workshop id>
## Unfavorite
#steamctl webapi call --method=POST IPublishedFileService.Unsubscribe list_type=2 publishedfileid=<workshop id>
LOG = logging.getLogger(__name__)
_appdirs = AppDirs("SteamModManager")
def ensure_dir(path, mode=0o750):
dirpath = os.path.dirname(path)
if not os.path.exists(dirpath):
LOG.debug("Making dirs: %s", dirpath)
os.makedirs(dirpath, mode)
def normpath(path):
if os.sep == '/':
path = path.replace('\\', '/')
return os.path.normpath(path)
class FileBase(object):
_root_path = None
def __init__(self, relpath, mode='r'):
self.mode = mode
self.relpath = relpath
self.path = normpath(os.path.join(self._root_path, relpath))
self.filename = os.path.basename(self.path)
def __repr__(self):
return "%s(%r, mode=%r)" % (
self.__class__.__name__,
self.relpath,
self.mode,
)
def exists(self):
return os.path.exists(self.path)
def mkdir(self):
ensure_dir(self.path, 0o700)
def older_than(self,seconds=0, minutes=0, hours=0, days=0):
delta = seconds + (minutes*60) + (hours*3600) + (days*86400)
ts = os.path.getmtime(self.path)
return ts + delta > time()
def open(self, mode):
LOG.debug("Opening file (%s): %s", mode, self.path)
self.mkdir()
return open(self.path, mode)
def read_text(self):
if self.exists():
with self.open('r') as fp:
return fp.read()
def write_text(self, data):
with self.open('w') as fp:
fp.write(data)
def read_json(self):
if self.exists():
with self.open('r') as fp:
return json.load(fp)
def write_json(self, data, pretty=True):
with self.open('w') as fp:
if pretty:
json.dump(data, fp, indent=4, sort_keys=True)
else:
json.dump(data, fp)
def remove(self):
LOG.debug("Removing file: %s", self.path)
if self.exists():
os.remove(self.path)
def secure_remove(self):
LOG.debug("Securely removing file: %s", self.path)
if self.exists():
with open(self.path, 'r+b') as fp:
size = fp.seek(0, 2)
fp.seek(0)
chunk = b'0' * 4096
while fp.tell() + 4096 < size:
fp.write(chunk)
fp.write(chunk[:max(size - fp.tell(), 0)])
fp.flush()
os.fsync(fp.fileno())
os.remove(self.path)
def __enter__(self):
self._fp = self.open(self.mode)
return self._fp
def __exit__(self, exc_type, exc_value, traceback):
self._fp.close()
class UserDataFile(FileBase):
_root_path = _appdirs.user_data_dir
class UserCacheFile(FileBase):
_root_path = _appdirs.user_cache_dir
CONFIG = toml.load(open('config.toml'))
WEBKEY = CONFIG["config"]["webkey"]
class Namespace:
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
def subscribeToMod(args):
# load key=value pairs. Stuff thats start with [ is a list, so parse as json
try:
params = {k: (json.loads(v) if v[0:1] == '[' else v) for k, v in args.params}
except Exception as exp:
LOG.error("Error parsing params: %s", str(exp))
return 1 # error
apicall = webapi.get
version = args.version or 1
if args.method == 'POST':
apicall = webapi.post
webapi_map = {}
# load cache webapi_interfaces if available
for interface in (UserCacheFile('webapi_interfaces.json').read_json() or {}):
for method in interface['methods']:
key = "{}.{}".format(interface['name'], method['name'])
if key not in webapi_map or webapi_map[key][1] < method['version']:
webapi_map[key] = method['httpmethod'], method['version']
# if --method or --version are unset, take them the cache
# This will the call POST if needed with specifying explicity
# This will prever the highest version of a method
if args.endpoint in webapi_map:
if args.method is None:
if webapi_map[args.endpoint][0] == 'POST':
apicall = webapi.post
if args.version is None:
version = webapi_map[args.endpoint][1]
# drop reserved words. these have special meaning for steam.webapi
for reserved in ('key', 'format', 'raw', 'http_timeout', 'apihost', 'https'):
params.pop(reserved, None)
# load key if available
params.setdefault('key', WEBKEY)
if args.format !='text':
params['format'] = 'json' if args.format == 'json_line' else args.format
params['raw'] = True
try:
interface, method = args.endpoint.split('.', 1)
resp = apicall(interface, method, version, params=params)
except Exception as exp:
LOG.error("%s failed: %s", args.endpoint, str(exp))
if getattr(exp, 'response', None):
LOG.error("Response body: %s", exp.response.text)
return 1 # error
# by default we print json, other formats are shown as returned from api
if args.format == 'json':
json.dump(json.loads(resp.rstrip('\n\t\x00 ')), sys.stdout, indent=4, sort_keys=True)
print('')
else:
print(resp)
version = 1
if __name__ == "__main__":
ns = Namespace(versions_report=None,
log_level='info',
anonymous=False,
user=None,
command='webapi',
#_cmd_func='steamctl.commands.webapi.cmds:cmd_webapi_call',
subcommand='call',
apikey=None,
format='json',
method='POST',
version=None,
endpoint='IPublishedFileService.Unsubscribe',
params=[['list_type', '1'], ['publishedfileid', '2798599672']])
subscribeToMod(ns)