Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hooks/playbooks/ceph.yml
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@

- name: Generate a cephx key
cephx_key:
cipher: "{{ cifmw_ceph_key_cipher | default('aes') }}"
register: cephx
no_log: "{{ cifmw_nolog | default(true) | bool }}"

Expand Down
56 changes: 46 additions & 10 deletions plugins/modules/cephx_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,53 @@
short_description: Generate a random CephX authentication key

description:
- Generate a random CephX authentication key and return it
- Generate a random CephX authentication key and return it.
- Supports AES-128 (default, type=1, 16-byte key) and AES-256k (type=2, 32-byte key) ciphers.

options:
cipher:
description:
- The cipher to use when generating the CephX key.
- Use C(aes) for AES-128 (16-byte key, 40-char base64, type=1). This is the default.
- Use C(aes256k) for AES-256k (32-byte key, 60-char base64, type=2).
type: str
default: aes
choices: [aes, aes256k]

author:
- John Fulton (@fultonj)
"""

EXAMPLES = r"""
- name: Generate a cephx key
- name: Generate a cephx key (AES-128, backward compatible default)
cifmw.general.cephx_key:
register: cephx

- name: Generate a cephx key with explicit AES-128 cipher
cifmw.general.cephx_key:
cipher: aes
register: cephx

- name: Generate a cephx key with AES-256k cipher
cifmw.general.cephx_key:
cipher: aes256k
register: cephx

- name: Show cephx key
debug:
msg: "{{ cephx.key }}"
"""

RETURN = r"""
key:
description: A random cephx authentication key
type: dict
description:
- A random CephX authentication key encoded as base64.
- AES-128 keys are 40 characters long (ending with ==).
- AES-256k keys are 60 characters long (ending with =).
type: str
returned: success
sample:
- KEY: AQC+vYNXgDAgAhAAc8UoYt+OTz5uhV7ItLdwUw==
- AQC+vYNXgDAgAhAAc8UoYt+OTz5uhV7ItLdwUw==
"""


Expand All @@ -47,21 +71,33 @@
import time


def __create_cephx_key():
def __create_cephx_key(cipher="aes"):
# NOTE(fultonj): Taken from
# https://github.com/ceph/ceph-deploy/blob/master/ceph_deploy/new.py#L21
key = os.urandom(16)
header = struct.pack("<hiih", 1, int(time.time()), 0, len(key))
if cipher == "aes256k":
key_type = 2
key = os.urandom(32)
else:
key_type = 1
key = os.urandom(16)
header = struct.pack("<hiih", key_type, int(time.time()), 0, len(key))
return base64.b64encode(header + key).decode("utf-8")


def main():
mod_args = {}
mod_args = {
"cipher": {
"type": "str",
"default": "aes",
"choices": ["aes", "aes256k"],
}
}
module = AnsibleModule(argument_spec=mod_args, supports_check_mode=False)

result = {"changed": False, "error": ""}

cephx_key = __create_cephx_key()
cipher = module.params["cipher"]
cephx_key = __create_cephx_key(cipher)
if not cephx_key:
result["msg"] = "Error: unable to create cephx key"
module.fail_json(**result)
Expand Down
136 changes: 136 additions & 0 deletions tests/unit/modules/test_cephx_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Copyright: (c) 2026, Red Hat

# GNU General Public License v3.0+ (see COPYING or
# https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function

import base64
import struct
import unittest.mock # noqa: F401 — required so unittest.mock is loaded for utils.py setUp

from ansible_collections.cifmw.general.tests.unit.utils import (
AnsibleExitJson,
AnsibleFailJson,
ModuleBaseTestCase,
set_module_args,
)
from ansible_collections.cifmw.general.plugins.modules import cephx_key


class TestCephxKey(ModuleBaseTestCase):
"""Unit tests for the cephx_key Ansible module."""

def _decode_key(self, key_b64):
"""Decode a base64 CephX key and return (header_tuple, key_bytes)."""
raw = base64.b64decode(key_b64)
# Header is 12 bytes: struct.pack("<hiih", type, sec, nsec, key_len)
header = struct.unpack("<hiih", raw[:12])
key_bytes = raw[12:]
return header, key_bytes

def test_default_cipher_returns_aes128_key(self):
"""No args: default cipher produces a 28-byte (AES-128) encoded key."""
set_module_args({})
with self.assertRaises(AnsibleExitJson) as ctx:
cephx_key.main()
result = ctx.exception.args[0]
self.assertIn("key", result)
key_b64 = result["key"]
raw = base64.b64decode(key_b64)
self.assertEqual(len(raw), 28)
header, key_bytes = self._decode_key(key_b64)
# type=1, key_len=16
self.assertEqual(header[0], 1)
self.assertEqual(header[3], 16)
self.assertEqual(len(key_bytes), 16)

def test_aes_cipher_returns_aes128_key(self):
"""cipher=aes produces a 28-byte (AES-128) encoded key."""
set_module_args({"cipher": "aes"})
with self.assertRaises(AnsibleExitJson) as ctx:
cephx_key.main()
result = ctx.exception.args[0]
key_b64 = result["key"]
raw = base64.b64decode(key_b64)
self.assertEqual(len(raw), 28)
header, key_bytes = self._decode_key(key_b64)
self.assertEqual(header[0], 1)
self.assertEqual(header[3], 16)
self.assertEqual(len(key_bytes), 16)

def test_aes256k_cipher_returns_aes256k_key(self):
"""cipher=aes256k produces a 44-byte (AES-256k) encoded key."""
set_module_args({"cipher": "aes256k"})
with self.assertRaises(AnsibleExitJson) as ctx:
cephx_key.main()
result = ctx.exception.args[0]
key_b64 = result["key"]
raw = base64.b64decode(key_b64)
self.assertEqual(len(raw), 44)
header, key_bytes = self._decode_key(key_b64)
# type=2, key_len=32
self.assertEqual(header[0], 2)
self.assertEqual(header[3], 32)
self.assertEqual(len(key_bytes), 32)

def test_invalid_cipher_fails(self):
"""cipher=invalid raises AnsibleFailJson (AnsibleModule enforces choices)."""
set_module_args({"cipher": "invalid"})
with self.assertRaises(AnsibleFailJson) as ctx:
cephx_key.main()
result = ctx.exception.args[0]
self.assertTrue(result["failed"])

def test_aes_key_is_valid_base64(self):
"""AES-128 key is valid base64 and ends with ==."""
set_module_args({"cipher": "aes"})
with self.assertRaises(AnsibleExitJson) as ctx:
cephx_key.main()
key_b64 = ctx.exception.args[0]["key"]
# Should not raise
decoded = base64.b64decode(key_b64)
self.assertIsInstance(decoded, bytes)
self.assertTrue(
key_b64.endswith("=="), f"Expected == suffix, got: {key_b64[-2:]}"
)

def test_aes256k_key_is_valid_base64(self):
"""AES-256k key is valid base64 and ends with a single =."""
set_module_args({"cipher": "aes256k"})
with self.assertRaises(AnsibleExitJson) as ctx:
cephx_key.main()
key_b64 = ctx.exception.args[0]["key"]
decoded = base64.b64decode(key_b64)
self.assertIsInstance(decoded, bytes)
self.assertTrue(
key_b64.endswith("=") and not key_b64.endswith("=="),
f"Expected single = suffix, got: {key_b64[-2:]}",
)

def test_key_changes_on_each_call(self):
"""Two successive calls produce different keys (randomness check)."""
set_module_args({"cipher": "aes"})
with self.assertRaises(AnsibleExitJson) as ctx1:
cephx_key.main()
key1 = ctx1.exception.args[0]["key"]

set_module_args({"cipher": "aes"})
with self.assertRaises(AnsibleExitJson) as ctx2:
cephx_key.main()
key2 = ctx2.exception.args[0]["key"]

self.assertNotEqual(key1, key2)

def test_aes256k_key_changes_on_each_call(self):
"""Two successive aes256k calls produce different keys (randomness check)."""
set_module_args({"cipher": "aes256k"})
with self.assertRaises(AnsibleExitJson) as ctx1:
cephx_key.main()
key1 = ctx1.exception.args[0]["key"]

set_module_args({"cipher": "aes256k"})
with self.assertRaises(AnsibleExitJson) as ctx2:
cephx_key.main()
key2 = ctx2.exception.args[0]["key"]

self.assertNotEqual(key1, key2)
Loading