Skip to content

Commit

Permalink
Clean up keygen.py
Browse files Browse the repository at this point in the history
The code that generates system cert requests have been moved
into PKIDeployer.
  • Loading branch information
edewata committed Jul 19, 2023
1 parent 693aa62 commit e31e3b9
Show file tree
Hide file tree
Showing 2 changed files with 325 additions and 330 deletions.
324 changes: 324 additions & 0 deletions base/server/python/pki/server/deployment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,330 @@ def get_cert_id(self, subsystem, tag):
else:
return tag

def generate_ca_signing_request(self, subsystem):

csr_path = self.mdict.get('pki_ca_signing_csr_path')
if not csr_path:
return

basic_constraints_ext = {
'ca': True,
'path_length': None,
'critical': True
}

key_usage_ext = {
'digitalSignature': True,
'nonRepudiation': True,
'certSigning': True,
'crlSigning': True,
'critical': True
}

# if specified, add generic CSR extension
generic_exts = None

if 'preop.cert.signing.ext.oid' in subsystem.config and \
'preop.cert.signing.ext.data' in subsystem.config:

data = subsystem.config['preop.cert.signing.ext.data']
critical = subsystem.config['preop.cert.signing.ext.critical']

generic_ext = {
'oid': subsystem.config['preop.cert.signing.ext.oid'],
'data': binascii.unhexlify(data),
'critical': config.str2bool(critical)
}

generic_exts = [generic_ext]

tag = 'signing'
cert = subsystem.get_subsystem_cert(tag)
token = pki.nssdb.normalize_token(cert['token'])

if not token:
token = self.mdict['pki_token_name']

nssdb = subsystem.instance.open_nssdb(
token=token,
user=self.mdict.get('pki_user'),
group=self.mdict.get('pki_group'),
)

try:
self.generate_csr(
nssdb,
subsystem,
tag,
csr_path,
basic_constraints_ext=basic_constraints_ext,
key_usage_ext=key_usage_ext,
generic_exts=generic_exts,
subject_key_id=self.configuration_file.req_ski,
)

finally:
nssdb.close()

def generate_kra_storage_request(self, subsystem):

csr_path = self.mdict.get('pki_storage_csr_path')
if not csr_path:
return

key_usage_ext = {
'digitalSignature': True,
'nonRepudiation': True,
'keyEncipherment': True,
'dataEncipherment': True,
'critical': True
}

extended_key_usage_ext = {
'clientAuth': True
}

tag = 'storage'
cert = subsystem.get_subsystem_cert(tag)
token = pki.nssdb.normalize_token(cert['token'])

if not token:
token = self.mdict['pki_token_name']

nssdb = subsystem.instance.open_nssdb(token)

try:
self.generate_csr(
nssdb,
subsystem,
tag,
csr_path,
key_usage_ext=key_usage_ext,
extended_key_usage_ext=extended_key_usage_ext
)

finally:
nssdb.close()

def generate_kra_transport_request(self, subsystem):

csr_path = self.mdict.get('pki_transport_csr_path')
if not csr_path:
return

key_usage_ext = {
'digitalSignature': True,
'nonRepudiation': True,
'keyEncipherment': True,
'dataEncipherment': True,
'critical': True
}

extended_key_usage_ext = {
'clientAuth': True
}

tag = 'transport'
cert = subsystem.get_subsystem_cert(tag)
token = pki.nssdb.normalize_token(cert['token'])

if not token:
token = self.mdict['pki_token_name']

nssdb = subsystem.instance.open_nssdb(token)

try:
self.generate_csr(
nssdb,
subsystem,
tag,
csr_path,
key_usage_ext=key_usage_ext,
extended_key_usage_ext=extended_key_usage_ext
)

finally:
nssdb.close()

def generate_ocsp_signing_request(self, subsystem):

csr_path = self.mdict.get('pki_ocsp_signing_csr_path')
if not csr_path:
return

tag = 'signing'
cert = subsystem.get_subsystem_cert(tag)
token = pki.nssdb.normalize_token(cert['token'])

if not token:
token = self.mdict['pki_token_name']

nssdb = subsystem.instance.open_nssdb(token)

try:
self.generate_csr(
nssdb,
subsystem,
tag,
csr_path
)

finally:
nssdb.close()

def generate_sslserver_request(self, subsystem):

csr_path = self.mdict.get('pki_sslserver_csr_path')
if not csr_path:
return

key_usage_ext = {
'digitalSignature': True,
'nonRepudiation': True,
'keyEncipherment': True,
'dataEncipherment': True,
'critical': True
}

extended_key_usage_ext = {
'serverAuth': True
}

tag = 'sslserver'
cert = subsystem.get_subsystem_cert(tag)
token = pki.nssdb.normalize_token(cert['token'])

if not token:
token = self.mdict['pki_token_name']

nssdb = subsystem.instance.open_nssdb(token)

try:
self.generate_csr(
nssdb,
subsystem,
tag,
csr_path,
key_usage_ext=key_usage_ext,
extended_key_usage_ext=extended_key_usage_ext
)

finally:
nssdb.close()

def generate_subsystem_request(self, subsystem):

csr_path = self.mdict.get('pki_subsystem_csr_path')
if not csr_path:
return

key_usage_ext = {
'digitalSignature': True,
'nonRepudiation': True,
'keyEncipherment': True,
'dataEncipherment': True,
'critical': True
}

extended_key_usage_ext = {
'serverAuth': True,
'clientAuth': True
}

tag = 'subsystem'
cert = subsystem.get_subsystem_cert(tag)
token = pki.nssdb.normalize_token(cert['token'])

if not token:
token = self.mdict['pki_token_name']

nssdb = subsystem.instance.open_nssdb(token)

try:
self.generate_csr(
nssdb,
subsystem,
tag,
csr_path,
key_usage_ext=key_usage_ext,
extended_key_usage_ext=extended_key_usage_ext
)

finally:
nssdb.close()

def generate_audit_signing_request(self, subsystem):

csr_path = self.mdict.get('pki_audit_signing_csr_path')
if not csr_path:
return

key_usage_ext = {
'digitalSignature': True,
'nonRepudiation': True,
'critical': True
}

tag = 'audit_signing'
cert = subsystem.get_subsystem_cert(tag)
token = pki.nssdb.normalize_token(cert['token'])

if not token:
token = self.mdict['pki_token_name']

nssdb = subsystem.instance.open_nssdb(token)

try:
self.generate_csr(
nssdb,
subsystem,
tag,
csr_path,
key_usage_ext=key_usage_ext
)

finally:
nssdb.close()

def generate_admin_request(self, subsystem):

csr_path = self.mdict.get('pki_admin_csr_path')
if not csr_path:
return

client_nssdb = pki.nssdb.NSSDatabase(
directory=self.mdict['pki_client_database_dir'],
password=self.mdict['pki_client_database_password'])

try:
self.generate_csr(
client_nssdb,
subsystem,
'admin',
csr_path
)

finally:
client_nssdb.close()

def generate_system_cert_requests(self, subsystem):

if subsystem.name == 'ca':
self.generate_ca_signing_request(subsystem)

if subsystem.name == 'kra':
self.generate_kra_storage_request(subsystem)
self.generate_kra_transport_request(subsystem)

if subsystem.name == 'ocsp':
self.generate_ocsp_signing_request(subsystem)

if subsystem.name in ['kra', 'ocsp', 'tks', 'tps']:
self.generate_sslserver_request(subsystem)
self.generate_subsystem_request(subsystem)
self.generate_audit_signing_request(subsystem)
self.generate_admin_request(subsystem)

def import_system_cert_request(self, subsystem, tag):

cert_id = self.get_cert_id(subsystem, tag)
Expand Down
Loading

0 comments on commit e31e3b9

Please sign in to comment.