Add preprocessing step to remove watch_* operations in favor of list_*(watch=True)
This commit is contained in:
parent
9e2e8e414b
commit
869fd7b6b9
@ -13,13 +13,27 @@
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import operator
|
||||
import os.path
|
||||
import sys
|
||||
from collections import OrderedDict
|
||||
|
||||
|
||||
# these four constants are shown as part of this example in []:
|
||||
# "[watch]Pod[List]" is the deprecated version of "[list]Pod?[watch]=True"
|
||||
WATCH_OP_PREFIX = "watch"
|
||||
WATCH_OP_SUFFIX = "List"
|
||||
LIST_OP_PREFIX = "list"
|
||||
WATCH_QUERY_PARAM_NAME = "watch"
|
||||
|
||||
|
||||
_ops = ['get', 'put', 'post', 'delete', 'options', 'head', 'patch']
|
||||
|
||||
|
||||
class PreprocessingException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _title(s):
|
||||
if len(s) == 0:
|
||||
return s
|
||||
@ -30,24 +44,72 @@ def _to_camel_case(s):
|
||||
return ''.join(_title(y) for y in s.split("_"))
|
||||
|
||||
|
||||
def iterate_through_operations(spec, func):
|
||||
def apply_func_to_spec_operations(spec, func, *params):
|
||||
"""Apply func to each operation in the spec.
|
||||
|
||||
:param spec: The OpenAPI spec to apply func to.
|
||||
:param func: the function to apply to the spec's operations. It should be
|
||||
a func(operation, parent) where operation will be each
|
||||
operation of the spec and parent would be the parent object of
|
||||
the given operation.
|
||||
If the return value of the func is True, then the operation
|
||||
will be deleted from the spec.
|
||||
"""
|
||||
for k, v in spec['paths'].iteritems():
|
||||
for op in _ops:
|
||||
if op in v:
|
||||
func(v[op])
|
||||
if op not in v:
|
||||
continue
|
||||
if func(v[op], v, *params):
|
||||
del v[op]
|
||||
|
||||
|
||||
def _has_property(prop_list, property_name):
|
||||
for prop in prop_list:
|
||||
if prop["name"] == property_name:
|
||||
return True
|
||||
|
||||
|
||||
def remove_watch_operations(op, parent, operation_ids):
|
||||
op_id = op['operationId']
|
||||
if not op_id.startswith(WATCH_OP_PREFIX):
|
||||
return
|
||||
list_id = (LIST_OP_PREFIX +
|
||||
op_id.replace(WATCH_OP_SUFFIX, "")[len(WATCH_OP_PREFIX):])
|
||||
if list_id not in operation_ids:
|
||||
raise PreprocessingException("Cannot find %s" % list_id)
|
||||
list_op = operation_ids[list_id]
|
||||
params = []
|
||||
if 'parameters' in list_op:
|
||||
params += list_op['parameters']
|
||||
if 'parameters' in parent:
|
||||
params += parent['parameters']
|
||||
if not _has_property(params, WATCH_QUERY_PARAM_NAME):
|
||||
raise PreprocessingException("%s has no watch query param" % list_id)
|
||||
return True
|
||||
|
||||
|
||||
def strip_tags_from_operation_id(operation, _):
|
||||
operation_id = operation['operationId']
|
||||
for t in operation['tags']:
|
||||
operation_id = operation_id.replace(_to_camel_case(t), '')
|
||||
operation['operationId'] = operation_id
|
||||
|
||||
|
||||
def process_swagger(infile, outfile):
|
||||
with open(infile, 'r') as f:
|
||||
spec = json.load(f, object_pairs_hook=OrderedDict)
|
||||
|
||||
def strip_tags_from_operation_id(operation):
|
||||
operation_id = operation['operationId']
|
||||
for t in operation['tags']:
|
||||
operation_id = operation_id.replace(_to_camel_case(t), '')
|
||||
operation['operationId'] = operation_id
|
||||
apply_func_to_spec_operations(spec, strip_tags_from_operation_id)
|
||||
|
||||
iterate_through_operations(spec, strip_tags_from_operation_id)
|
||||
operation_ids = {}
|
||||
apply_func_to_spec_operations(spec, lambda op, _: operator.setitem(
|
||||
operation_ids, op['operationId'], op))
|
||||
|
||||
try:
|
||||
apply_func_to_spec_operations(
|
||||
spec, remove_watch_operations, operation_ids)
|
||||
except PreprocessingException as e:
|
||||
print(e.message)
|
||||
|
||||
with open(outfile, 'w') as out:
|
||||
json.dump(spec, out, sort_keys=False, indent=2,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user