#!/usr/bin/env python3 """This script manage files detection for launch linter""" import os import re import json import collections.abc import yaml ROOT = "." CI_ROOT = ".tekton" def save_json(filename, data): """Save a Json file.""" print("saving to", filename, json.dumps(data)) with open(filename, "w", encoding="utf-8") as file: file.write(json.dumps(data)) def load_json(filename): """Load a json file.""" data = {} with open(filename, "r", encoding="utf-8") as file: data = json.loads(file.read()) return data def load_yaml(filename): """Load a file.""" docs = [] with open(filename, "r", encoding="utf-8") as file: try: data = yaml.safe_load_all(file) for doc in data: docs.append(doc) except yaml.constructor.ConstructorError: pass else: pass return docs def load_config(ci_root_dir): """Load the configuration from the configuration directory.""" ret = { "files": [], "languages": [ "markdown", "docker", "rust", "shell", "python", "yaml", "js", "make", ], "markdown": {"extentions": ["md"], "mdl-args": ["."]}, "docker": {"extentions": ["Dockerfile"]}, "rust": {"extentions": ["rs"]}, "shell": {"extentions": ["sh", "ksh"], "shellcheck-args": []}, "python": { "extentions": ["py"], "black-args": ["--check", "--diff"], "pylint-args": [], }, "yaml": { "extentions": ["yaml", "yml"], "detect": True, "ansible": {"enable": False}, "kube": {"enable": False, "kubelinter-args": [""]}, }, "js": { "extentions": ["ts", "js"], "files": ["package.json", "yarn.lock", "schema.prisma"], }, "make": {"files": ["Makefile"], "checkmake-args": []}, } if not os.path.isdir(ci_root_dir): return ret filtered_files = [ f for f in os.listdir(ci_root_dir) if os.path.isfile(os.path.join(ci_root_dir, f)) and re.match(".yaml$", f) ] if "auto-ci.yaml" in filtered_files: for doc in load_yaml(os.path.join(ci_root_dir, "auto-ci.yaml")): ret = {**ret, **doc} ret["files"] = filtered_files return ret def detect_files(config, root_dir): """ Detect files based on their extention """ ret = {} supported_extentions = [] supported_filename = [] for lang in config["languages"]: if "extentions" in config[lang]: supported_extentions.extend(config[lang]["extentions"]) if "files" in config[lang]: print("adding files", config[lang]) supported_filename.extend(config[lang]["files"]) for directory, _, file_list in os.walk(root_dir): for filename in file_list: if filename in supported_filename: if not filename in ret: ret[filename] = [] ret[filename].append(os.path.join(directory, filename)) ext = filename.split(".")[len(filename.split(".")) - 1] if ext in supported_extentions: if not ext in ret: ret[ext] = [] ret[ext].append(os.path.join(directory, filename)) return ret def get_images_name(dockerfiles, root_dir): """Generate the images names for the detected Dockerfile.""" ret = [] for file in dockerfiles: directory = os.path.dirname(file) if directory == root_dir: ret.append( "$(params.artifactory-url)/$(params.project-path):$(params.image-version)" ) else: ret.append( ( "$(params.artifactory-url)/$(params.project-path)" f"-{os.path.basename(directory)}:$(params.image-version)" ) ) return ret def append_key(obj, key, val): """ Append a value in {obj}[{key}], create the array if not existing. """ if not key in obj: obj[key] = [] obj[key].append(val) # def append_stage(to, key, val, files): def append_stage(obj, key, val): """ Append a value in {obj}[{key}], create the array if not existing. If the key-file is found in the files add a custom suffix """ if not key in obj: obj[key] = [] # Not possible right now # if "{basename}.yaml".format(basename=val) in files: # obj[key].append("{stage}-custom".format(stage=val)) # else: obj[key].append(val) # def set_js_stages(stages, config, files, root_dir): def set_js_stages(stages, files, root_dir): """ Add the stages for javascript code. """ if ( "package.json" in files and os.path.join(root_dir, "package.json") in files["package.json"] ): if ( "yarn.lock" in files and os.path.join(root_dir, "yarn.lock") in files["yarn.lock"] ): # append_stage(stages, "prepare", "prepare-yarn", config["files"]) append_stage(stages, "prepare", "prepare-yarn") else: # append_stage(stages, "prepare", "prepare-npm", config["files"]) append_stage(stages, "prepare", "prepare-npm") if ( "schema.prisma" in files and os.path.join(root_dir, "prisma", "schema.prisma") in files["schema.prisma"] ): # append_stage(stages, "prepare", "prepare-prisma", config["files"]) append_stage(stages, "prepare", "prepare-prisma") defs = load_json(os.path.join(root_dir, "package.json")) if "scripts" in defs and "lint" in defs["scripts"]: # append_stage(stages, "lint", "lint-javascript", config["files"]) append_stage(stages, "lint", "lint-javascript") if "scripts" in defs and "test" in defs["scripts"]: # append_stage(stages, "test", "test-javascript", config["files"]) append_stage(stages, "test", "test-javascript") def set_yaml_stages(stages, config, files): """Add the stages for yaml files.""" yamls = [] if "yaml" in files: yamls += files["yaml"] if "yml" in files: yamls += files["yml"] have_k8s = ( "kube" in config["yaml"] and "enable" in config["yaml"]["kube"] and config["yaml"]["kube"]["enable"] ) have_ansible = ( "ansible" in config["yaml"] and "enable" in config["yaml"]["ansible"] and config["yaml"]["ansible"]["enable"] ) should_detect = ( "detect" not in config["yaml"] or config["yaml"]["detect"] ) and not (have_k8s and have_ansible) if should_detect: # pylint: disable=too-many-nested-blocks for file in yamls: objs = load_yaml(file) for obj in objs: if obj is None: continue if isinstance(obj, collections.abc.Sequence): for item in obj: if "name" in item and ( "register" in item or "changed_when" in item or "loop_control" in item or "ansible.builtin.template" in item ): have_ansible = True elif "apiVersion" in obj: have_k8s = True # append_stage(stages, "lint", "lint-yaml", config["files"]) append_stage(stages, "lint", "lint-yaml") if have_k8s: # append_stage(stages, "lint", "lint-kube", config["files"]) append_stage(stages, "lint", "lint-kube") if have_ansible: # append_stage(stages, "lint", "lint-ansible", config["files"]) append_stage(stages, "lint", "lint-ansible") def get_results(config, files, root_dir): # pylint: disable=too-many-branches """ Generate the stages based on the configuration and detected files. """ stages = { "global": [], "prepare": [], "lint": [], "build": [], "test": [], "publish": [], } args = { "shellcheck-args": ( config["shell"]["shellcheck-args"] if "shellcheck-args" in config["shell"] else [] ), "checkmake-args": ( config["make"]["checkmake-args"] if "checkmake-args" in config["make"] else [] ), "kubelinter-args": ( config["yaml"]["kube"]["kubelinter-args"] if "kube" in config["yaml"] and "kubelinter-args" in config["yaml"]["kube"] else [] ), "mdl-args": ( config["markdown"]["mdl-args"] if "mdl-args" in config["markdown"] else ["."] ), "black-args": ( config["python"]["black-args"] if "black-args" in config["python"] else [] ), "pylint-args": ( config["python"]["pylint-args"] if "pylint-args" in config["python"] else [] ), } if "on-$(params.pipeline-type).yaml" in config["files"]: append_key(stages, "global", "$(params.pipeline-type)") return stages, args if "Dockerfile" in files: # append_stage(stages, "lint", "lint-docker", config["files"]) # append_stage(stages, "publish", "publish-docker", config["files"]) append_stage(stages, "lint", "lint-docker") append_stage(stages, "publish", "publish-docker") if "yaml" in files or "yml" in files: set_yaml_stages(stages, config, files) if "sh" in files: # append_stage(stages, "lint", "lint-shell", config["files"]) append_stage(stages, "lint", "lint-shell") args["shellcheck-args"].extend(files["sh"]) if "sh" in files: # append_stage(stages, "lint", "lint-shell", config["files"]) append_stage(stages, "lint", "lint-shell") args["shellcheck-args"].extend(files["sh"]) if "Makefile" in files: # append_stage(stages, "lint", "lint-make", config["files"]) append_stage(stages, "lint", "lint-make") args["checkmake-args"].extend(files["Makefile"]) if "md" in files: # append_stage(stages, "lint", "lint-md", config["files"]) append_stage(stages, "lint", "lint-md") if "rs" in files: # append_stage(stages, "lint", "lint-clippy", config["files"]) append_stage(stages, "lint", "lint-clippy") if "py" in files: # append_stage(stages, "lint", "lint-python", config["files"]) append_stage(stages, "lint", "lint-python") args["pylint-args"].extend(files["py"]) # append_stage(stages, "lint", "lint-black", config["files"]) append_stage(stages, "lint", "lint-black") args["black-args"].extend(files["py"]) if len([t for t in files["py"] if re.match(r"/test_", t) is not None]) > 0: # append_stage(stages, "test", "test-python", config["files"]) append_stage(stages, "test", "test-python") if "ts" in files or "js" in files: # set_js_stages(stages, config, files, root_dir) set_js_stages(stages, files, root_dir) for stage in ["prepare", "lint", "build", "test", "publish"]: if "{stage}-custom.yaml" in config["files"]: stages[stage].append("{stage}-custom") # Unsupported by tekton... yet :P # if len(stages[stage])>0: # append_stage(stages, "global", "on-{stage}".format(stage = stage), config["files"]) return stages, args def main(): """Main function""" config = load_config(CI_ROOT) files = detect_files(config, ROOT) stages, args = get_results(config, files, ROOT) save_json("$(results.stages-global.path)", stages["global"]) save_json("$(results.stages-prepare.path)", stages["prepare"]) save_json("$(results.stages-lint.path)", stages["lint"]) save_json("$(results.stages-build.path)", stages["build"]) save_json("$(results.stages-test.path)", stages["test"]) save_json("$(results.stages-publish.path)", stages["publish"]) save_json( "$(results.file-docker.path)", files["Dockerfile"] if "Dockerfile" in files else [], ) save_json( "$(results.images-name.path)", get_images_name(files["Dockerfile"] if "Dockerfile" in files else [], ROOT), ) save_json("$(results.shellcheck-args.path)", args["shellcheck-args"]) save_json("$(results.checkmake-args.path)", args["checkmake-args"]) save_json("$(results.black-args.path)", args["black-args"]) save_json("$(results.pylint-args.path)", args["pylint-args"]) save_json("$(results.kubelinter-args.path)", args["kubelinter-args"]) save_json("$(results.mdl-args.path)", args["mdl-args"]) if __name__ == "__main__": main()