Getting error while sending message to kafka topic in kerberosed enviornment. We have cluster on hdp 2.3
I followed this http://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/
But for sending messages, I have to do kinit explicitly first, then only I am able to send message to kafka topic. I tried to do knit through java class but that also doesn't work. PFB code:
package com.ct.test.kafka;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class TestProducer {
public static void main(String[] args) {
String principalName = "ctadmin";
String keyTabPath = "/etc/security/keytabs/ctadmin.keytab";
boolean authStatus = CTSecurityUtil.loginUserFromKeytab(principalName, keyTabPath);
if (!authStatus) {
System.out.println("Authntication fails, try something else " + authStatus);
} else {
System.out.println("Authntication successfull " + authStatus);
}
System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
System.setProperty("java.security.auth.login.config", "/etc/kafka/2.3.4.0-3485/0/kafka_jaas.conf");
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
System.setProperty("sun.security.krb5.debug", "true");
try {
long events = Long.parseLong("3");
Random rnd = new Random();
Properties props = new Properties();
System.out.println("After broker list- " + args[0]);
props.put("metadata.broker.list", args[0]);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("security.protocol", "PLAINTEXTSASL");
//props.put("partitioner.class", "com.ct.test.kafka.SimplePartitioner");
System.out.println("After config prop -1");
ProducerConfig config = new ProducerConfig(props);
System.out.println("After config prop -2 config" + config);
Producer<String, String> producer = new Producer<String, String>(config);
System.out.println("After config prop -3");
for (long nEvents = 0L; nEvents < events; nEvents += 1L) {
Date runtime = new Date();
String ip = "192.168.2" + rnd.nextInt(255);
String msg = runtime + " www.example.com, " + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("test_march4", ip, msg);
System.out.println("After config prop -1 data" + data);
producer.send(data);
}
producer.close();
} catch (Throwable th) {
th.printStackTrace();
}
}
}
Pom.xml : All dependency downloaded from hortonworks repo.
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.2.3.4.0-3485</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.2.3.4.0-3485</version>
</dependency>
<dependency>
<groupId>org.jasypt</groupId>
<artifactId>jasypt-spring31</artifactId>
<version>1.9.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.1.2.3.4.0-3485</version>
</dependency>
</dependencies>
Error : Case1 : when I specify myuser kafka_jass.conf
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
After config prop -2 configkafka.producer.ProducerConfig@643293ae
java.lang.SecurityException: Configuration Error:
Line 6: expected [controlFlag]
at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:110)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:379)
at javax.security.auth.login.Configuration$2.run(Configuration.java:258)
at javax.security.auth.login.Configuration$2.run(Configuration.java:250)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:249)
at org.apache.kafka.common.security.kerberos.Login.login(Login.java:291)
at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
at kafka.producer.Producer.<init>(Producer.scala:50)
at kafka.producer.Producer.<init>(Producer.scala:73)
at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
at com.ct.test.kafka.TestProducer.main(TestProducer.java:51)
Caused by: java.io.IOException: Configuration Error:
Line 6: expected [controlFlag]
at com.sun.security.auth.login.ConfigFile.match(ConfigFile.java:563)
at com.sun.security.auth.login.ConfigFile.parseLoginEntry(ConfigFile.java:413)
at com.sun.security.auth.login.ConfigFile.readConfig(ConfigFile.java:383)
at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:283)
at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:219)
at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:108)
MyUser_Kafka_jass.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=true
renewTicket=true
principal="ctadmin/[email protected]";
useKeyTab=true
serviceName="kafka"
keyTab="/etc/security/keytabs/ctadmin.keytab"
client=true;
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/ctadmin.keytab"
storeKey=true
useTicketCache=true
serviceName="zookeeper"
principal="ctadmin/[email protected]";
};
case2 : When I specify Kafkas own jaas file
Java config name: /etc/krb5.conf
Loaded from Java config
javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. Make sure -Djava.security.auth.login.config property passed to JVM and the client is configured to use a ticket cache (using the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using FQDN of the Kafka broker you are trying to connect to. not available to garner authentication information from the user
at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:899)
at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:719)
at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:584)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:762)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:690)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:688)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:687)
at javax.security.auth.login.LoginContext.login(LoginContext.java:595)
at org.apache.kafka.common.security.kerberos.Login.login(Login.java:298)
at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
at kafka.producer.Producer.<init>(Producer.scala:50)
at kafka.producer.Producer.<init>(Producer.scala:73)
at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
at com.ct.test.kafka.TestProducer.main(TestProducer.java:51)
This works fine, if I do kinit before running this app, else it will through above error. I cant do this in my production environment, if there is any way to do this by our app itself then please help me out. Please let me know if you need any more details.
Thanks:)
The error is in a semicolon you have in your jaas file as you can see in this piece of output:
Line 6: expected [controlFlag]
This line cannot have the semicolon:
principal="ctadmin/[email protected]";
it can only exist in the last line: